(doesn't work) Add test for batch persisting multiple member events for the same user

This commit is contained in:
Eric Eastwood 2024-06-27 16:34:41 -05:00
parent 325856e14b
commit 63c7b5017a

View file

@ -46,7 +46,7 @@ from synapse.types import (
from synapse.util import Clock from synapse.util import Clock
from tests.test_utils.event_injection import create_event from tests.test_utils.event_injection import create_event
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase, skip_unless
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -894,12 +894,12 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
], ],
) )
def test_membership_persisted_in_same_batch(self) -> None: def test_different_user_membership_persisted_in_same_batch(self) -> None:
""" """
Test batch of membership events being processed at once. This will result in all Test batch of membership events from different users being processed at once.
of the memberships being stored in the `current_state_delta_stream` table with This will result in all of the memberships being stored in the
the same `stream_ordering` even though the individual events have different `current_state_delta_stream` table with the same `stream_ordering` even though
`stream_ordering`s. the individual events have different `stream_ordering`s.
""" """
user1_id = self.register_user("user1", "pass") user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass") _user1_tok = self.login(user1_id, "pass")
@ -1012,6 +1012,115 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
], ],
) )
@skip_unless(
False,
"persist code does not support multiple membership events for the same user in the same batch",
)
def test_membership_persisted_in_same_batch(self) -> None:
"""
Test batch of membership events for the same user being processed at once.
This *should* (doesn't happen currently) result in all of the memberships being
stored in the `current_state_delta_stream` table with the same `stream_ordering`
even though the individual events have different `stream_ordering`s.
FIXME: Currently, only the `join_event` is recorded in the `current_state_delta_stream`
table.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
# User2 is just the designated person to create the room (we do this across the
# tests to be consistent)
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
# Persist a timeline event sandwiched between two membership events so they end
# up in the `current_state_delta_stream` table with the same `stream_id`.
join_event, join_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "join"},
room_id=room_id1,
)
)
timeline_event, timeline_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Message,
state_key=user1_id,
content={"body": "foo bar", "msgtype": "m.text"},
room_id=room_id1,
)
)
leave_event, leave_event_context = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "leave"},
room_id=room_id1,
)
)
self.get_success(
self.persistence.persist_events(
[
(join_event, join_event_context),
(timeline_event, timeline_event_context),
(leave_event, leave_event_context),
]
)
)
after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like (notice
# those three memberships at the end with `stream_id=7` because we persisted
# them in the same batch):
#
# TODO: DB rows to better see what's going on.
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
join_pos = self.get_success(
self.store.get_position_for_event(join_event.event_id)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
event_id=leave_event.event_id,
# Ideally, this would be `leave_pos` (to match the `event_id`) but
# when events are persisted in a batch, they are all stored in the
# `current_state_delta_stream` table with the minimum
# `stream_ordering` from the batch.
event_pos=join_pos, # leave_pos,
prev_event_id=None,
room_id=room_id1,
membership="leave",
sender=user1_id,
),
],
)
def test_state_reset(self) -> None: def test_state_reset(self) -> None:
""" """
Test a state reset scenario where the user gets removed from the room (when Test a state reset scenario where the user gets removed from the room (when