Speed up push rule evaluation

This commit is contained in:
Erik Johnston 2022-08-17 17:10:27 +01:00
parent 82a0752f32
commit 115970a494
4 changed files with 323 additions and 64 deletions

179
eval.py Normal file
View file

@ -0,0 +1,179 @@
from time import time
from typing import Any, Collection, Dict, List
from synapse.api.constants import EventTypes
from synapse.api.room_versions import RoomVersions
from synapse.config.experimental import ExperimentalConfig
from synapse.events import EventBase, make_event_from_dict
from synapse.push.baserules import FilteredPushRules, PushRules
from synapse.push.push_rule_evaluator import PushRuleEvaluatorForEvent
def compute_push_actions(
experimental_config: ExperimentalConfig,
evaluator: PushRuleEvaluatorForEvent,
event: EventBase,
rules_by_user: Dict[str, FilteredPushRules],
profiles: Dict[str, Any],
count_as_unread: bool,
uids_with_visibility: Collection[str],
) -> Dict[str, List]:
actions_by_user = {}
default_rules = FilteredPushRules(PushRules(), {}, experimental_config)
matching_default_rule = None
for rule, _ in default_rules:
if not rule.default_enabled:
continue
matches = evaluator.check_conditions(rule.conditions, "uid", None)
if matches:
matching_default_rule = rule
break
joining_user = None
if event.type == EventTypes.Member:
joining_user = event.state_key
for uid, rules in rules_by_user.items():
if event.sender == uid:
try:
actions_by_user.pop(uid)
except KeyError:
pass
continue
if uid not in uids_with_visibility:
try:
actions_by_user.pop(uid)
except KeyError:
pass
continue
display_name = None
profile = profiles.get(uid)
if profile:
display_name = profile.display_name
if not display_name and joining_user:
# Handle the case where we are pushing a membership event to
# that user, as they might not be already joined.
if joining_user == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None
if count_as_unread:
# Add an element for the current user if the event needs to be marked as
# unread, so that add_push_actions_to_staging iterates over it.
# If the event shouldn't be marked as unread but should notify the
# current user, it'll be added to the dict later.
actions_by_user[uid] = []
matched_default = False
if matching_default_rule:
if not rules.enabled_map.get(matching_default_rule.rule_id, True):
continue
matched_default = True
override = rules.push_rules.overriden_base_rules.get(
matching_default_rule.rule_id
)
if override:
actions = override.actions
else:
actions = matching_default_rule.actions
actions = [x for x in actions if x != "dont_notify"]
if actions and "notify" in actions:
actions_by_user[uid] = matching_default_rule.actions
for rule, enabled in rules.user_specific_rules():
if not enabled:
continue
if (
matched_default
and rule.priority_class < matching_default_rule.priority_class
):
break
matches = evaluator.check_conditions(rule.conditions, uid, display_name)
if matches:
actions = [x for x in rule.actions if x != "dont_notify"]
if actions and "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions
else:
try:
actions_by_user.pop(uid)
except KeyError:
pass
break
return actions_by_user
if __name__ == "__main__":
event = make_event_from_dict(
{
"auth_events": [
"$Y6V1n3kQq_G2Q2gqma4tXbS0TtZQYne-zk8EGymcErI",
"$RWzLUHmF5Hc6kr5hJuCY7gcDt3bVXS2JL6oJD7lTEdo",
"$uIZRw93tT3lXnpMj40J8aPbnDkXeaWtgJWBVrfeQsYs",
],
"prev_events": ["$6lCOe9WyCBREZrvfdShVHO7OgBZ3HA82AN-TsGzsj94"],
"type": "m.room.message",
"room_id": "!mWlQLVyRcFtLrKOgEl:localhost:8448",
"sender": "@user-nn87-main:localhost:8448",
"content": {
"org.matrix.msc1767.text": "test",
"body": "test",
"msgtype": "m.text",
},
"depth": 5006,
"prev_state": [],
"origin": "localhost:8448",
"origin_server_ts": 1660738396696,
"hashes": {"sha256": "j2X9zgQU6jUqARb9blCdX5UL8SKKJgG1cTxb7uZOiLI"},
"signatures": {
"localhost:8448": {
"ed25519:a_ERAh": "BsToq2Bf2DqksU5i7vsMN2hxgRBmou+5++IK4+Af8GLt46E9Po1L5Iv1JLxe4eN/zN/jYW03ULGdrzzJkCzaDA"
}
},
"unsigned": {"age_ts": 1660738396696},
},
RoomVersions.V10,
)
evaluator = PushRuleEvaluatorForEvent(event, 5000, 0, {}, {}, False)
experimental_config = ExperimentalConfig()
experimental_config.read_config({})
rules_by_user = {
f"@user-{i}:localhost": FilteredPushRules(PushRules(), {}, experimental_config)
for i in range(5000)
}
uids_with_visibility = set(rules_by_user)
start = time()
number = 100
for _ in range(number):
result = compute_push_actions(
experimental_config,
evaluator,
event,
rules_by_user,
{},
True,
uids_with_visibility,
)
end = time()
print(f"Average time: {(end - start)*1000/number:.3}ms")

View file

@ -109,9 +109,10 @@ class PushRules:
for rule in itertools.chain(
BASE_PREPEND_OVERRIDE_RULES,
self.override,
BASE_APPEND_OVERRIDE_RULES_USER_SPECIFIC,
BASE_APPEND_OVERRIDE_RULES,
self.content,
BASE_APPEND_CONTENT_RULES,
BASE_APPEND_CONTENT_RULES_USER_SPECIFIC,
self.room,
self.sender,
self.underride,
@ -125,6 +126,24 @@ class PushRules:
else:
yield rule
def user_specific_rules(self) -> Iterator[PushRule]:
for rule in itertools.chain(
self.override,
BASE_APPEND_OVERRIDE_RULES_USER_SPECIFIC,
self.content,
BASE_APPEND_CONTENT_RULES_USER_SPECIFIC,
self.room,
self.sender,
self.underride,
):
if rule.default:
override_rule = self.overriden_base_rules.get(rule.rule_id)
if override_rule:
yield override_rule
continue
yield rule
def __len__(self) -> int:
# The length is mostly used by caches to get a sense of "size" / amount
# of memory this object is using, so we only count the number of custom
@ -160,6 +179,17 @@ class FilteredPushRules:
yield rule, enabled
def user_specific_rules(self) -> Iterator[PushRule]:
for rule in self.push_rules.user_specific_rules():
if rule.default and not _is_experimental_rule_enabled(
rule.rule_id, self.experimental_config
):
continue
enabled = self.enabled_map.get(rule.rule_id, rule.default_enabled)
yield rule, enabled
def __len__(self) -> int:
return len(self.push_rules)
@ -237,7 +267,7 @@ def _is_experimental_rule_enabled(
return True
BASE_APPEND_CONTENT_RULES = [
BASE_APPEND_CONTENT_RULES_USER_SPECIFIC = [
PushRule(
default=True,
priority_class=PRIORITY_CLASS_MAP["content"],
@ -271,21 +301,7 @@ BASE_PREPEND_OVERRIDE_RULES = [
]
BASE_APPEND_OVERRIDE_RULES = [
PushRule(
default=True,
priority_class=PRIORITY_CLASS_MAP["override"],
rule_id="global/override/.m.rule.suppress_notices",
conditions=[
{
"kind": "event_match",
"key": "content.msgtype",
"pattern": "m.notice",
"_cache_key": "_suppress_notices",
}
],
actions=["dont_notify"],
),
BASE_APPEND_OVERRIDE_RULES_USER_SPECIFIC = [
# NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
# otherwise invites will be matched by .m.rule.member_event
PushRule(
@ -314,6 +330,38 @@ BASE_APPEND_OVERRIDE_RULES = [
{"set_tweak": "highlight", "value": False},
],
),
# This was changed from underride to override so it's closer in priority
# to the content rules where the user name highlight rule lives. This
# way a room rule is lower priority than both but a custom override rule
# is higher priority than both.
PushRule(
default=True,
priority_class=PRIORITY_CLASS_MAP["override"],
rule_id="global/override/.m.rule.contains_display_name",
conditions=[{"kind": "contains_display_name"}],
actions=[
"notify",
{"set_tweak": "sound", "value": "default"},
{"set_tweak": "highlight"},
],
),
]
BASE_APPEND_OVERRIDE_RULES = [
PushRule(
default=True,
priority_class=PRIORITY_CLASS_MAP["override"],
rule_id="global/override/.m.rule.suppress_notices",
conditions=[
{
"kind": "event_match",
"key": "content.msgtype",
"pattern": "m.notice",
"_cache_key": "_suppress_notices",
}
],
actions=["dont_notify"],
),
# Will we sometimes want to know about people joining and leaving?
# Perhaps: if so, this could be expanded upon. Seems the most usual case
# is that we don't though. We add this override rule so that even if
@ -334,21 +382,6 @@ BASE_APPEND_OVERRIDE_RULES = [
],
actions=["dont_notify"],
),
# This was changed from underride to override so it's closer in priority
# to the content rules where the user name highlight rule lives. This
# way a room rule is lower priority than both but a custom override rule
# is higher priority than both.
PushRule(
default=True,
priority_class=PRIORITY_CLASS_MAP["override"],
rule_id="global/override/.m.rule.contains_display_name",
conditions=[{"kind": "contains_display_name"}],
actions=[
"notify",
{"set_tweak": "sound", "value": "default"},
{"set_tweak": "highlight"},
],
),
PushRule(
default=True,
priority_class=PRIORITY_CLASS_MAP["override"],
@ -566,18 +599,11 @@ BASE_RULE_IDS = set()
BASE_RULES_BY_ID: Dict[str, PushRule] = {}
for r in BASE_APPEND_CONTENT_RULES:
BASE_RULE_IDS.add(r.rule_id)
BASE_RULES_BY_ID[r.rule_id] = r
for r in BASE_PREPEND_OVERRIDE_RULES:
BASE_RULE_IDS.add(r.rule_id)
BASE_RULES_BY_ID[r.rule_id] = r
for r in BASE_APPEND_OVERRIDE_RULES:
BASE_RULE_IDS.add(r.rule_id)
BASE_RULES_BY_ID[r.rule_id] = r
for r in BASE_APPEND_UNDERRIDE_RULES:
for r in itertools.chain(
BASE_APPEND_OVERRIDE_RULES_USER_SPECIFIC,
BASE_PREPEND_OVERRIDE_RULES,
BASE_APPEND_CONTENT_RULES_USER_SPECIFIC,
BASE_APPEND_UNDERRIDE_RULES,
):
BASE_RULE_IDS.add(r.rule_id)
BASE_RULES_BY_ID[r.rule_id] = r

View file

@ -41,7 +41,7 @@ from synapse.util.caches import register_cache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_event_for_clients_with_state
from .baserules import FilteredPushRules, PushRule
from .baserules import FilteredPushRules, PushRule, PushRules
from .push_rule_evaluator import PushRuleEvaluatorForEvent
if TYPE_CHECKING:
@ -301,11 +301,39 @@ class BulkPushRuleEvaluator:
self.store, users, event, context
)
actions_by_user = {}
default_rules = FilteredPushRules(PushRules(), {}, self.hs.config.experimental)
matching_default_rule = None
for rule, _ in default_rules:
if not rule.default_enabled:
continue
matches = evaluator.check_conditions(rule.conditions, "uid", None)
if matches:
matching_default_rule = rule
break
logger.info("ACTIONS found matching rule %s", rule)
joining_user = None
if event.type == EventTypes.Member:
joining_user = event.state_key
for uid, rules in rules_by_user.items():
if event.sender == uid:
try:
actions_by_user.pop(uid)
except KeyError:
pass
continue
if uid not in uids_with_visibility:
try:
actions_by_user.pop(uid)
except KeyError:
pass
continue
display_name = None
@ -313,10 +341,10 @@ class BulkPushRuleEvaluator:
if profile:
display_name = profile.display_name
if not display_name:
if not display_name and joining_user:
# Handle the case where we are pushing a membership event to
# that user, as they might not be already joined.
if event.type == EventTypes.Member and event.state_key == uid:
if joining_user == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None
@ -328,18 +356,51 @@ class BulkPushRuleEvaluator:
# current user, it'll be added to the dict later.
actions_by_user[uid] = []
for rule, enabled in rules:
matched_default = False
if matching_default_rule:
if not rules.enabled_map.get(matching_default_rule.rule_id, True):
continue
matched_default = True
override = rules.push_rules.overriden_base_rules.get(
matching_default_rule.rule_id
)
if override:
actions = override.actions
else:
actions = matching_default_rule.actions
actions = [x for x in actions if x != "dont_notify"]
if actions and "notify" in actions:
actions_by_user[uid] = matching_default_rule.actions
for rule, enabled in rules.user_specific_rules():
if not enabled:
continue
if (
matched_default
and rule.priority_class < matching_default_rule.priority_class
):
break
matches = evaluator.check_conditions(rule.conditions, uid, display_name)
if matches:
actions = [x for x in rule.actions if x != "dont_notify"]
if actions and "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions
else:
try:
actions_by_user.pop(uid)
except KeyError:
pass
break
logger.info("ACTIONS %s", actions_by_user)
# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)

View file

@ -144,6 +144,8 @@ class PushRuleEvaluatorForEvent:
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
self._value_cache = _flatten_dict(event)
self._body_split = re.split(r"\b", self._value_cache.get("content.body", ""))
# Maps cache keys to final values.
self._condition_cache: Dict[str, bool] = {}
@ -233,7 +235,7 @@ class PushRuleEvaluatorForEvent:
if pattern_type == "user_id":
pattern = user_id
elif pattern_type == "user_localpart":
pattern = UserID.from_string(user_id).localpart
pattern = user_id[1:].split(":", 1)[0]
if not pattern:
logger.warning("event_match condition with no pattern")
@ -241,11 +243,7 @@ class PushRuleEvaluatorForEvent:
# XXX: optimisation: cache our pattern regexps
if condition["key"] == "content.body":
body = self._event.content.get("body", None)
if not body or not isinstance(body, str):
return False
return _glob_matches(pattern, body, word_boundary=True)
return any(pattern == b for b in self._body_split)
else:
haystack = self._value_cache.get(condition["key"], None)
if haystack is None:
@ -270,15 +268,7 @@ class PushRuleEvaluatorForEvent:
if not body or not isinstance(body, str):
return False
# Similar to _glob_matches, but do not treat display_name as a glob.
r = regex_cache.get((display_name, False, True), None)
if not r:
r1 = re.escape(display_name)
r1 = to_word_pattern(r1)
r = re.compile(r1, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r
return bool(r.search(body))
return display_name in self._body_split
def _relation_match(self, condition: Mapping, user_id: str) -> bool:
"""
@ -332,6 +322,9 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
string. Defaults to False.
"""
if not IS_GLOB.search(glob):
return glob == value
try:
r = regex_cache.get((glob, True, word_boundary), None)
if not r: