diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ab592dcf15..19dba00a0f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -123,6 +123,8 @@ class CurrentStateDeltaMembership: room_id: The room ID of the membership event. membership: The membership state of the user in the room sender: The person who sent the membership event + state_reset: Whether the membership in the room was changed without a + corresponding event (state reset). """ event_id: Optional[str] @@ -131,6 +133,7 @@ class CurrentStateDeltaMembership: room_id: str membership: str sender: Optional[str] + state_reset: bool def generate_pagination_where_clause( @@ -846,7 +849,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): min_from_id = from_key.stream max_to_id = to_key.get_max_stream_pos() - args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id] + args: List[Any] = [ + EventTypes.Member, + user_id, + user_id, + min_from_id, + max_to_id, + EventTypes.Member, + user_id, + ] # TODO: It would be good to assert that the `from_token`/`to_token` is >= # the first row in `current_state_delta_stream` for the rooms we're @@ -859,30 +870,35 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # `stream_ordering` is unique across the Synapse instance so this should # work fine. # - # We `COALESCE` the `instance_name` and `stream_ordering` because we prefer - # the source of truth from the events table. This gives slightly more - # accurate results when available since `current_state_delta_stream` only - # tracks that the current state is at this stream position (not what stream - # position the state event was added) and batches events at the same - # `stream_id` in certain cases. + # We `COALESCE` the `stream_ordering` because we prefer the source of truth + # from the `events` table. This gives slightly more accurate results when + # available since `current_state_delta_stream` only tracks that the current + # state is at this stream position (not what stream position the state event + # was added) and uses the *minimum* stream position for batches of events. # - # TODO: We need to add indexes for `current_state_delta_stream.event_id` and - # `current_state_delta_stream.state_key`/`current_state_delta_stream.type` - # for this to be efficient. + # The extra `LEFT JOIN` by stream position are only needed to tell a state + # reset from the server leaving the room. Both cases have `event_id = null` + # but if we can find a corresponding event at that stream position, then we + # know it was just the server leaving the room. sql = """ SELECT - e.event_id, + COALESCE(e.event_id, e_by_stream.event_id) AS event_id, s.prev_event_id, s.room_id, s.instance_name, - COALESCE(e.stream_ordering, s.stream_id), - m.membership, - e.sender, + COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering, + COALESCE(m.membership, m_by_stream.membership) AS membership, + COALESCE(e.sender, e_by_stream.sender) AS sender, m_prev.membership AS prev_membership FROM current_state_delta_stream AS s LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id + LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id + AND e_by_stream.type = ? + AND e_by_stream.state_key = ? + LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id + AND m_by_stream.user_id = ? WHERE s.stream_id > ? AND s.stream_id <= ? AND s.type = ? AND s.state_key = ? @@ -921,6 +937,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if event_id is None and prev_membership == Membership.LEAVE: continue + # We can detect a state reset if there was a membership change + # without a corresponding event. + state_reset = False + if event_id is None and membership != prev_membership: + state_reset = True + membership_change = CurrentStateDeltaMembership( event_id=event_id, event_pos=PersistedEventPosition( @@ -933,6 +955,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): membership if membership is not None else Membership.LEAVE ), sender=sender, + state_reset=state_reset, ) membership_changes.append(membership_change) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 5a054d7f2e..acb2f0e429 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -615,6 +615,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, + state_reset=False, ) ], ) @@ -716,6 +717,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, + state_reset=False, ), CurrentStateDeltaMembership( event_id=leave_response1["event_id"], @@ -724,6 +726,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="leave", sender=user1_id, + state_reset=False, ), ], ) @@ -882,14 +885,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, + state_reset=False, ), CurrentStateDeltaMembership( - event_id=None, # leave_response1["event_id"], + event_id=leave_response1["event_id"], event_pos=leave_pos1, prev_event_id=join_response1["event_id"], room_id=room_id1, membership="leave", - sender=None, # user1_id, + sender=user1_id, + state_reset=False, ), ], ) @@ -1004,6 +1009,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user3_id, + state_reset=False, ), ], ) @@ -1091,6 +1097,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="leave", sender=None, # user1_id, + state_reset=True, ), ], ) @@ -1141,6 +1148,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, + state_reset=False, ), CurrentStateDeltaMembership( event_id=join_response2["event_id"], @@ -1149,6 +1157,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id2, membership="join", sender=user1_id, + state_reset=False, ), ], ) @@ -1175,6 +1184,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, + state_reset=False, ) ], ) @@ -1368,6 +1378,7 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase( room_id=intially_unjoined_room_id, membership="join", sender=user1_id, + state_reset=False, ), ], )