mirror of
https://github.com/element-hq/synapse
synced 2024-07-04 08:43:29 +00:00
Don't worry about state_reset
for now
See: - Why no `COALESCE` https://github.com/element-hq/synapse/pull/17320#discussion_r1657435662 - Don't worry about `state_reset` for now, https://github.com/element-hq/synapse/pull/17320#discussion_r1657562645
This commit is contained in:
parent
81c06bec20
commit
eb159c11cd
|
@ -123,8 +123,6 @@ class CurrentStateDeltaMembership:
|
||||||
room_id: The room ID of the membership event.
|
room_id: The room ID of the membership event.
|
||||||
membership: The membership state of the user in the room
|
membership: The membership state of the user in the room
|
||||||
sender: The person who sent the membership event
|
sender: The person who sent the membership event
|
||||||
state_reset: Whether the membership in the room was changed without a
|
|
||||||
corresponding event (state reset).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
event_id: Optional[str]
|
event_id: Optional[str]
|
||||||
|
@ -133,7 +131,6 @@ class CurrentStateDeltaMembership:
|
||||||
room_id: str
|
room_id: str
|
||||||
membership: str
|
membership: str
|
||||||
sender: Optional[str]
|
sender: Optional[str]
|
||||||
state_reset: bool
|
|
||||||
|
|
||||||
|
|
||||||
def generate_pagination_where_clause(
|
def generate_pagination_where_clause(
|
||||||
|
@ -849,56 +846,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
min_from_id = from_key.stream
|
min_from_id = from_key.stream
|
||||||
max_to_id = to_key.get_max_stream_pos()
|
max_to_id = to_key.get_max_stream_pos()
|
||||||
|
|
||||||
args: List[Any] = [
|
args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
|
||||||
EventTypes.Member,
|
|
||||||
user_id,
|
|
||||||
user_id,
|
|
||||||
min_from_id,
|
|
||||||
max_to_id,
|
|
||||||
EventTypes.Member,
|
|
||||||
user_id,
|
|
||||||
]
|
|
||||||
|
|
||||||
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
|
||||||
# the first row in `current_state_delta_stream` for the rooms we're
|
# the first row in `current_state_delta_stream` for the rooms we're
|
||||||
# interested in. Otherwise, we will end up with empty results and not know
|
# interested in. Otherwise, we will end up with empty results and not know
|
||||||
# it.
|
# it.
|
||||||
|
|
||||||
# We have to look-up events by `stream_ordering` because
|
# We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate
|
||||||
# `current_state_delta_stream.event_id` can be `null` if the server is no
|
# stream positioning when available but given our usages, we can avoid the
|
||||||
# longer in the room or a state reset happened and it was unset.
|
# complexity. Between two (valid) stream tokens, we will still get all of
|
||||||
# `stream_ordering` is unique across the Synapse instance so this should
|
# the state changes. Since those events are persisted in a batch, valid
|
||||||
# work fine.
|
# tokens will either be before or after the batch of events.
|
||||||
#
|
#
|
||||||
# We `COALESCE` the `stream_ordering` because we prefer the source of truth
|
# `stream_ordering` from the `events` table is more accurate when available
|
||||||
# from the `events` table. This gives slightly more accurate results when
|
# since the `current_state_delta_stream` table only tracks that the current
|
||||||
# available since `current_state_delta_stream` only tracks that the current
|
|
||||||
# state is at this stream position (not what stream position the state event
|
# state is at this stream position (not what stream position the state event
|
||||||
# was added) and uses the *minimum* stream position for batches of events.
|
# was added) and uses the *minimum* stream position for batches of events.
|
||||||
#
|
|
||||||
# The extra `LEFT JOIN` by stream position are only needed to tell a state
|
|
||||||
# reset from the server leaving the room. Both cases have `event_id = null`
|
|
||||||
# but if we can find a corresponding event at that stream position, then we
|
|
||||||
# know it was just the server leaving the room.
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT
|
SELECT
|
||||||
COALESCE(e.event_id, e_by_stream.event_id) AS event_id,
|
e.event_id,
|
||||||
s.prev_event_id,
|
s.prev_event_id,
|
||||||
s.room_id,
|
s.room_id,
|
||||||
s.instance_name,
|
s.instance_name,
|
||||||
COALESCE(e.stream_ordering, e_by_stream.stream_ordering, s.stream_id) AS stream_ordering,
|
s.stream_id,
|
||||||
COALESCE(m.membership, m_by_stream.membership) AS membership,
|
m.membership,
|
||||||
COALESCE(e.sender, e_by_stream.sender) AS sender,
|
e.sender,
|
||||||
m_prev.membership AS prev_membership
|
m_prev.membership AS prev_membership
|
||||||
FROM current_state_delta_stream AS s
|
FROM current_state_delta_stream AS s
|
||||||
LEFT JOIN events AS e ON e.event_id = s.event_id
|
LEFT JOIN events AS e ON e.event_id = s.event_id
|
||||||
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
||||||
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
|
LEFT JOIN room_memberships AS m_prev ON s.prev_event_id = m_prev.event_id
|
||||||
LEFT JOIN events AS e_by_stream ON e_by_stream.stream_ordering = s.stream_id
|
|
||||||
AND e_by_stream.type = ?
|
|
||||||
AND e_by_stream.state_key = ?
|
|
||||||
LEFT JOIN room_memberships AS m_by_stream ON m_by_stream.event_stream_ordering = s.stream_id
|
|
||||||
AND m_by_stream.user_id = ?
|
|
||||||
WHERE s.stream_id > ? AND s.stream_id <= ?
|
WHERE s.stream_id > ? AND s.stream_id <= ?
|
||||||
AND s.type = ?
|
AND s.type = ?
|
||||||
AND s.state_key = ?
|
AND s.state_key = ?
|
||||||
|
@ -937,12 +915,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
if event_id is None and prev_membership == Membership.LEAVE:
|
if event_id is None and prev_membership == Membership.LEAVE:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# We can detect a state reset if there was a membership change
|
|
||||||
# without a corresponding event.
|
|
||||||
state_reset = False
|
|
||||||
if event_id is None and membership != prev_membership:
|
|
||||||
state_reset = True
|
|
||||||
|
|
||||||
membership_change = CurrentStateDeltaMembership(
|
membership_change = CurrentStateDeltaMembership(
|
||||||
event_id=event_id,
|
event_id=event_id,
|
||||||
event_pos=PersistedEventPosition(
|
event_pos=PersistedEventPosition(
|
||||||
|
@ -955,7 +927,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
membership if membership is not None else Membership.LEAVE
|
membership if membership is not None else Membership.LEAVE
|
||||||
),
|
),
|
||||||
sender=sender,
|
sender=sender,
|
||||||
state_reset=state_reset,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
membership_changes.append(membership_change)
|
membership_changes.append(membership_change)
|
||||||
|
|
|
@ -615,7 +615,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -717,7 +716,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=leave_response1["event_id"],
|
event_id=leave_response1["event_id"],
|
||||||
|
@ -726,7 +724,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -885,16 +882,14 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=leave_response1["event_id"],
|
event_id=None, # leave_response1["event_id"],
|
||||||
event_pos=leave_pos1,
|
event_pos=leave_pos1,
|
||||||
prev_event_id=join_response1["event_id"],
|
prev_event_id=join_response1["event_id"],
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=user1_id,
|
sender=None, # user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -924,16 +919,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
# Persist the user1, user3, and user4 join events in the same batch so they all
|
# Persist the user1, user3, and user4 join events in the same batch so they all
|
||||||
# end up in the `current_state_delta_stream` table with the same
|
# end up in the `current_state_delta_stream` table with the same
|
||||||
# stream_ordering.
|
# stream_ordering.
|
||||||
join_event1, join_event_context1 = self.get_success(
|
|
||||||
create_event(
|
|
||||||
self.hs,
|
|
||||||
sender=user1_id,
|
|
||||||
type=EventTypes.Member,
|
|
||||||
state_key=user1_id,
|
|
||||||
content={"membership": "join"},
|
|
||||||
room_id=room_id1,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
join_event3, join_event_context3 = self.get_success(
|
join_event3, join_event_context3 = self.get_success(
|
||||||
create_event(
|
create_event(
|
||||||
self.hs,
|
self.hs,
|
||||||
|
@ -944,6 +929,19 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
# We want to put user1 in the middle of the batch. This way, regardless of the
|
||||||
|
# implementation that inserts rows into current_state_delta_stream` (whether it
|
||||||
|
# be minimum/maximum of stream position of the batch), we will still catch bugs.
|
||||||
|
join_event1, join_event_context1 = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
sender=user1_id,
|
||||||
|
type=EventTypes.Member,
|
||||||
|
state_key=user1_id,
|
||||||
|
content={"membership": "join"},
|
||||||
|
room_id=room_id1,
|
||||||
|
)
|
||||||
|
)
|
||||||
join_event4, join_event_context4 = self.get_success(
|
join_event4, join_event_context4 = self.get_success(
|
||||||
create_event(
|
create_event(
|
||||||
self.hs,
|
self.hs,
|
||||||
|
@ -957,8 +955,8 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.persistence.persist_events(
|
self.persistence.persist_events(
|
||||||
[
|
[
|
||||||
(join_event1, join_event_context1),
|
|
||||||
(join_event3, join_event_context3),
|
(join_event3, join_event_context3),
|
||||||
|
(join_event1, join_event_context1),
|
||||||
(join_event4, join_event_context4),
|
(join_event4, join_event_context4),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
@ -966,10 +964,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
after_room1_token = self.event_sources.get_current_token()
|
after_room1_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
# Let's get membership changes from user3's perspective because it was in the
|
# Get the membership changes for the user.
|
||||||
# middle of the batch. This way, if rows in` current_state_delta_stream` are
|
|
||||||
# stored with the first or last event's `stream_ordering`, we will still catch
|
|
||||||
# bugs.
|
|
||||||
#
|
#
|
||||||
# At this point, the `current_state_delta_stream` table should look like (notice
|
# At this point, the `current_state_delta_stream` table should look like (notice
|
||||||
# those three memberships at the end with `stream_id=7` because we persisted
|
# those three memberships at the end with `stream_id=7` because we persisted
|
||||||
|
@ -987,7 +982,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
|
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
|
||||||
membership_changes = self.get_success(
|
membership_changes = self.get_success(
|
||||||
self.store.get_current_state_delta_membership_changes_for_user(
|
self.store.get_current_state_delta_membership_changes_for_user(
|
||||||
user3_id,
|
user1_id,
|
||||||
from_key=before_room1_token.room_key,
|
from_key=before_room1_token.room_key,
|
||||||
to_key=after_room1_token.room_key,
|
to_key=after_room1_token.room_key,
|
||||||
)
|
)
|
||||||
|
@ -1003,13 +998,16 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
membership_changes,
|
membership_changes,
|
||||||
[
|
[
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=join_event3.event_id,
|
event_id=join_event1.event_id,
|
||||||
|
# Ideally, this would be `join_pos1` (to match the `event_id`) but
|
||||||
|
# when events are persisted in a batch, they are all stored in the
|
||||||
|
# `current_state_delta_stream` table with the minimum
|
||||||
|
# `stream_ordering` from the batch.
|
||||||
event_pos=join_pos3,
|
event_pos=join_pos3,
|
||||||
prev_event_id=None,
|
prev_event_id=None,
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user3_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1097,7 +1095,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="leave",
|
membership="leave",
|
||||||
sender=None, # user1_id,
|
sender=None, # user1_id,
|
||||||
state_reset=True,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1148,7 +1145,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
CurrentStateDeltaMembership(
|
CurrentStateDeltaMembership(
|
||||||
event_id=join_response2["event_id"],
|
event_id=join_response2["event_id"],
|
||||||
|
@ -1157,7 +1153,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id2,
|
room_id=room_id2,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1184,7 +1179,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||||
room_id=room_id1,
|
room_id=room_id1,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1378,7 +1372,6 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
|
||||||
room_id=intially_unjoined_room_id,
|
room_id=intially_unjoined_room_id,
|
||||||
membership="join",
|
membership="join",
|
||||||
sender=user1_id,
|
sender=user1_id,
|
||||||
state_reset=False,
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue