mirror of
https://github.com/element-hq/synapse
synced 2024-06-30 13:43:29 +00:00
Merge 0eb029472e
into adeedb7b7c
This commit is contained in:
commit
07d9cac9e5
1
changelog.d/17320.feature
Normal file
1
changelog.d/17320.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add `rooms` data to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
|
@ -836,3 +836,21 @@ def maybe_upsert_event_field(
|
|||
del container[key]
|
||||
|
||||
return upsert_okay
|
||||
|
||||
|
||||
def strip_event(event: EventBase) -> JsonDict:
|
||||
"""
|
||||
Used for "stripped state" events which provide a simplified view of the state of a
|
||||
room intended to help a potential joiner identify the room (relevant when the user
|
||||
is invited or knocked).
|
||||
|
||||
Stripped state events can only have the `sender`, `type`, `state_key` and `content`
|
||||
properties present.
|
||||
"""
|
||||
|
||||
return {
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
"content": event.content,
|
||||
"sender": event.sender,
|
||||
}
|
||||
|
|
|
@ -18,22 +18,27 @@
|
|||
#
|
||||
#
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
|
||||
|
||||
import attr
|
||||
from immutabledict import immutabledict
|
||||
|
||||
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
|
||||
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import strip_event
|
||||
from synapse.storage.roommember import RoomsForUser
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
PersistedEventPosition,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
StreamKeyType,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
@ -82,6 +87,25 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
|
|||
return membership != Membership.LEAVE or sender != user_id
|
||||
|
||||
|
||||
# We can't freeze this class because we want to update it in place with the
|
||||
# de-duplicated data.
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class RoomSyncConfig:
|
||||
"""
|
||||
Holds the config for what data we should fetch for a room in the sync response.
|
||||
|
||||
Attributes:
|
||||
timeline_limit: The maximum number of events to return in the timeline.
|
||||
required_state: The set of state events requested for the room. The
|
||||
values are close to `StateKey` but actually use a syntax where you can
|
||||
provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
|
||||
of the tuple (type, state_key).
|
||||
"""
|
||||
|
||||
timeline_limit: int
|
||||
required_state: Set[Tuple[str, str]]
|
||||
|
||||
|
||||
class SlidingSyncHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
|
@ -90,6 +114,7 @@ class SlidingSyncHandler:
|
|||
self.auth_blocking = hs.get_auth_blocking()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.relations_handler = hs.get_relations_handler()
|
||||
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||
|
||||
async def wait_for_sync_for_user(
|
||||
|
@ -201,6 +226,7 @@ class SlidingSyncHandler:
|
|||
|
||||
# Assemble sliding window lists
|
||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
||||
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
||||
if sync_config.lists:
|
||||
# Get all of the room IDs that the user should be able to see in the sync
|
||||
# response
|
||||
|
@ -225,29 +251,66 @@ class SlidingSyncHandler:
|
|||
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
||||
if list_config.ranges:
|
||||
for range in list_config.ranges:
|
||||
sliced_room_ids = [
|
||||
room_id
|
||||
for room_id, _ in sorted_room_info[range[0] : range[1]]
|
||||
]
|
||||
|
||||
ops.append(
|
||||
SlidingSyncResult.SlidingWindowList.Operation(
|
||||
op=OperationType.SYNC,
|
||||
range=range,
|
||||
room_ids=[
|
||||
room_id
|
||||
for room_id, _ in sorted_room_info[
|
||||
range[0] : range[1]
|
||||
]
|
||||
],
|
||||
room_ids=sliced_room_ids,
|
||||
)
|
||||
)
|
||||
|
||||
# Take the superset of the `RoomSyncConfig` for each room
|
||||
for room_id in sliced_room_ids:
|
||||
if relevant_room_map.get(room_id) is not None:
|
||||
# Take the highest timeline limit
|
||||
if (
|
||||
relevant_room_map[room_id].timeline_limit
|
||||
< list_config.timeline_limit
|
||||
):
|
||||
relevant_room_map[room_id].timeline_limit = (
|
||||
list_config.timeline_limit
|
||||
)
|
||||
|
||||
# Union the required state
|
||||
relevant_room_map[room_id].required_state.update(
|
||||
list_config.required_state
|
||||
)
|
||||
else:
|
||||
relevant_room_map[room_id] = RoomSyncConfig(
|
||||
timeline_limit=list_config.timeline_limit,
|
||||
required_state=set(list_config.required_state),
|
||||
)
|
||||
|
||||
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
||||
count=len(sorted_room_info),
|
||||
ops=ops,
|
||||
)
|
||||
|
||||
# TODO: if (sync_config.room_subscriptions):
|
||||
|
||||
# Fetch room data
|
||||
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
||||
for room_id, room_sync_config in relevant_room_map.items():
|
||||
room_sync_result = await self.get_room_sync_data(
|
||||
user=sync_config.user,
|
||||
room_id=room_id,
|
||||
room_sync_config=room_sync_config,
|
||||
rooms_for_user_membership_at_to_token=sync_room_map[room_id],
|
||||
from_token=from_token,
|
||||
to_token=to_token,
|
||||
)
|
||||
|
||||
rooms[room_id] = room_sync_result
|
||||
|
||||
return SlidingSyncResult(
|
||||
next_pos=to_token,
|
||||
lists=lists,
|
||||
# TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
|
||||
rooms={},
|
||||
rooms=rooms,
|
||||
extensions={},
|
||||
)
|
||||
|
||||
|
@ -665,3 +728,217 @@ class SlidingSyncHandler:
|
|||
# We want descending order
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
async def get_room_sync_data(
|
||||
self,
|
||||
user: UserID,
|
||||
room_id: str,
|
||||
room_sync_config: RoomSyncConfig,
|
||||
rooms_for_user_membership_at_to_token: RoomsForUser,
|
||||
from_token: Optional[StreamToken],
|
||||
to_token: StreamToken,
|
||||
) -> SlidingSyncResult.RoomResult:
|
||||
"""
|
||||
Fetch room data for the sync response.
|
||||
|
||||
We fetch data according to the token range (> `from_token` and <= `to_token`).
|
||||
|
||||
Args:
|
||||
user: User to fetch data for
|
||||
room_id: The room ID to fetch data for
|
||||
room_sync_config: Config for what data we should fetch for a room in the
|
||||
sync response.
|
||||
rooms_for_user_membership_at_to_token: Membership information for the user
|
||||
in the room at the time of `to_token`.
|
||||
from_token: The point in the stream to sync from.
|
||||
to_token: The point in the stream to sync up to.
|
||||
"""
|
||||
|
||||
# Assemble the list of timeline events
|
||||
timeline_events: List[EventBase] = []
|
||||
limited = False
|
||||
# We want to start off using the `to_token` (vs `from_token`) because we look
|
||||
# backwards from the `to_token` up to the `timeline_limit` and we might not
|
||||
# reach the `from_token` before we hit the limit. We will update the room stream
|
||||
# position once we've fetched the events to point to the earliest event fetched.
|
||||
prev_batch_token = to_token
|
||||
if room_sync_config.timeline_limit > 0:
|
||||
newly_joined = False
|
||||
if (
|
||||
# We can only determine new-ness if we have a `from_token` to define our range
|
||||
from_token is not None
|
||||
and rooms_for_user_membership_at_to_token.membership == Membership.JOIN
|
||||
):
|
||||
newly_joined = (
|
||||
rooms_for_user_membership_at_to_token.event_pos.persisted_after(
|
||||
from_token.room_key
|
||||
)
|
||||
)
|
||||
|
||||
# We're going to paginate backwards from the `to_token`
|
||||
from_bound = to_token.room_key
|
||||
# People shouldn't see past their leave/ban event
|
||||
if rooms_for_user_membership_at_to_token.membership in (
|
||||
Membership.LEAVE,
|
||||
Membership.BAN,
|
||||
):
|
||||
from_bound = (
|
||||
rooms_for_user_membership_at_to_token.event_pos.to_room_stream_token()
|
||||
)
|
||||
|
||||
# Determine whether we should limit the timeline to the token range.
|
||||
#
|
||||
# We should return historical messages (before token range) in the
|
||||
# following cases because we want clients to be able to show a basic
|
||||
# screen of information:
|
||||
# - Initial sync (because no `from_token` to limit us anyway)
|
||||
# - When users `newly_joined`
|
||||
# - TODO: For an incremental sync where we haven't sent it down this
|
||||
# connection before
|
||||
to_bound = (
|
||||
from_token.room_key
|
||||
if from_token is not None and not newly_joined
|
||||
else None
|
||||
)
|
||||
|
||||
timeline_events, new_room_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_bound,
|
||||
to_key=to_bound,
|
||||
direction=Direction.BACKWARDS,
|
||||
# We add one so we can determine if there are enough events to saturate
|
||||
# the limit or not (see `limited`)
|
||||
limit=room_sync_config.timeline_limit + 1,
|
||||
event_filter=None,
|
||||
)
|
||||
|
||||
# We want to return the events in ascending order (the last event is the
|
||||
# most recent).
|
||||
timeline_events.reverse()
|
||||
|
||||
# Determine our `limited` status based on the timeline. We do this before
|
||||
# filtering the events so we can accurately determine if there is more to
|
||||
# paginate even if we filter out some/all events.
|
||||
if len(timeline_events) > room_sync_config.timeline_limit:
|
||||
limited = True
|
||||
# Get rid of that extra "+ 1" event because we only used it to determine
|
||||
# if we hit the limit or not
|
||||
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
|
||||
assert timeline_events[0].internal_metadata.stream_ordering
|
||||
new_room_key = RoomStreamToken(
|
||||
stream=timeline_events[0].internal_metadata.stream_ordering - 1
|
||||
)
|
||||
|
||||
# TODO: Does `newly_joined` affect `limited`? It does in sync v2 but I fail
|
||||
# to understand why.
|
||||
|
||||
# Make sure we don't expose any events that the client shouldn't see
|
||||
timeline_events = await filter_events_for_client(
|
||||
self.storage_controllers,
|
||||
user.to_string(),
|
||||
timeline_events,
|
||||
is_peeking=rooms_for_user_membership_at_to_token.membership
|
||||
!= Membership.JOIN,
|
||||
filter_send_to_client=True,
|
||||
)
|
||||
|
||||
# Determine how many "live" events we have (events within the given token range).
|
||||
#
|
||||
# This is mostly useful to determine whether a given @mention event should
|
||||
# make a noise or not. Clients cannot rely solely on the absence of
|
||||
# `initial: true` to determine live events because if a room not in the
|
||||
# sliding window bumps into the window because of an @mention it will have
|
||||
# `initial: true` yet contain a single live event (with potentially other
|
||||
# old events in the timeline)
|
||||
num_live = 0
|
||||
if from_token is not None:
|
||||
for timeline_event in reversed(timeline_events):
|
||||
# This fields should be present for all persisted events
|
||||
assert timeline_event.internal_metadata.stream_ordering is not None
|
||||
assert timeline_event.internal_metadata.instance_name is not None
|
||||
|
||||
persisted_position = PersistedEventPosition(
|
||||
instance_name=timeline_event.internal_metadata.instance_name,
|
||||
stream=timeline_event.internal_metadata.stream_ordering,
|
||||
)
|
||||
if persisted_position.persisted_after(from_token.room_key):
|
||||
num_live += 1
|
||||
else:
|
||||
# Since we're iterating over the timeline events in
|
||||
# reverse-chronological order, we can break once we hit an event
|
||||
# that's not live. In the future, we could potentially optimize
|
||||
# this more with a binary search (bisect).
|
||||
break
|
||||
|
||||
# Update the `prev_batch_token` to point to the position that allows us to
|
||||
# keep paginating backwards from the oldest event we return in the timeline.
|
||||
prev_batch_token = prev_batch_token.copy_and_replace(
|
||||
StreamKeyType.ROOM, new_room_key
|
||||
)
|
||||
|
||||
# Figure out any stripped state events for invite/knocks. This allows the
|
||||
# potential joiner to identify the room.
|
||||
stripped_state: List[JsonDict] = []
|
||||
if rooms_for_user_membership_at_to_token.membership in (
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
):
|
||||
invite_or_knock_event = await self.store.get_event(
|
||||
rooms_for_user_membership_at_to_token.event_id
|
||||
)
|
||||
|
||||
stripped_state = []
|
||||
if invite_or_knock_event.membership == Membership.INVITE:
|
||||
stripped_state.extend(
|
||||
invite_or_knock_event.unsigned.get("invite_room_state", [])
|
||||
)
|
||||
elif invite_or_knock_event.membership == Membership.KNOCK:
|
||||
stripped_state.extend(
|
||||
invite_or_knock_event.unsigned.get("knock_room_state", [])
|
||||
)
|
||||
|
||||
stripped_state.append(strip_event(invite_or_knock_event))
|
||||
|
||||
# TODO: Handle timeline gaps (`get_timeline_gaps()`)
|
||||
|
||||
# If the timeline is `limited=True`, the client does not have all events
|
||||
# necessary to calculate aggregations themselves.
|
||||
bundled_aggregations = None
|
||||
if limited:
|
||||
bundled_aggregations = (
|
||||
await self.relations_handler.get_bundled_aggregations(
|
||||
timeline_events, user.to_string()
|
||||
)
|
||||
)
|
||||
|
||||
return SlidingSyncResult.RoomResult(
|
||||
# TODO: Dummy value
|
||||
name=None,
|
||||
# TODO: Dummy value
|
||||
avatar=None,
|
||||
# TODO: Dummy value
|
||||
heroes=None,
|
||||
# TODO: Since we can't determine whether we've already sent a room down this
|
||||
# Sliding Sync connection before (we plan to add this optimization in the
|
||||
# future), we're always returning the requested room state instead of
|
||||
# updates.
|
||||
initial=True,
|
||||
# TODO: Dummy value
|
||||
required_state=[],
|
||||
timeline_events=timeline_events,
|
||||
bundled_aggregations=bundled_aggregations,
|
||||
# TODO: Dummy value
|
||||
is_dm=False,
|
||||
stripped_state=stripped_state,
|
||||
prev_batch=prev_batch_token,
|
||||
limited=limited,
|
||||
# TODO: Dummy values
|
||||
joined_count=0,
|
||||
invited_count=0,
|
||||
# TODO: These are just dummy values. We could potentially just remove these
|
||||
# since notifications can only really be done correctly on the client anyway
|
||||
# (encrypted rooms).
|
||||
notification_count=0,
|
||||
highlight_count=0,
|
||||
num_live=num_live,
|
||||
)
|
||||
|
|
|
@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [ [0, 99] ],
|
||||
"sort": [ "by_notification_level", "by_recency", "by_name" ],
|
||||
"required_state": [
|
||||
["m.room.join_rules", ""],
|
||||
["m.room.history_visibility", ""],
|
||||
|
@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
"filters": {
|
||||
"is_dm": true
|
||||
},
|
||||
"bump_event_types": [ "m.room.message", "m.room.encrypted" ],
|
||||
}
|
||||
},
|
||||
// Room Subscriptions API
|
||||
|
@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
"!sub1:bar": {
|
||||
"required_state": [ ["*","*"] ],
|
||||
"timeline_limit": 10,
|
||||
"include_old_rooms": {
|
||||
"timeline_limit": 1,
|
||||
"required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
|
||||
}
|
||||
}
|
||||
},
|
||||
// Extensions API
|
||||
|
@ -791,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
|
||||
Response JSON::
|
||||
{
|
||||
"next_pos": "s58_224_0_13_10_1_1_16_0_1",
|
||||
"pos": "s58_224_0_13_10_1_1_16_0_1",
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"count": 1337,
|
||||
|
@ -830,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
"joined_count": 41,
|
||||
"invited_count": 1,
|
||||
"notification_count": 1,
|
||||
"highlight_count": 0
|
||||
"highlight_count": 0,
|
||||
"num_live": 2"
|
||||
},
|
||||
// rooms from list
|
||||
"!foo:bar": {
|
||||
|
@ -855,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
"joined_count": 4,
|
||||
"invited_count": 0,
|
||||
"notification_count": 54,
|
||||
"highlight_count": 3
|
||||
"highlight_count": 3,
|
||||
"num_live": 1,
|
||||
},
|
||||
// ... 99 more items
|
||||
},
|
||||
|
@ -871,10 +867,11 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
super().__init__()
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.filtering = hs.get_filtering()
|
||||
self.sliding_sync_handler = hs.get_sliding_sync_handler()
|
||||
self.event_serializer = hs.get_event_client_serializer()
|
||||
|
||||
# TODO: Update this to `on_GET` once we figure out how we want to handle params
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user = requester.user
|
||||
|
@ -920,22 +917,25 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
logger.info("Client has disconnected; not serializing response.")
|
||||
return 200, {}
|
||||
|
||||
response_content = await self.encode_response(sliding_sync_results)
|
||||
response_content = await self.encode_response(requester, sliding_sync_results)
|
||||
|
||||
return 200, response_content
|
||||
|
||||
# TODO: Is there a better way to encode things?
|
||||
async def encode_response(
|
||||
self,
|
||||
requester: Requester,
|
||||
sliding_sync_result: SlidingSyncResult,
|
||||
) -> JsonDict:
|
||||
response: JsonDict = defaultdict(dict)
|
||||
|
||||
response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
|
||||
response["pos"] = await sliding_sync_result.next_pos.to_string(self.store)
|
||||
serialized_lists = self.encode_lists(sliding_sync_result.lists)
|
||||
if serialized_lists:
|
||||
response["lists"] = serialized_lists
|
||||
response["rooms"] = {} # TODO: sliding_sync_result.rooms
|
||||
response["rooms"] = await self.encode_rooms(
|
||||
requester, sliding_sync_result.rooms
|
||||
)
|
||||
response["extensions"] = {} # TODO: sliding_sync_result.extensions
|
||||
|
||||
return response
|
||||
|
@ -961,6 +961,75 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
|
||||
return serialized_lists
|
||||
|
||||
async def encode_rooms(
|
||||
self,
|
||||
requester: Requester,
|
||||
rooms: Dict[str, SlidingSyncResult.RoomResult],
|
||||
) -> JsonDict:
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
serialize_options = SerializeEventConfig(
|
||||
event_format=format_event_for_client_v2_without_room_id,
|
||||
requester=requester,
|
||||
)
|
||||
|
||||
serialized_rooms = {}
|
||||
for room_id, room_result in rooms.items():
|
||||
serialized_timeline = await self.event_serializer.serialize_events(
|
||||
room_result.timeline_events,
|
||||
time_now,
|
||||
config=serialize_options,
|
||||
bundle_aggregations=room_result.bundled_aggregations,
|
||||
)
|
||||
|
||||
serialized_required_state = await self.event_serializer.serialize_events(
|
||||
room_result.required_state,
|
||||
time_now,
|
||||
config=serialize_options,
|
||||
)
|
||||
|
||||
serialized_rooms[room_id] = {
|
||||
"required_state": serialized_required_state,
|
||||
"timeline": serialized_timeline,
|
||||
"prev_batch": await room_result.prev_batch.to_string(self.store),
|
||||
"limited": room_result.limited,
|
||||
"joined_count": room_result.joined_count,
|
||||
"invited_count": room_result.invited_count,
|
||||
"notification_count": room_result.notification_count,
|
||||
"highlight_count": room_result.highlight_count,
|
||||
"num_live": room_result.num_live,
|
||||
}
|
||||
|
||||
if room_result.name:
|
||||
serialized_rooms[room_id]["name"] = room_result.name
|
||||
|
||||
if room_result.avatar:
|
||||
serialized_rooms[room_id]["avatar"] = room_result.avatar
|
||||
|
||||
if room_result.heroes:
|
||||
serialized_rooms[room_id]["heroes"] = room_result.heroes
|
||||
|
||||
# We should only include the `initial` key if it's `True` to save bandwidth.
|
||||
# The absense of this flag means `False`.
|
||||
if room_result.initial:
|
||||
serialized_rooms[room_id]["initial"] = room_result.initial
|
||||
|
||||
# Field should be absent on non-DM rooms
|
||||
if room_result.is_dm:
|
||||
serialized_rooms[room_id]["is_dm"] = room_result.is_dm
|
||||
|
||||
# Stripped state only applies to invite/knock rooms
|
||||
if room_result.stripped_state:
|
||||
# TODO: `knocked_state` but that isn't specced yet.
|
||||
#
|
||||
# TODO: Instead of adding `knocked_state`, it would be good to rename
|
||||
# this to `stripped_state` so it can be shared between invite and knock
|
||||
# rooms, see
|
||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919
|
||||
serialized_rooms[room_id]["invite_state"] = room_result.stripped_state
|
||||
|
||||
return serialized_rooms
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
SyncRestServlet(hs).register(http_server)
|
||||
|
|
|
@ -55,7 +55,7 @@ from synapse.api.room_versions import (
|
|||
)
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.events.utils import prune_event, strip_event
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
current_context,
|
||||
|
@ -1025,15 +1025,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
state_to_include = await self.get_events(selected_state_ids.values())
|
||||
|
||||
return [
|
||||
{
|
||||
"type": e.type,
|
||||
"state_key": e.state_key,
|
||||
"content": e.content,
|
||||
"sender": e.sender,
|
||||
}
|
||||
for e in state_to_include.values()
|
||||
]
|
||||
return [strip_event(e) for e in state_to_include.values()]
|
||||
|
||||
def _maybe_start_fetch_thread(self) -> None:
|
||||
"""Starts an event fetch thread if we are not yet at the maximum number."""
|
||||
|
|
|
@ -1551,6 +1551,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
) -> Tuple[List[EventBase], RoomStreamToken]:
|
||||
"""Returns list of events before or after a given token.
|
||||
|
||||
When Direction.FORWARDS: from_key < x <= to_key
|
||||
When Direction.BACKWARDS: from_key >= x > to_key
|
||||
|
||||
Args:
|
||||
room_id
|
||||
from_key: The token used to stream from
|
||||
|
@ -1567,6 +1570,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
and `to_key`).
|
||||
"""
|
||||
|
||||
# We can bail early if we're looking forwards, and our `to_key` is already
|
||||
# before our `from_key`.
|
||||
if (
|
||||
direction == Direction.FORWARDS
|
||||
and to_key is not None
|
||||
and to_key.is_before_or_eq(from_key)
|
||||
):
|
||||
return [], from_key
|
||||
# Or vice-versa, if we're looking backwards and our `from_key` is already before
|
||||
# our `to_key`.
|
||||
elif (
|
||||
direction == Direction.BACKWARDS
|
||||
and to_key is not None
|
||||
and from_key.is_before_or_eq(to_key)
|
||||
):
|
||||
return [], from_key
|
||||
|
||||
rows, token = await self.db_pool.runInteraction(
|
||||
"paginate_room_events",
|
||||
self._paginate_room_events_txn,
|
||||
|
|
|
@ -1078,6 +1078,9 @@ class PersistedPosition:
|
|||
stream: int
|
||||
|
||||
def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool:
|
||||
"""
|
||||
Checks whether this position happened after the token
|
||||
"""
|
||||
return token.get_stream_pos_for_instance(self.instance_name) < self.stream
|
||||
|
||||
|
||||
|
|
|
@ -31,7 +31,8 @@ else:
|
|||
from pydantic import Extra
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import JsonMapping, StreamToken, UserID
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
|
||||
|
||||
|
@ -159,11 +160,16 @@ class SlidingSyncResult:
|
|||
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||
absence of this flag means 'false'.
|
||||
required_state: The current state of the room
|
||||
timeline: Latest events in the room. The last event is the most recent
|
||||
timeline: Latest events in the room. The last event is the most recent.
|
||||
bundled_aggregations: A mapping of event ID to the bundled aggregations for
|
||||
the timeline events above. This allows clients to show accurate reaction
|
||||
counts (or edits, threads), even if some of the reaction events were skipped
|
||||
over in a gappy sync.
|
||||
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
||||
between two people).
|
||||
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
|
||||
in sync v2, absent on joined/left rooms
|
||||
stripped_state: Stripped state events (for rooms where the usre is
|
||||
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
|
||||
absent on joined/left rooms
|
||||
prev_batch: A token that can be passed as a start parameter to the
|
||||
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
||||
limited: True if their are more events than fit between the given position and now.
|
||||
|
@ -185,14 +191,15 @@ class SlidingSyncResult:
|
|||
(with potentially other old events in the timeline).
|
||||
"""
|
||||
|
||||
name: str
|
||||
name: Optional[str]
|
||||
avatar: Optional[str]
|
||||
heroes: Optional[List[EventBase]]
|
||||
initial: bool
|
||||
required_state: List[EventBase]
|
||||
timeline: List[EventBase]
|
||||
timeline_events: List[EventBase]
|
||||
bundled_aggregations: Optional[Dict[str, BundledAggregations]]
|
||||
is_dm: bool
|
||||
invite_state: List[EventBase]
|
||||
stripped_state: Optional[List[JsonDict]]
|
||||
prev_batch: StreamToken
|
||||
limited: bool
|
||||
joined_count: int
|
||||
|
|
|
@ -152,22 +152,14 @@ class SlidingSyncBody(RequestBodyModel):
|
|||
anyway.
|
||||
timeline_limit: The maximum number of timeline events to return per response.
|
||||
(Max 1000 messages)
|
||||
include_old_rooms: Determines if `predecessor` rooms are included in the
|
||||
`rooms` response. The user MUST be joined to old rooms for them to show up
|
||||
in the response.
|
||||
"""
|
||||
|
||||
class IncludeOldRooms(RequestBodyModel):
|
||||
timeline_limit: StrictInt
|
||||
required_state: List[Tuple[StrictStr, StrictStr]]
|
||||
|
||||
required_state: List[Tuple[StrictStr, StrictStr]]
|
||||
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
|
||||
if TYPE_CHECKING:
|
||||
timeline_limit: int
|
||||
else:
|
||||
timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type]
|
||||
include_old_rooms: Optional[IncludeOldRooms] = None
|
||||
|
||||
class SlidingSyncList(CommonRoomParameters):
|
||||
"""
|
||||
|
@ -208,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
|
|||
}
|
||||
|
||||
timeline_limit: The maximum number of timeline events to return per response.
|
||||
include_old_rooms: Determines if `predecessor` rooms are included in the
|
||||
`rooms` response. The user MUST be joined to old rooms for them to show up
|
||||
in the response.
|
||||
include_heroes: Return a stripped variant of membership events (containing
|
||||
`user_id` and optionally `avatar_url` and `displayname`) for the users used
|
||||
to calculate the room name.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#
|
||||
#
|
||||
import json
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from parameterized import parameterized, parameterized_class
|
||||
|
@ -30,12 +31,13 @@ from synapse.api.constants import (
|
|||
AccountDataTypes,
|
||||
EventContentFields,
|
||||
EventTypes,
|
||||
HistoryVisibility,
|
||||
ReceiptTypes,
|
||||
RelationTypes,
|
||||
)
|
||||
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
|
||||
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
|
@ -44,6 +46,8 @@ from tests.federation.transport.test_knocking import (
|
|||
)
|
||||
from tests.server import TimedOutException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FilterTestCase(unittest.HomeserverTestCase):
|
||||
user_id = "@apple:test"
|
||||
|
@ -1284,7 +1288,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
def test_sync_list(self) -> None:
|
||||
"""
|
||||
Test that room IDs show up in the Sliding Sync lists
|
||||
Test that room IDs show up in the Sliding Sync `lists`
|
||||
"""
|
||||
alice_user_id = self.register_user("alice", "correcthorse")
|
||||
alice_access_token = self.login(alice_user_id, "correcthorse")
|
||||
|
@ -1381,15 +1385,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
|||
channel.await_result(timeout_ms=200)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We expect the `next_pos` in the result to be the same as what we requested
|
||||
# We expect the next `pos` in the result to be the same as what we requested
|
||||
# with because we weren't able to find anything new yet.
|
||||
self.assertEqual(
|
||||
channel.json_body["next_pos"], future_position_token_serialized
|
||||
)
|
||||
self.assertEqual(channel.json_body["pos"], future_position_token_serialized)
|
||||
|
||||
def test_filter_list(self) -> None:
|
||||
"""
|
||||
Test that filters apply to lists
|
||||
Test that filters apply to `lists`
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
@ -1464,7 +1466,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
def test_sort_list(self) -> None:
|
||||
"""
|
||||
Test that the lists are sorted by `stream_ordering`
|
||||
Test that the `lists` are sorted by `stream_ordering`
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
@ -1518,3 +1520,749 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
|||
],
|
||||
channel.json_body["lists"]["foo-list"],
|
||||
)
|
||||
|
||||
def test_rooms_limited_initial_sync(self) -> None:
|
||||
"""
|
||||
Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
|
||||
on initial sync.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity1", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity2", tok=user2_tok)
|
||||
event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok)
|
||||
event_pos3 = self.get_success(
|
||||
self.store.get_position_for_event(event_response3["event_id"])
|
||||
)
|
||||
event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok)
|
||||
event_pos4 = self.get_success(
|
||||
self.store.get_position_for_event(event_response4["event_id"])
|
||||
)
|
||||
event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok)
|
||||
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 3,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We expect to saturate the `timeline_limit` (there are more than 3 messages in the room)
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
True,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# Check to make sure the latest events are returned
|
||||
self.assertEqual(
|
||||
[
|
||||
event["event_id"]
|
||||
for event in channel.json_body["rooms"][room_id1]["timeline"]
|
||||
],
|
||||
[
|
||||
event_response4["event_id"],
|
||||
event_response5["event_id"],
|
||||
user1_join_response["event_id"],
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
|
||||
# Check to make sure the `prev_batch` points at the right place
|
||||
prev_batch_token = self.get_success(
|
||||
StreamToken.from_string(
|
||||
self.store, channel.json_body["rooms"][room_id1]["prev_batch"]
|
||||
)
|
||||
)
|
||||
prev_batch_room_stream_token_serialized = self.get_success(
|
||||
prev_batch_token.room_key.to_string(self.store)
|
||||
)
|
||||
# If we use the `prev_batch` token to look backwards, we should see `event3`
|
||||
# next so make sure the token encompasses it
|
||||
self.assertEqual(
|
||||
event_pos3.persisted_after(prev_batch_token.room_key),
|
||||
False,
|
||||
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}",
|
||||
)
|
||||
# If we use the `prev_batch` token to look backwards, we shouldn't see `event4`
|
||||
# anymore since it was just returned in this response.
|
||||
self.assertEqual(
|
||||
event_pos4.persisted_after(prev_batch_token.room_key),
|
||||
True,
|
||||
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}",
|
||||
)
|
||||
|
||||
# With no `from_token` (initial sync), it's all historical since there is no
|
||||
# "live" range
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
0,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_not_limited_initial_sync(self) -> None:
|
||||
"""
|
||||
Test that we mark `rooms` as `limited=False` when there are no more events to
|
||||
paginate to.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity1", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity2", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity3", tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
timeline_limit = 100
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": timeline_limit,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# The timeline should be `limited=False` because we have all of the events (no
|
||||
# more to paginate to)
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
False,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
expected_number_of_events = 9
|
||||
# We're just looking to make sure we got all of the events before hitting the `timeline_limit`
|
||||
self.assertEqual(
|
||||
len(channel.json_body["rooms"][room_id1]["timeline"]),
|
||||
expected_number_of_events,
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
self.assertLessEqual(expected_number_of_events, timeline_limit)
|
||||
|
||||
# With no `from_token` (initial sync), it's all historical since there is no
|
||||
# "live" token range.
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
0,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_incremental_sync(self) -> None:
|
||||
"""
|
||||
Test `rooms` data during an incremental sync after an initial sync.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok)
|
||||
|
||||
# Make an initial Sliding Sync request to grab a token. This is also a sanity
|
||||
# check that we can go from initial to incremental sync.
|
||||
sync_params = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
sync_params,
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
next_pos = channel.json_body["pos"]
|
||||
|
||||
# Send some events but don't send enough to saturate the `timeline_limit`.
|
||||
# We want to later test that we only get the new events since the `next_pos`
|
||||
event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok)
|
||||
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request (what we're trying to test)
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint + f"?pos={next_pos}",
|
||||
sync_params,
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We only expect to see the new events since the last sync which isn't enough to
|
||||
# fill up the `timeline_limit`.
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
False,
|
||||
f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} '
|
||||
+ f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
|
||||
+ str(channel.json_body["rooms"][room_id1]),
|
||||
)
|
||||
# Check to make sure the latest events are returned
|
||||
self.assertEqual(
|
||||
[
|
||||
event["event_id"]
|
||||
for event in channel.json_body["rooms"][room_id1]["timeline"]
|
||||
],
|
||||
[
|
||||
event_response2["event_id"],
|
||||
event_response3["event_id"],
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
|
||||
# All events are "live"
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
2,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_newly_joined_incremental_sync(self) -> None:
|
||||
"""
|
||||
Test that when we make an incremental sync with a `newly_joined` `rooms`, we are
|
||||
able to see some historical events before the `from_token`.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before token1", tok=user2_tok)
|
||||
event_response2 = self.helper.send(
|
||||
room_id1, "activity before token2", tok=user2_tok
|
||||
)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
|
||||
# Join the room after the `from_token` which will make us consider this room as
|
||||
# `newly_joined`.
|
||||
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
# Send some events but don't send enough to saturate the `timeline_limit`.
|
||||
# We want to later test that we only get the new events since the `next_pos`
|
||||
event_response3 = self.helper.send(
|
||||
room_id1, "activity after token3", tok=user2_tok
|
||||
)
|
||||
event_response4 = self.helper.send(
|
||||
room_id1, "activity after token4", tok=user2_tok
|
||||
)
|
||||
|
||||
# The `timeline_limit` is set to 4 so we can at least see one historical event
|
||||
# before the `from_token`. We should see historical events because this is a
|
||||
# `newly_joined` room.
|
||||
timeline_limit = 4
|
||||
# Make an incremental Sliding Sync request (what we're trying to test)
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(
|
||||
from_token.to_string(self.store)
|
||||
)}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": timeline_limit,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We should see the new events and the rest should be filled with historical
|
||||
# events which will make us `limited=True` since there are more to paginate to.
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
True,
|
||||
f"Our `timeline_limit` was {timeline_limit} "
|
||||
+ f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
|
||||
+ str(channel.json_body["rooms"][room_id1]),
|
||||
)
|
||||
# Check to make sure that the "live" and historical events are returned
|
||||
self.assertEqual(
|
||||
[
|
||||
event["event_id"]
|
||||
for event in channel.json_body["rooms"][room_id1]["timeline"]
|
||||
],
|
||||
[
|
||||
event_response2["event_id"],
|
||||
user1_join_response["event_id"],
|
||||
event_response3["event_id"],
|
||||
event_response4["event_id"],
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
|
||||
# Only events after the `from_token` are "live" (join, event3, event4)
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
3,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_invite_shared_history_initial_sync(self) -> None:
|
||||
"""
|
||||
Test that `rooms` we are invited to have some stripped `invite_state` and that
|
||||
we can't see any timeline events because the history visiblity is `shared` and
|
||||
we haven't joined the room yet.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user1 = UserID.from_string(user1_id)
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
user2 = UserID.from_string(user2_id)
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
# Ensure we're testing with a room with `shared` history visibility which means
|
||||
# history visible until you actually join the room.
|
||||
history_visibility_response = self.helper.get_state(
|
||||
room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
|
||||
)
|
||||
self.assertEqual(
|
||||
history_visibility_response.get("history_visibility"),
|
||||
HistoryVisibility.SHARED,
|
||||
)
|
||||
|
||||
self.helper.send(room_id1, "activity before1", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before2", tok=user2_tok)
|
||||
self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 3,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Should not see anything (except maybe the invite event) because we haven't
|
||||
# joined yet (history visibility is `shared`) (`filter_events_for_client(...)`
|
||||
# is doing the work here)
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
[],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
# No "live" events in an initial sync (no `from_token` to define the "live"
|
||||
# range) and no events returned in the timeline anyway so nothing could be
|
||||
# "live".
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
0,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# Even though we don't get any timeline events because they are filtered out,
|
||||
# there is still more to paginate
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
True,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# We should have some `stripped_state` so the potential joiner can identify the
|
||||
# room (we don't care about the order).
|
||||
self.assertCountEqual(
|
||||
channel.json_body["rooms"][room_id1]["invite_state"],
|
||||
[
|
||||
{
|
||||
"content": {"creator": user2_id, "room_version": "10"},
|
||||
"sender": user2_id,
|
||||
"state_key": "",
|
||||
"type": "m.room.create",
|
||||
},
|
||||
{
|
||||
"content": {"join_rule": "public"},
|
||||
"sender": user2_id,
|
||||
"state_key": "",
|
||||
"type": "m.room.join_rules",
|
||||
},
|
||||
{
|
||||
"content": {"displayname": user2.localpart, "membership": "join"},
|
||||
"sender": user2_id,
|
||||
"state_key": user2_id,
|
||||
"type": "m.room.member",
|
||||
},
|
||||
{
|
||||
"content": {"displayname": user1.localpart, "membership": "invite"},
|
||||
"sender": user2_id,
|
||||
"state_key": user1_id,
|
||||
"type": "m.room.member",
|
||||
},
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["invite_state"],
|
||||
)
|
||||
|
||||
def test_rooms_invite_world_readable_history_initial_sync(self) -> None:
|
||||
"""
|
||||
Test that `rooms` we are invited to have some stripped `invite_state` and that
|
||||
we can't see any timeline events because the history visiblity is `shared` and
|
||||
we haven't joined the room yet.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user1 = UserID.from_string(user1_id)
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
user2 = UserID.from_string(user2_id)
|
||||
|
||||
room_id1 = self.helper.create_room_as(
|
||||
user2_id,
|
||||
tok=user2_tok,
|
||||
extra_content={
|
||||
"preset": "public_chat",
|
||||
"initial_state": [
|
||||
{
|
||||
"content": {
|
||||
"history_visibility": HistoryVisibility.WORLD_READABLE
|
||||
},
|
||||
"state_key": "",
|
||||
"type": EventTypes.RoomHistoryVisibility,
|
||||
}
|
||||
],
|
||||
},
|
||||
)
|
||||
# Ensure we're testing with a room with `world_readable` history visibility
|
||||
# which means events are visible to anyone even without membership.
|
||||
history_visibility_response = self.helper.get_state(
|
||||
room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
|
||||
)
|
||||
self.assertEqual(
|
||||
history_visibility_response.get("history_visibility"),
|
||||
HistoryVisibility.WORLD_READABLE,
|
||||
)
|
||||
|
||||
self.helper.send(room_id1, "activity before1", tok=user2_tok)
|
||||
event_response2 = self.helper.send(room_id1, "activity before2", tok=user2_tok)
|
||||
use1_invite_response = self.helper.invite(
|
||||
room_id1, src=user2_id, targ=user1_id, tok=user2_tok
|
||||
)
|
||||
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
# Large enough to see the latest events and before the invite
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Should see the last 4 events in the room
|
||||
self.assertEqual(
|
||||
[
|
||||
event["event_id"]
|
||||
for event in channel.json_body["rooms"][room_id1]["timeline"]
|
||||
],
|
||||
[
|
||||
event_response2["event_id"],
|
||||
use1_invite_response["event_id"],
|
||||
event_response3["event_id"],
|
||||
event_response4["event_id"],
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
# No "live" events in an initial sync (no `from_token` to define the "live"
|
||||
# range)
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
0,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# There is still more to paginate
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
True,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# We should have some `stripped_state` so the potential joiner can identify the
|
||||
# room (we don't care about the order).
|
||||
self.assertCountEqual(
|
||||
channel.json_body["rooms"][room_id1]["invite_state"],
|
||||
[
|
||||
{
|
||||
"content": {"creator": user2_id, "room_version": "10"},
|
||||
"sender": user2_id,
|
||||
"state_key": "",
|
||||
"type": "m.room.create",
|
||||
},
|
||||
{
|
||||
"content": {"join_rule": "public"},
|
||||
"sender": user2_id,
|
||||
"state_key": "",
|
||||
"type": "m.room.join_rules",
|
||||
},
|
||||
{
|
||||
"content": {"displayname": user2.localpart, "membership": "join"},
|
||||
"sender": user2_id,
|
||||
"state_key": user2_id,
|
||||
"type": "m.room.member",
|
||||
},
|
||||
{
|
||||
"content": {"displayname": user1.localpart, "membership": "invite"},
|
||||
"sender": user2_id,
|
||||
"state_key": user1_id,
|
||||
"type": "m.room.member",
|
||||
},
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["invite_state"],
|
||||
)
|
||||
|
||||
def test_rooms_ban_initial_sync(self) -> None:
|
||||
"""
|
||||
Test that `rooms` we are banned from in an intial sync only allows us to see
|
||||
timeline events up to the ban event.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before1", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before2", tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
user1_ban_response = self.helper.ban(
|
||||
room_id1, src=user2_id, targ=user1_id, tok=user2_tok
|
||||
)
|
||||
|
||||
self.helper.send(room_id1, "activity after5", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after6", tok=user2_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 3,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We should see events before the ban but not after
|
||||
self.assertEqual(
|
||||
[
|
||||
event["event_id"]
|
||||
for event in channel.json_body["rooms"][room_id1]["timeline"]
|
||||
],
|
||||
[
|
||||
event_response3["event_id"],
|
||||
event_response4["event_id"],
|
||||
user1_ban_response["event_id"],
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
# No "live" events in an initial sync (no `from_token` to define the "live"
|
||||
# range)
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
0,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# There are more events to paginate to
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
True,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_ban_incremental_sync1(self) -> None:
|
||||
"""
|
||||
Test that `rooms` we are banned from during the next incremental sync only
|
||||
allows us to see timeline events up to the ban event.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before1", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before2", tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
|
||||
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
# The ban is within the token range (between the `from_token` and the sliding
|
||||
# sync request)
|
||||
user1_ban_response = self.helper.ban(
|
||||
room_id1, src=user2_id, targ=user1_id, tok=user2_tok
|
||||
)
|
||||
|
||||
self.helper.send(room_id1, "activity after5", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after6", tok=user2_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(
|
||||
from_token.to_string(self.store)
|
||||
)}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We should see events before the ban but not after
|
||||
self.assertEqual(
|
||||
[
|
||||
event["event_id"]
|
||||
for event in channel.json_body["rooms"][room_id1]["timeline"]
|
||||
],
|
||||
[
|
||||
event_response3["event_id"],
|
||||
event_response4["event_id"],
|
||||
user1_ban_response["event_id"],
|
||||
],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
# All live events in the incremental sync
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
3,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# There aren't anymore events to paginate to in this range
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
False,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
||||
def test_rooms_ban_incremental_sync2(self) -> None:
|
||||
"""
|
||||
Test that `rooms` we are banned from before the incremental sync don't return
|
||||
any events in the timeline.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity before1", tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
self.helper.send(room_id1, "activity after2", tok=user2_tok)
|
||||
# The ban is before we get our `from_token`
|
||||
self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
|
||||
|
||||
self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
|
||||
self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(
|
||||
from_token.to_string(self.store)
|
||||
)}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Nothing to see for this banned user in the room in the token range
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
[],
|
||||
channel.json_body["rooms"][room_id1]["timeline"],
|
||||
)
|
||||
# No events returned in the timeline so nothing is "live"
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["num_live"],
|
||||
0,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
# There aren't anymore events to paginate to in this range
|
||||
self.assertEqual(
|
||||
channel.json_body["rooms"][room_id1]["limited"],
|
||||
False,
|
||||
channel.json_body["rooms"][room_id1],
|
||||
)
|
||||
|
|
|
@ -261,9 +261,9 @@ class RestHelper:
|
|||
targ: str,
|
||||
expect_code: int = HTTPStatus.OK,
|
||||
tok: Optional[str] = None,
|
||||
) -> None:
|
||||
) -> JsonDict:
|
||||
"""A convenience helper: `change_membership` with `membership` preset to "ban"."""
|
||||
self.change_membership(
|
||||
return self.change_membership(
|
||||
room=room,
|
||||
src=src,
|
||||
targ=targ,
|
||||
|
|
Loading…
Reference in a new issue