mirror of
https://github.com/element-hq/synapse
synced 2024-06-30 14:53:29 +00:00
Compare commits
5 commits
a5b7922de1
...
104a2db590
Author | SHA1 | Date | |
---|---|---|---|
|
104a2db590 | ||
|
6156923114 | ||
|
0c4580d688 | ||
|
cdfed1c4fa | ||
|
944d7f6727 |
1
changelog.d/17149.misc
Normal file
1
changelog.d/17149.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Small performance improvement to limited incremental sync in large rooms.
|
|
@ -148,6 +148,12 @@ class TimelineBatch:
|
||||||
prev_batch: StreamToken
|
prev_batch: StreamToken
|
||||||
events: Sequence[EventBase]
|
events: Sequence[EventBase]
|
||||||
limited: bool
|
limited: bool
|
||||||
|
|
||||||
|
# All the events that were fetched from the DB while loading the room. This
|
||||||
|
# is a superset of `events`.
|
||||||
|
fetched_events: Sequence[EventBase]
|
||||||
|
fetched_limited: bool # Whether there is a gap between the previous timeline batch
|
||||||
|
|
||||||
# A mapping of event ID to the bundled aggregations for the above events.
|
# A mapping of event ID to the bundled aggregations for the above events.
|
||||||
# This is only calculated if limited is true.
|
# This is only calculated if limited is true.
|
||||||
bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
|
bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
|
||||||
|
@ -861,7 +867,11 @@ class SyncHandler:
|
||||||
)
|
)
|
||||||
|
|
||||||
return TimelineBatch(
|
return TimelineBatch(
|
||||||
events=recents, prev_batch=prev_batch_token, limited=False
|
events=recents,
|
||||||
|
prev_batch=prev_batch_token,
|
||||||
|
limited=False,
|
||||||
|
fetched_events=recents,
|
||||||
|
fetched_limited=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
filtering_factor = 2
|
filtering_factor = 2
|
||||||
|
@ -878,6 +888,9 @@ class SyncHandler:
|
||||||
elif since_token and not newly_joined_room:
|
elif since_token and not newly_joined_room:
|
||||||
since_key = since_token.room_key
|
since_key = since_token.room_key
|
||||||
|
|
||||||
|
fetched_events: List[EventBase] = []
|
||||||
|
fetched_limited = True
|
||||||
|
|
||||||
while limited and len(recents) < timeline_limit and max_repeat:
|
while limited and len(recents) < timeline_limit and max_repeat:
|
||||||
# If we have a since_key then we are trying to get any events
|
# If we have a since_key then we are trying to get any events
|
||||||
# that have happened since `since_key` up to `end_key`, so we
|
# that have happened since `since_key` up to `end_key`, so we
|
||||||
|
@ -896,6 +909,10 @@ class SyncHandler:
|
||||||
room_id, limit=load_limit + 1, end_token=end_key
|
room_id, limit=load_limit + 1, end_token=end_key
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# We prepend as `fetched_events` is in ascending stream order,
|
||||||
|
# and `events` is from *before* the previously fetched events.
|
||||||
|
fetched_events = events + fetched_events
|
||||||
|
|
||||||
log_kv({"loaded_recents": len(events)})
|
log_kv({"loaded_recents": len(events)})
|
||||||
|
|
||||||
loaded_recents = (
|
loaded_recents = (
|
||||||
|
@ -947,6 +964,7 @@ class SyncHandler:
|
||||||
|
|
||||||
if len(events) <= load_limit:
|
if len(events) <= load_limit:
|
||||||
limited = False
|
limited = False
|
||||||
|
fetched_limited = False
|
||||||
break
|
break
|
||||||
max_repeat -= 1
|
max_repeat -= 1
|
||||||
|
|
||||||
|
@ -977,6 +995,8 @@ class SyncHandler:
|
||||||
# (to force client to paginate the gap).
|
# (to force client to paginate the gap).
|
||||||
limited=limited or newly_joined_room or gap_token is not None,
|
limited=limited or newly_joined_room or gap_token is not None,
|
||||||
bundled_aggregations=bundled_aggregations,
|
bundled_aggregations=bundled_aggregations,
|
||||||
|
fetched_events=fetched_events,
|
||||||
|
fetched_limited=fetched_limited,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_state_after_event(
|
async def get_state_after_event(
|
||||||
|
@ -1514,8 +1534,12 @@ class SyncHandler:
|
||||||
#
|
#
|
||||||
# c.f. #16941 for an example of why we can't do this for all non-gappy
|
# c.f. #16941 for an example of why we can't do this for all non-gappy
|
||||||
# syncs.
|
# syncs.
|
||||||
|
#
|
||||||
|
# We can apply a similar optimization for gappy syncs if we know the room
|
||||||
|
# has been linear in the gap, so instead of just looking at the
|
||||||
|
# `timeline.batch` we can look at `timeline.fetched_events`.
|
||||||
is_linear_timeline = True
|
is_linear_timeline = True
|
||||||
if batch.events:
|
if batch.fetched_events:
|
||||||
# We need to make sure the first event in our batch points to the
|
# We need to make sure the first event in our batch points to the
|
||||||
# last event in the previous batch.
|
# last event in the previous batch.
|
||||||
last_event_id_prev_batch = (
|
last_event_id_prev_batch = (
|
||||||
|
@ -1532,8 +1556,19 @@ class SyncHandler:
|
||||||
break
|
break
|
||||||
prev_event_id = e.event_id
|
prev_event_id = e.event_id
|
||||||
|
|
||||||
if is_linear_timeline and not batch.limited:
|
if is_linear_timeline and not batch.fetched_limited:
|
||||||
state_ids: StateMap[str] = {}
|
batch_state_ids: MutableStateMap[str] = {}
|
||||||
|
|
||||||
|
# If the returned batch is actually limited, we need to add the
|
||||||
|
# state events that happened in the batch.
|
||||||
|
if batch.limited:
|
||||||
|
timeline_events = {e.event_id for e in batch.events}
|
||||||
|
batch_state_ids = {
|
||||||
|
(e.type, e.state_key): e.event_id
|
||||||
|
for e in batch.fetched_events
|
||||||
|
if e.is_state() and e.event_id not in timeline_events
|
||||||
|
}
|
||||||
|
|
||||||
if lazy_load_members:
|
if lazy_load_members:
|
||||||
if members_to_fetch and batch.events:
|
if members_to_fetch and batch.events:
|
||||||
# We're lazy-loading, so the client might need some more
|
# We're lazy-loading, so the client might need some more
|
||||||
|
@ -1542,7 +1577,7 @@ class SyncHandler:
|
||||||
# timeline here. The caller will then dedupe any redundant
|
# timeline here. The caller will then dedupe any redundant
|
||||||
# ones.
|
# ones.
|
||||||
|
|
||||||
state_ids = await self._state_storage_controller.get_state_ids_for_event(
|
ll_state_ids = await self._state_storage_controller.get_state_ids_for_event(
|
||||||
batch.events[0].event_id,
|
batch.events[0].event_id,
|
||||||
# we only want members!
|
# we only want members!
|
||||||
state_filter=StateFilter.from_types(
|
state_filter=StateFilter.from_types(
|
||||||
|
@ -1550,7 +1585,8 @@ class SyncHandler:
|
||||||
),
|
),
|
||||||
await_full_state=False,
|
await_full_state=False,
|
||||||
)
|
)
|
||||||
return state_ids
|
batch_state_ids.update(ll_state_ids)
|
||||||
|
return batch_state_ids
|
||||||
|
|
||||||
if batch:
|
if batch:
|
||||||
state_at_timeline_start = (
|
state_at_timeline_start = (
|
||||||
|
|
Loading…
Reference in a new issue