Compare commits

..

No commits in common. "81c06bec20d2f6732100672853a140a6e19ff67d" and "956f20ef748b6e3caf76f91623e72b9a617ae235" have entirely different histories.

2 changed files with 42 additions and 65 deletions

View file

@ -123,8 +123,6 @@ class CurrentStateDeltaMembership:
room_id: The room ID of the membership event. room_id: The room ID of the membership event.
membership: The membership state of the user in the room membership: The membership state of the user in the room
sender: The person who sent the membership event sender: The person who sent the membership event
state_reset: Whether the membership in the room was changed without a
corresponding event (state reset).
""" """
event_id: Optional[str] event_id: Optional[str]
@ -133,7 +131,6 @@ class CurrentStateDeltaMembership:
room_id: str room_id: str
membership: str membership: str
sender: Optional[str] sender: Optional[str]
state_reset: bool
def generate_pagination_where_clause( def generate_pagination_where_clause(
@ -849,15 +846,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_from_id = from_key.stream min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos() max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [ args: List[Any] = [min_from_id, max_to_id, user_id, EventTypes.Member]
EventTypes.Member,
user_id,
user_id,
min_from_id,
max_to_id,
EventTypes.Member,
user_id,
]
# TODO: It would be good to assert that the `from_token`/`to_token` is >= # TODO: It would be good to assert that the `from_token`/`to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're # the first row in `current_state_delta_stream` for the rooms we're
@ -870,44 +859,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `stream_ordering` is unique across the Synapse instance so this should # `stream_ordering` is unique across the Synapse instance so this should
# work fine. # work fine.
# #
# We `COALESCE` the `stream_ordering` because we prefer the source of truth # We `COALESCE` the `instance_name` and `stream_ordering` because we prefer
# from the `events` table. This gives slightly more accurate results when # the source of truth from the events table. This gives slightly more
# available since `current_state_delta_stream` only tracks that the current # accurate results when available since `current_state_delta_stream` only
# state is at this stream position (not what stream position the state event # tracks that the current state is at this stream position (not what stream
# was added) and uses the *minimum* stream position for batches of events. # position the state event was added) and batches events at the same
# `stream_id` in certain cases.
# #
# The extra `LEFT JOIN` by stream position are only needed to tell a state # TODO: We need to add indexes for `current_state_delta_stream.event_id` and
# reset from the server leaving the room. Both cases have `event_id = null` # `current_state_delta_stream.state_key`/`current_state_delta_stream.type`
# but if we can find a corresponding event at that stream position, then we # for this to be efficient.
# know it was just the server leaving the room.
sql = """ sql = """
SELECT SELECT
COALESCE(e.event_id, e_by_stream.event_id) AS event_id, e.event_id,
s.prev_event_id, s.prev_event_id,
s.room_id, s.room_id,
s.instance_name, COALESCE(e.instance_name, s.instance_name),
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering, COALESCE(e.stream_ordering, s.stream_id),
COALESCE(m.membership, m_by_stream.membership) AS membership, m.membership,
COALESCE(e.sender, e_by_stream.sender) AS sender, e.sender
m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
AND e_by_stream.type = ?
AND e_by_stream.state_key = ?
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
AND m_by_stream.user_id = ?
WHERE s.stream_id > ? AND s.stream_id <= ? WHERE s.stream_id > ? AND s.stream_id <= ?
AND s.type = ?
AND s.state_key = ? AND s.state_key = ?
AND s.type = ?
ORDER BY s.stream_id ASC ORDER BY s.stream_id ASC
""" """
txn.execute(sql, args) txn.execute(sql, args)
membership_changes: List[CurrentStateDeltaMembership] = [] membership_changes: List[CurrentStateDeltaMembership] = []
membership_change_map: Dict[str, CurrentStateDeltaMembership] = {}
for ( for (
event_id, event_id,
prev_event_id, prev_event_id,
@ -916,7 +899,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering, stream_ordering,
membership, membership,
sender, sender,
prev_membership,
) in txn: ) in txn:
assert room_id is not None assert room_id is not None
assert instance_name is not None assert instance_name is not None
@ -932,17 +914,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `event_id = null` for all current state. This means we might # `event_id = null` for all current state. This means we might
# already have a row for the leave event and then another for the # already have a row for the leave event and then another for the
# same leave where the `event_id=null` but the `prev_event_id` is # same leave where the `event_id=null` but the `prev_event_id` is
# pointing back at the earlier leave event. We don't want to report # pointing back at the earlier leave event. Since we're assuming the
# the leave, if we already have a leave event. # `event_id = null` row is a `leave` and we don't want duplicate
if event_id is None and prev_membership == Membership.LEAVE: # membership changes in our results, let's get rid of those
# (deduplicate) (see `test_server_left_after_us_room`).
if event_id is None:
already_tracked_membership_change = membership_change_map.get(
prev_event_id
)
if (
already_tracked_membership_change is not None
and already_tracked_membership_change.membership
== Membership.LEAVE
):
continue continue
# We can detect a state reset if there was a membership change
# without a corresponding event.
state_reset = False
if event_id is None and membership != prev_membership:
state_reset = True
membership_change = CurrentStateDeltaMembership( membership_change = CurrentStateDeltaMembership(
event_id=event_id, event_id=event_id,
event_pos=PersistedEventPosition( event_pos=PersistedEventPosition(
@ -955,10 +941,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
membership if membership is not None else Membership.LEAVE membership if membership is not None else Membership.LEAVE
), ),
sender=sender, sender=sender,
state_reset=state_reset,
) )
membership_changes.append(membership_change) membership_changes.append(membership_change)
if event_id:
membership_change_map[event_id] = membership_change
return membership_changes return membership_changes

View file

@ -615,7 +615,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -717,7 +716,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=leave_response1["event_id"], event_id=leave_response1["event_id"],
@ -726,7 +724,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -788,9 +785,9 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
def test_we_cause_server_left_room(self) -> None: def test_we_cause_server_left_room(self) -> None:
""" """
Test that when probing over part of the DAG where the user leaves the room Test that when probing over part of the DAG where we leave the room causing the
causing the server to leave the room (because we were the last local user in the server to leave the room (because we were the last local user in the room), we
room), we still see the join and leave changes. still see the join and leave changes.
This is to make sure we play nicely with this behavior: When the server leaves a This is to make sure we play nicely with this behavior: When the server leaves a
room, it will insert new rows with `event_id = null` into the room, it will insert new rows with `event_id = null` into the
@ -885,16 +882,14 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=leave_response1["event_id"], event_id=None, # leave_response1["event_id"],
event_pos=leave_pos1, event_pos=leave_pos1,
prev_event_id=join_response1["event_id"], prev_event_id=join_response1["event_id"],
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=user1_id, sender=None, # user1_id,
state_reset=False,
), ),
], ],
) )
@ -1009,7 +1004,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user3_id, sender=user3_id,
state_reset=False,
), ),
], ],
) )
@ -1025,7 +1019,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
user2_tok = self.login(user2_id, "pass") user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) self.helper.join(room_id1, user1_id, tok=user1_tok)
before_reset_token = self.event_sources.get_current_token() before_reset_token = self.event_sources.get_current_token()
@ -1062,7 +1056,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
"type": EventTypes.Member, "type": EventTypes.Member,
"state_key": user1_id, "state_key": user1_id,
"event_id": None, "event_id": None,
"prev_event_id": join_response1["event_id"], # FIXME: I'm not sure if a state reset should have a prev_event_id
"prev_event_id": None,
"instance_name": dummy_state_pos.instance_name, "instance_name": dummy_state_pos.instance_name,
}, },
desc="state reset user in current_state_delta_stream", desc="state reset user in current_state_delta_stream",
@ -1093,11 +1088,10 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=None, event_id=None,
event_pos=dummy_state_pos, event_pos=dummy_state_pos,
prev_event_id=join_response1["event_id"], prev_event_id=None,
room_id=room_id1, room_id=room_id1,
membership="leave", membership="leave",
sender=None, # user1_id, sender=None, # user1_id,
state_reset=True,
), ),
], ],
) )
@ -1148,7 +1142,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
CurrentStateDeltaMembership( CurrentStateDeltaMembership(
event_id=join_response2["event_id"], event_id=join_response2["event_id"],
@ -1157,7 +1150,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id2, room_id=room_id2,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )
@ -1184,7 +1176,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
room_id=room_id1, room_id=room_id1,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
) )
], ],
) )
@ -1378,7 +1369,6 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
room_id=intially_unjoined_room_id, room_id=intially_unjoined_room_id,
membership="join", membership="join",
sender=user1_id, sender=user1_id,
state_reset=False,
), ),
], ],
) )