From 63c7b5017ad82ee20bc2ae5898b051a2660cf188 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 27 Jun 2024 16:34:41 -0500 Subject: [PATCH] (doesn't work) Add test for batch persisting multiple member events for the same user --- tests/storage/test_stream.py | 121 +++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 6 deletions(-) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 4f8f919a24..53a58bd82a 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -46,7 +46,7 @@ from synapse.types import ( from synapse.util import Clock from tests.test_utils.event_injection import create_event -from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase +from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase, skip_unless logger = logging.getLogger(__name__) @@ -894,12 +894,12 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): ], ) - def test_membership_persisted_in_same_batch(self) -> None: + def test_different_user_membership_persisted_in_same_batch(self) -> None: """ - Test batch of membership events being processed at once. This will result in all - of the memberships being stored in the `current_state_delta_stream` table with - the same `stream_ordering` even though the individual events have different - `stream_ordering`s. + Test batch of membership events from different users being processed at once. + This will result in all of the memberships being stored in the + `current_state_delta_stream` table with the same `stream_ordering` even though + the individual events have different `stream_ordering`s. """ user1_id = self.register_user("user1", "pass") _user1_tok = self.login(user1_id, "pass") @@ -1012,6 +1012,115 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): ], ) + @skip_unless( + False, + "persist code does not support multiple membership events for the same user in the same batch", + ) + def test_membership_persisted_in_same_batch(self) -> None: + """ + Test batch of membership events for the same user being processed at once. + + This *should* (doesn't happen currently) result in all of the memberships being + stored in the `current_state_delta_stream` table with the same `stream_ordering` + even though the individual events have different `stream_ordering`s. + + FIXME: Currently, only the `join_event` is recorded in the `current_state_delta_stream` + table. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # User2 is just the designated person to create the room (we do this across the + # tests to be consistent) + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + + # Persist a timeline event sandwiched between two membership events so they end + # up in the `current_state_delta_stream` table with the same `stream_id`. + join_event, join_event_context = self.get_success( + create_event( + self.hs, + sender=user1_id, + type=EventTypes.Member, + state_key=user1_id, + content={"membership": "join"}, + room_id=room_id1, + ) + ) + timeline_event, timeline_event_context = self.get_success( + create_event( + self.hs, + sender=user1_id, + type=EventTypes.Message, + state_key=user1_id, + content={"body": "foo bar", "msgtype": "m.text"}, + room_id=room_id1, + ) + ) + leave_event, leave_event_context = self.get_success( + create_event( + self.hs, + sender=user1_id, + type=EventTypes.Member, + state_key=user1_id, + content={"membership": "leave"}, + room_id=room_id1, + ) + ) + self.get_success( + self.persistence.persist_events( + [ + (join_event, join_event_context), + (timeline_event, timeline_event_context), + (leave_event, leave_event_context), + ] + ) + ) + + after_room1_token = self.event_sources.get_current_token() + + # Get the membership changes for the user. + # + # At this point, the `current_state_delta_stream` table should look like (notice + # those three memberships at the end with `stream_id=7` because we persisted + # them in the same batch): + # + # TODO: DB rows to better see what's going on. + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + ) + ) + + join_pos = self.get_success( + self.store.get_position_for_event(join_event.event_id) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + event_id=leave_event.event_id, + # Ideally, this would be `leave_pos` (to match the `event_id`) but + # when events are persisted in a batch, they are all stored in the + # `current_state_delta_stream` table with the minimum + # `stream_ordering` from the batch. + event_pos=join_pos, # leave_pos, + prev_event_id=None, + room_id=room_id1, + membership="leave", + sender=user1_id, + ), + ], + ) + def test_state_reset(self) -> None: """ Test a state reset scenario where the user gets removed from the room (when