mirror of
https://github.com/element-hq/synapse
synced 2024-09-15 10:55:11 +00:00
Shared logic for get_sync_result_builder()
This commit is contained in:
parent
c60a4f84ac
commit
10ffae6c50
1 changed files with 143 additions and 134 deletions
|
@ -1673,128 +1673,17 @@ class SyncHandler:
|
||||||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
# Note: we get the users room list *before* we get the current token, this
|
sync_result_builder = await self.get_sync_result_builder(
|
||||||
# avoids checking back in history if rooms are joined after the token is fetched.
|
sync_config,
|
||||||
token_before_rooms = self.event_sources.get_current_token()
|
since_token,
|
||||||
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
full_state,
|
||||||
|
|
||||||
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
|
||||||
# this is due to some of the underlying streams not supporting the ability
|
|
||||||
# to query up to a given point.
|
|
||||||
# Always use the `now_token` in `SyncResultBuilder`
|
|
||||||
now_token = self.event_sources.get_current_token()
|
|
||||||
log_kv({"now_token": now_token})
|
|
||||||
|
|
||||||
# Since we fetched the users room list before the token, there's a small window
|
|
||||||
# during which membership events may have been persisted, so we fetch these now
|
|
||||||
# and modify the joined room list for any changes between the get_rooms_for_user
|
|
||||||
# call and the get_current_token call.
|
|
||||||
membership_change_events = []
|
|
||||||
if since_token:
|
|
||||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
|
||||||
user_id,
|
|
||||||
since_token.room_key,
|
|
||||||
now_token.room_key,
|
|
||||||
self.rooms_to_exclude_globally,
|
|
||||||
)
|
|
||||||
|
|
||||||
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
|
||||||
for event in membership_change_events:
|
|
||||||
mem_last_change_by_room_id[event.room_id] = event
|
|
||||||
|
|
||||||
# For the latest membership event in each room found, add/remove the room ID
|
|
||||||
# from the joined room list accordingly. In this case we only care if the
|
|
||||||
# latest change is JOIN.
|
|
||||||
|
|
||||||
for room_id, event in mem_last_change_by_room_id.items():
|
|
||||||
assert event.internal_metadata.stream_ordering
|
|
||||||
if (
|
|
||||||
event.internal_metadata.stream_ordering
|
|
||||||
< token_before_rooms.room_key.stream
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"User membership change between getting rooms and current token: %s %s %s",
|
|
||||||
user_id,
|
|
||||||
event.membership,
|
|
||||||
room_id,
|
|
||||||
)
|
|
||||||
# User joined a room - we have to then check the room state to ensure we
|
|
||||||
# respect any bans if there's a race between the join and ban events.
|
|
||||||
if event.membership == Membership.JOIN:
|
|
||||||
user_ids_in_room = await self.store.get_users_in_room(room_id)
|
|
||||||
if user_id in user_ids_in_room:
|
|
||||||
mutable_joined_room_ids.add(room_id)
|
|
||||||
# The user left the room, or left and was re-invited but not joined yet
|
|
||||||
else:
|
|
||||||
mutable_joined_room_ids.discard(room_id)
|
|
||||||
|
|
||||||
# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
|
|
||||||
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
|
|
||||||
if not sync_config.filter_collection.lazy_load_members():
|
|
||||||
# Non-lazy syncs should never include partially stated rooms.
|
|
||||||
# Exclude all partially stated rooms from this sync.
|
|
||||||
results = await self.store.is_partial_state_room_batched(
|
|
||||||
mutable_joined_room_ids
|
|
||||||
)
|
|
||||||
mutable_rooms_to_exclude.update(
|
|
||||||
room_id
|
|
||||||
for room_id, is_partial_state in results.items()
|
|
||||||
if is_partial_state
|
|
||||||
)
|
|
||||||
membership_change_events = [
|
|
||||||
event
|
|
||||||
for event in membership_change_events
|
|
||||||
if not results.get(event.room_id, False)
|
|
||||||
]
|
|
||||||
|
|
||||||
# Incremental eager syncs should additionally include rooms that
|
|
||||||
# - we are joined to
|
|
||||||
# - are full-stated
|
|
||||||
# - became fully-stated at some point during the sync period
|
|
||||||
# (These rooms will have been omitted during a previous eager sync.)
|
|
||||||
forced_newly_joined_room_ids: Set[str] = set()
|
|
||||||
if since_token and not sync_config.filter_collection.lazy_load_members():
|
|
||||||
un_partial_stated_rooms = (
|
|
||||||
await self.store.get_un_partial_stated_rooms_between(
|
|
||||||
since_token.un_partial_stated_rooms_key,
|
|
||||||
now_token.un_partial_stated_rooms_key,
|
|
||||||
mutable_joined_room_ids,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
results = await self.store.is_partial_state_room_batched(
|
|
||||||
un_partial_stated_rooms
|
|
||||||
)
|
|
||||||
forced_newly_joined_room_ids.update(
|
|
||||||
room_id
|
|
||||||
for room_id, is_partial_state in results.items()
|
|
||||||
if not is_partial_state
|
|
||||||
)
|
|
||||||
|
|
||||||
# Now we have our list of joined room IDs, exclude as configured and freeze
|
|
||||||
joined_room_ids = frozenset(
|
|
||||||
room_id
|
|
||||||
for room_id in mutable_joined_room_ids
|
|
||||||
if room_id not in mutable_rooms_to_exclude
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Calculating sync response for %r between %s and %s",
|
"Calculating sync response for %r between %s and %s",
|
||||||
sync_config.user,
|
sync_config.user,
|
||||||
since_token,
|
sync_result_builder.since_token,
|
||||||
now_token,
|
sync_result_builder.now_token,
|
||||||
)
|
|
||||||
|
|
||||||
sync_result_builder = SyncResultBuilder(
|
|
||||||
sync_config,
|
|
||||||
full_state,
|
|
||||||
since_token=since_token,
|
|
||||||
now_token=now_token,
|
|
||||||
joined_room_ids=joined_room_ids,
|
|
||||||
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
|
|
||||||
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
|
|
||||||
membership_change_events=membership_change_events,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Fetching account data")
|
logger.debug("Fetching account data")
|
||||||
|
@ -1918,37 +1807,157 @@ class SyncHandler:
|
||||||
# them since the appservice doesn't have to make a massive initial sync.
|
# them since the appservice doesn't have to make a massive initial sync.
|
||||||
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)
|
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)
|
||||||
|
|
||||||
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
sync_result_builder = await self.get_sync_result_builder(
|
||||||
|
sync_config,
|
||||||
|
since_token,
|
||||||
|
full_state=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1. Calculate `device_lists`
|
||||||
|
device_lists = await self._generate_sync_entry_for_device_list(
|
||||||
|
sync_result_builder
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Calculate `to_device` events
|
||||||
|
await self._generate_sync_entry_for_to_device(sync_result_builder)
|
||||||
|
|
||||||
|
return E2eeSyncResult(
|
||||||
|
to_device=sync_result_builder.to_device,
|
||||||
|
device_lists=device_lists,
|
||||||
|
# device_one_time_keys_count: JsonMapping
|
||||||
|
# device_unused_fallback_key_types: List[str]
|
||||||
|
next_batch=sync_result_builder.now_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_sync_result_builder(
|
||||||
|
self,
|
||||||
|
sync_config: SyncConfig,
|
||||||
|
since_token: Optional[StreamToken] = None,
|
||||||
|
full_state: bool = False,
|
||||||
|
) -> "SyncResultBuilder":
|
||||||
|
"""
|
||||||
|
Assemble a `SyncResultBuilder` with all of the necessary context
|
||||||
|
"""
|
||||||
|
user_id = sync_config.user.to_string()
|
||||||
|
|
||||||
|
# NB: The `now_token` gets changed by some of the `generate_sync_*` methods,
|
||||||
# this is due to some of the underlying streams not supporting the ability
|
# this is due to some of the underlying streams not supporting the ability
|
||||||
# to query up to a given point.
|
# to query up to a given point.
|
||||||
# Always use the `now_token` in `SyncResultBuilder`
|
# Always use the `now_token` in `SyncResultBuilder`
|
||||||
now_token = self.event_sources.get_current_token()
|
now_token = self.event_sources.get_current_token()
|
||||||
log_kv({"now_token": now_token})
|
log_kv({"now_token": now_token})
|
||||||
|
|
||||||
joined_room_ids = await self.store.get_rooms_for_user(user_id)
|
# Note: we get the users room list *before* we get the current token, this
|
||||||
|
# avoids checking back in history if rooms are joined after the token is fetched.
|
||||||
|
token_before_rooms = self.event_sources.get_current_token()
|
||||||
|
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
||||||
|
|
||||||
|
# Since we fetched the users room list before the token, there's a small window
|
||||||
|
# during which membership events may have been persisted, so we fetch these now
|
||||||
|
# and modify the joined room list for any changes between the get_rooms_for_user
|
||||||
|
# call and the get_current_token call.
|
||||||
|
membership_change_events = []
|
||||||
|
if since_token:
|
||||||
|
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||||
|
user_id,
|
||||||
|
since_token.room_key,
|
||||||
|
now_token.room_key,
|
||||||
|
self.rooms_to_exclude_globally,
|
||||||
|
)
|
||||||
|
|
||||||
|
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
||||||
|
for event in membership_change_events:
|
||||||
|
mem_last_change_by_room_id[event.room_id] = event
|
||||||
|
|
||||||
|
# For the latest membership event in each room found, add/remove the room ID
|
||||||
|
# from the joined room list accordingly. In this case we only care if the
|
||||||
|
# latest change is JOIN.
|
||||||
|
|
||||||
|
for room_id, event in mem_last_change_by_room_id.items():
|
||||||
|
assert event.internal_metadata.stream_ordering
|
||||||
|
if (
|
||||||
|
event.internal_metadata.stream_ordering
|
||||||
|
< token_before_rooms.room_key.stream
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"User membership change between getting rooms and current token: %s %s %s",
|
||||||
|
user_id,
|
||||||
|
event.membership,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
# User joined a room - we have to then check the room state to ensure we
|
||||||
|
# respect any bans if there's a race between the join and ban events.
|
||||||
|
if event.membership == Membership.JOIN:
|
||||||
|
user_ids_in_room = await self.store.get_users_in_room(room_id)
|
||||||
|
if user_id in user_ids_in_room:
|
||||||
|
mutable_joined_room_ids.add(room_id)
|
||||||
|
# The user left the room, or left and was re-invited but not joined yet
|
||||||
|
else:
|
||||||
|
mutable_joined_room_ids.discard(room_id)
|
||||||
|
|
||||||
|
# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
|
||||||
|
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
|
||||||
|
if not sync_config.filter_collection.lazy_load_members():
|
||||||
|
# Non-lazy syncs should never include partially stated rooms.
|
||||||
|
# Exclude all partially stated rooms from this sync.
|
||||||
|
results = await self.store.is_partial_state_room_batched(
|
||||||
|
mutable_joined_room_ids
|
||||||
|
)
|
||||||
|
mutable_rooms_to_exclude.update(
|
||||||
|
room_id
|
||||||
|
for room_id, is_partial_state in results.items()
|
||||||
|
if is_partial_state
|
||||||
|
)
|
||||||
|
membership_change_events = [
|
||||||
|
event
|
||||||
|
for event in membership_change_events
|
||||||
|
if not results.get(event.room_id, False)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Incremental eager syncs should additionally include rooms that
|
||||||
|
# - we are joined to
|
||||||
|
# - are full-stated
|
||||||
|
# - became fully-stated at some point during the sync period
|
||||||
|
# (These rooms will have been omitted during a previous eager sync.)
|
||||||
|
forced_newly_joined_room_ids: Set[str] = set()
|
||||||
|
if since_token and not sync_config.filter_collection.lazy_load_members():
|
||||||
|
un_partial_stated_rooms = (
|
||||||
|
await self.store.get_un_partial_stated_rooms_between(
|
||||||
|
since_token.un_partial_stated_rooms_key,
|
||||||
|
now_token.un_partial_stated_rooms_key,
|
||||||
|
mutable_joined_room_ids,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
results = await self.store.is_partial_state_room_batched(
|
||||||
|
un_partial_stated_rooms
|
||||||
|
)
|
||||||
|
forced_newly_joined_room_ids.update(
|
||||||
|
room_id
|
||||||
|
for room_id, is_partial_state in results.items()
|
||||||
|
if not is_partial_state
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now we have our list of joined room IDs, exclude as configured and freeze
|
||||||
|
joined_room_ids = frozenset(
|
||||||
|
room_id
|
||||||
|
for room_id in mutable_joined_room_ids
|
||||||
|
if room_id not in mutable_rooms_to_exclude
|
||||||
|
)
|
||||||
|
|
||||||
sync_result_builder = SyncResultBuilder(
|
sync_result_builder = SyncResultBuilder(
|
||||||
sync_config,
|
sync_config,
|
||||||
full_state=False,
|
full_state,
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
now_token=now_token,
|
now_token=now_token,
|
||||||
joined_room_ids=joined_room_ids,
|
joined_room_ids=joined_room_ids,
|
||||||
# Dummy values to fill out `SyncResultBuilder`
|
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
|
||||||
excluded_room_ids=frozenset({}),
|
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
|
||||||
forced_newly_joined_room_ids=frozenset({}),
|
membership_change_events=membership_change_events,
|
||||||
membership_change_events=[],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await self._generate_sync_entry_for_to_device(sync_result_builder)
|
return sync_result_builder
|
||||||
|
|
||||||
return E2eeSyncResult(
|
|
||||||
to_device=sync_result_builder.to_device,
|
|
||||||
# to_device: List[JsonDict]
|
|
||||||
# device_lists: DeviceListUpdates
|
|
||||||
# device_one_time_keys_count: JsonMapping
|
|
||||||
# device_unused_fallback_key_types: List[str]
|
|
||||||
next_batch=sync_result_builder.now_token,
|
|
||||||
)
|
|
||||||
|
|
||||||
@measure_func("_generate_sync_entry_for_device_list")
|
@measure_func("_generate_sync_entry_for_device_list")
|
||||||
async def _generate_sync_entry_for_device_list(
|
async def _generate_sync_entry_for_device_list(
|
||||||
|
|
Loading…
Reference in a new issue