Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter-dms

This commit is contained in:
Eric Eastwood 2024-06-13 12:44:28 -05:00
commit eaaf4089ec
22 changed files with 427 additions and 75 deletions

1
changelog.d/17295.bugfix Normal file
View file

@ -0,0 +1 @@
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.

View file

@ -0,0 +1 @@
Add support for the unstable [MSC4151](https://github.com/matrix-org/matrix-spec-proposals/pull/4151) report room API.

1
changelog.d/17297.misc Normal file
View file

@ -0,0 +1 @@
Bump `mypy` from 1.8.0 to 1.9.0.

1
changelog.d/17300.misc Normal file
View file

@ -0,0 +1 @@
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.

1
changelog.d/17301.bugfix Normal file
View file

@ -0,0 +1 @@
Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

56
poetry.lock generated
View file

@ -1384,38 +1384,38 @@ files = [
[[package]]
name = "mypy"
version = "1.8.0"
version = "1.9.0"
description = "Optional static typing for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "mypy-1.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:485a8942f671120f76afffff70f259e1cd0f0cfe08f81c05d8816d958d4577d3"},
{file = "mypy-1.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:df9824ac11deaf007443e7ed2a4a26bebff98d2bc43c6da21b2b64185da011c4"},
{file = "mypy-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2afecd6354bbfb6e0160f4e4ad9ba6e4e003b767dd80d85516e71f2e955ab50d"},
{file = "mypy-1.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8963b83d53ee733a6e4196954502b33567ad07dfd74851f32be18eb932fb1cb9"},
{file = "mypy-1.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:e46f44b54ebddbeedbd3d5b289a893219065ef805d95094d16a0af6630f5d410"},
{file = "mypy-1.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:855fe27b80375e5c5878492f0729540db47b186509c98dae341254c8f45f42ae"},
{file = "mypy-1.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4c886c6cce2d070bd7df4ec4a05a13ee20c0aa60cb587e8d1265b6c03cf91da3"},
{file = "mypy-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d19c413b3c07cbecf1f991e2221746b0d2a9410b59cb3f4fb9557f0365a1a817"},
{file = "mypy-1.8.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9261ed810972061388918c83c3f5cd46079d875026ba97380f3e3978a72f503d"},
{file = "mypy-1.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:51720c776d148bad2372ca21ca29256ed483aa9a4cdefefcef49006dff2a6835"},
{file = "mypy-1.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:52825b01f5c4c1c4eb0db253ec09c7aa17e1a7304d247c48b6f3599ef40db8bd"},
{file = "mypy-1.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f5ac9a4eeb1ec0f1ccdc6f326bcdb464de5f80eb07fb38b5ddd7b0de6bc61e55"},
{file = "mypy-1.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afe3fe972c645b4632c563d3f3eff1cdca2fa058f730df2b93a35e3b0c538218"},
{file = "mypy-1.8.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:42c6680d256ab35637ef88891c6bd02514ccb7e1122133ac96055ff458f93fc3"},
{file = "mypy-1.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:720a5ca70e136b675af3af63db533c1c8c9181314d207568bbe79051f122669e"},
{file = "mypy-1.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:028cf9f2cae89e202d7b6593cd98db6759379f17a319b5faf4f9978d7084cdc6"},
{file = "mypy-1.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4e6d97288757e1ddba10dd9549ac27982e3e74a49d8d0179fc14d4365c7add66"},
{file = "mypy-1.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f1478736fcebb90f97e40aff11a5f253af890c845ee0c850fe80aa060a267c6"},
{file = "mypy-1.8.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:42419861b43e6962a649068a61f4a4839205a3ef525b858377a960b9e2de6e0d"},
{file = "mypy-1.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:2b5b6c721bd4aabaadead3a5e6fa85c11c6c795e0c81a7215776ef8afc66de02"},
{file = "mypy-1.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5c1538c38584029352878a0466f03a8ee7547d7bd9f641f57a0f3017a7c905b8"},
{file = "mypy-1.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4ef4be7baf08a203170f29e89d79064463b7fc7a0908b9d0d5114e8009c3a259"},
{file = "mypy-1.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7178def594014aa6c35a8ff411cf37d682f428b3b5617ca79029d8ae72f5402b"},
{file = "mypy-1.8.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:ab3c84fa13c04aeeeabb2a7f67a25ef5d77ac9d6486ff33ded762ef353aa5592"},
{file = "mypy-1.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:99b00bc72855812a60d253420d8a2eae839b0afa4938f09f4d2aa9bb4654263a"},
{file = "mypy-1.8.0-py3-none-any.whl", hash = "sha256:538fd81bb5e430cc1381a443971c0475582ff9f434c16cd46d2c66763ce85d9d"},
{file = "mypy-1.8.0.tar.gz", hash = "sha256:6ff8b244d7085a0b425b56d327b480c3b29cafbd2eff27316a004f9a7391ae07"},
{file = "mypy-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f8a67616990062232ee4c3952f41c779afac41405806042a8126fe96e098419f"},
{file = "mypy-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d357423fa57a489e8c47b7c85dfb96698caba13d66e086b412298a1a0ea3b0ed"},
{file = "mypy-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49c87c15aed320de9b438ae7b00c1ac91cd393c1b854c2ce538e2a72d55df150"},
{file = "mypy-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:48533cdd345c3c2e5ef48ba3b0d3880b257b423e7995dada04248725c6f77374"},
{file = "mypy-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:4d3dbd346cfec7cb98e6cbb6e0f3c23618af826316188d587d1c1bc34f0ede03"},
{file = "mypy-1.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:653265f9a2784db65bfca694d1edd23093ce49740b2244cde583aeb134c008f3"},
{file = "mypy-1.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a3c007ff3ee90f69cf0a15cbcdf0995749569b86b6d2f327af01fd1b8aee9dc"},
{file = "mypy-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2418488264eb41f69cc64a69a745fad4a8f86649af4b1041a4c64ee61fc61129"},
{file = "mypy-1.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:68edad3dc7d70f2f17ae4c6c1b9471a56138ca22722487eebacfd1eb5321d612"},
{file = "mypy-1.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:85ca5fcc24f0b4aeedc1d02f93707bccc04733f21d41c88334c5482219b1ccb3"},
{file = "mypy-1.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aceb1db093b04db5cd390821464504111b8ec3e351eb85afd1433490163d60cd"},
{file = "mypy-1.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0235391f1c6f6ce487b23b9dbd1327b4ec33bb93934aa986efe8a9563d9349e6"},
{file = "mypy-1.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d4d5ddc13421ba3e2e082a6c2d74c2ddb3979c39b582dacd53dd5d9431237185"},
{file = "mypy-1.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:190da1ee69b427d7efa8aa0d5e5ccd67a4fb04038c380237a0d96829cb157913"},
{file = "mypy-1.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe28657de3bfec596bbeef01cb219833ad9d38dd5393fc649f4b366840baefe6"},
{file = "mypy-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e54396d70be04b34f31d2edf3362c1edd023246c82f1730bbf8768c28db5361b"},
{file = "mypy-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5e6061f44f2313b94f920e91b204ec600982961e07a17e0f6cd83371cb23f5c2"},
{file = "mypy-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81a10926e5473c5fc3da8abb04119a1f5811a236dc3a38d92015cb1e6ba4cb9e"},
{file = "mypy-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b685154e22e4e9199fc95f298661deea28aaede5ae16ccc8cbb1045e716b3e04"},
{file = "mypy-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:5d741d3fc7c4da608764073089e5f58ef6352bedc223ff58f2f038c2c4698a89"},
{file = "mypy-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:587ce887f75dd9700252a3abbc9c97bbe165a4a630597845c61279cf32dfbf02"},
{file = "mypy-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f88566144752999351725ac623471661c9d1cd8caa0134ff98cceeea181789f4"},
{file = "mypy-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:61758fabd58ce4b0720ae1e2fea5cfd4431591d6d590b197775329264f86311d"},
{file = "mypy-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e49499be624dead83927e70c756970a0bc8240e9f769389cdf5714b0784ca6bf"},
{file = "mypy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:571741dc4194b4f82d344b15e8837e8c5fcc462d66d076748142327626a1b6e9"},
{file = "mypy-1.9.0-py3-none-any.whl", hash = "sha256:a260627a570559181a9ea5de61ac6297aa5af202f06fd7ab093ce74e7181e43e"},
{file = "mypy-1.9.0.tar.gz", hash = "sha256:3cc5da0127e6a478cddd906068496a97a7618a21ce9b54bde5bf7e539c7af974"},
]
[package.dependencies]

View file

@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,
/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
outlier: false,
})
}

View file

@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
# Mark the event as redacted
@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier
return new_event

View file

@ -1551,6 +1551,7 @@ class EventCreationHandler:
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance
return event

View file

@ -300,12 +300,6 @@ class SlidingSyncHandler:
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we can just straight-up return the room list (nothing has
# changed)
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
return sync_room_id_set
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
@ -325,14 +319,20 @@ class SlidingSyncHandler:
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
membership_change_events_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
)
)
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only

View file

@ -107,7 +107,15 @@ class ReportEventRestServlet(RestServlet):
class ReportRoomRestServlet(RestServlet):
# https://github.com/matrix-org/matrix-spec-proposals/pull/4151
"""This endpoint lets clients report a room for abuse.
Whilst MSC4151 is not yet merged, this unstable endpoint is enabled on matrix.org
for content moderation purposes, and therefore backwards compatibility should be
carefully considered when changing anything on this endpoint.
More details on the MSC: https://github.com/matrix-org/matrix-spec-proposals/pull/4151
"""
PATTERNS = client_patterns(
"/org.matrix.msc4151/rooms/(?P<room_id>[^/]*)/report$",
releases=[],

View file

@ -207,6 +207,7 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name
await self.db_pool.runInteraction(
"persist_events",

View file

@ -156,6 +156,7 @@ class _EventRow:
event_id: str
stream_ordering: int
instance_name: str
json: str
internal_metadata: str
format_version: Optional[int]
@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier
# Consistency check: if the content of the event has been modified in the
@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore):
SELECT
e.event_id,
e.stream_ordering,
e.instance_name,
ej.internal_metadata,
ej.json,
ej.format_version,
@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
internal_metadata=row[2],
json=row[3],
format_version=row[4],
room_version_id=row[5],
rejected_reason=row[6],
instance_name=row[2],
internal_metadata=row[3],
json=row[4],
format_version=row[5],
room_version_id=row[6],
rejected_reason=row[7],
redactions=[],
outlier=bool(row[7]), # This is an int in SQLite3
outlier=bool(row[8]), # This is an int in SQLite3
)
# check for redactions

View file

@ -914,12 +914,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_last_event_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
# We need to handle the fact that the stream tokens can be vector
# clocks. We do this by getting all rows between the minimum and
# maximum stream ordering in the token, plus one row less than the
# minimum stream ordering. We then filter the results against the
# token and return the first row that matches.
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
# (sharded event persisters). The first subquery handles the events that
# would be within the vector clock and gets all rows between the minimum and
# maximum stream ordering in the token which need to be filtered against the
# `instance_map`. The second subquery handles the "before" case and finds
# the first row before the token. We then filter out any results past the
# token's vector clock and return the first row that matches.
min_stream = end_token.stream
max_stream = end_token.get_max_stream_pos()
# We use `union all` because we don't need any of the deduplication logic
# (`union` is really a union + distinct). `UNION ALL` does preserve the
# ordering of the operand queries but there is no actual gurantee that it
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
# bottom.
sql = """
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
@ -931,7 +942,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
) AS a
UNION
UNION ALL
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
FROM events
@ -943,15 +954,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ORDER BY stream_ordering DESC
LIMIT 1
) AS b
ORDER BY stream_ordering DESC
"""
txn.execute(
sql,
(
room_id,
end_token.stream,
end_token.get_max_stream_pos(),
min_stream,
max_stream,
room_id,
end_token.stream,
min_stream,
),
)

View file

@ -19,6 +19,8 @@ class EventInternalMetadata:
stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted."""
instance_name: Optional[str]
"""the instance name of the server that persisted this event. None, until it has been persisted."""
outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that

View file

@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
)
original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
original.internal_metadata.instance_name = "worker1"
self.assertEqual(original.internal_metadata.instance_name, "worker1")
cloned = clone_event(original)
cloned.unsigned["b"] = 3
@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn")

View file

@ -327,7 +327,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Leave during the from_token/to_token range (newly_left)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.helper.leave(room_id1, user1_id, tok=user1_tok)
self.helper.leave(room_id2, user1_id, tok=user1_tok)
after_room2_token = self.event_sources.get_current_token()

View file

@ -205,8 +205,24 @@ class EmailPusherTests(HomeserverTestCase):
# Multipart: plain text, base 64 encoded; html, base 64 encoded
multipart_msg = email.message_from_bytes(msg)
txt = multipart_msg.get_payload()[0].get_payload(decode=True).decode()
html = multipart_msg.get_payload()[1].get_payload(decode=True).decode()
# Extract the text (non-HTML) portion of the multipart Message,
# as a Message.
txt_message = multipart_msg.get_payload(i=0)
assert isinstance(txt_message, email.message.Message)
# Extract the actual bytes from the Message object, and decode them to a `str`.
txt_bytes = txt_message.get_payload(decode=True)
assert isinstance(txt_bytes, bytes)
txt = txt_bytes.decode()
# Do the same for the HTML portion of the multipart Message.
html_message = multipart_msg.get_payload(i=1)
assert isinstance(html_message, email.message.Message)
html_bytes = html_message.get_payload(decode=True)
assert isinstance(html_bytes, bytes)
html = html_bytes.decode()
self.assertIn("/_synapse/client/unsubscribe", txt)
self.assertIn("/_synapse/client/unsubscribe", html)
@ -347,12 +363,17 @@ class EmailPusherTests(HomeserverTestCase):
# That email should contain the room's avatar
msg: bytes = args[5]
# Multipart: plain text, base 64 encoded; html, base 64 encoded
html = (
email.message_from_bytes(msg)
.get_payload()[1]
.get_payload(decode=True)
.decode()
)
# Extract the html Message object from the Multipart Message.
# We need the asserts to convince mypy that this is OK.
html_message = email.message_from_bytes(msg).get_payload(i=1)
assert isinstance(html_message, email.message.Message)
# Extract the `bytes` from the html Message object, and decode to a `str`.
html = html_message.get_payload(decode=True)
assert isinstance(html, bytes)
html = html.decode()
self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html)
def test_empty_room(self) -> None:

View file

@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None
self.replicate()
@ -155,7 +156,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
"invite",
event.event_id,
PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
@ -232,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
@ -288,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
event_source = RoomEventSource(self.hs)
@ -329,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name,
j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,

View file

@ -427,13 +427,23 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
text = None
for part in mail.walk():
if part.get_content_type() == "text/plain":
text = part.get_payload(decode=True).decode("UTF-8")
text = part.get_payload(decode=True)
if text is not None:
# According to the logic table in `get_payload`, we know that
# the result of `get_payload` will be `bytes`, but mypy doesn't
# know this and complains. Thus, we assert the type.
assert isinstance(text, bytes)
text = text.decode("UTF-8")
break
if not text:
self.fail("Could not find text portion of email to parse")
assert text is not None
# `text` must be a `str`, after being decoded and determined just above
# to not be `None` or an empty `str`.
assert isinstance(text, str)
match = re.search(r"https://example.com\S+", text)
assert match, "Could not find link in email"
@ -1209,13 +1219,23 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
text = None
for part in mail.walk():
if part.get_content_type() == "text/plain":
text = part.get_payload(decode=True).decode("UTF-8")
text = part.get_payload(decode=True)
if text is not None:
# According to the logic table in `get_payload`, we know that
# the result of `get_payload` will be `bytes`, but mypy doesn't
# know this and complains. Thus, we assert the type.
assert isinstance(text, bytes)
text = text.decode("UTF-8")
break
if not text:
self.fail("Could not find text portion of email to parse")
assert text is not None
# `text` must be a `str`, after being decoded and determined just above
# to not be `None` or an empty `str`.
assert isinstance(text, str)
match = re.search(r"https://example.com\S+", text)
assert match, "Could not find link in email"

View file

@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering
e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1
def _persist(txn: LoggingTransaction) -> None:

View file

@ -19,7 +19,10 @@
#
#
from typing import List
import logging
from typing import List, Tuple
from immutabledict import immutabledict
from twisted.test.proto_helpers import MemoryReactor
@ -28,11 +31,13 @@ from synapse.api.filtering import Filter
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class PaginationTestCase(HomeserverTestCase):
"""
@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase):
}
chunk = self._filter_messages(filter)
self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
"""
Test `get_last_event_in_room_before_stream_ordering(...)`
"""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def _update_persisted_instance_name_for_event(
self, event_id: str, instance_name: str
) -> None:
"""
Update the `instance_name` that persisted the the event in the database.
"""
return self.get_success(
self.store.db_pool.simple_update_one(
"events",
keyvalues={"event_id": event_id},
updatevalues={"instance_name": instance_name},
)
)
def _send_event_on_instance(
self, instance_name: str, room_id: str, access_token: str
) -> Tuple[JsonDict, PersistedEventPosition]:
"""
Send an event in a room and mimic that it was persisted by a specific
instance/worker.
"""
event_response = self.helper.send(
room_id, f"{instance_name} message", tok=access_token
)
self._update_persisted_instance_name_for_event(
event_response["event_id"], instance_name
)
event_pos = self.get_success(
self.store.get_position_for_event(event_response["event_id"])
)
return event_response, event_pos
def test_before_room_created(self) -> None:
"""
Test that no event is returned if we are using a token before the room was even created
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
before_room_token = self.event_sources.get_current_token()
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id,
end_token=before_room_token.room_key,
)
)
self.assertIsNone(last_event)
def test_after_room_created(self) -> None:
"""
Test that an event is returned if we are using a token after the room was created
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id,
end_token=after_room_token.room_key,
)
)
self.assertIsNotNone(last_event)
def test_activity_in_other_rooms(self) -> None:
"""
Test to make sure that the last event in the room is returned even if the
`stream_ordering` has advanced from activity in other rooms.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
# Create another room to advance the stream_ordering
self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
# Make sure it's the event we expect (which also means we know it's from the
# correct room)
self.assertEqual(last_event, event_response["event_id"])
def test_activity_after_token_has_no_effect(self) -> None:
"""
Test to make sure we return the last event before the token even if there is
activity after it.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
# Make sure it's the last event before the token
self.assertEqual(last_event, event_response["event_id"])
def test_last_event_within_sharded_token(self) -> None:
"""
Test to make sure we can find the last event that that is *within* the sharded
token (a token that has an `instance_map` and looks like
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
that we can find an event within the tokens minimum and instance
`stream_ordering`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response1, event_pos1 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response2, event_pos2 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response3, event_pos3 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
# Create another room to advance the `stream_ordering` on the same worker
# so we can sandwich event3 in the middle of the token
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response4, event_pos4 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
# Assemble a token that encompasses event1 -> event4 on worker1
end_token = RoomStreamToken(
stream=event_pos2.stream,
instance_map=immutabledict({"worker1": event_pos4.stream}),
)
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
# Should find closest event at/before the token in room1
self.assertEqual(
last_event,
event_response3["event_id"],
f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
"event2": event_response2["event_id"],
"event3": event_response3["event_id"],
}
),
)
def test_last_event_before_sharded_token(self) -> None:
"""
Test to make sure we can find the last event that is *before* the sharded token
(a token that has an `instance_map` and looks like
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response1, event_pos1 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
event_response2, event_pos2 = self._send_event_on_instance(
"worker1", room_id1, user1_tok
)
# Create another room to advance the `stream_ordering` on the same worker
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
event_response3, event_pos3 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
event_response4, event_pos4 = self._send_event_on_instance(
"worker1", room_id2, user1_tok
)
# Assemble a token that encompasses event3 -> event4 on worker1
end_token = RoomStreamToken(
stream=event_pos3.stream,
instance_map=immutabledict({"worker1": event_pos4.stream}),
)
# Send some events after the token
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
# Should find closest event at/before the token in room1
self.assertEqual(
last_event,
event_response2["event_id"],
f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
"event2": event_response2["event_id"],
}
),
)