Iterating

This commit is contained in:
Eric Eastwood 2024-05-15 14:53:53 -05:00
parent c8256b6cbc
commit ee6baba7b6
3 changed files with 318 additions and 267 deletions

View file

@ -10,102 +10,141 @@ from typing import (
import attr
from synapse._pydantic_compat import HAS_PYDANTIC_V2
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import Extra
else:
from pydantic import Extra
from synapse.events import EventBase
from synapse.rest.client.models import SlidingSyncBody
from synapse.types import (
JsonMapping,
Requester,
StreamToken,
UserID,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomResult:
"""
Attributes:
name: Room name or calculated room name.
avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name.
initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
in sync v2, absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if their are more events than fit between the given position and now.
Sync again to get more.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
"""
name: str
avatar: Optional[str]
heroes: Optional[List[EventBase]]
initial: bool
required_state: List[EventBase]
timeline: List[EventBase]
is_dm: bool
invite_state: List[EventBase]
prev_batch: StreamToken
limited: bool
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
num_live: int
logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
# TODO
pass
class SlidingSyncConfig(SlidingSyncBody):
user: UserID
device_id: str
class Config:
# By default, ignore fields that we don't recognise.
extra = Extra.ignore
# By default, don't allow fields to be reassigned after parsing.
allow_mutation = False
# Allow custom types like `UserID` to be used in the model
arbitrary_types_allowed = True
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncResult:
"""
Attributes:
pos: The next position in the sliding window to request (next_pos, next_batch).
next_pos: The next position token in the sliding window to request (next_batch).
lists: Sliding window API. A map of list key to list results.
rooms: Room subscription API. A map of room ID to room subscription to room results.
extensions: TODO
"""
pos: str
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomResult:
"""
Attributes:
name: Room name or calculated room name.
avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name.
initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
in sync v2, absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if their are more events than fit between the given position and now.
Sync again to get more.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
"""
name: str
avatar: Optional[str]
heroes: Optional[List[EventBase]]
initial: bool
required_state: List[EventBase]
timeline: List[EventBase]
is_dm: bool
invite_state: List[EventBase]
prev_batch: StreamToken
limited: bool
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
num_live: int
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
# TODO
pass
next_pos: str
lists: Dict[str, SlidingWindowList]
rooms: List[RoomResult]
extensions: JsonMapping
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
"""
return bool(self.lists or self.rooms or self.extensions)
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
self.store = hs.get_datastores().main
self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources()
async def wait_for_sync_for_user():
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SlidingSyncConfig,
from_token: Optional[StreamToken] = None,
timeout: int = 0,
):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@ -113,46 +152,44 @@ class SlidingSyncHandler:
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
# auth_blocking will occur)
user_id = sync_config.user.to_string()
await self.auth_blocking.check_auth_blocking(requester=requester)
# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
if since_token is not None:
since_stream_id = since_token.to_device_key
deleted = await self.store.delete_messages_for_device(
sync_config.user.to_string(),
sync_config.device_id,
since_stream_id,
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
)
# TODO: If the To-Device extension is enabled and we have a since token, delete
# any to-device messages before that token (since we now know that the device
# has received them). (see sync v2 for how to do this)
if timeout == 0 or since_token is None:
if timeout == 0 or from_token is None:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
sync_config,
from_token=from_token,
to_token=self.event_sources.get_current_token(),
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> Union[SyncResult, E2eeSyncResult]:
) -> SlidingSyncResult:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
sync_config,
from_token=from_token,
to_token=after_token,
)
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
sync_config.user,
timeout,
current_sync_callback,
from_token=since_token,
from_token=from_token,
)
pass
def assemble_response():
# ...
pass
async def current_sync_for_user(
sync_config: SlidingSyncConfig,
from_token: Optional[StreamToken] = None,
to_token: Optional[StreamToken] = None,
):
user_id = sync_config.user.to_string()
# TODO: Should we exclude app services here? There could be an argument to allow
# them since the appservice doesn't have to make a massive initial sync.
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)

View file

@ -18,14 +18,30 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict, Optional, List, Optional, Tuple, Union
from synapse._pydantic_compat import HAS_PYDANTIC_V2
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import Extra, StrictInt, StrictStr, constr, validator
from pydantic.v1 import (
Extra,
StrictBool,
StrictInt,
StrictStr,
conint,
constr,
validator,
)
else:
from pydantic import Extra, StrictInt, StrictStr, constr, validator
from pydantic import (
Extra,
StrictBool,
StrictInt,
StrictStr,
conint,
constr,
validator,
)
from synapse.rest.models import RequestBodyModel
from synapse.util.threepids import validate_email
@ -97,3 +113,155 @@ else:
class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
country: ISO3116_1_Alpha_2
phone_number: StrictStr
class SlidingSyncBody(RequestBodyModel):
"""
Attributes:
lists: Sliding window API. A map of list key to list information
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
arbitrary strings which the client is using to refer to the list. Keep this
small as it needs to be sent a lot. Max length: 64 bytes.
room_subscriptions: Room subscription API. A map of room ID to room subscription
information. Used to subscribe to a specific room. Sometimes clients know
exactly which room they want to get information about e.g by following a
permalink or by refreshing a webapp currently viewing a specific room. The
sliding window API alone is insufficient for this use case because there's
no way to say "please track this room explicitly".
extensions: TODO
"""
class CommonRoomParameters(RequestBodyModel):
"""
Common parameters shared between the sliding window and room subscription APIs.
Attributes:
required_state: Required state for each room returned. An array of event
type and state key tuples. Elements in this array are ORd together to
produce the final set of state events to return. One unique exception is
when you request all state events via `["*", "*"]`. When used, all state
events are returned by default, and additional entries FILTER OUT the
returned set of state events. These additional entries cannot use `*`
themselves. For example, `["*", "*"], ["m.room.member",
"@alice:example.com"]` will *exclude* every `m.room.member` event
*except* for `@alice:example.com`, and include every other state event.
In addition, `["*", "*"], ["m.space.child", "*"]` is an error, the
`m.space.child` filter is not required as it would have been returned
anyway.
timeline_limit: The maximum number of timeline events to return per response.
(Max 1000 messages)
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
"""
class IncludeOldRooms(RequestBodyModel):
timeline_limit: StrictInt
required_state: List[Tuple[StrictStr, StrictStr]]
required_state: List[Tuple[StrictStr, StrictStr]]
timeline_limit: conint(le=1000, strict=True)
include_old_rooms: Optional[IncludeOldRooms]
class SlidingSyncList(CommonRoomParameters):
"""
Attributes:
ranges: Sliding window ranges. If this field is missing, no sliding window
is used and all rooms are returned in this list. Integers are
*inclusive*.
sort: How the list should be sorted on the server. The first value is
applied first, then tiebreaks are performed with each subsequent sort
listed.
FIXME: Furthermore, it's not currently defined how servers should behave
if they encounter a filter or sort operation they do not recognise. If
the server rejects the request with an HTTP 400 then that will break
backwards compatibility with new clients vs old servers. However, the
client would be otherwise unaware that only some of the sort/filter
operations have taken effect. We may need to include a "warnings"
section to indicate which sort/filter operations are unrecognised,
allowing for some form of graceful degradation of service.
-- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
sliding windows). When true, the `ranges` and `sort` fields are ignored.
required_state: Required state for each room returned. An array of event
type and state key tuples. Elements in this array are ORd together to
produce the final set of state events to return.
One unique exception is when you request all state events via `["*",
"*"]`. When used, all state events are returned by default, and
additional entries FILTER OUT the returned set of state events. These
additional entries cannot use `*` themselves. For example, `["*", "*"],
["m.room.member", "@alice:example.com"]` will *exclude* every
`m.room.member` event *except* for `@alice:example.com`, and include
every other state event. In addition, `["*", "*"], ["m.space.child",
"*"]` is an error, the `m.space.child` filter is not required as it
would have been returned anyway.
Room members can be lazily-loaded by using the special `$LAZY` state key
(`["m.room.member", "$LAZY"]`). Typically, when you view a room, you
want to retrieve all state events except for m.room.member events which
you want to lazily load. To get this behaviour, clients can send the
following::
{
"required_state": [
// activate lazy loading
["m.room.member", "$LAZY"],
// request all state events _except_ for m.room.member
events which are lazily loaded
["*", "*"]
]
}
timeline_limit: The maximum number of timeline events to return per response.
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
include_heroes: Return a stripped variant of membership events (containing
`user_id` and optionally `avatar_url` and `displayname`) for the users used
to calculate the room name.
filters: Filters to apply to the list before sorting.
bump_event_types: Allowlist of event types which should be considered recent activity
when sorting `by_recency`. By omitting event types from this field,
clients can ensure that uninteresting events (e.g. a profile rename) do
not cause a room to jump to the top of its list(s). Empty or omitted
`bump_event_types` have no effectall events in a room will be
considered recent activity.
"""
class Filters(RequestBodyModel):
is_dm: Optional[StrictBool]
spaces: Optional[List[StrictStr]]
is_encrypted: Optional[StrictBool]
is_invite: Optional[StrictBool]
room_types: Optional[List[Union[StrictStr, None]]]
not_room_types: Optional[List[StrictStr]]
room_name_like: Optional[StrictStr]
tags: Optional[List[StrictStr]]
not_tags: Optional[List[StrictStr]]
ranges: Optional[List[Tuple[StrictInt, StrictInt]]]
sort: Optional[List[StrictStr]]
slow_get_all_rooms: Optional[StrictBool] = False
include_heroes: Optional[StrictBool] = False
filters: Optional[Filters]
bump_event_types: Optional[List[StrictStr]]
class RoomSubscription(CommonRoomParameters):
pass
class Extension(RequestBodyModel):
enabled: Optional[StrictBool] = False
lists: Optional[List[StrictStr]]
rooms: Optional[List[StrictStr]]
lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]]
room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]]
extensions: Optional[Dict[StrictStr, Extension]]
@validator("lists")
def lists_length_check(cls, v):
assert len(v) <= 100, f"Max lists: 100 but saw {len(v)}"
return v

View file

@ -25,35 +25,18 @@ from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Annotated
from synapse._pydantic_compat import HAS_PYDANTIC_V2
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import (
StrictBool,
StrictInt,
StrictStr,
constr,
validator,
)
else:
from pydantic import (
StrictBool,
StrictInt,
StrictStr,
constr,
validator,
)
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.rest.client.models import SlidingSyncBody
from synapse.events.utils import (
SerializeEventConfig,
format_event_for_client_v2_without_room_id,
format_event_raw,
)
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sliding_sync import SlidingSyncConfig
from synapse.handlers.sync import (
ArchivedSyncResult,
InvitedSyncResult,
@ -709,164 +692,13 @@ class SlidingSyncE2eeRestServlet(RestServlet):
return 200, response
class SlidingSyncBody(RequestBodyModel):
"""
Attributes:
lists: Sliding window API. A map of list key to list information
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
arbitrary strings which the client is using to refer to the list. Keep this
small as it needs to be sent a lot. Max length: 64 bytes.
room_subscriptions: Room subscription API. A map of room ID to room subscription
information. Used to subscribe to a specific room. Sometimes clients know
exactly which room they want to get information about e.g by following a
permalink or by refreshing a webapp currently viewing a specific room. The
sliding window API alone is insufficient for this use case because there's
no way to say "please track this room explicitly".
extensions: TODO
"""
class CommonRoomParameters(RequestBodyModel):
"""
Common parameters shared between the sliding window and room subscription APIs.
Attributes:
required_state: Required state for each room returned. An array of event
type and state key tuples. Elements in this array are ORd together to
produce the final set of state events to return. One unique exception is
when you request all state events via `["*", "*"]`. When used, all state
events are returned by default, and additional entries FILTER OUT the
returned set of state events. These additional entries cannot use `*`
themselves. For example, `["*", "*"], ["m.room.member",
"@alice:example.com"]` will *exclude* every `m.room.member` event
*except* for `@alice:example.com`, and include every other state event.
In addition, `["*", "*"], ["m.space.child", "*"]` is an error, the
`m.space.child` filter is not required as it would have been returned
anyway.
timeline_limit: The maximum number of timeline events to return per response.
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
"""
class IncludeOldRooms(RequestBodyModel):
timeline_limit: StrictInt
required_state: List[Tuple[StrictStr, StrictStr]]
required_state: List[Tuple[StrictStr, StrictStr]]
timeline_limit: StrictInt
include_old_rooms: Optional[IncludeOldRooms]
class SlidingSyncList(CommonRoomParameters):
"""
Attributes:
ranges: Sliding window ranges. If this field is missing, no sliding window
is used and all rooms are returned in this list. Integers are
*inclusive*.
sort: How the list should be sorted on the server. The first value is
applied first, then tiebreaks are performed with each subsequent sort
listed.
FIXME: Furthermore, it's not currently defined how servers should behave
if they encounter a filter or sort operation they do not recognise. If
the server rejects the request with an HTTP 400 then that will break
backwards compatibility with new clients vs old servers. However, the
client would be otherwise unaware that only some of the sort/filter
operations have taken effect. We may need to include a "warnings"
section to indicate which sort/filter operations are unrecognised,
allowing for some form of graceful degradation of service.
-- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
sliding windows). When true, the `ranges` and `sort` fields are ignored.
required_state: Required state for each room returned. An array of event
type and state key tuples. Elements in this array are ORd together to
produce the final set of state events to return.
One unique exception is when you request all state events via `["*",
"*"]`. When used, all state events are returned by default, and
additional entries FILTER OUT the returned set of state events. These
additional entries cannot use `*` themselves. For example, `["*", "*"],
["m.room.member", "@alice:example.com"]` will *exclude* every
`m.room.member` event *except* for `@alice:example.com`, and include
every other state event. In addition, `["*", "*"], ["m.space.child",
"*"]` is an error, the `m.space.child` filter is not required as it
would have been returned anyway.
Room members can be lazily-loaded by using the special `$LAZY` state key
(`["m.room.member", "$LAZY"]`). Typically, when you view a room, you
want to retrieve all state events except for m.room.member events which
you want to lazily load. To get this behaviour, clients can send the
following::
{
"required_state": [
// activate lazy loading
["m.room.member", "$LAZY"],
// request all state events _except_ for m.room.member
events which are lazily loaded
["*", "*"]
]
}
timeline_limit: The maximum number of timeline events to return per response.
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
include_heroes: Return a stripped variant of membership events (containing
`user_id` and optionally `avatar_url` and `displayname`) for the users used
to calculate the room name.
filters: Filters to apply to the list before sorting.
bump_event_types: Allowlist of event types which should be considered recent activity
when sorting `by_recency`. By omitting event types from this field,
clients can ensure that uninteresting events (e.g. a profile rename) do
not cause a room to jump to the top of its list(s). Empty or omitted
`bump_event_types` have no effectall events in a room will be
considered recent activity.
"""
class Filters(RequestBodyModel):
is_dm: Optional[StrictBool]
spaces: Optional[List[StrictStr]]
is_encrypted: Optional[StrictBool]
is_invite: Optional[StrictBool]
room_types: Optional[List[Union[StrictStr, None]]]
not_room_types: Optional[List[StrictStr]]
room_name_like: Optional[StrictStr]
tags: Optional[List[StrictStr]]
not_tags: Optional[List[StrictStr]]
ranges: Optional[List[Tuple[StrictInt, StrictInt]]]
sort: Optional[List[StrictStr]]
slow_get_all_rooms: Optional[StrictBool] = False
include_heroes: Optional[StrictBool] = False
filters: Optional[Filters]
bump_event_types: Optional[List[StrictStr]]
class RoomSubscription(CommonRoomParameters):
pass
class Extension(RequestBodyModel):
enabled: Optional[StrictBool] = False
lists: Optional[List[StrictStr]]
rooms: Optional[List[StrictStr]]
lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]]
room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]]
extensions: Optional[Dict[StrictStr, Extension]]
@validator("lists")
def lists_length_check(cls, v):
assert len(v) <= 100, f"Max lists: 100 but saw {len(v)}"
return v
class SlidingSyncRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync`. TODO
GET parameters::
timeout(int): How long to wait for new events in milliseconds.
since(batch_token): Batch token when asking for incremental deltas.
timeout: How long to wait for new events in milliseconds.
pos: Stream position token when asking for incremental deltas.
Response JSON::
{
@ -891,18 +723,32 @@ class SlidingSyncRestServlet(RestServlet):
timeout = parse_integer(request, "timeout", default=0)
# Position in the stream
since_token = parse_string(request, "pos")
from_token_string = parse_string(request, "pos")
from_token = None
if from_token_string is not None:
from_token = await StreamToken.from_string(self.store, from_token_string)
# TODO: We currently don't know whether we're going to use sticky params or
# maybe some filters like sync v2 where they are built up once and referenced
# by filter ID. For now, we will just prototype with always passing everything
# in.
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
sliding_sync_results = await wait_for_sync_for_user()
logger.info("Sliding sync request: %r", body)
sync_config = SlidingSyncConfig(
user=user,
device_id=device_id,
# TODO: Copy SlidingSyncBody fields into SlidingSyncConfig
)
sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user(
requester,
sync_config,
from_token,
timeout,
)
return 200, {"foo": "bar"}