diff --git a/changelog.d/13222.misc b/changelog.d/13222.misc new file mode 100644 index 0000000000..0bab1aed70 --- /dev/null +++ b/changelog.d/13222.misc @@ -0,0 +1 @@ +Improve memory usage of calculating push actions for events in large rooms. diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 561621a285..87f82bd9a5 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -39,6 +39,7 @@ from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.server import HomeServer +from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.room import RoomWorkerStore from synapse.types import StateMap from synapse.util import SYNAPSE_VERSION @@ -60,7 +61,17 @@ class AdminCmdSlavedStore( BaseSlavedStore, RoomWorkerStore, ): - pass + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + # Annoyingly `filter_events_for_client` assumes that this exists. We + # should refactor it to take a `Clock` directly. + self.clock = hs.get_clock() class AdminCmdServer(HomeServer): diff --git a/synapse/visibility.py b/synapse/visibility.py index 8aaa8c709f..9abbaa5a64 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -13,16 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from enum import Enum, auto from typing import Collection, Dict, FrozenSet, List, Optional, Tuple +import attr from typing_extensions import Final from synapse.api.constants import EventTypes, HistoryVisibility, Membership from synapse.events import EventBase +from synapse.events.snapshot import EventContext from synapse.events.utils import prune_event from synapse.storage.controllers import StorageControllers +from synapse.storage.databases.main import DataStore from synapse.storage.state import StateFilter from synapse.types import RetentionPolicy, StateMap, get_domain_from_id +from synapse.util import Clock logger = logging.getLogger(__name__) @@ -102,153 +107,18 @@ async def filter_events_for_client( ] = await storage.main.get_retention_policy_for_room(room_id) def allowed(event: EventBase) -> Optional[EventBase]: - """ - Args: - event: event to check - - Returns: - None if the user cannot see this event at all - - a redacted copy of the event if they can only see a redacted - version - - the original event if they can see it as normal. - """ - # Only run some checks if these events aren't about to be sent to clients. This is - # because, if this is not the case, we're probably only checking if the users can - # see events in the room at that point in the DAG, and that shouldn't be decided - # on those checks. - if filter_send_to_client: - if event.type == EventTypes.Dummy: - return None - - if not event.is_state() and event.sender in ignore_list: - return None - - # Until MSC2261 has landed we can't redact malicious alias events, so for - # now we temporarily filter out m.room.aliases entirely to mitigate - # abuse, while we spec a better solution to advertising aliases - # on rooms. - if event.type == EventTypes.Aliases: - return None - - # Don't try to apply the room's retention policy if the event is a state - # event, as MSC1763 states that retention is only considered for non-state - # events. - if not event.is_state(): - retention_policy = retention_policies[event.room_id] - max_lifetime = retention_policy.max_lifetime - - if max_lifetime is not None: - oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime - - if event.origin_server_ts < oldest_allowed_ts: - return None - - if event.event_id in always_include_ids: - return event - - # we need to handle outliers separately, since we don't have the room state. - if event.internal_metadata.outlier: - # Normally these can't be seen by clients, but we make an exception for - # for out-of-band membership events (eg, incoming invites, or rejections of - # said invite) for the user themselves. - if event.type == EventTypes.Member and event.state_key == user_id: - logger.debug("Returning out-of-band-membership event %s", event) - return event - - return None - - state = event_id_to_state[event.event_id] - - # get the room_visibility at the time of the event. - visibility = get_effective_room_visibility_from_state(state) - - # Always allow history visibility events on boundaries. This is done - # by setting the effective visibility to the least restrictive - # of the old vs new. - if event.type == EventTypes.RoomHistoryVisibility: - prev_content = event.unsigned.get("prev_content", {}) - prev_visibility = prev_content.get("history_visibility", None) - - if prev_visibility not in VISIBILITY_PRIORITY: - prev_visibility = HistoryVisibility.SHARED - - new_priority = VISIBILITY_PRIORITY.index(visibility) - old_priority = VISIBILITY_PRIORITY.index(prev_visibility) - if old_priority < new_priority: - visibility = prev_visibility - - # likewise, if the event is the user's own membership event, use - # the 'most joined' membership - membership = None - if event.type == EventTypes.Member and event.state_key == user_id: - membership = event.content.get("membership", None) - if membership not in MEMBERSHIP_PRIORITY: - membership = "leave" - - prev_content = event.unsigned.get("prev_content", {}) - prev_membership = prev_content.get("membership", None) - if prev_membership not in MEMBERSHIP_PRIORITY: - prev_membership = "leave" - - # Always allow the user to see their own leave events, otherwise - # they won't see the room disappear if they reject the invite - # - # (Note this doesn't work for out-of-band invite rejections, which don't - # have prev_state populated. They are handled above in the outlier code.) - if membership == "leave" and ( - prev_membership == "join" or prev_membership == "invite" - ): - return event - - new_priority = MEMBERSHIP_PRIORITY.index(membership) - old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) - if old_priority < new_priority: - membership = prev_membership - - # otherwise, get the user's membership at the time of the event. - if membership is None: - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - membership = membership_event.membership - - # if the user was a member of the room at the time of the event, - # they can see it. - if membership == Membership.JOIN: - return event - - # otherwise, it depends on the room visibility. - - if visibility == HistoryVisibility.JOINED: - # we weren't a member at the time of the event, so we can't - # see this event. - return None - - elif visibility == HistoryVisibility.INVITED: - # user can also see the event if they were *invited* at the time - # of the event. - return event if membership == Membership.INVITE else None - - elif visibility == HistoryVisibility.SHARED and is_peeking: - # if the visibility is shared, users cannot see the event unless - # they have *subsequently* joined the room (or were members at the - # time, of course) - # - # XXX: if the user has subsequently joined and then left again, - # ideally we would share history up to the point they left. But - # we don't know when they left. We just treat it as though they - # never joined, and restrict access. - return None - - # the visibility is either shared or world_readable, and the user was - # not a member at the time. We allow it, provided the original sender - # has not requested their data to be erased, in which case, we return - # a redacted version. - if erased_senders[event.sender]: - return prune_event(event) - - return event + return _check_client_allowed_to_see_event( + user_id=user_id, + event=event, + clock=storage.main.clock, + filter_send_to_client=filter_send_to_client, + sender_ignored=event.sender in ignore_list, + always_include_ids=always_include_ids, + retention_policy=retention_policies[room_id], + state=event_id_to_state.get(event.event_id), + is_peeking=is_peeking, + sender_erased=erased_senders.get(event.sender, False), + ) # Check each event: gives an iterable of None or (a potentially modified) # EventBase. @@ -258,9 +128,389 @@ async def filter_events_for_client( return [ev for ev in filtered_events if ev] +async def filter_event_for_clients_with_state( + store: DataStore, + user_ids: Collection[str], + event: EventBase, + context: EventContext, + is_peeking: bool = False, + filter_send_to_client: bool = True, +) -> Collection[str]: + """ + Checks to see if an event is visible to the users in the list at the time of + the event. + + Note: This does *not* check if the sender of the event was erased. + + Args: + store: databases + user_ids: user_ids to be checked + event: the event to be checked + context: EventContext for the event to be checked + is_peeking: Whether the users are peeking into the room, ie not + currently joined + filter_send_to_client: Whether we're checking an event that's going to be + sent to a client. This might not always be the case since this function can + also be called to check whether a user can see the state at a given point. + + Returns: + Collection of user IDs for whom the event is visible + """ + # None of the users should see the event if it is soft_failed + if event.internal_metadata.is_soft_failed(): + return [] + + # Make a set for all user IDs that haven't been filtered out by a check. + allowed_user_ids = set(user_ids) + + # Only run some checks if these events aren't about to be sent to clients. This is + # because, if this is not the case, we're probably only checking if the users can + # see events in the room at that point in the DAG, and that shouldn't be decided + # on those checks. + if filter_send_to_client: + ignored_by = await store.ignored_by(event.sender) + retention_policy = await store.get_retention_policy_for_room(event.room_id) + + for user_id in user_ids: + if ( + _check_filter_send_to_client( + event, + store.clock, + retention_policy, + sender_ignored=user_id in ignored_by, + ) + == _CheckFilter.DENIED + ): + allowed_user_ids.discard(user_id) + + if event.internal_metadata.outlier: + # Normally these can't be seen by clients, but we make an exception for + # for out-of-band membership events (eg, incoming invites, or rejections of + # said invite) for the user themselves. + if event.type == EventTypes.Member and event.state_key in allowed_user_ids: + logger.debug("Returning out-of-band-membership event %s", event) + return {event.state_key} + + return set() + + # First we get just the history visibility in case its shared/world-readable + # room. + visibility_state_map = await _get_state_map( + store, event, context, StateFilter.from_types([_HISTORY_VIS_KEY]) + ) + + visibility = get_effective_room_visibility_from_state(visibility_state_map) + if ( + _check_history_visibility(event, visibility, is_peeking=is_peeking) + == _CheckVisibility.ALLOWED + ): + return allowed_user_ids + + # The history visibility isn't lax, so we now need to fetch the membership + # events of all the users. + + filter_list = [] + for user_id in allowed_user_ids: + filter_list.append((EventTypes.Member, user_id)) + filter_list.append((EventTypes.RoomHistoryVisibility, "")) + + state_filter = StateFilter.from_types(filter_list) + state_map = await _get_state_map(store, event, context, state_filter) + + # Now we check whether the membership allows each user to see the event. + return { + user_id + for user_id in allowed_user_ids + if _check_membership(user_id, event, visibility, state_map, is_peeking).allowed + } + + +async def _get_state_map( + store: DataStore, event: EventBase, context: EventContext, state_filter: StateFilter +) -> StateMap[EventBase]: + """Helper function for getting a `StateMap[EventBase]` from an `EventContext`""" + state_map = await context.get_prev_state_ids(state_filter) + + # Use events rather than event ids as content from the events are needed in + # _check_visibility + event_map = await store.get_events(state_map.values(), get_prev_content=False) + + updated_state_map = {} + for state_key, event_id in state_map.items(): + state_event = event_map.get(event_id) + if state_event: + updated_state_map[state_key] = state_event + + if event.is_state(): + current_state_key = (event.type, event.state_key) + # Add current event to updated_state_map, we need to do this here as it + # may not have been persisted to the db yet + updated_state_map[current_state_key] = event + + return updated_state_map + + +def _check_client_allowed_to_see_event( + user_id: str, + event: EventBase, + clock: Clock, + filter_send_to_client: bool, + is_peeking: bool, + always_include_ids: FrozenSet[str], + sender_ignored: bool, + retention_policy: RetentionPolicy, + state: Optional[StateMap[EventBase]], + sender_erased: bool, +) -> Optional[EventBase]: + """Check with the given user is allowed to see the given event + + See `filter_events_for_client` for details about args + + Args: + user_id + event + clock + filter_send_to_client + is_peeking + always_include_ids + sender_ignored: Whether the user is ignoring the event sender + retention_policy: The retention policy of the room + state: The state at the event, unless its an outlier + sender_erased: Whether the event sender has been marked as "erased" + + Returns: + None if the user cannot see this event at all + + a redacted copy of the event if they can only see a redacted + version + + the original event if they can see it as normal. + """ + # Only run some checks if these events aren't about to be sent to clients. This is + # because, if this is not the case, we're probably only checking if the users can + # see events in the room at that point in the DAG, and that shouldn't be decided + # on those checks. + if filter_send_to_client: + if ( + _check_filter_send_to_client(event, clock, retention_policy, sender_ignored) + == _CheckFilter.DENIED + ): + return None + + if event.event_id in always_include_ids: + return event + + # we need to handle outliers separately, since we don't have the room state. + if event.internal_metadata.outlier: + # Normally these can't be seen by clients, but we make an exception for + # for out-of-band membership events (eg, incoming invites, or rejections of + # said invite) for the user themselves. + if event.type == EventTypes.Member and event.state_key == user_id: + logger.debug("Returning out-of-band-membership event %s", event) + return event + + return None + + if state is None: + raise Exception("Missing state for non-outlier event") + + # get the room_visibility at the time of the event. + visibility = get_effective_room_visibility_from_state(state) + + # Check if the room has lax history visibility, allowing us to skip + # membership checks. + # + # We can only do this check if the sender has *not* been erased, as if they + # have we need to check the user's membership. + if ( + not sender_erased + and _check_history_visibility(event, visibility, is_peeking) + == _CheckVisibility.ALLOWED + ): + return event + + membership_result = _check_membership(user_id, event, visibility, state, is_peeking) + if not membership_result.allowed: + return None + + # If the sender has been erased and the user was not joined at the time, we + # must only return the redacted form. + if sender_erased and not membership_result.joined: + event = prune_event(event) + + return event + + +@attr.s(frozen=True, slots=True, auto_attribs=True) +class _CheckMembershipReturn: + "Return value of _check_membership" + allowed: bool + joined: bool + + +def _check_membership( + user_id: str, + event: EventBase, + visibility: str, + state: StateMap[EventBase], + is_peeking: bool, +) -> _CheckMembershipReturn: + """Check whether the user can see the event due to their membership + + Returns: + True if they can, False if they can't, plus the membership of the user + at the event. + """ + # If the event is the user's own membership event, use the 'most joined' + # membership + membership = None + if event.type == EventTypes.Member and event.state_key == user_id: + membership = event.content.get("membership", None) + if membership not in MEMBERSHIP_PRIORITY: + membership = "leave" + + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership not in MEMBERSHIP_PRIORITY: + prev_membership = "leave" + + # Always allow the user to see their own leave events, otherwise + # they won't see the room disappear if they reject the invite + # + # (Note this doesn't work for out-of-band invite rejections, which don't + # have prev_state populated. They are handled above in the outlier code.) + if membership == "leave" and ( + prev_membership == "join" or prev_membership == "invite" + ): + return _CheckMembershipReturn(True, membership == Membership.JOIN) + + new_priority = MEMBERSHIP_PRIORITY.index(membership) + old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) + if old_priority < new_priority: + membership = prev_membership + + # otherwise, get the user's membership at the time of the event. + if membership is None: + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + membership = membership_event.membership + + # if the user was a member of the room at the time of the event, + # they can see it. + if membership == Membership.JOIN: + return _CheckMembershipReturn(True, True) + + # otherwise, it depends on the room visibility. + + if visibility == HistoryVisibility.JOINED: + # we weren't a member at the time of the event, so we can't + # see this event. + return _CheckMembershipReturn(False, False) + + elif visibility == HistoryVisibility.INVITED: + # user can also see the event if they were *invited* at the time + # of the event. + return _CheckMembershipReturn(membership == Membership.INVITE, False) + + elif visibility == HistoryVisibility.SHARED and is_peeking: + # if the visibility is shared, users cannot see the event unless + # they have *subsequently* joined the room (or were members at the + # time, of course) + # + # XXX: if the user has subsequently joined and then left again, + # ideally we would share history up to the point they left. But + # we don't know when they left. We just treat it as though they + # never joined, and restrict access. + return _CheckMembershipReturn(False, False) + + # The visibility is either shared or world_readable, and the user was + # not a member at the time. We allow it. + return _CheckMembershipReturn(True, False) + + +class _CheckFilter(Enum): + MAYBE_ALLOWED = auto() + DENIED = auto() + + +def _check_filter_send_to_client( + event: EventBase, + clock: Clock, + retention_policy: RetentionPolicy, + sender_ignored: bool, +) -> _CheckFilter: + """Apply checks for sending events to client + + Returns: + True if might be allowed to be sent to clients, False if definitely not. + """ + + if event.type == EventTypes.Dummy: + return _CheckFilter.DENIED + + if not event.is_state() and sender_ignored: + return _CheckFilter.DENIED + + # Until MSC2261 has landed we can't redact malicious alias events, so for + # now we temporarily filter out m.room.aliases entirely to mitigate + # abuse, while we spec a better solution to advertising aliases + # on rooms. + if event.type == EventTypes.Aliases: + return _CheckFilter.DENIED + + # Don't try to apply the room's retention policy if the event is a state + # event, as MSC1763 states that retention is only considered for non-state + # events. + if not event.is_state(): + max_lifetime = retention_policy.max_lifetime + + if max_lifetime is not None: + oldest_allowed_ts = clock.time_msec() - max_lifetime + + if event.origin_server_ts < oldest_allowed_ts: + return _CheckFilter.DENIED + + return _CheckFilter.MAYBE_ALLOWED + + +class _CheckVisibility(Enum): + ALLOWED = auto() + MAYBE_DENIED = auto() + + +def _check_history_visibility( + event: EventBase, visibility: str, is_peeking: bool +) -> _CheckVisibility: + """Check if event is allowed to be seen due to lax history visibility. + + Returns: + True if user can definitely see the event, False if maybe not. + """ + # Always allow history visibility events on boundaries. This is done + # by setting the effective visibility to the least restrictive + # of the old vs new. + if event.type == EventTypes.RoomHistoryVisibility: + prev_content = event.unsigned.get("prev_content", {}) + prev_visibility = prev_content.get("history_visibility", None) + + if prev_visibility not in VISIBILITY_PRIORITY: + prev_visibility = HistoryVisibility.SHARED + + new_priority = VISIBILITY_PRIORITY.index(visibility) + old_priority = VISIBILITY_PRIORITY.index(prev_visibility) + if old_priority < new_priority: + visibility = prev_visibility + + if visibility == HistoryVisibility.SHARED and not is_peeking: + return _CheckVisibility.ALLOWED + elif visibility == HistoryVisibility.WORLD_READABLE: + return _CheckVisibility.ALLOWED + + return _CheckVisibility.MAYBE_DENIED + + def get_effective_room_visibility_from_state(state: StateMap[EventBase]) -> str: """Get the actual history vis, from a state map including the history_visibility event - Handles missing and invalid history visibility events. """ visibility_event = state.get(_HISTORY_VIS_KEY, None)