diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 19dba00a0f..c128eb5d5b 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -123,8 +123,6 @@ class CurrentStateDeltaMembership: room_id: The room ID of the membership event. membership: The membership state of the user in the room sender: The person who sent the membership event - state_reset: Whether the membership in the room was changed without a - corresponding event (state reset). """ event_id: Optional[str] @@ -133,7 +131,6 @@ class CurrentStateDeltaMembership: room_id: str membership: str sender: Optional[str] - state_reset: bool def generate_pagination_where_clause( @@ -849,56 +846,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): min_from_id = from_key.stream max_to_id = to_key.get_max_stream_pos() - args: List[Any] = [ - EventTypes.Member, - user_id, - user_id, - min_from_id, - max_to_id, - EventTypes.Member, - user_id, - ] + args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id] # TODO: It would be good to assert that the `from_token`/`to_token` is >= # the first row in `current_state_delta_stream` for the rooms we're # interested in. Otherwise, we will end up with empty results and not know # it. - # We have to look-up events by `stream_ordering` because - # `current_state_delta_stream.event_id` can be `null` if the server is no - # longer in the room or a state reset happened and it was unset. - # `stream_ordering` is unique across the Synapse instance so this should - # work fine. + # We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate + # stream positioning when available but given our usages, we can avoid the + # complexity. Between two (valid) stream tokens, we will still get all of + # the state changes. Since those events are persisted in a batch, valid + # tokens will either be before or after the batch of events. # - # We `COALESCE` the `stream_ordering` because we prefer the source of truth - # from the `events` table. This gives slightly more accurate results when - # available since `current_state_delta_stream` only tracks that the current + # `stream_ordering` from the `events` table is more accurate when available + # since the `current_state_delta_stream` table only tracks that the current # state is at this stream position (not what stream position the state event # was added) and uses the *minimum* stream position for batches of events. - # - # The extra `LEFT JOIN` by stream position are only needed to tell a state - # reset from the server leaving the room. Both cases have `event_id = null` - # but if we can find a corresponding event at that stream position, then we - # know it was just the server leaving the room. sql = """ SELECT - COALESCE(e.event_id, e_by_stream.event_id) AS event_id, + e.event_id, s.prev_event_id, s.room_id, s.instance_name, - COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering, - COALESCE(m.membership, m_by_stream.membership) AS membership, - COALESCE(e.sender, e_by_stream.sender) AS sender, + s.stream_id, + m.membership, + e.sender, m_prev.membership AS prev_membership FROM current_state_delta_stream AS s LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id - LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id - AND e_by_stream.type = ? - AND e_by_stream.state_key = ? - LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id - AND m_by_stream.user_id = ? WHERE s.stream_id > ? AND s.stream_id <= ? AND s.type = ? AND s.state_key = ? @@ -937,12 +915,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if event_id is None and prev_membership == Membership.LEAVE: continue - # We can detect a state reset if there was a membership change - # without a corresponding event. - state_reset = False - if event_id is None and membership != prev_membership: - state_reset = True - membership_change = CurrentStateDeltaMembership( event_id=event_id, event_pos=PersistedEventPosition( @@ -955,7 +927,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): membership if membership is not None else Membership.LEAVE ), sender=sender, - state_reset=state_reset, ) membership_changes.append(membership_change) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index acb2f0e429..4f8f919a24 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -615,7 +615,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, - state_reset=False, ) ], ) @@ -717,7 +716,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, - state_reset=False, ), CurrentStateDeltaMembership( event_id=leave_response1["event_id"], @@ -726,7 +724,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="leave", sender=user1_id, - state_reset=False, ), ], ) @@ -885,16 +882,14 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, - state_reset=False, ), CurrentStateDeltaMembership( - event_id=leave_response1["event_id"], + event_id=None, # leave_response1["event_id"], event_pos=leave_pos1, prev_event_id=join_response1["event_id"], room_id=room_id1, membership="leave", - sender=user1_id, - state_reset=False, + sender=None, # user1_id, ), ], ) @@ -924,16 +919,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): # Persist the user1, user3, and user4 join events in the same batch so they all # end up in the `current_state_delta_stream` table with the same # stream_ordering. - join_event1, join_event_context1 = self.get_success( - create_event( - self.hs, - sender=user1_id, - type=EventTypes.Member, - state_key=user1_id, - content={"membership": "join"}, - room_id=room_id1, - ) - ) join_event3, join_event_context3 = self.get_success( create_event( self.hs, @@ -944,6 +929,19 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, ) ) + # We want to put user1 in the middle of the batch. This way, regardless of the + # implementation that inserts rows into current_state_delta_stream` (whether it + # be minimum/maximum of stream position of the batch), we will still catch bugs. + join_event1, join_event_context1 = self.get_success( + create_event( + self.hs, + sender=user1_id, + type=EventTypes.Member, + state_key=user1_id, + content={"membership": "join"}, + room_id=room_id1, + ) + ) join_event4, join_event_context4 = self.get_success( create_event( self.hs, @@ -957,8 +955,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): self.get_success( self.persistence.persist_events( [ - (join_event1, join_event_context1), (join_event3, join_event_context3), + (join_event1, join_event_context1), (join_event4, join_event_context4), ] ) @@ -966,10 +964,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): after_room1_token = self.event_sources.get_current_token() - # Let's get membership changes from user3's perspective because it was in the - # middle of the batch. This way, if rows in` current_state_delta_stream` are - # stored with the first or last event's `stream_ordering`, we will still catch - # bugs. + # Get the membership changes for the user. # # At this point, the `current_state_delta_stream` table should look like (notice # those three memberships at the end with `stream_id=7` because we persisted @@ -987,7 +982,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): # | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None | membership_changes = self.get_success( self.store.get_current_state_delta_membership_changes_for_user( - user3_id, + user1_id, from_key=before_room1_token.room_key, to_key=after_room1_token.room_key, ) @@ -1003,13 +998,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): membership_changes, [ CurrentStateDeltaMembership( - event_id=join_event3.event_id, + event_id=join_event1.event_id, + # Ideally, this would be `join_pos1` (to match the `event_id`) but + # when events are persisted in a batch, they are all stored in the + # `current_state_delta_stream` table with the minimum + # `stream_ordering` from the batch. event_pos=join_pos3, prev_event_id=None, room_id=room_id1, membership="join", - sender=user3_id, - state_reset=False, + sender=user1_id, ), ], ) @@ -1097,7 +1095,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="leave", sender=None, # user1_id, - state_reset=True, ), ], ) @@ -1148,7 +1145,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, - state_reset=False, ), CurrentStateDeltaMembership( event_id=join_response2["event_id"], @@ -1157,7 +1153,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id2, membership="join", sender=user1_id, - state_reset=False, ), ], ) @@ -1184,7 +1179,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): room_id=room_id1, membership="join", sender=user1_id, - state_reset=False, ) ], ) @@ -1378,7 +1372,6 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase( room_id=intially_unjoined_room_id, membership="join", sender=user1_id, - state_reset=False, ), ], )