diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index d99fc4bec0..369b23a108 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -199,7 +199,8 @@ class InitialSyncHandler: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - stream=event.stream_ordering, + stream=event.event_pos.stream, + instance_map={event.event_pos: event.event_pos.stream}, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 39009ef762..add7058389 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -323,7 +323,7 @@ class SlidingSyncHandler: room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, # We want to fetch any kind of membership (joined and left rooms) in order - # to get the `stream_ordering` of the latest room membership event for the + # to get the `event_pos` of the latest room membership event for the # user. # # We will filter out the rooms that the user has left below (see @@ -347,7 +347,7 @@ class SlidingSyncHandler: # Find the stream_ordering of the latest room membership event which will mark # the spot we queried up to. # - # TODO: With the new `GetRoomsForUserWithStreamOrdering` info, make a instance + # TODO: With the new `RoomsForUser.event_pos` info, make a instance # map to stream ordering and construct the new room key from that map, # `RoomStreamToken(stream=, instance_map=...)` max_stream_ordering_from_room_list = max( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 9fddbb2caf..642cf96af8 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) sql = """ - SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version + SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version FROM local_current_membership AS c INNER JOIN events AS e USING (room_id, event_id) INNER JOIN rooms AS r USING (room_id) @@ -488,7 +488,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) txn.execute(sql, (user_id, *args)) - results = [RoomsForUser(*r) for r in txn] + results = [ + RoomsForUser( + room_id, + sender, + membership, + event_id, + PersistedEventPosition(instance_name, stream_ordering), + room_version, + ) + for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn + ] return results diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 86c8f14d1b..3df7fb89b5 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -154,7 +154,9 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): USER_ID, "invite", event.event_id, - event.internal_metadata.stream_ordering, + PersistedEventPosition( + "master", event.internal_metadata.stream_ordering + ), RoomVersions.V1.identifier, ) ],