Compare commits

...

3 commits

Author SHA1 Message Date
Eric Eastwood 81c06bec20 Detect state resets 2024-06-27 11:50:18 -05:00
Eric Eastwood 15fcead2a5 Slight clean-up 2024-06-27 11:33:41 -05:00
Eric Eastwood 830e09d2de Grab prev_membership to see whether the server left the room (fixes tests)
See https://github.com/element-hq/synapse/pull/17320#discussion_r1657170493

`prev_membership` helps determine whether we should include the `event_id=null` row because
we can check whether we have already left.

 - When we leave the room causing the server to leave the room, the `prev_event_id` will be our join event
 - When the server leaves the room after us, the `prev_event_id` will be leave event
 - In the state reset case, `prev_event_id` will be our join event
2024-06-27 10:20:33 -05:00
2 changed files with 65 additions and 42 deletions

View file

@ -123,6 +123,8 @@ class CurrentStateDeltaMembership:
room_id: The room ID of the membership event. room_id: The room ID of the membership event.
membership: The membership state of the user in the room membership: The membership state of the user in the room
sender: The person who sent the membership event sender: The person who sent the membership event
state_reset: Whether the membership in the room was changed without a
corresponding event (state reset).
""" """
event_id: Optional[str] event_id: Optional[str]
@ -131,6 +133,7 @@ class CurrentStateDeltaMembership:
room_id: str room_id: str
membership: str membership: str
sender: Optional[str] sender: Optional[str]
state_reset: bool
def generate_pagination_where_clause( def generate_pagination_where_clause(
@ -846,7 +849,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_from_id = from_key.stream min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos() max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [min_from_id, max_to_id, user_id, EventTypes.Member] args: List[Any] = [
EventTypes.Member,
user_id,
user_id,
min_from_id,
max_to_id,
EventTypes.Member,
user_id,
]
# TODO: It would be good to assert that the `from_token`/`to_token` is >= # TODO: It would be good to assert that the `from_token`/`to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're # the first row in `current_state_delta_stream` for the rooms we're
@ -859,38 +870,44 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `stream_ordering` is unique across the Synapse instance so this should # `stream_ordering` is unique across the Synapse instance so this should
# work fine. # work fine.
# #
# We `COALESCE` the `instance_name` and `stream_ordering` because we prefer # We `COALESCE` the `stream_ordering` because we prefer the source of truth
# the source of truth from the events table. This gives slightly more # from the `events` table. This gives slightly more accurate results when
# accurate results when available since `current_state_delta_stream` only # available since `current_state_delta_stream` only tracks that the current
# tracks that the current state is at this stream position (not what stream # state is at this stream position (not what stream position the state event
# position the state event was added) and batches events at the same # was added) and uses the *minimum* stream position for batches of events.
# `stream_id` in certain cases.
# #
# TODO: We need to add indexes for `current_state_delta_stream.event_id` and # The extra `LEFT JOIN` by stream position are only needed to tell a state
# `current_state_delta_stream.state_key`/`current_state_delta_stream.type` # reset from the server leaving the room. Both cases have `event_id = null`
# for this to be efficient. # but if we can find a corresponding event at that stream position, then we
# know it was just the server leaving the room.
sql = """ sql = """
SELECT SELECT
e.event_id, COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
s.prev_event_id, s.prev_event_id,
s.room_id, s.room_id,
COALESCE(e.instance_name, s.instance_name), s.instance_name,
COALESCE(e.stream_ordering, s.stream_id), COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
m.membership, COALESCE(m.membership, m_by_stream.membership) AS membership,
e.sender COALESCE(e.sender, e_by_stream.sender) AS sender,
m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
AND e_by_stream.type = ?
AND e_by_stream.state_key = ?
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
AND m_by_stream.user_id = ?
WHERE s.stream_id > ? AND s.stream_id <= ? WHERE s.stream_id > ? AND s.stream_id <= ?
AND s.state_key = ?
AND s.type = ? AND s.type = ?
AND s.state_key = ?
ORDER BY s.stream_id ASC ORDER BY s.stream_id ASC
""" """
txn.execute(sql, args) txn.execute(sql, args)
membership_changes: List[CurrentStateDeltaMembership] = [] membership_changes: List[CurrentStateDeltaMembership] = []
membership_change_map: Dict[str, CurrentStateDeltaMembership] = {}
for ( for (
event_id, event_id,
prev_event_id, prev_event_id,
@ -899,6 +916,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering, stream_ordering,
membership, membership,
sender, sender,
prev_membership,
) in txn: ) in txn:
assert room_id is not None assert room_id is not None
assert instance_name is not None assert instance_name is not None
@ -914,20 +932,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `event_id = null` for all current state. This means we might # `event_id = null` for all current state. This means we might
# already have a row for the leave event and then another for the # already have a row for the leave event and then another for the
# same leave where the `event_id=null` but the `prev_event_id` is # same leave where the `event_id=null` but the `prev_event_id` is
# pointing back at the earlier leave event. Since we're assuming the # pointing back at the earlier leave event. We don't want to report
# `event_id = null` row is a `leave` and we don't want duplicate # the leave, if we already have a leave event.
# membership changes in our results, let's get rid of those if event_id is None and prev_membership == Membership.LEAVE:
# (deduplicate) (see `test_server_left_after_us_room`). continue
if event_id is None:
already_tracked_membership_change = membership_change_map.get( # We can detect a state reset if there was a membership change
prev_event_id # without a corresponding event.
) state_reset = False
if ( if event_id is None and membership != prev_membership:
already_tracked_membership_change is not None state_reset = True
and already_tracked_membership_change.membership
== Membership.LEAVE
):
continue
membership_change = CurrentStateDeltaMembership( membership_change = CurrentStateDeltaMembership(
event_id=event_id, event_id=event_id,
@ -941,11 +955,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
membership if membership is not None else Membership.LEAVE membership if membership is not None else Membership.LEAVE
), ),
sender=sender, sender=sender,
state_reset=state_reset,
) )
membership_changes.append(membership_change) membership_changes.append(membership_change)
if event_id:
membership_change_map[event_id] = membership_change
return membership_changes return membership_changes

