mirror of
https://github.com/element-hq/synapse
synced 2024-10-02 06:52:40 +00:00
Detect state resets
This commit is contained in:
parent
15fcead2a5
commit
81c06bec20
2 changed files with 50 additions and 16 deletions
|
@ -123,6 +123,8 @@ class CurrentStateDeltaMembership:
|
|||
room_id: The room ID of the membership event.
|
||||
membership: The membership state of the user in the room
|
||||
sender: The person who sent the membership event
|
||||
state_reset: Whether the membership in the room was changed without a
|
||||
corresponding event (state reset).
|
||||
"""
|
||||
|
||||
event_id: Optional[str]
|
||||
|
@ -131,6 +133,7 @@ class CurrentStateDeltaMembership:
|
|||
room_id: str
|
||||
membership: str
|
||||
sender: Optional[str]
|
||||
state_reset: bool
|
||||
|
||||
|
||||
def generate_pagination_where_clause(
|
||||
|
@ -846,7 +849,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
min_from_id = from_key.stream
|
||||
max_to_id = to_key.get_max_stream_pos()
|
||||
|
||||
args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
|
||||
args: List[Any] = [
|
||||
EventTypes.Member,
|
||||
user_id,
|
||||
user_id,
|
||||
min_from_id,
|
||||
max_to_id,
|
||||
EventTypes.Member,
|
||||
user_id,
|
||||
]
|
||||
|
||||
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
||||
# the first row in `current_state_delta_stream` for the rooms we're
|
||||
|
@ -859,30 +870,35 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
# `stream_ordering` is unique across the Synapse instance so this should
|
||||
# work fine.
|
||||
#
|
||||
# We `COALESCE` the `instance_name` and `stream_ordering` because we prefer
|
||||
# the source of truth from the events table. This gives slightly more
|
||||
# accurate results when available since `current_state_delta_stream` only
|
||||
# tracks that the current state is at this stream position (not what stream
|
||||
# position the state event was added) and batches events at the same
|
||||
# `stream_id` in certain cases.
|
||||
# We `COALESCE` the `stream_ordering` because we prefer the source of truth
|
||||
# from the `events` table. This gives slightly more accurate results when
|
||||
# available since `current_state_delta_stream` only tracks that the current
|
||||
# state is at this stream position (not what stream position the state event
|
||||
# was added) and uses the *minimum* stream position for batches of events.
|
||||
#
|
||||
# TODO: We need to add indexes for `current_state_delta_stream.event_id` and
|
||||
# `current_state_delta_stream.state_key`/`current_state_delta_stream.type`
|
||||
# for this to be efficient.
|
||||
# The extra `LEFT JOIN` by stream position are only needed to tell a state
|
||||
# reset from the server leaving the room. Both cases have `event_id = null`
|
||||
# but if we can find a corresponding event at that stream position, then we
|
||||
# know it was just the server leaving the room.
|
||||
sql = """
|
||||
SELECT
|
||||
e.event_id,
|
||||
COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
|
||||
s.prev_event_id,
|
||||
s.room_id,
|
||||
s.instance_name,
|
||||
COALESCE(e.stream_ordering, s.stream_id),
|
||||
m.membership,
|
||||
e.sender,
|
||||
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
|
||||
COALESCE(m.membership, m_by_stream.membership) AS membership,
|
||||
COALESCE(e.sender, e_by_stream.sender) AS sender,
|
||||
m_prev.membership AS prev_membership
|
||||
FROM current_state_delta_stream AS s
|
||||
LEFT JOIN events AS e ON e.event_id = s.event_id
|
||||
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
||||
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
|
||||
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
|
||||
AND e_by_stream.type = ?
|
||||
AND e_by_stream.state_key = ?
|
||||
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
|
||||
AND m_by_stream.user_id = ?
|
||||
WHERE s.stream_id > ? AND s.stream_id <= ?
|
||||
AND s.type = ?
|
||||
AND s.state_key = ?
|
||||
|
@ -921,6 +937,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
if event_id is None and prev_membership == Membership.LEAVE:
|
||||
continue
|
||||
|
||||
# We can detect a state reset if there was a membership change
|
||||
# without a corresponding event.
|
||||
state_reset = False
|
||||
if event_id is None and membership != prev_membership:
|
||||
state_reset = True
|
||||
|
||||
membership_change = CurrentStateDeltaMembership(
|
||||
event_id=event_id,
|
||||
event_pos=PersistedEventPosition(
|
||||
|
@ -933,6 +955,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
membership if membership is not None else Membership.LEAVE
|
||||
),
|
||||
sender=sender,
|
||||
state_reset=state_reset,
|
||||
)
|
||||
|
||||
membership_changes.append(membership_change)
|
||||
|
|
|
@ -615,6 +615,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
@ -716,6 +717,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
CurrentStateDeltaMembership(
|
||||
event_id=leave_response1["event_id"],
|
||||
|
@ -724,6 +726,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="leave",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -882,14 +885,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
CurrentStateDeltaMembership(
|
||||
event_id=None, # leave_response1["event_id"],
|
||||
event_id=leave_response1["event_id"],
|
||||
event_pos=leave_pos1,
|
||||
prev_event_id=join_response1["event_id"],
|
||||
room_id=room_id1,
|
||||
membership="leave",
|
||||
sender=None, # user1_id,
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -1004,6 +1009,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="join",
|
||||
sender=user3_id,
|
||||
state_reset=False,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -1091,6 +1097,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="leave",
|
||||
sender=None, # user1_id,
|
||||
state_reset=True,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -1141,6 +1148,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
CurrentStateDeltaMembership(
|
||||
event_id=join_response2["event_id"],
|
||||
|
@ -1149,6 +1157,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id2,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -1175,6 +1184,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
|||
room_id=room_id1,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
@ -1368,6 +1378,7 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
|
|||
room_id=intially_unjoined_room_id,
|
||||
membership="join",
|
||||
sender=user1_id,
|
||||
state_reset=False,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue