Compare commits

...

11 commits

Author SHA1 Message Date
dependabot[bot] 559548344c
Merge cb036eb79c into 2c36a679ae 2024-06-14 10:21:43 +01:00
Richard van der Hoff 2c36a679ae
Include user membership on events (#17282)
MSC4115 has now completed FCP, so we can enable it by default and switch
to the stable identifier.
2024-06-13 21:45:54 +00:00
Eric Eastwood c12ee0d5ba
Add is_dm filtering to Sliding Sync /sync (#17277)
Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync
2024-06-13 13:56:58 -05:00
Eric Eastwood 8aaff851b1
Fix newly_left rooms not appearing if we returned early (Sliding Sync) (#17301)
Fix `newly_left` rooms not appearing if we returned early when `membership_snapshot_token.is_before_or_eq(to_token.room_key)`. 

Introduced in https://github.com/element-hq/synapse/pull/17187 (part of Sliding Sync)

The tests didn't catch it because they had a small typo in it `room_id1` vs `room_id2`.

Found while working on https://github.com/element-hq/synapse/pull/17293
2024-06-13 11:36:57 -05:00
Eric Eastwood 8c58eb7f17
Add event.internal_metadata.instance_name (#17300)
Add `event.internal_metadata.instance_name` (the worker instance that persisted the event) to go alongside the existing `event.internal_metadata.stream_ordering`.

`instance_name` is useful to properly compare and query for events with a token since you need to compare both the `stream_ordering` and `instance_name` against the vector clock/`instance_map` in the `RoomStreamToken`.

This is pre-requisite work and may be used in https://github.com/element-hq/synapse/pull/17293

Adding `event.internal_metadata.instance_name` was first mentioned in the initial Sliding Sync PR while pairing with @erikjohnston, see 09609cb0db (diff-5cd773fb307aa754bd3948871ba118b1ef0303f4d72d42a2d21e38242bf4e096R405-R410)
2024-06-13 11:32:50 -05:00
Eric Eastwood ebdce69f6a
Fix get_last_event_in_room_before_stream_ordering(...) finding the wrong last event (#17295)
PR where this was introduced: https://github.com/matrix-org/synapse/pull/14817

### What does this affect?

`get_last_event_in_room_before_stream_ordering(...)` is used in Sync v2 in a lot of different state calculations.

`get_last_event_in_room_before_stream_ordering(...)`  is also used in `/rooms/{roomId}/members`
2024-06-13 11:00:52 -05:00
Erik Johnston cb036eb79c Try setting DIST_EXTRA_CONFIG in CI 2024-04-30 11:41:00 +01:00
Erik Johnston 2f45986ce8 Fix typo 2024-04-30 10:42:47 +01:00
Erik Johnston 901647e91c Build ABI3 wheels in CI 2024-04-30 10:37:28 +01:00
Erik Johnston 8410856411 Remove deprecated py_limited flag 2024-04-30 10:30:09 +01:00
dependabot[bot] 9c01f2cc11
Bump setuptools-rust from 1.8.1 to 1.9.0
Bumps [setuptools-rust](https://github.com/PyO3/setuptools-rust) from 1.8.1 to 1.9.0.
- [Release notes](https://github.com/PyO3/setuptools-rust/releases)
- [Changelog](https://github.com/PyO3/setuptools-rust/blob/main/CHANGELOG.md)
- [Commits](https://github.com/PyO3/setuptools-rust/compare/v1.8.1...v1.9.0)

---
updated-dependencies:
- dependency-name: setuptools-rust
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-04-22 10:43:13 +00:00
40 changed files with 766 additions and 94 deletions

View file

@ -14,7 +14,6 @@ def build(setup_kwargs: Dict[str, Any]) -> None:
target="synapse.synapse_rust",
path=cargo_toml_path,
binding=Binding.PyO3,
py_limited_api=True,
# We force always building in release mode, as we can't tell the
# difference between using `poetry` in development vs production.
debug=False,

View file

@ -0,0 +1 @@
Add `is_dm` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View file

@ -0,0 +1 @@
Include user membership in events served to clients, per MSC4115.

1
changelog.d/17295.bugfix Normal file
View file

@ -0,0 +1 @@
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.

1
changelog.d/17300.misc Normal file
View file

@ -0,0 +1 @@
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.

1
changelog.d/17301.bugfix Normal file
View file

@ -0,0 +1 @@
Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View file

@ -105,8 +105,6 @@ experimental_features:
# Expose a room summary for public rooms
msc3266_enabled: true
msc4115_membership_on_events: true
server_notices:
system_mxid_localpart: _server
system_mxid_display_name: "Server Alert"

6
poetry.lock generated
View file

@ -2477,13 +2477,13 @@ testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (
[[package]]
name = "setuptools-rust"
version = "1.8.1"
version = "1.9.0"
description = "Setuptools Rust extension plugin"
optional = false
python-versions = ">=3.8"
files = [
{file = "setuptools-rust-1.8.1.tar.gz", hash = "sha256:94b1dd5d5308b3138d5b933c3a2b55e6d6927d1a22632e509fcea9ddd0f7e486"},
{file = "setuptools_rust-1.8.1-py3-none-any.whl", hash = "sha256:b5324493949ccd6aa0c03890c5f6b5f02de4512e3ac1697d02e9a6c02b18aa8e"},
{file = "setuptools-rust-1.9.0.tar.gz", hash = "sha256:704df0948f2e4cc60c2596ad6e840ea679f4f43e58ed4ad0c1857807240eab96"},
{file = "setuptools_rust-1.9.0-py3-none-any.whl", hash = "sha256:409caf49dcf7ad9bd510b4bf4011fbad504e745fae98f57fe1c06f3a97719638"},
]
[package.dependencies]

View file

@ -369,7 +369,7 @@ tomli = ">=1.2.3"
# runtime errors caused by build system changes.
# We are happy to raise these upper bounds upon request,
# provided we check that it's safe to do so (i.e. that CI passes).
requires = ["poetry-core>=1.1.0,<=1.9.0", "setuptools_rust>=1.3,<=1.8.1"]
requires = ["poetry-core>=1.1.0,<=1.9.0", "setuptools_rust>=1.3,<=1.8.1", "wheel"]
build-backend = "poetry.core.masonry.api"
@ -387,13 +387,13 @@ skip = "cp36* cp37* pp37* *-musllinux_i686 pp*aarch64 *-musllinux_aarch64"
# We need a rust compiler
before-all = "curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y --profile minimal"
environment= { PATH = "$PATH:$HOME/.cargo/bin" }
environment = { PATH = "$PATH:$HOME/.cargo/bin", DIST_EXTRA_CONFIG = "/tmp/bdist.ini" }
# For some reason if we don't manually clean the build directory we
# can end up polluting the next build with a .so that is for the wrong
# Python version.
before-build = "rm -rf {project}/build"
build-frontend = "build"
before-build = "rm -rf {project}/build && echo '[bdist_wheel]\npy_limited_api=cp38\n' > $DIST_EXTRA_CONFIG"
build-frontend = "build; args: --config-setting=--build-option=--py-limited-api=cp38" # Build ABI3 wheels for 3.8+
test-command = "python -c 'from synapse.synapse_rust import sum_as_string; print(sum_as_string(1, 2))'"

View file

@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,
/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
outlier: false,
})
}

View file

@ -223,7 +223,6 @@ test_packages=(
./tests/msc3930
./tests/msc3902
./tests/msc3967
./tests/msc4115
)
# Enable dirty runs, so tests will reuse the same container where possible.

View file

@ -238,7 +238,7 @@ class EventUnsignedContentFields:
"""Fields found inside the 'unsigned' data on events"""
# Requesting user's membership, per MSC4115
MSC4115_MEMBERSHIP: Final = "io.element.msc4115.membership"
MEMBERSHIP: Final = "membership"
class RoomTypes:

View file

@ -436,10 +436,6 @@ class ExperimentalConfig(Config):
("experimental", "msc4108_delegation_endpoint"),
)
self.msc4115_membership_on_events = experimental.get(
"msc4115_membership_on_events", False
)
self.msc3916_authenticated_media_enabled = experimental.get(
"msc3916_authenticated_media_enabled", False
)

View file

@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
# Mark the event as redacted
@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier
return new_event

View file

@ -42,7 +42,6 @@ class AdminHandler:
self._device_handler = hs.get_device_handler()
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._hs_config = hs.config
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
async def get_whois(self, user: UserID) -> JsonMapping:
@ -215,7 +214,6 @@ class AdminHandler:
self._storage_controllers,
user_id,
events,
msc4115_membership_on_events=self._hs_config.experimental.msc4115_membership_on_events,
)
writer.write_events(room_id, events)

View file

@ -148,7 +148,6 @@ class EventHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
async def get_event(
self,
@ -194,7 +193,6 @@ class EventHandler:
user.to_string(),
[event],
is_peeking=is_peeking,
msc4115_membership_on_events=self._config.experimental.msc4115_membership_on_events,
)
if not filtered:

View file

@ -224,7 +224,6 @@ class InitialSyncHandler:
self._storage_controllers,
user_id,
messages,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
@ -383,7 +382,6 @@ class InitialSyncHandler:
requester.user.to_string(),
messages,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
@ -498,7 +496,6 @@ class InitialSyncHandler:
requester.user.to_string(),
messages,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)

View file

@ -1551,6 +1551,7 @@ class EventCreationHandler:
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance
return event

View file

@ -623,7 +623,6 @@ class PaginationHandler:
user_id,
events,
is_peeking=(member_event_id is None),
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
# if after the filter applied there are no more events

View file

@ -95,7 +95,6 @@ class RelationsHandler:
self._event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
self._event_creation_handler = hs.get_event_creation_handler()
self._config = hs.config
async def get_relations(
self,
@ -164,7 +163,6 @@ class RelationsHandler:
user_id,
events,
is_peeking=(member_event_id is None),
msc4115_membership_on_events=self._config.experimental.msc4115_membership_on_events,
)
# The relations returned for the requested event do include their
@ -610,7 +608,6 @@ class RelationsHandler:
user_id,
events,
is_peeking=(member_event_id is None),
msc4115_membership_on_events=self._config.experimental.msc4115_membership_on_events,
)
aggregations = await self.get_bundled_aggregations(

View file

@ -1476,7 +1476,6 @@ class RoomContextHandler:
user.to_string(),
events,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
event = await self.store.get_event(

View file

@ -483,7 +483,6 @@ class SearchHandler:
self._storage_controllers,
user.to_string(),
filtered_events,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
events.sort(key=lambda e: -rank_map[e.event_id])
@ -585,7 +584,6 @@ class SearchHandler:
self._storage_controllers,
user.to_string(),
filtered_events,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
room_events.extend(events)
@ -673,14 +671,12 @@ class SearchHandler:
self._storage_controllers,
user.to_string(),
res.events_before,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
events_after = await filter_events_for_client(
self._storage_controllers,
user.to_string(),
res.events_after,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
context: JsonDict = {

View file

@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from immutabledict import immutabledict
from synapse.api.constants import Membership
from synapse.api.constants import AccountDataTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
@ -69,9 +69,19 @@ class SlidingSyncHandler:
from_token: Optional[StreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
"""
Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Args:
requester: The user making the request
sync_config: Sync configuration
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
timeout_ms: The time in milliseconds to wait for new data to arrive. If 0,
we will immediately but there might not be any new data so we just return an
empty response.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@ -143,6 +153,14 @@ class SlidingSyncHandler:
"""
Generates the response body of a Sliding Sync result, represented as a
`SlidingSyncResult`.
We fetch data according to the token range (> `from_token` and <= `to_token`).
Args:
sync_config: Sync configuration
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
"""
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
@ -163,11 +181,12 @@ class SlidingSyncHandler:
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
for list_key, list_config in sync_config.lists.items():
# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
# Apply filters
filtered_room_ids = room_id_set
if list_config.filters is not None:
filtered_room_ids = await self.filter_rooms(
sync_config.user, room_id_set, list_config.filters, to_token
)
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)
@ -217,6 +236,12 @@ class SlidingSyncHandler:
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.
"""
user_id = user.to_string()
@ -275,12 +300,6 @@ class SlidingSyncHandler:
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we can just straight-up return the room list (nothing has
# changed)
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
return sync_room_id_set
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
@ -300,14 +319,20 @@ class SlidingSyncHandler:
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
membership_change_events_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
)
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
@ -439,3 +464,84 @@ class SlidingSyncHandler:
sync_room_id_set.add(room_id)
return sync_room_id_set
async def filter_rooms(
self,
user: UserID,
room_id_set: AbstractSet[str],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> AbstractSet[str]:
"""
Filter rooms based on the sync request.
Args:
user: User to filter rooms for
room_id_set: Set of room IDs to filter down
filters: Filters to apply
to_token: We filter based on the state of the room at this token
"""
user_id = user.to_string()
# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
filtered_room_id_set = set(room_id_set)
# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
# We're using global account data (`m.direct`) instead of checking for
# `is_direct` on membership events because that property only appears for
# the invitee membership event (doesn't show up for the inviter). Account
# data is set by the client so it needs to be scrutinized.
#
# We're unable to take `to_token` into account for global account data since
# we only keep track of the latest account data for the user.
dm_map = await self.store.get_global_account_data_by_type_for_user(
user_id, AccountDataTypes.DIRECT
)
# Flatten out the map
dm_room_id_set = set()
if dm_map:
for room_ids in dm_map.values():
# Account data should be a list of room IDs. Ignore anything else
if isinstance(room_ids, list):
for room_id in room_ids:
if isinstance(room_id, str):
dm_room_id_set.add(room_id)
if filters.is_dm:
# Only DM rooms please
filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
else:
# Only non-DM rooms please
filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
if filters.spaces:
raise NotImplementedError()
if filters.is_encrypted:
raise NotImplementedError()
if filters.is_invite:
raise NotImplementedError()
if filters.room_types:
raise NotImplementedError()
if filters.not_room_types:
raise NotImplementedError()
if filters.room_name_like:
raise NotImplementedError()
if filters.tags:
raise NotImplementedError()
if filters.not_tags:
raise NotImplementedError()
return filtered_room_id_set

View file

@ -844,7 +844,6 @@ class SyncHandler:
sync_config.user.to_string(),
recents,
always_include_ids=current_state_ids,
msc4115_membership_on_events=self.hs_config.experimental.msc4115_membership_on_events,
)
log_kv({"recents_after_visibility_filtering": len(recents)})
else:
@ -930,7 +929,6 @@ class SyncHandler:
sync_config.user.to_string(),
loaded_recents,
always_include_ids=current_state_ids,
msc4115_membership_on_events=self.hs_config.experimental.msc4115_membership_on_events,
)
loaded_recents = []

View file

@ -721,7 +721,6 @@ class Notifier:
user.to_string(),
new_events,
is_peeking=is_peeking,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
elif keyname == StreamKeyType.PRESENCE:
now = self.clock.time_msec()

View file

@ -532,7 +532,6 @@ class Mailer:
self._storage_controllers,
user_id,
results.events_before,
msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
)
the_events.append(notif_event)

View file

@ -207,6 +207,7 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
await self.db_pool.runInteraction(
"persist_events",

View file

@ -156,6 +156,7 @@ class _EventRow:
event_id: str
stream_ordering: int
instance_name: str
json: str
internal_metadata: str
format_version: Optional[int]
@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier
# Consistency check: if the content of the event has been modified in the
@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore):
SELECT
e.event_id,
e.stream_ordering,
e.instance_name,
ej.internal_metadata,
ej.json,
ej.format_version,
@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
internal_metadata=row[2],
json=row[3],
format_version=row[4],
room_version_id=row[5],
rejected_reason=row[6],
instance_name=row[2],
internal_metadata=row[3],
json=row[4],
format_version=row[5],
room_version_id=row[6],
rejected_reason=row[7],
redactions=[],
outlier=bool(row[7]), # This is an int in SQLite3
outlier=bool(row[8]), # This is an int in SQLite3
)
# check for redactions

View file

@ -914,12 +914,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_last_event_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
# We need to handle the fact that the stream tokens can be vector
# clocks. We do this by getting all rows between the minimum and
# maximum stream ordering in the token, plus one row less than the
# minimum stream ordering. We then filter the results against the
# token and return the first row that matches.
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
# (sharded event persisters). The first subquery handles the events that
# would be within the vector clock and gets all rows between the minimum and
# maximum stream ordering in the token which need to be filtered against the
# `instance_map`. The second subquery handles the "before" case and finds
# the first row before the token. We then filter out any results past the
# token's vector clock and return the first row that matches.
min_stream = end_token.stream
max_stream = end_token.get_max_stream_pos()
# We use `union all` because we don't need any of the deduplication logic
# (`union` is really a union + distinct). `UNION ALL` does preserve the
# ordering of the operand queries but there is no actual gurantee that it
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
# bottom.
sql = """
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
@ -931,7 +942,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
) AS a
UNION
UNION ALL
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
FROM events
@ -943,15 +954,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ORDER BY stream_ordering DESC
LIMIT 1
) AS b
ORDER BY stream_ordering DESC
"""
txn.execute(
sql,
(
room_id,
end_token.stream,
end_token.get_max_stream_pos(),
min_stream,
max_stream,
room_id,
end_token.stream,
min_stream,
),
)

View file

@ -19,6 +19,8 @@ class EventInternalMetadata:
stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted."""
instance_name: Optional[str]
"""the instance name of the server that persisted this event. None, until it has been persisted."""
outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that

View file

@ -238,6 +238,53 @@ class SlidingSyncBody(RequestBodyModel):
"""
class Filters(RequestBodyModel):
"""
All fields are applied with AND operators, hence if `is_dm: True` and
`is_encrypted: True` then only Encrypted DM rooms will be returned. The
absence of fields implies no filter on that criteria: it does NOT imply
`False`. These fields may be expanded through use of extensions.
Attributes:
is_dm: Flag which only returns rooms present (or not) in the DM section
of account data. If unset, both DM rooms and non-DM rooms are returned.
If False, only non-DM rooms are returned. If True, only DM rooms are
returned.
spaces: Filter the room based on the space they belong to according to
`m.space.child` state events. If multiple spaces are present, a room can
be part of any one of the listed spaces (OR'd). The server will inspect
the `m.space.child` state events for the JOINED space room IDs given.
Servers MUST NOT navigate subspaces. It is up to the client to give a
complete list of spaces to navigate. Only rooms directly mentioned as
`m.space.child` events in these spaces will be returned. Unknown spaces
or spaces the user is not joined to will be ignored.
is_encrypted: Flag which only returns rooms which have an
`m.room.encryption` state event. If unset, both encrypted and
unencrypted rooms are returned. If `False`, only unencrypted rooms are
returned. If `True`, only encrypted rooms are returned.
is_invite: Flag which only returns rooms the user is currently invited
to. If unset, both invited and joined rooms are returned. If `False`, no
invited rooms are returned. If `True`, only invited rooms are returned.
room_types: If specified, only rooms where the `m.room.create` event has
a `type` matching one of the strings in this array will be returned. If
this field is unset, all rooms are returned regardless of type. This can
be used to get the initial set of spaces for an account. For rooms which
do not have a room type, use `null`/`None` to include them.
not_room_types: Same as `room_types` but inverted. This can be used to
filter out spaces from the room list. If a type is in both `room_types`
and `not_room_types`, then `not_room_types` wins and they are not included
in the result.
room_name_like: Filter the room name. Case-insensitive partial matching
e.g 'foo' matches 'abFooab'. The term 'like' is inspired by SQL 'LIKE',
and the text here is similar to '%foo%'.
tags: Filter the room based on its room tags. If multiple tags are
present, a room can have any one of the listed tags (OR'd).
not_tags: Filter the room based on its room tags. Takes priority over
`tags`. For example, a room with tags A and B with filters `tags: [A]`
`not_tags: [B]` would NOT be included because `not_tags` takes priority over
`tags`. This filter is useful if your rooms list does NOT include the
list of favourite rooms again.
"""
is_dm: Optional[StrictBool] = None
spaces: Optional[List[StrictStr]] = None
is_encrypted: Optional[StrictBool] = None

View file

@ -82,7 +82,6 @@ async def filter_events_for_client(
is_peeking: bool = False,
always_include_ids: FrozenSet[str] = frozenset(),
filter_send_to_client: bool = True,
msc4115_membership_on_events: bool = False,
) -> List[EventBase]:
"""
Check which events a user is allowed to see. If the user can see the event but its
@ -101,12 +100,10 @@ async def filter_events_for_client(
filter_send_to_client: Whether we're checking an event that's going to be
sent to a client. This might not always be the case since this function can
also be called to check whether a user can see the state at a given point.
msc4115_membership_on_events: Whether to include the requesting user's
membership in the "unsigned" data, per MSC4115.
Returns:
The filtered events. If `msc4115_membership_on_events` is true, the `unsigned`
data is annotated with the membership state of `user_id` at each event.
The filtered events. The `unsigned` data is annotated with the membership state
of `user_id` at each event.
"""
# Filter out events that have been soft failed so that we don't relay them
# to clients.
@ -159,9 +156,6 @@ async def filter_events_for_client(
if filtered is None:
return None
if not msc4115_membership_on_events:
return filtered
# Annotate the event with the user's membership after the event.
#
# Normally we just look in `state_after_event`, but if the event is an outlier
@ -186,7 +180,7 @@ async def filter_events_for_client(
# Copy the event before updating the unsigned data: this shouldn't be persisted
# to the cache!
cloned = clone_event(filtered)
cloned.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP] = user_membership
cloned.unsigned[EventUnsignedContentFields.MEMBERSHIP] = user_membership
return cloned

View file

@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
)
original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
original.internal_metadata.instance_name = "worker1"
self.assertEqual(original.internal_metadata.instance_name, "worker1")
cloned = clone_event(original)
cloned.unsigned["b"] = 3
@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn")

View file

@ -22,8 +22,9 @@ from unittest.mock import patch
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
from synapse.api.room_versions import RoomVersions
from synapse.handlers.sliding_sync import SlidingSyncConfig
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
@ -326,7 +327,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Leave during the from_token/to_token range (newly_left)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.helper.leave(room_id1, user1_id, tok=user1_tok)
self.helper.leave(room_id2, user1_id, tok=user1_tok)
after_room2_token = self.event_sources.get_current_token()
@ -1116,3 +1117,130 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
room_id3,
},
)
class FilterRoomsTestCase(HomeserverTestCase):
"""
Tests Sliding Sync handler `filter_rooms()` to make sure it includes/excludes rooms
correctly.
"""
servlets = [
admin.register_servlets,
knock.register_servlets,
login.register_servlets,
room.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
# Enable sliding sync
config["experimental_features"] = {"msc3575_enabled": True}
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
self.store = self.hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _create_dm_room(
self,
inviter_user_id: str,
inviter_tok: str,
invitee_user_id: str,
invitee_tok: str,
) -> str:
"""
Helper to create a DM room as the "inviter" and invite the "invitee" user to the room. The
"invitee" user also will join the room. The `m.direct` account data will be set
for both users.
"""
# Create a room and send an invite the other user
room_id = self.helper.create_room_as(
inviter_user_id,
is_public=False,
tok=inviter_tok,
)
self.helper.invite(
room_id,
src=inviter_user_id,
targ=invitee_user_id,
tok=inviter_tok,
extra_data={"is_direct": True},
)
# Person that was invited joins the room
self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
# Mimic the client setting the room as a direct message in the global account
# data
self.get_success(
self.store.add_account_data_for_user(
invitee_user_id,
AccountDataTypes.DIRECT,
{inviter_user_id: [room_id]},
)
)
self.get_success(
self.store.add_account_data_for_user(
inviter_user_id,
AccountDataTypes.DIRECT,
{invitee_user_id: [room_id]},
)
)
return room_id
def test_filter_dm_rooms(self) -> None:
"""
Test `filter.is_dm` for DM rooms
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# Create a normal room
room_id = self.helper.create_room_as(
user1_id,
is_public=False,
tok=user1_tok,
)
# Create a DM room
dm_room_id = self._create_dm_room(
inviter_user_id=user1_id,
inviter_tok=user1_tok,
invitee_user_id=user2_id,
invitee_tok=user2_tok,
)
after_rooms_token = self.event_sources.get_current_token()
# Try with `is_dm=True`
truthy_filtered_room_ids = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, dm_room_id},
SlidingSyncConfig.SlidingSyncList.Filters(
is_dm=True,
),
after_rooms_token,
)
)
self.assertEqual(truthy_filtered_room_ids, {dm_room_id})
# Try with `is_dm=False`
falsy_filtered_room_ids = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, dm_room_id},
SlidingSyncConfig.SlidingSyncList.Filters(
is_dm=False,
),
after_rooms_token,
)
)
self.assertEqual(falsy_filtered_room_ids, {room_id})

View file

@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None
self.replicate()
@ -155,7 +156,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
"invite",
event.event_id,
PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
@ -232,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
@ -288,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
event_source = RoomEventSource(self.hs)
@ -329,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name,
j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,

View file

@ -167,7 +167,6 @@ class RetentionTestCase(unittest.HomeserverTestCase):
storage_controllers,
self.user_id,
events,
msc4115_membership_on_events=True,
)
)

View file

@ -27,6 +27,7 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import (
AccountDataTypes,
EventContentFields,
EventTypes,
ReceiptTypes,
@ -1226,10 +1227,59 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync"
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _create_dm_room(
self,
inviter_user_id: str,
inviter_tok: str,
invitee_user_id: str,
invitee_tok: str,
) -> str:
"""
Helper to create a DM room as the "inviter" and invite the "invitee" user to the
room. The "invitee" user also will join the room. The `m.direct` account data
will be set for both users.
"""
# Create a room and send an invite the other user
room_id = self.helper.create_room_as(
inviter_user_id,
is_public=False,
tok=inviter_tok,
)
self.helper.invite(
room_id,
src=inviter_user_id,
targ=invitee_user_id,
tok=inviter_tok,
extra_data={"is_direct": True},
)
# Person that was invited joins the room
self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
# Mimic the client setting the room as a direct message in the global account
# data
self.get_success(
self.store.add_account_data_for_user(
invitee_user_id,
AccountDataTypes.DIRECT,
{inviter_user_id: [room_id]},
)
)
self.get_success(
self.store.add_account_data_for_user(
inviter_user_id,
AccountDataTypes.DIRECT,
{invitee_user_id: [room_id]},
)
)
return room_id
def test_sync_list(self) -> None:
"""
Test that room IDs show up in the Sliding Sync lists
@ -1336,3 +1386,80 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.assertEqual(
channel.json_body["next_pos"], future_position_token_serialized
)
def test_filter_list(self) -> None:
"""
Test that filters apply to lists
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# Create a DM room
dm_room_id = self._create_dm_room(
inviter_user_id=user1_id,
inviter_tok=user1_tok,
invitee_user_id=user2_id,
invitee_tok=user2_tok,
)
# Create a normal room
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"dms": {
"ranges": [[0, 99]],
"sort": ["by_recency"],
"required_state": [],
"timeline_limit": 1,
"filters": {"is_dm": True},
},
"foo-list": {
"ranges": [[0, 99]],
"sort": ["by_recency"],
"required_state": [],
"timeline_limit": 1,
"filters": {"is_dm": False},
},
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["dms", "foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list includes the room we are joined to
self.assertListEqual(
list(channel.json_body["lists"]["dms"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 99],
"room_ids": [dm_room_id],
}
],
list(channel.json_body["lists"]["dms"]),
)
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 99],
"room_ids": [room_id],
}
],
list(channel.json_body["lists"]["foo-list"]),
)

View file

@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering
e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1
def _persist(txn: LoggingTransaction) -> None:

View file

@ -19,7 +19,10 @@
#
#
from typing import List
import logging
from typing import List, Tuple
from immutabledict import immutabledict
from twisted.test.proto_helpers import MemoryReactor
@ -28,11 +31,13 @@ from synapse.api.filtering import Filter
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class PaginationTestCase(HomeserverTestCase):
"""
@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase):
}
chunk = self._filter_messages(filter)
self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
"""
Test `get_last_event_in_room_before_stream_ordering(...)`
"""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _update_persisted_instance_name_for_event(
self, event_id: str, instance_name: str
) -> None:
"""
Update the `instance_name` that persisted the the event in the database.
"""
return self.get_success(
self.store.db_pool.simple_update_one(
"events",
keyvalues={"event_id": event_id},
updatevalues={"instance_name": instance_name},
)
)
def _send_event_on_instance(
self, instance_name: str, room_id: str, access_token: str
) -> Tuple[JsonDict, PersistedEventPosition]:
"""
Send an event in a room and mimic that it was persisted by a specific
instance/worker.
"""
event_response = self.helper.send(
room_id, f"{instance_name} message", tok=access_token
)
self._update_persisted_instance_name_for_event(
event_response["event_id"], instance_name
)
event_pos = self.get_success(
self.store.get_position_for_event(event_response["event_id"])
)
return event_response, event_pos
def test_before_room_created(self) -> None:
"""
Test that no event is returned if we are using a token before the room was even created
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
before_room_token = self.event_sources.get_current_token()
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id,
end_token=before_room_token.room_key,
)
)
self.assertIsNone(last_event)
def test_after_room_created(self) -> None:
"""
Test that an event is returned if we are using a token after the room was created
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id,
end_token=after_room_token.room_key,
)
)
self.assertIsNotNone(last_event)
def test_activity_in_other_rooms(self) -> None:
"""
Test to make sure that the last event in the room is returned even if the
`stream_ordering` has advanced from activity in other rooms.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
# Create another room to advance the stream_ordering
self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
# Make sure it's the event we expect (which also means we know it's from the
# correct room)
self.assertEqual(last_event, event_response["event_id"])
def test_activity_after_token_has_no_effect(self) -> None:
"""
Test to make sure we return the last event before the token even if there is
activity after it.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
# Make sure it's the last event before the token
self.assertEqual(last_event, event_response["event_id"])
def test_last_event_within_sharded_token(self) -> None:
"""
Test to make sure we can find the last event that that is *within* the sharded
token (a token that has an `instance_map` and looks like
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
that we can find an event within the tokens minimum and instance
`stream_ordering`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response1, event_pos1 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response2, event_pos2 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response3, event_pos3 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
# Create another room to advance the `stream_ordering` on the same worker
# so we can sandwich event3 in the middle of the token
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response4, event_pos4 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
# Assemble a token that encompasses event1 -> event4 on worker1
end_token = RoomStreamToken(
stream=event_pos2.stream,
instance_map=immutabledict({"worker1": event_pos4.stream}),
)
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
# Should find closest event at/before the token in room1
self.assertEqual(
last_event,
event_response3["event_id"],
f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
"event2": event_response2["event_id"],
"event3": event_response3["event_id"],
}
),
)
def test_last_event_before_sharded_token(self) -> None:
"""
Test to make sure we can find the last event that is *before* the sharded token
(a token that has an `instance_map` and looks like
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response1, event_pos1 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response2, event_pos2 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
# Create another room to advance the `stream_ordering` on the same worker
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response3, event_pos3 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
event_response4, event_pos4 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
# Assemble a token that encompasses event3 -> event4 on worker1
end_token = RoomStreamToken(
stream=event_pos3.stream,
instance_map=immutabledict({"worker1": event_pos4.stream}),
)
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
# Should find closest event at/before the token in room1
self.assertEqual(
last_event,
event_response2["event_id"],
f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
"event2": event_response2["event_id"],
}
),
)

View file

@ -336,7 +336,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.hs.get_storage_controllers(),
"@joiner:test",
events_to_filter,
msc4115_membership_on_events=True,
)
)
resident_filtered_events = self.get_success(
@ -344,7 +343,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.hs.get_storage_controllers(),
"@resident:test",
events_to_filter,
msc4115_membership_on_events=True,
)
)
@ -357,7 +355,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.assertEqual(
["join", "join", "leave"],
[
e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
for e in joiner_filtered_events
],
)
@ -379,7 +377,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
self.assertEqual(
["join", "join", "join", "join", "join"],
[
e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
for e in resident_filtered_events
],
)
@ -441,7 +439,6 @@ class FilterEventsOutOfBandEventsForClientTestCase(
self.hs.get_storage_controllers(),
"@user:test",
[invite_event, reject_event],
msc4115_membership_on_events=True,
)
)
self.assertEqual(
@ -451,7 +448,7 @@ class FilterEventsOutOfBandEventsForClientTestCase(
self.assertEqual(
["invite", "leave"],
[
e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
for e in filtered_events
],
)
@ -463,7 +460,6 @@ class FilterEventsOutOfBandEventsForClientTestCase(
self.hs.get_storage_controllers(),
"@other:test",
[invite_event, reject_event],
msc4115_membership_on_events=True,
)
),
[],