View file

@ -615,6 +615,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -716,6 +717,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=leave_response1["event_id"], event_id=leave_response1["event_id"],
@ -724,6 +726,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -785,9 +788,9 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
def test_we_cause_server_left_room(self) -> None: def test_we_cause_server_left_room(self) -> None:
""" """
Test that when probing over part of the DAG where we leave the room causing the Test that when probing over part of the DAG where the user leaves the room
server to leave the room (because we were the last local user in the room), we causing the server to leave the room (because we were the last local user in the
still see the join and leave changes. room), we still see the join and leave changes.
This is to make sure we play nicely with this behavior: When the server leaves a This is to make sure we play nicely with this behavior: When the server leaves a
room, it will insert new rows with `event_id = null` into the room, it will insert new rows with `event_id = null` into the
@ -882,14 +885,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=None, # leave_response1["event_id"], event_id=leave_response1["event_id"],
event_pos=leave_pos1, event_pos=leave_pos1,
prev_event_id=join_response1["event_id"], prev_event_id=join_response1["event_id"],
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=None, # user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -1004,6 +1009,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user3_id, sender=user3_id,
state_reset=False,
), ),
], ],
) )
@ -1019,7 +1025,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
user2_tok = self.login(user2_id, "pass") user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok) join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
before_reset_token = self.event_sources.get_current_token() before_reset_token = self.event_sources.get_current_token()
@ -1056,8 +1062,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
"type": EventTypes.Member, "type": EventTypes.Member,
"state_key": user1_id, "state_key": user1_id,
"event_id": None, "event_id": None,
# FIXME: I'm not sure if a state reset should have a prev_event_id "prev_event_id": join_response1["event_id"],
"prev_event_id": None,
"instance_name": dummy_state_pos.instance_name, "instance_name": dummy_state_pos.instance_name,
}, },
desc="state reset user in current_state_delta_stream", desc="state reset user in current_state_delta_stream",
@ -1088,10 +1093,11 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=None, event_id=None,
event_pos=dummy_state_pos, event_pos=dummy_state_pos,
prev_event_id=None, prev_event_id=join_response1["event_id"],
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=None, # user1_id, sender=None, # user1_id,
state_reset=True,
), ),
], ],
) )
@ -1142,6 +1148,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=join_response2["event_id"], event_id=join_response2["event_id"],
@ -1150,6 +1157,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id2, room_id=room_id2,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -1176,6 +1184,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -1369,6 +1378,7 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
room_id=intially_unjoined_room_id, room_id=intially_unjoined_room_id,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )