Return some room timeline data in Sliding Sync

This commit is contained in:
Eric Eastwood 2024-06-17 18:03:02 -05:00
parent a5485437cf
commit 079194c547
4 changed files with 275 additions and 30 deletions

View file

@ -18,22 +18,25 @@
#
#
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
import attr
from immutabledict import immutabledict
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
StreamKeyType,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter
from synapse.types.state import StateFilter, StateKey
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -82,6 +85,18 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
return membership != Membership.LEAVE or sender != user_id
# We can't freeze this class because we want to update it in place with the
# de-duplicated data.
@attr.s(slots=True, auto_attribs=True)
class RoomSyncConfig:
"""
Holds the config for what data we should fetch for a room in the sync response.
"""
timeline_limit: int
required_state: Set[StateKey]
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
@ -201,6 +216,7 @@ class SlidingSyncHandler:
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
@ -225,29 +241,66 @@ class SlidingSyncHandler:
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
room_id_set = {
room_id
for room_id, _ in sorted_room_info[range[0] : range[1]]
}
ops.append(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=[
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
room_ids=list(room_id_set),
)
)
# Update the relevant room map
for room_id in room_id_set:
if relevant_room_map.get(room_id) is not None:
# Take the highest timeline limit
if (
relevant_room_map[room_id].timeline_limit
< list_config.timeline_limit
):
relevant_room_map[room_id].timeline_limit = (
list_config.timeline_limit
)
# Union the required state
relevant_room_map[room_id].required_state.update(
list_config.required_state
)
else:
relevant_room_map[room_id] = RoomSyncConfig(
timeline_limit=list_config.timeline_limit,
required_state=set(list_config.required_state),
)
lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info),
ops=ops,
)
# TODO: if (sync_config.room_subscriptions):
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
rooms_for_user_membership_at_to_token=sync_room_map[room_id],
from_token=from_token,
to_token=to_token,
)
rooms[room_id] = room_sync_result
return SlidingSyncResult(
next_pos=to_token,
lists=lists,
# TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
rooms={},
rooms=rooms,
extensions={},
)
@ -665,3 +718,130 @@ class SlidingSyncHandler:
# We want descending order
reverse=True,
)
async def get_room_sync_data(
self,
user: UserID,
room_id: str,
room_sync_config: RoomSyncConfig,
rooms_for_user_membership_at_to_token: RoomsForUser,
from_token: Optional[StreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for a room.
We fetch data according to the token range (> `from_token` and <= `to_token`).
Args:
user: User to fetch data for
room_id: The room ID to fetch data for
room_sync_config: Config for what data we should fetch for a room in the
sync response.
rooms_for_user_membership_at_to_token: Membership information for the user
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
timeline_events: List[EventBase] = []
limited = False
# We want to use `to_token` (vs `from_token`) because we look backwards from the
# `to_token` up to the `timeline_limit` and we might not reach `from_token`
# before we hit the limit. We will update the room stream position once we've
# fetched the events.
prev_batch_token = to_token
if room_sync_config.timeline_limit > 0:
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
# We're going to paginate backwards from the `to_token`
from_key=to_token.room_key,
to_key=from_token.room_key if from_token is not None else None,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
event_filter=None,
)
# We want to return the events in ascending order (the last event is the
# most recent).
timeline_events.reverse()
timeline_events = await filter_events_for_client(
self.storage_controllers,
user.to_string(),
timeline_events,
is_peeking=rooms_for_user_membership_at_to_token.membership
!= Membership.JOIN,
filter_send_to_client=True,
)
# Determine our `limited` status
if len(timeline_events) > room_sync_config.timeline_limit:
limited = True
# Get rid of that extra "+ 1" event because we only used it to determine
# if we hit the limit or not
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
assert timeline_events[0].internal_metadata.stream_ordering
new_room_key = RoomStreamToken(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)
prev_batch_token = prev_batch_token.copy_and_replace(
StreamKeyType.ROOM, new_room_key
)
# Figure out any stripped state events for invite/knocks
stripped_state: List[EventBase] = []
if rooms_for_user_membership_at_to_token.membership in {
Membership.INVITE,
Membership.KNOCK,
}:
invite_or_knock_event = await self.store.get_event(
rooms_for_user_membership_at_to_token.event_id
)
stripped_state = []
if invite_or_knock_event.membership == Membership.INVITE:
stripped_state = invite_or_knock_event.unsigned.get(
"invite_room_state", []
)
elif invite_or_knock_event.membership == Membership.KNOCK:
stripped_state = invite_or_knock_event.unsigned.get(
"knock_room_state", []
)
stripped_state.append(invite_or_knock_event)
return SlidingSyncResult.RoomResult(
# TODO: Dummy value
name="TODO",
# TODO: Dummy value
avatar=None,
# TODO: Dummy value
heroes=None,
# Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial=True,
# TODO: Dummy value
required_state=[],
timeline=timeline_events,
# TODO: Dummy value
is_dm=False,
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
# TODO: Dummy values
joined_count=0,
invited_count=0,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
# TODO: Dummy value
num_live=0,
)

View file

@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet):
"lists": {
"foo-list": {
"ranges": [ [0, 99] ],
"sort": [ "by_notification_level", "by_recency", "by_name" ],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet):
"filters": {
"is_dm": true
},
"bump_event_types": [ "m.room.message", "m.room.encrypted" ],
}
},
// Room Subscriptions API
@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet):
"!sub1:bar": {
"required_state": [ ["*","*"] ],
"timeline_limit": 10,
"include_old_rooms": {
"timeline_limit": 1,
"required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
}
}
},
// Extensions API
@ -871,10 +865,11 @@ class SlidingSyncRestServlet(RestServlet):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
self.sliding_sync_handler = hs.get_sliding_sync_handler()
self.event_serializer = hs.get_event_client_serializer()
# TODO: Update this to `on_GET` once we figure out how we want to handle params
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user = requester.user
@ -920,13 +915,14 @@ class SlidingSyncRestServlet(RestServlet):
logger.info("Client has disconnected; not serializing response.")
return 200, {}
response_content = await self.encode_response(sliding_sync_results)
response_content = await self.encode_response(requester, sliding_sync_results)
return 200, response_content
# TODO: Is there a better way to encode things?
async def encode_response(
self,
requester: Requester,
sliding_sync_result: SlidingSyncResult,
) -> JsonDict:
response: JsonDict = defaultdict(dict)
@ -935,7 +931,9 @@ class SlidingSyncRestServlet(RestServlet):
serialized_lists = self.encode_lists(sliding_sync_result.lists)
if serialized_lists:
response["lists"] = serialized_lists
response["rooms"] = {} # TODO: sliding_sync_result.rooms
response["rooms"] = await self.encode_rooms(
requester, sliding_sync_result.rooms
)
response["extensions"] = {} # TODO: sliding_sync_result.extensions
return response
@ -961,6 +959,79 @@ class SlidingSyncRestServlet(RestServlet):
return serialized_lists
async def encode_rooms(
self,
requester: Requester,
rooms: Dict[str, SlidingSyncResult.RoomResult],
) -> JsonDict:
time_now = self.clock.time_msec()
serialize_options = SerializeEventConfig(
event_format=format_event_for_client_v2_without_room_id,
requester=requester,
)
serialized_rooms = {}
for room_id, room_result in rooms.items():
serialized_timeline = await self.event_serializer.serialize_events(
room_result.timeline,
time_now,
config=serialize_options,
# TODO
# bundle_aggregations=room.timeline.bundled_aggregations,
)
serialized_required_state = await self.event_serializer.serialize_events(
room_result.required_state,
time_now,
config=serialize_options,
)
serialized_rooms[room_id] = {
"name": room_result.name,
"required_state": serialized_required_state,
"timeline": serialized_timeline,
"prev_batch": await room_result.prev_batch.to_string(self.store),
"limited": room_result.limited,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
"num_live": room_result.num_live,
}
if room_result.avatar:
serialized_rooms[room_id]["avatar"] = room_result.avatar
if room_result.heroes:
serialized_rooms[room_id]["heroes"] = room_result.heroes
# We should only include the `initial` key if it's `True` to save bandwidth.
# The absense of this flag means `False`.
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial
# Field should be absent on non-DM rooms
if room_result.is_dm:
serialized_rooms[room_id]["is_dm"] = room_result.is_dm
# Stripped state only applies to invite/knock rooms
if room_result.stripped_state:
serialized_stripped_state = (
await self.event_serializer.serialize_events(
room_result.stripped_state,
time_now,
config=serialize_options,
)
)
# TODO: Would be good to rename this to `stripped_state` so it can be
# shared between invite and knock rooms, see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919
serialized_rooms[room_id]["invite_state"] = serialized_stripped_state
return serialized_rooms
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)

View file

@ -162,8 +162,9 @@ class SlidingSyncResult:
timeline: Latest events in the room. The last event is the most recent
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
in sync v2, absent on joined/left rooms
stripped_state: Stripped state events (for rooms where the usre is
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if their are more events than fit between the given position and now.
@ -192,7 +193,7 @@ class SlidingSyncResult:
required_state: List[EventBase]
timeline: List[EventBase]
is_dm: bool
invite_state: List[EventBase]
stripped_state: Optional[List[EventBase]]
prev_batch: StreamToken
limited: bool
joined_count: int

View file

@ -152,9 +152,6 @@ class SlidingSyncBody(RequestBodyModel):
anyway.
timeline_limit: The maximum number of timeline events to return per response.
(Max 1000 messages)
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
"""
class IncludeOldRooms(RequestBodyModel):
@ -167,7 +164,6 @@ class SlidingSyncBody(RequestBodyModel):
timeline_limit: int
else:
timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type]
include_old_rooms: Optional[IncludeOldRooms] = None
class SlidingSyncList(CommonRoomParameters):
"""
@ -208,9 +204,6 @@ class SlidingSyncBody(RequestBodyModel):
}
timeline_limit: The maximum number of timeline events to return per response.
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
include_heroes: Return a stripped variant of membership events (containing
`user_id` and optionally `avatar_url` and `displayname`) for the users used
to calculate the room name.