Compare commits

...

11 commits

Author SHA1 Message Date
Alexander Fechler d404dab1bf
Merge b886ca4a89 into 2c36a679ae 2024-06-14 10:21:43 +01:00
Richard van der Hoff 2c36a679ae
Include user membership on events (#17282)
MSC4115 has now completed FCP, so we can enable it by default and switch
to the stable identifier.
2024-06-13 21:45:54 +00:00
Eric Eastwood c12ee0d5ba
Add is_dm filtering to Sliding Sync /sync (#17277)
Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync
2024-06-13 13:56:58 -05:00
Eric Eastwood 8aaff851b1
Fix newly_left rooms not appearing if we returned early (Sliding Sync) (#17301)
Fix `newly_left` rooms not appearing if we returned early when `membership_snapshot_token.is_before_or_eq(to_token.room_key)`. 

Introduced in https://github.com/element-hq/synapse/pull/17187 (part of Sliding Sync)

The tests didn't catch it because they had a small typo in it `room_id1` vs `room_id2`.

Found while working on https://github.com/element-hq/synapse/pull/17293
2024-06-13 11:36:57 -05:00
Eric Eastwood 8c58eb7f17
Add event.internal_metadata.instance_name (#17300)
Add `event.internal_metadata.instance_name` (the worker instance that persisted the event) to go alongside the existing `event.internal_metadata.stream_ordering`.

`instance_name` is useful to properly compare and query for events with a token since you need to compare both the `stream_ordering` and `instance_name` against the vector clock/`instance_map` in the `RoomStreamToken`.

This is pre-requisite work and may be used in https://github.com/element-hq/synapse/pull/17293

Adding `event.internal_metadata.instance_name` was first mentioned in the initial Sliding Sync PR while pairing with @erikjohnston, see 09609cb0db (diff-5cd773fb307aa754bd3948871ba118b1ef0303f4d72d42a2d21e38242bf4e096R405-R410)
2024-06-13 11:32:50 -05:00
Eric Eastwood ebdce69f6a
Fix get_last_event_in_room_before_stream_ordering(...) finding the wrong last event (#17295)
PR where this was introduced: https://github.com/matrix-org/synapse/pull/14817

### What does this affect?

`get_last_event_in_room_before_stream_ordering(...)` is used in Sync v2 in a lot of different state calculations.

`get_last_event_in_room_before_stream_ordering(...)`  is also used in `/rooms/{roomId}/members`
2024-06-13 11:00:52 -05:00
Alexander Fechler b886ca4a89 Link to Admin-API adjusted. 2024-06-10 18:31:06 +02:00
Alexander Fechler c55b638259 - Query parameter renamed and description adjusted.
- catch-block with print-statement removed.
2024-06-10 17:45:39 +02:00
Alexander Fechler f09d00020a - postgres incompatibility fixed.
- Test for filter_empty_rooms added.
2024-06-10 17:45:39 +02:00
Alexander Fechler 712e3d17c2 changelog added. 2024-06-10 17:45:39 +02:00
Alexander Fechler 34454ad88c Filter added to Admin-API GET /rooms 2024-06-10 17:45:39 +02:00
42 changed files with 890 additions and 101 deletions

View file

@ -0,0 +1 @@
Filter for public and empty rooms added to Admin-API [List Room API](https://element-hq.github.io/synapse/latest/admin_api/rooms.html#list-room-api).

View file

@ -0,0 +1 @@
Add `is_dm` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View file

@ -0,0 +1 @@
Include user membership in events served to clients, per MSC4115.

1
changelog.d/17295.bugfix Normal file
View file

@ -0,0 +1 @@
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.

1
changelog.d/17300.misc Normal file
View file

@ -0,0 +1 @@
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.

1
changelog.d/17301.bugfix Normal file
View file

@ -0,0 +1 @@
Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View file

@ -105,8 +105,6 @@ experimental_features:
# Expose a room summary for public rooms
msc3266_enabled: true
msc4115_membership_on_events: true
server_notices:
system_mxid_localpart: _server
system_mxid_display_name: "Server Alert"

View file

@ -36,6 +36,10 @@ The following query parameters are available:
- the room's name,
- the local part of the room's canonical alias, or
- the complete (local and server part) room's id (case sensitive).
* `public_rooms` - Optional flag to filter public rooms. If `true`, only public rooms are queried. If `false`, public rooms are excluded from
the query. When the flag is absent (the default), **both** public and non-public rooms are included in the search results.
* `empty_rooms` - Optional flag to filter empty rooms. A room is empty if joined_members is zero. If `true`, only empty rooms are queried. If `false`, empty rooms are excluded from
the query. When the flag is absent (the default), **both** empty and non-empty rooms are included in the search results.
Defaults to no filtering.

View file

@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,
/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
outlier: false,
})
}

View file

@ -223,7 +223,6 @@ test_packages=(
./tests/msc3930
./tests/msc3902
./tests/msc3967
./tests/msc4115
)
# Enable dirty runs, so tests will reuse the same container where possible.

View file

@ -238,7 +238,7 @@ class EventUnsignedContentFields:
"""Fields found inside the 'unsigned' data on events"""
# Requesting user's membership, per MSC4115
MSC4115_MEMBERSHIP: Final = "io.element.msc4115.membership"
MEMBERSHIP: Final = "membership"
class RoomTypes:

View file

@ -436,10 +436,6 @@ class ExperimentalConfig(Config):
("experimental", "msc4108_delegation_endpoint"),
)
self.msc4115_membership_on_events = experimental.get(
"msc4115_membership_on_events", False
)
self.msc3916_authenticated_media_enabled = experimental.get(
"msc3916_authenticated_media_enabled", False
)

View file

@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
# Mark the event as redacted
@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier
return new_event

View file

@ -42,7 +42,6 @@ class AdminHandler:
self._device_handler = hs.get_device_handler()
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._hs_config = hs.config
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
async def get_whois(self, user: UserID) -> JsonMapping:
@ -215,7 +214,6 @@ class AdminHandler:
self._storage_controllers,
user_id,
events,
msc4115_membership_on_events=self._hs_config.experimental.msc4115_membership_on_events,
)
writer.write_events(room_id, events)

View file

@ -148,7 +148,6 @@ class EventHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
async def get_event(
self,
@ -194,7 +193,6 @@ class EventHandler:
user.to_string(),
[event],
is_peeking=is_peeking,
msc4115_membership_on_events=self._config.experimental.msc4115_membership_on_events,
)
if not filtered:

View file

@ -224,7 +224,6 @@ class InitialSyncHandler:
self._storage_controllers,
user_id,
messages,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
@ -383,7 +382,6 @@ class InitialSyncHandler:
requester.user.to_string(),
messages,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
@ -498,7 +496,6 @@ class InitialSyncHandler:
requester.user.to_string(),
messages,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)

View file

@ -1551,6 +1551,7 @@ class EventCreationHandler:
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance
return event

View file

@ -623,7 +623,6 @@ class PaginationHandler:
user_id,
events,
is_peeking=(member_event_id is None),
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
# if after the filter applied there are no more events

View file

@ -95,7 +95,6 @@ class RelationsHandler:
self._event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
self._event_creation_handler = hs.get_event_creation_handler()
self._config = hs.config
async def get_relations(
self,
@ -164,7 +163,6 @@ class RelationsHandler:
user_id,
events,
is_peeking=(member_event_id is None),
msc4115_membership_on_events=self._config.experimental.msc4115_membership_on_events,
)
# The relations returned for the requested event do include their
@ -610,7 +608,6 @@ class RelationsHandler:
user_id,
events,
is_peeking=(member_event_id is None),
msc4115_membership_on_events=self._config.experimental.msc4115_membership_on_events,
)
aggregations = await self.get_bundled_aggregations(

View file

@ -1476,7 +1476,6 @@ class RoomContextHandler:
user.to_string(),
events,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
event = await self.store.get_event(

View file

@ -483,7 +483,6 @@ class SearchHandler:
self._storage_controllers,
user.to_string(),
filtered_events,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
events.sort(key=lambda e: -rank_map[e.event_id])
@ -585,7 +584,6 @@ class SearchHandler:
self._storage_controllers,
user.to_string(),
filtered_events,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
room_events.extend(events)
@ -673,14 +671,12 @@ class SearchHandler:
self._storage_controllers,
user.to_string(),
res.events_before,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
events_after = await filter_events_for_client(
self._storage_controllers,
user.to_string(),
res.events_after,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
context: JsonDict = {

View file

@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from immutabledict import immutabledict
from synapse.api.constants import Membership
from synapse.api.constants import AccountDataTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
@ -69,9 +69,19 @@ class SlidingSyncHandler:
from_token: Optional[StreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
"""
Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Args:
requester: The user making the request
sync_config: Sync configuration
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
timeout_ms: The time in milliseconds to wait for new data to arrive. If 0,
we will immediately but there might not be any new data so we just return an
empty response.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@ -143,6 +153,14 @@ class SlidingSyncHandler:
"""
Generates the response body of a Sliding Sync result, represented as a
`SlidingSyncResult`.
We fetch data according to the token range (> `from_token` and <= `to_token`).
Args:
sync_config: Sync configuration
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
"""
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
@ -163,11 +181,12 @@ class SlidingSyncHandler:
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
for list_key, list_config in sync_config.lists.items():
# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
# Apply filters
filtered_room_ids = room_id_set
if list_config.filters is not None:
filtered_room_ids = await self.filter_rooms(
sync_config.user, room_id_set, list_config.filters, to_token
)
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)
@ -217,6 +236,12 @@ class SlidingSyncHandler:
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.
"""
user_id = user.to_string()
@ -275,12 +300,6 @@ class SlidingSyncHandler:
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we can just straight-up return the room list (nothing has
# changed)
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
return sync_room_id_set
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
@ -300,14 +319,20 @@ class SlidingSyncHandler:
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
membership_change_events_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
)
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
@ -439,3 +464,84 @@ class SlidingSyncHandler:
sync_room_id_set.add(room_id)
return sync_room_id_set
async def filter_rooms(
self,
user: UserID,
room_id_set: AbstractSet[str],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> AbstractSet[str]:
"""
Filter rooms based on the sync request.
Args:
user: User to filter rooms for
room_id_set: Set of room IDs to filter down
filters: Filters to apply
to_token: We filter based on the state of the room at this token
"""
user_id = user.to_string()
# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
filtered_room_id_set = set(room_id_set)
# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
# We're using global account data (`m.direct`) instead of checking for
# `is_direct` on membership events because that property only appears for
# the invitee membership event (doesn't show up for the inviter). Account
# data is set by the client so it needs to be scrutinized.
#
# We're unable to take `to_token` into account for global account data since
# we only keep track of the latest account data for the user.
dm_map = await self.store.get_global_account_data_by_type_for_user(
user_id, AccountDataTypes.DIRECT
)
# Flatten out the map
dm_room_id_set = set()
if dm_map:
for room_ids in dm_map.values():
# Account data should be a list of room IDs. Ignore anything else
if isinstance(room_ids, list):
for room_id in room_ids:
if isinstance(room_id, str):
dm_room_id_set.add(room_id)
if filters.is_dm:
# Only DM rooms please
filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
else:
# Only non-DM rooms please
filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
if filters.spaces:
raise NotImplementedError()
if filters.is_encrypted:
raise NotImplementedError()
if filters.is_invite:
raise NotImplementedError()
if filters.room_types:
raise NotImplementedError()
if filters.not_room_types:
raise NotImplementedError()
if filters.room_name_like:
raise NotImplementedError()
if filters.tags:
raise NotImplementedError()
if filters.not_tags:
raise NotImplementedError()
return filtered_room_id_set

View file

@ -844,7 +844,6 @@ class SyncHandler:
sync_config.user.to_string(),
recents,
always_include_ids=current_state_ids,
msc4115_membership_on_events=self.hs_config.experimental.msc4115_membership_on_events,
)
log_kv({"recents_after_visibility_filtering": len(recents)})
else:
@ -930,7 +929,6 @@ class SyncHandler:
sync_config.user.to_string(),
loaded_recents,
always_include_ids=current_state_ids,
msc4115_membership_on_events=self.hs_config.experimental.msc4115_membership_on_events,
)
loaded_recents = []

View file

@ -721,7 +721,6 @@ class Notifier:
user.to_string(),
new_events,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
elif keyname == StreamKeyType.PRESENCE:
now = self.clock.time_msec()

View file

@ -532,7 +532,6 @@ class Mailer:
self._storage_controllers,
user_id,
results.events_before,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
the_events.append(notif_event)

View file

@ -35,6 +35,7 @@ from synapse.http.servlet import (
ResolveRoomIdMixin,
RestServlet,
assert_params_in_dict,
parse_boolean,
parse_enum,
parse_integer,
parse_json,
@ -242,13 +243,23 @@ class ListRoomRestServlet(RestServlet):
errcode=Codes.INVALID_PARAM,
)
public_rooms = parse_boolean(request, "public_rooms")
empty_rooms = parse_boolean(request, "empty_rooms")
direction = parse_enum(request, "dir", Direction, default=Direction.FORWARDS)
reverse_order = True if direction == Direction.BACKWARDS else False
# Return list of rooms according to parameters
rooms, total_rooms = await self.store.get_rooms_paginate(
start, limit, order_by, reverse_order, search_term
start,
limit,
order_by,
reverse_order,
search_term,
public_rooms,
empty_rooms,
)
response = {
# next_token should be opaque, so return a value the client can parse
"offset": start,

View file

@ -207,6 +207,7 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
await self.db_pool.runInteraction(
"persist_events",

View file

@ -156,6 +156,7 @@ class _EventRow:
event_id: str
stream_ordering: int
instance_name: str
json: str
internal_metadata: str
format_version: Optional[int]
@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier
# Consistency check: if the content of the event has been modified in the
@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore):
SELECT
e.event_id,
e.stream_ordering,
e.instance_name,
ej.internal_metadata,
ej.json,
ej.format_version,
@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
internal_metadata=row[2],
json=row[3],
format_version=row[4],
room_version_id=row[5],
rejected_reason=row[6],
instance_name=row[2],
internal_metadata=row[3],
json=row[4],
format_version=row[5],
room_version_id=row[6],
rejected_reason=row[7],
redactions=[],
outlier=bool(row[7]), # This is an int in SQLite3
outlier=bool(row[8]), # This is an int in SQLite3
)
# check for redactions

View file

@ -606,6 +606,8 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
order_by: str,
reverse_order: bool,
search_term: Optional[str],
public_rooms: Optional[bool],
empty_rooms: Optional[bool],
) -> Tuple[List[Dict[str, Any]], int]:
"""Function to retrieve a paginated list of rooms as json.
@ -617,30 +619,49 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
search_term: a string to filter room names,
canonical alias and room ids by.
Room ID must match exactly. Canonical alias must match a substring of the local part.
public_rooms: Optional flag to filter public and non-public rooms. If true, public rooms are queried.
if false, public rooms are excluded from the query. When it is
none (the default), both public rooms and none-public-rooms are queried.
empty_rooms: Optional flag to filter empty and non-empty rooms.
A room is empty if joined_members is zero.
If true, empty rooms are queried.
if false, empty rooms are excluded from the query. When it is
none (the default), both empty rooms and none-empty rooms are queried.
Returns:
A list of room dicts and an integer representing the total number of
rooms that exist given this query
"""
# Filter room names by a string
where_statement = ""
search_pattern: List[object] = []
filter_ = []
where_args = []
if search_term:
where_statement = """
WHERE LOWER(state.name) LIKE ?
OR LOWER(state.canonical_alias) LIKE ?
OR state.room_id = ?
"""
filter_ = [
"LOWER(state.name) LIKE ? OR "
"LOWER(state.canonical_alias) LIKE ? OR "
"state.room_id = ?"
]
# Our postgres db driver converts ? -> %s in SQL strings as that's the
# placeholder for postgres.
# HOWEVER, if you put a % into your SQL then everything goes wibbly.
# To get around this, we're going to surround search_term with %'s
# before giving it to the database in python instead
search_pattern = [
"%" + search_term.lower() + "%",
"#%" + search_term.lower() + "%:%",
where_args = [
f"%{search_term.lower()}%",
f"#%{search_term.lower()}%:%",
search_term,
]
if public_rooms is not None:
filter_arg = "1" if public_rooms else "0"
filter_.append(f"rooms.is_public = '{filter_arg}'")
if empty_rooms is not None:
if empty_rooms:
filter_.append("curr.joined_members = 0")
else:
filter_.append("curr.joined_members <> 0")
where_clause = "WHERE " + " AND ".join(filter_) if len(filter_) > 0 else ""
# Set ordering
if RoomSortOrder(order_by) == RoomSortOrder.SIZE:
@ -717,7 +738,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
LIMIT ?
OFFSET ?
""".format(
where=where_statement,
where=where_clause,
order_by=order_by_column,
direction="ASC" if order_by_asc else "DESC",
)
@ -726,10 +747,12 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
count_sql = """
SELECT count(*) FROM (
SELECT room_id FROM room_stats_state state
INNER JOIN room_stats_current curr USING (room_id)
INNER JOIN rooms USING (room_id)
{where}
) AS get_room_ids
""".format(
where=where_statement,
where=where_clause,
)
def _get_rooms_paginate_txn(
@ -737,7 +760,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
) -> Tuple[List[Dict[str, Any]], int]:
# Add the search term into the WHERE clause
# and execute the data query
txn.execute(info_sql, search_pattern + [limit, start])
txn.execute(info_sql, where_args + [limit, start])
# Refactor room query data into a structured dictionary
rooms = []
@ -767,7 +790,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
# Execute the count query
# Add the search term into the WHERE clause if present
txn.execute(count_sql, search_pattern)
txn.execute(count_sql, where_args)
room_count = cast(Tuple[int], txn.fetchone())
return rooms, room_count[0]

View file

@ -914,12 +914,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_last_event_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
# We need to handle the fact that the stream tokens can be vector
# clocks. We do this by getting all rows between the minimum and
# maximum stream ordering in the token, plus one row less than the
# minimum stream ordering. We then filter the results against the
# token and return the first row that matches.
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
# (sharded event persisters). The first subquery handles the events that
# would be within the vector clock and gets all rows between the minimum and
# maximum stream ordering in the token which need to be filtered against the
# `instance_map`. The second subquery handles the "before" case and finds
# the first row before the token. We then filter out any results past the
# token's vector clock and return the first row that matches.
min_stream = end_token.stream
max_stream = end_token.get_max_stream_pos()
# We use `union all` because we don't need any of the deduplication logic
# (`union` is really a union + distinct). `UNION ALL` does preserve the
# ordering of the operand queries but there is no actual gurantee that it
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
# bottom.
sql = """
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
@ -931,7 +942,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
) AS a
UNION
UNION ALL
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
FROM events
@ -943,15 +954,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ORDER BY stream_ordering DESC
LIMIT 1
) AS b
ORDER BY stream_ordering DESC
"""
txn.execute(
sql,
(
room_id,
end_token.stream,
end_token.get_max_stream_pos(),
min_stream,
max_stream,
room_id,
end_token.stream,
min_stream,
),
)

View file

@ -19,6 +19,8 @@ class EventInternalMetadata:
stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted."""
instance_name: Optional[str]
"""the instance name of the server that persisted this event. None, until it has been persisted."""
outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that

View file

@ -238,6 +238,53 @@ class SlidingSyncBody(RequestBodyModel):
"""
class Filters(RequestBodyModel):
"""
All fields are applied with AND operators, hence if `is_dm: True` and
`is_encrypted: True` then only Encrypted DM rooms will be returned. The
absence of fields implies no filter on that criteria: it does NOT imply
`False`. These fields may be expanded through use of extensions.
Attributes:
is_dm: Flag which only returns rooms present (or not) in the DM section
of account data. If unset, both DM rooms and non-DM rooms are returned.
If False, only non-DM rooms are returned. If True, only DM rooms are
returned.
spaces: Filter the room based on the space they belong to according to
`m.space.child` state events. If multiple spaces are present, a room can
be part of any one of the listed spaces (OR'd). The server will inspect
the `m.space.child` state events for the JOINED space room IDs given.
Servers MUST NOT navigate subspaces. It is up to the client to give a
complete list of spaces to navigate. Only rooms directly mentioned as
`m.space.child` events in these spaces will be returned. Unknown spaces
or spaces the user is not joined to will be ignored.
is_encrypted: Flag which only returns rooms which have an
`m.room.encryption` state event. If unset, both encrypted and
unencrypted rooms are returned. If `False`, only unencrypted rooms are
returned. If `True`, only encrypted rooms are returned.
is_invite: Flag which only returns rooms the user is currently invited
to. If unset, both invited and joined rooms are returned. If `False`, no
invited rooms are returned. If `True`, only invited rooms are returned.
room_types: If specified, only rooms where the `m.room.create` event has
a `type` matching one of the strings in this array will be returned. If
this field is unset, all rooms are returned regardless of type. This can
be used to get the initial set of spaces for an account. For rooms which
do not have a room type, use `null`/`None` to include them.
not_room_types: Same as `room_types` but inverted. This can be used to
filter out spaces from the room list. If a type is in both `room_types`
and `not_room_types`, then `not_room_types` wins and they are not included
in the result.
room_name_like: Filter the room name. Case-insensitive partial matching
e.g 'foo' matches 'abFooab'. The term 'like' is inspired by SQL 'LIKE',
and the text here is similar to '%foo%'.
tags: Filter the room based on its room tags. If multiple tags are
present, a room can have any one of the listed tags (OR'd).
not_tags: Filter the room based on its room tags. Takes priority over
`tags`. For example, a room with tags A and B with filters `tags: [A]`
`not_tags: [B]` would NOT be included because `not_tags` takes priority over
`tags`. This filter is useful if your rooms list does NOT include the
list of favourite rooms again.
"""
is_dm: Optional[StrictBool] = None
spaces: Optional[List[StrictStr]] = None
is_encrypted: Optional[StrictBool] = None

View file

@ -82,7 +82,6 @@ async def filter_events_for_client(
is_peeking: bool = False,
always_include_ids: FrozenSet[str] = frozenset(),
filter_send_to_client: bool = True,
msc4115_membership_on_events: bool = False,
) -> List[EventBase]:
"""
Check which events a user is allowed to see. If the user can see the event but its
@ -101,12 +100,10 @@ async def filter_events_for_client(
filter_send_to_client: Whether we're checking an event that's going to be
sent to a client. This might not always be the case since this function can
also be called to check whether a user can see the state at a given point.
msc4115_membership_on_events: Whether to include the requesting user's
membership in the "unsigned" data, per MSC4115.
Returns:
The filtered events. If `msc4115_membership_on_events` is true, the `unsigned`
data is annotated with the membership state of `user_id` at each event.
The filtered events. The `unsigned` data is annotated with the membership state
of `user_id` at each event.
"""
# Filter out events that have been soft failed so that we don't relay them
# to clients.
@ -159,9 +156,6 @@ async def filter_events_for_client(
if filtered is None:
return None
if not msc4115_membership_on_events:
return filtered
# Annotate the event with the user's membership after the event.
#
# Normally we just look in `state_after_event`, but if the event is an outlier
@ -186,7 +180,7 @@ async def filter_events_for_client(
# Copy the event before updating the unsigned data: this shouldn't be persisted
# to the cache!
cloned = clone_event(filtered)
cloned.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP] = user_membership
cloned.unsigned[EventUnsignedContentFields.MEMBERSHIP] = user_membership
return cloned

View file

@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
)
original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
original.internal_metadata.instance_name = "worker1"
self.assertEqual(original.internal_metadata.instance_name, "worker1")
cloned = clone_event(original)
cloned.unsigned["b"] = 3
@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn")

View file

@ -22,8 +22,9 @@ from unittest.mock import patch
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
from synapse.api.room_versions import RoomVersions
from synapse.handlers.sliding_sync import SlidingSyncConfig
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
@ -326,7 +327,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Leave during the from_token/to_token range (newly_left)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.helper.leave(room_id1, user1_id, tok=user1_tok)
self.helper.leave(room_id2, user1_id, tok=user1_tok)
after_room2_token = self.event_sources.get_current_token()
@ -1116,3 +1117,130 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
room_id3,
},
)
class FilterRoomsTestCase(HomeserverTestCase):
"""
Tests Sliding Sync handler `filter_rooms()` to make sure it includes/excludes rooms
correctly.
"""
servlets = [
admin.register_servlets,
knock.register_servlets,
login.register_servlets,
room.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
# Enable sliding sync
config["experimental_features"] = {"msc3575_enabled": True}
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
self.store = self.hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _create_dm_room(
self,
inviter_user_id: str,
inviter_tok: str,
invitee_user_id: str,
invitee_tok: str,
) -> str:
"""
Helper to create a DM room as the "inviter" and invite the "invitee" user to the room. The
"invitee" user also will join the room. The `m.direct` account data will be set
for both users.
"""
# Create a room and send an invite the other user
room_id = self.helper.create_room_as(
inviter_user_id,
is_public=False,
tok=inviter_tok,
)
self.helper.invite(
room_id,
src=inviter_user_id,
targ=invitee_user_id,
tok=inviter_tok,
extra_data={"is_direct": True},
)
# Person that was invited joins the room
self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
# Mimic the client setting the room as a direct message in the global account
# data
self.get_success(
self.store.add_account_data_for_user(
invitee_user_id,
AccountDataTypes.DIRECT,
{inviter_user_id: [room_id]},
)
)
self.get_success(
self.store.add_account_data_for_user(
inviter_user_id,
AccountDataTypes.DIRECT,
{invitee_user_id: [room_id]},
)
)
return room_id
def test_filter_dm_rooms(self) -> None:
"""
Test `filter.is_dm` for DM rooms
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# Create a normal room
room_id = self.helper.create_room_as(
user1_id,
is_public=False,
tok=user1_tok,
)
# Create a DM room
dm_room_id = self._create_dm_room(
inviter_user_id=user1_id,
inviter_tok=user1_tok,
invitee_user_id=user2_id,
invitee_tok=user2_tok,
)
after_rooms_token = self.event_sources.get_current_token()
# Try with `is_dm=True`
truthy_filtered_room_ids = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, dm_room_id},
SlidingSyncConfig.SlidingSyncList.Filters(
is_dm=True,
),
after_rooms_token,
)
)
self.assertEqual(truthy_filtered_room_ids, {dm_room_id})
# Try with `is_dm=False`
falsy_filtered_room_ids = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, dm_room_id},
SlidingSyncConfig.SlidingSyncList.Filters(
is_dm=False,
),
after_rooms_token,
)
)
self.assertEqual(falsy_filtered_room_ids, {room_id})

View file

@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None
self.replicate()
@ -155,7 +156,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
"invite",
event.event_id,
PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
@ -232,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
@ -288,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
event_source = RoomEventSource(self.hs)
@ -329,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name,
j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,

View file

@ -1795,6 +1795,83 @@ class RoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(room_id, channel.json_body["rooms"][0].get("room_id"))
self.assertEqual("ж", channel.json_body["rooms"][0].get("name"))
def test_filter_public_rooms(self) -> None:
self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok, is_public=True
)
self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok, is_public=True
)
self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok, is_public=False
)
response = self.make_request(
"GET",
"/_synapse/admin/v1/rooms",
access_token=self.admin_user_tok,
)
self.assertEqual(200, response.code, msg=response.json_body)
self.assertEqual(3, response.json_body["total_rooms"])
self.assertEqual(3, len(response.json_body["rooms"]))
response = self.make_request(
"GET",
"/_synapse/admin/v1/rooms?public_rooms=true",
access_token=self.admin_user_tok,
)
self.assertEqual(200, response.code, msg=response.json_body)
self.assertEqual(2, response.json_body["total_rooms"])
self.assertEqual(2, len(response.json_body["rooms"]))
response = self.make_request(
"GET",
"/_synapse/admin/v1/rooms?public_rooms=false",
access_token=self.admin_user_tok,
)
self.assertEqual(200, response.code, msg=response.json_body)
self.assertEqual(1, response.json_body["total_rooms"])
self.assertEqual(1, len(response.json_body["rooms"]))
def test_filter_empty_rooms(self) -> None:
self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok, is_public=True
)
self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok, is_public=True
)
room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok, is_public=False
)
self.helper.leave(room_id, self.admin_user, tok=self.admin_user_tok)
response = self.make_request(
"GET",
"/_synapse/admin/v1/rooms",
access_token=self.admin_user_tok,
)
self.assertEqual(200, response.code, msg=response.json_body)
self.assertEqual(3, response.json_body["total_rooms"])
self.assertEqual(3, len(response.json_body["rooms"]))
response = self.make_request(
"GET",
"/_synapse/admin/v1/rooms?empty_rooms=false",
access_token=self.admin_user_tok,
)
self.assertEqual(200, response.code, msg=response.json_body)
self.assertEqual(2, response.json_body["total_rooms"])
self.assertEqual(2, len(response.json_body["rooms"]))
response = self.make_request(
"GET",
"/_synapse/admin/v1/rooms?empty_rooms=true",
access_token=self.admin_user_tok,
)
self.assertEqual(200, response.code, msg=response.json_body)
self.assertEqual(1, response.json_body["total_rooms"])
self.assertEqual(1, len(response.json_body["rooms"]))
def test_single_room(self) -> None:
"""Test that a single room can be requested correctly"""
# Create two test rooms

View file

@ -167,7 +167,6 @@ class RetentionTestCase(unittest.HomeserverTestCase):
storage_controllers,
self.user_id,
events,
msc4115_membership_on_events=True,
)
)

View file

@ -27,6 +27,7 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import (
AccountDataTypes,
EventContentFields,
EventTypes,
ReceiptTypes,
@ -1226,10 +1227,59 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync"
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _create_dm_room(
self,
inviter_user_id: str,
inviter_tok: str,
invitee_user_id: str,
invitee_tok: str,
) -> str:
"""
Helper to create a DM room as the "inviter" and invite the "invitee" user to the
room. The "invitee" user also will join the room. The `m.direct` account data
will be set for both users.
"""
# Create a room and send an invite the other user
room_id = self.helper.create_room_as(
inviter_user_id,
is_public=False,
tok=inviter_tok,
)
self.helper.invite(
room_id,
src=inviter_user_id,
targ=invitee_user_id,
tok=inviter_tok,
extra_data={"is_direct": True},
)
# Person that was invited joins the room
self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
# Mimic the client setting the room as a direct message in the global account
# data
self.get_success(
self.store.add_account_data_for_user(
invitee_user_id,
AccountDataTypes.DIRECT,
{inviter_user_id: [room_id]},
)
)
self.get_success(
self.store.add_account_data_for_user(
inviter_user_id,
AccountDataTypes.DIRECT,
{invitee_user_id: [room_id]},
)
)
return room_id
def test_sync_list(self) -> None:
"""
Test that room IDs show up in the Sliding Sync lists
@ -1336,3 +1386,80 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.assertEqual(
channel.json_body["next_pos"], future_position_token_serialized
)
def test_filter_list(self) -> None:
"""
Test that filters apply to lists
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# Create a DM room
dm_room_id = self._create_dm_room(
inviter_user_id=user1_id,
inviter_tok=user1_tok,
invitee_user_id=user2_id,
invitee_tok=user2_tok,
)
# Create a normal room
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"dms": {
"ranges": [[0, 99]],
"sort": ["by_recency"],
"required_state": [],
"timeline_limit": 1,
"filters": {"is_dm": True},
},
"foo-list": {
"ranges": [[0, 99]],
"sort": ["by_recency"],
"required_state": [],
"timeline_limit": 1,
"filters": {"is_dm": False},
},
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["dms", "foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list includes the room we are joined to
self.assertListEqual(
list(channel.json_body["lists"]["dms"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 99],
"room_ids": [dm_room_id],
}
],
list(channel.json_body["lists"]["dms"]),
)
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 99],
"room_ids": [room_id],
}
],
list(channel.json_body["lists"]["foo-list"]),
)

View file

@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering
e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1
def _persist(txn: LoggingTransaction) -> None:

View file

@ -19,7 +19,10 @@
#
#
from typing import List
import logging
from typing import List, Tuple
from immutabledict import immutabledict
from twisted.test.proto_helpers import MemoryReactor
@ -28,11 +31,13 @@ from synapse.api.filtering import Filter
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class PaginationTestCase(HomeserverTestCase):
"""
@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase):
}
chunk = self._filter_messages(filter)
self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
"""
Test `get_last_event_in_room_before_stream_ordering(...)`
"""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _update_persisted_instance_name_for_event(
self, event_id: str, instance_name: str
) -> None:
"""
Update the `instance_name` that persisted the the event in the database.
"""
return self.get_success(
self.store.db_pool.simple_update_one(
"events",
keyvalues={"event_id": event_id},
updatevalues={"instance_name": instance_name},
)
)
def _send_event_on_instance(
self, instance_name: str, room_id: str, access_token: str
) -> Tuple[JsonDict, PersistedEventPosition]:
"""
Send an event in a room and mimic that it was persisted by a specific
instance/worker.
"""
event_response = self.helper.send(
room_id, f"{instance_name} message", tok=access_token
)
self._update_persisted_instance_name_for_event(
event_response["event_id"], instance_name
)
event_pos = self.get_success(
self.store.get_position_for_event(event_response["event_id"])
)
return event_response, event_pos
def test_before_room_created(self) -> None:
"""
Test that no event is returned if we are using a token before the room was even created
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
before_room_token = self.event_sources.get_current_token()
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id,
end_token=before_room_token.room_key,
)
)
self.assertIsNone(last_event)
def test_after_room_created(self) -> None:
"""
Test that an event is returned if we are using a token after the room was created
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id,
end_token=after_room_token.room_key,
)
)
self.assertIsNotNone(last_event)
def test_activity_in_other_rooms(self) -> None:
"""
Test to make sure that the last event in the room is returned even if the
`stream_ordering` has advanced from activity in other rooms.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
# Create another room to advance the stream_ordering
self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
# Make sure it's the event we expect (which also means we know it's from the
# correct room)
self.assertEqual(last_event, event_response["event_id"])
def test_activity_after_token_has_no_effect(self) -> None:
"""
Test to make sure we return the last event before the token even if there is
activity after it.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
# Make sure it's the last event before the token
self.assertEqual(last_event, event_response["event_id"])
def test_last_event_within_sharded_token(self) -> None:
"""
Test to make sure we can find the last event that that is *within* the sharded
token (a token that has an `instance_map` and looks like
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
that we can find an event within the tokens minimum and instance
`stream_ordering`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response1, event_pos1 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response2, event_pos2 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response3, event_pos3 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
# Create another room to advance the `stream_ordering` on the same worker
# so we can sandwich event3 in the middle of the token
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response4, event_pos4 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
# Assemble a token that encompasses event1 -> event4 on worker1
end_token = RoomStreamToken(
stream=event_pos2.stream,
instance_map=immutabledict({"worker1": event_pos4.stream}),
)
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
# Should find closest event at/before the token in room1
self.assertEqual(
last_event,
event_response3["event_id"],
f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
"event2": event_response2["event_id"],
"event3": event_response3["event_id"],
}
),
)
def test_last_event_before_sharded_token(self) -> None:
"""
Test to make sure we can find the last event that is *before* the sharded token
(a token that has an `instance_map` and looks like
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response1, event_pos1 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response2, event_pos2 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
# Create another room to advance the `stream_ordering` on the same worker
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response3, event_pos3 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
event_response4, event_pos4 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
# Assemble a token that encompasses event3 -> event4 on worker1
end_token = RoomStreamToken(
stream=event_pos3.stream,
instance_map=immutabledict({"worker1": event_pos4.stream}),
)
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
# Should find closest event at/before the token in room1
self.assertEqual(
last_event,
event_response2["event_id"],
f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
"event2": event_response2["event_id"],
}
),
)

View file

@ -336,7 +336,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.hs.get_storage_controllers(),
"@joiner:test",
events_to_filter,
msc4115_membership_on_events=True,
)
)
resident_filtered_events = self.get_success(
@ -344,7 +343,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.hs.get_storage_controllers(),
"@resident:test",
events_to_filter,
msc4115_membership_on_events=True,
)
)
@ -357,7 +355,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.assertEqual(
["join", "join", "leave"],
[
e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
for e in joiner_filtered_events
],
)
@ -379,7 +377,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.assertEqual(
["join", "join", "join", "join", "join"],
[
e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
for e in resident_filtered_events
],
)
@ -441,7 +439,6 @@ class FilterEventsOutOfBandEventsForClientTestCase(
self.hs.get_storage_controllers(),
"@user:test",
[invite_event, reject_event],
msc4115_membership_on_events=True,
)
)
self.assertEqual(
@ -451,7 +448,7 @@ class FilterEventsOutOfBandEventsForClientTestCase(
self.assertEqual(
["invite", "leave"],
[
e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
for e in filtered_events
],
)
@ -463,7 +460,6 @@ class FilterEventsOutOfBandEventsForClientTestCase(
self.hs.get_storage_controllers(),
"@other:test",
[invite_event, reject_event],
msc4115_membership_on_events=True,
)
),
[],