diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 9e94cb08f6..d94b9366ab 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -877,10 +877,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): COALESCE(e.instance_name, s.instance_name), COALESCE(e.stream_ordering, s.stream_id), m.membership, - e.sender + e.sender, + m_prev.membership AS prev_membership FROM current_state_delta_stream AS s LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id + LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id WHERE s.stream_id > ? AND s.stream_id <= ? AND s.state_key = ? AND s.type = ? @@ -890,7 +892,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) membership_changes: List[CurrentStateDeltaMembership] = [] - membership_change_map: Dict[str, CurrentStateDeltaMembership] = {} for ( event_id, prev_event_id, @@ -899,6 +900,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream_ordering, membership, sender, + prev_membership, ) in txn: assert room_id is not None assert instance_name is not None @@ -918,16 +920,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # `event_id = null` row is a `leave` and we don't want duplicate # membership changes in our results, let's get rid of those # (deduplicate) (see `test_server_left_after_us_room`). - if event_id is None: - already_tracked_membership_change = membership_change_map.get( - prev_event_id - ) - if ( - already_tracked_membership_change is not None - and already_tracked_membership_change.membership - == Membership.LEAVE - ): - continue + if event_id is None and prev_membership == Membership.LEAVE: + continue membership_change = CurrentStateDeltaMembership( event_id=event_id, @@ -944,8 +938,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) membership_changes.append(membership_change) - if event_id: - membership_change_map[event_id] = membership_change return membership_changes diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 0082132474..1342794d37 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -1019,7 +1019,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): user2_tok = self.login(user2_id, "pass") room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) - self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) before_reset_token = self.event_sources.get_current_token() @@ -1056,8 +1056,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): "type": EventTypes.Member, "state_key": user1_id, "event_id": None, - # FIXME: I'm not sure if a state reset should have a prev_event_id - "prev_event_id": None, + "prev_event_id": join_response1["event_id"], "instance_name": dummy_state_pos.instance_name, }, desc="state reset user in current_state_delta_stream", @@ -1088,7 +1087,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): CurrentStateDeltaMembership( event_id=None, event_pos=dummy_state_pos, - prev_event_id=None, + prev_event_id=join_response1["event_id"], room_id=room_id1, membership="leave", sender=None, # user1_id,