mirror of
https://github.com/element-hq/synapse
synced 2024-10-05 13:02:41 +00:00
Compare commits
No commits in common. "81c06bec20d2f6732100672853a140a6e19ff67d" and "956f20ef748b6e3caf76f91623e72b9a617ae235" have entirely different histories.
81c06bec20
...
956f20ef74
2 changed files with 42 additions and 65 deletions
|
@ -123,8 +123,6 @@ class CurrentStateDeltaMembership:
|
||||||
room_id: The room ID of the membership event.
|
room_id: The room ID of the membership event.
|
||||||
membership: The membership state of the user in the room
|
membership: The membership state of the user in the room
|
||||||
sender: The person who sent the membership event
|
sender: The person who sent the membership event
|
||||||
state_reset: Whether the membership in the room was changed without a
|
|
||||||
corresponding event (state reset).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
event_id: Optional[str]
|
event_id: Optional[str]
|
||||||
|
@ -133,7 +131,6 @@ class CurrentStateDeltaMembership:
|
||||||
room_id: str
|
room_id: str
|
||||||
membership: str
|
membership: str
|
||||||
sender: Optional[str]
|
sender: Optional[str]
|
||||||
state_reset: bool
|
|
||||||
|
|
||||||
|
|
||||||
def generate_pagination_where_clause(
|
def generate_pagination_where_clause(
|
||||||
|
@ -849,15 +846,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
min_from_id = from_key.stream
|
min_from_id = from_key.stream
|
||||||
max_to_id = to_key.get_max_stream_pos()
|
max_to_id = to_key.get_max_stream_pos()
|
||||||
|
|
||||||
args: List[Any] = [
|
args: List[Any] = [min_from_id, max_to_id, user_id, EventTypes.Member]
|
||||||
EventTypes.Member,
|
|
||||||
user_id,
|
|
||||||
user_id,
|
|
||||||
min_from_id,
|
|
||||||
max_to_id,
|
|
||||||
EventTypes.Member,
|
|
||||||
user_id,
|
|
||||||
]
|
|
||||||
|
|
||||||
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
||||||
# the first row in `current_state_delta_stream` for the rooms we're
|
# the first row in `current_state_delta_stream` for the rooms we're
|
||||||
|
@ -870,44 +859,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# `stream_ordering` is unique across the Synapse instance so this should
|
# `stream_ordering` is unique across the Synapse instance so this should
|
||||||
# work fine.
|
# work fine.
|
||||||
#
|
#
|
||||||
# We `COALESCE` the `stream_ordering` because we prefer the source of truth
|
# We `COALESCE` the `instance_name` and `stream_ordering` because we prefer
|
||||||
# from the `events` table. This gives slightly more accurate results when
|
# the source of truth from the events table. This gives slightly more
|
||||||
# available since `current_state_delta_stream` only tracks that the current
|
# accurate results when available since `current_state_delta_stream` only
|
||||||
# state is at this stream position (not what stream position the state event
|
# tracks that the current state is at this stream position (not what stream
|
||||||
# was added) and uses the *minimum* stream position for batches of events.
|
# position the state event was added) and batches events at the same
|
||||||
|
# `stream_id` in certain cases.
|
||||||
#
|
#
|
||||||
# The extra `LEFT JOIN` by stream position are only needed to tell a state
|
# TODO: We need to add indexes for `current_state_delta_stream.event_id` and
|
||||||
# reset from the server leaving the room. Both cases have `event_id = null`
|
# `current_state_delta_stream.state_key`/`current_state_delta_stream.type`
|
||||||
# but if we can find a corresponding event at that stream position, then we
|
# for this to be efficient.
|
||||||
# know it was just the server leaving the room.
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT
|
SELECT
|
||||||
COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
|
e.event_id,
|
||||||
s.prev_event_id,
|
s.prev_event_id,
|
||||||
s.room_id,
|
s.room_id,
|
||||||
s.instance_name,
|
COALESCE(e.instance_name, s.instance_name),
|
||||||
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
|
COALESCE(e.stream_ordering, s.stream_id),
|
||||||
COALESCE(m.membership, m_by_stream.membership) AS membership,
|
m.membership,
|
||||||
COALESCE(e.sender, e_by_stream.sender) AS sender,
|
e.sender
|
||||||
m_prev.membership AS prev_membership
|
|
||||||
FROM current_state_delta_stream AS s
|
FROM current_state_delta_stream AS s
|
||||||
LEFT JOIN events AS e ON e.event_id = s.event_id
|
LEFT JOIN events AS e ON e.event_id = s.event_id
|
||||||
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
||||||
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
|
|
||||||
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
|
|
||||||
AND e_by_stream.type = ?
|
|
||||||
AND e_by_stream.state_key = ?
|
|
||||||
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
|
|
||||||
AND m_by_stream.user_id = ?
|
|
||||||
WHERE s.stream_id > ? AND s.stream_id <= ?
|
WHERE s.stream_id > ? AND s.stream_id <= ?
|
||||||
AND s.type = ?
|
|
||||||
AND s.state_key = ?
|
AND s.state_key = ?
|
||||||
|
AND s.type = ?
|
||||||
ORDER BY s.stream_id ASC
|
ORDER BY s.stream_id ASC
|
||||||
"""
|
"""
|
||||||
|
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
|
|
||||||
membership_changes: List[CurrentStateDeltaMembership] = []
|
membership_changes: List[CurrentStateDeltaMembership] = []
|
||||||
|
membership_change_map: Dict[str, CurrentStateDeltaMembership] = {}
|
||||||
for (
|
for (
|
||||||
event_id,
|
event_id,
|
||||||
prev_event_id,
|
prev_event_id,
|
||||||
|
@ -916,7 +899,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
stream_ordering,
|
stream_ordering,
|
||||||
membership,
|
membership,
|
||||||
sender,
|
sender,
|
||||||
prev_membership,
|
|
||||||
) in txn:
|
) in txn:
|
||||||
assert room_id is not None
|
assert room_id is not None
|
||||||
assert instance_name is not None
|
assert instance_name is not None
|
||||||
|
@ -932,17 +914,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# `event_id = null` for all current state. This means we might
|
# `event_id = null` for all current state. This means we might
|
||||||
# already have a row for the leave event and then another for the
|
# already have a row for the leave event and then another for the
|
||||||
# same leave where the `event_id=null` but the `prev_event_id` is
|
# same leave where the `event_id=null` but the `prev_event_id` is
|
||||||
# pointing back at the earlier leave event. We don't want to report
|
# pointing back at the earlier leave event. Since we're assuming the
|
||||||
# the leave, if we already have a leave event.
|
# `event_id = null` row is a `leave` and we don't want duplicate
|
||||||
if event_id is None and prev_membership == Membership.LEAVE:
|
# membership changes in our results, let's get rid of those
|
||||||
|
# (deduplicate) (see `test_server_left_after_us_room`).
|
||||||
|
if event_id is None:
|
||||||
|
already_tracked_membership_change = membership_change_map.get(
|
||||||
|
prev_event_id
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
already_tracked_membership_change is not None
|
||||||
|
and already_tracked_membership_change.membership
|
||||||
|
== Membership.LEAVE
|
||||||
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# We can detect a state reset if there was a membership change
|
|
||||||
# without a corresponding event.
|
|
||||||
state_reset = False
|
|
||||||
if event_id is None and membership != prev_membership:
|
|
||||||
state_reset = True
|
|
||||||
|
|
||||||
membership_change = CurrentStateDeltaMembership(
|
membership_change = CurrentStateDeltaMembership(
|
||||||
event_id=event_id,
|
event_id=event_id,
|
||||||
event_pos=PersistedEventPosition(
|
event_pos=PersistedEventPosition(
|
||||||
|
@ -955,10 +941,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
membership if membership is not None else Membership.LEAVE
|
membership if membership is not None else Membership.LEAVE
|
||||||
),
|
),
|
||||||
sender=sender,
|
sender=sender,
|
||||||
state_reset=state_reset,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
membership_changes.append(membership_change)
|
membership_changes.append(membership_change)
|
||||||
|
if event_id:
|
||||||
|
membership_change_map[event_id] = membership_change
|
||||||
|
|
||||||
return membership_changes
|
return membership_changes
|
||||||
|
|
||||||
|
|
|
@ -615,7 +615,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -717,7 +716,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=leave_response1["event_id"],
|
event_id=leave_response1["event_id"],
|
||||||
|
@ -726,7 +724,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -788,9 +785,9 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
def test_we_cause_server_left_room(self) -> None:
|
def test_we_cause_server_left_room(self) -> None:
|
||||||
"""
|
"""
|
||||||
Test that when probing over part of the DAG where the user leaves the room
|
Test that when probing over part of the DAG where we leave the room causing the
|
||||||
causing the server to leave the room (because we were the last local user in the
|
server to leave the room (because we were the last local user in the room), we
|
||||||
room), we still see the join and leave changes.
|
still see the join and leave changes.
|
||||||
|
|
||||||
This is to make sure we play nicely with this behavior: When the server leaves a
|
This is to make sure we play nicely with this behavior: When the server leaves a
|
||||||
room, it will insert new rows with `event_id = null` into the
|
room, it will insert new rows with `event_id = null` into the
|
||||||
|
@ -885,16 +882,14 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=leave_response1["event_id"],
|
event_id=None, # leave_response1["event_id"],
|
||||||
event_pos=leave_pos1,
|
event_pos=leave_pos1,
|
||||||
prev_event_id=join_response1["event_id"],
|
prev_event_id=join_response1["event_id"],
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=user1_id,
|
sender=None, # user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1009,7 +1004,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user3_id,
|
sender=user3_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1025,7 +1019,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
user2_tok = self.login(user2_id, "pass")
|
user2_tok = self.login(user2_id, "pass")
|
||||||
|
|
||||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||||
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||||
|
|
||||||
before_reset_token = self.event_sources.get_current_token()
|
before_reset_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
@ -1062,7 +1056,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
"type": EventTypes.Member,
|
"type": EventTypes.Member,
|
||||||
"state_key": user1_id,
|
"state_key": user1_id,
|
||||||
"event_id": None,
|
"event_id": None,
|
||||||
"prev_event_id": join_response1["event_id"],
|
# FIXME: I'm not sure if a state reset should have a prev_event_id
|
||||||
|
"prev_event_id": None,
|
||||||
"instance_name": dummy_state_pos.instance_name,
|
"instance_name": dummy_state_pos.instance_name,
|
||||||
},
|
},
|
||||||
desc="state reset user in current_state_delta_stream",
|
desc="state reset user in current_state_delta_stream",
|
||||||
|
@ -1093,11 +1088,10 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=None,
|
event_id=None,
|
||||||
event_pos=dummy_state_pos,
|
event_pos=dummy_state_pos,
|
||||||
prev_event_id=join_response1["event_id"],
|
prev_event_id=None,
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=None, # user1_id,
|
sender=None, # user1_id,
|
||||||
state_reset=True,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1148,7 +1142,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=join_response2["event_id"],
|
event_id=join_response2["event_id"],
|
||||||
|
@ -1157,7 +1150,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id2,
|
room_id=room_id2,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1184,7 +1176,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1378,7 +1369,6 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
|
||||||
room_id=intially_unjoined_room_id,
|
room_id=intially_unjoined_room_id,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue