Compare commits

...

3 commits

Author SHA1 Message Date
Eric Eastwood 81c06bec20 Detect state resets 2024-06-27 11:50:18 -05:00
Eric Eastwood 15fcead2a5 Slight clean-up 2024-06-27 11:33:41 -05:00
Eric Eastwood 830e09d2de Grab prev_membership to see whether the server left the room (fixes tests)
See https://github.com/element-hq/synapse/pull/17320#discussion_r1657170493

`prev_membership` helps determine whether we should include the `event_id=null` row because
we can check whether we have already left.

 - When we leave the room causing the server to leave the room, the `prev_event_id` will be our join event
 - When the server leaves the room after us, the `prev_event_id` will be leave event
 - In the state reset case, `prev_event_id` will be our join event
2024-06-27 10:20:33 -05:00
2 changed files with 65 additions and 42 deletions

View file

@ -123,6 +123,8 @@ class CurrentStateDeltaMembership:
room_id: The room ID of the membership event.
membership: The membership state of the user in the room
sender: The person who sent the membership event
state_reset: Whether the membership in the room was changed without a
corresponding event (state reset).
"""
event_id: Optional[str]
@ -131,6 +133,7 @@ class CurrentStateDeltaMembership:
room_id: str
membership: str
sender: Optional[str]
state_reset: bool
def generate_pagination_where_clause(
@ -846,7 +849,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [min_from_id, max_to_id, user_id, EventTypes.Member]
args: List[Any] = [
EventTypes.Member,
user_id,
user_id,
min_from_id,
max_to_id,
EventTypes.Member,
user_id,
]
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're
@ -859,38 +870,44 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `stream_ordering` is unique across the Synapse instance so this should
# work fine.
#
# We `COALESCE` the `instance_name` and `stream_ordering` because we prefer
# the source of truth from the events table. This gives slightly more
# accurate results when available since `current_state_delta_stream` only
# tracks that the current state is at this stream position (not what stream
# position the state event was added) and batches events at the same
# `stream_id` in certain cases.
# We `COALESCE` the `stream_ordering` because we prefer the source of truth
# from the `events` table. This gives slightly more accurate results when
# available since `current_state_delta_stream` only tracks that the current
# state is at this stream position (not what stream position the state event
# was added) and uses the *minimum* stream position for batches of events.
#
# TODO: We need to add indexes for `current_state_delta_stream.event_id` and
# `current_state_delta_stream.state_key`/`current_state_delta_stream.type`
# for this to be efficient.
# The extra `LEFT JOIN` by stream position are only needed to tell a state
# reset from the server leaving the room. Both cases have `event_id = null`
# but if we can find a corresponding event at that stream position, then we
# know it was just the server leaving the room.
sql = """
SELECT
e.event_id,
COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
s.prev_event_id,
s.room_id,
COALESCE(e.instance_name, s.instance_name),
COALESCE(e.stream_ordering, s.stream_id),
m.membership,
e.sender
s.instance_name,
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
COALESCE(m.membership, m_by_stream.membership) AS membership,
COALESCE(e.sender, e_by_stream.sender) AS sender,
m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
AND e_by_stream.type = ?
AND e_by_stream.state_key = ?
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
AND m_by_stream.user_id = ?
WHERE s.stream_id > ? AND s.stream_id <= ?
AND s.state_key = ?
AND s.type = ?
AND s.state_key = ?
ORDER BY s.stream_id ASC
"""
txn.execute(sql, args)
membership_changes: List[CurrentStateDeltaMembership] = []
membership_change_map: Dict[str, CurrentStateDeltaMembership] = {}
for (
event_id,
prev_event_id,
@ -899,6 +916,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering,
membership,
sender,
prev_membership,
) in txn:
assert room_id is not None
assert instance_name is not None
@ -914,20 +932,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `event_id = null` for all current state. This means we might
# already have a row for the leave event and then another for the
# same leave where the `event_id=null` but the `prev_event_id` is
# pointing back at the earlier leave event. Since we're assuming the
# `event_id = null` row is a `leave` and we don't want duplicate
# membership changes in our results, let's get rid of those
# (deduplicate) (see `test_server_left_after_us_room`).
if event_id is None:
already_tracked_membership_change = membership_change_map.get(
prev_event_id
)
if (
already_tracked_membership_change is not None
and already_tracked_membership_change.membership
== Membership.LEAVE
):
continue
# pointing back at the earlier leave event. We don't want to report
# the leave, if we already have a leave event.
if event_id is None and prev_membership == Membership.LEAVE:
continue
# We can detect a state reset if there was a membership change
# without a corresponding event.
state_reset = False
if event_id is None and membership != prev_membership:
state_reset = True
membership_change = CurrentStateDeltaMembership(
event_id=event_id,
@ -941,11 +955,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
membership if membership is not None else Membership.LEAVE
),
sender=sender,
state_reset=state_reset,
)
membership_changes.append(membership_change)
if event_id:
membership_change_map[event_id] = membership_change
return membership_changes

View file

@ -615,6 +615,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="join",
sender=user1_id,
state_reset=False,
)
],
)
@ -716,6 +717,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="join",
sender=user1_id,
state_reset=False,
),
CurrentStateDeltaMembership(
event_id=leave_response1["event_id"],
@ -724,6 +726,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="leave",
sender=user1_id,
state_reset=False,
),
],
)
@ -785,9 +788,9 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
def test_we_cause_server_left_room(self) -> None:
"""
Test that when probing over part of the DAG where we leave the room causing the
server to leave the room (because we were the last local user in the room), we
still see the join and leave changes.
Test that when probing over part of the DAG where the user leaves the room
causing the server to leave the room (because we were the last local user in the
room), we still see the join and leave changes.
This is to make sure we play nicely with this behavior: When the server leaves a
room, it will insert new rows with `event_id = null` into the
@ -882,14 +885,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="join",
sender=user1_id,
state_reset=False,
),
CurrentStateDeltaMembership(
event_id=None, # leave_response1["event_id"],
event_id=leave_response1["event_id"],
event_pos=leave_pos1,
prev_event_id=join_response1["event_id"],
room_id=room_id1,
membership="leave",
sender=None, # user1_id,
sender=user1_id,
state_reset=False,
),
],
)
@ -1004,6 +1009,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="join",
sender=user3_id,
state_reset=False,
),
],
)
@ -1019,7 +1025,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
before_reset_token = self.event_sources.get_current_token()
@ -1056,8 +1062,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
"type": EventTypes.Member,
"state_key": user1_id,
"event_id": None,
# FIXME: I'm not sure if a state reset should have a prev_event_id
"prev_event_id": None,
"prev_event_id": join_response1["event_id"],
"instance_name": dummy_state_pos.instance_name,
},
desc="state reset user in current_state_delta_stream",
@ -1088,10 +1093,11 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
CurrentStateDeltaMembership(
event_id=None,
event_pos=dummy_state_pos,
prev_event_id=None,
prev_event_id=join_response1["event_id"],
room_id=room_id1,
membership="leave",
sender=None, # user1_id,
state_reset=True,
),
],
)
@ -1142,6 +1148,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="join",
sender=user1_id,
state_reset=False,
),
CurrentStateDeltaMembership(
event_id=join_response2["event_id"],
@ -1150,6 +1157,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id2,
membership="join",
sender=user1_id,
state_reset=False,
),
],
)
@ -1176,6 +1184,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1,
membership="join",
sender=user1_id,
state_reset=False,
)
],
)
@ -1369,6 +1378,7 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
room_id=intially_unjoined_room_id,
membership="join",
sender=user1_id,
state_reset=False,
),
],
)