synapse/tests/storage/test_events.py
Eric Eastwood 1a6b718f8c
Sliding Sync: Pre-populate room data for quick filtering/sorting (#17512)
Pre-populate room data for quick filtering/sorting in the Sliding Sync
API

Spawning from
https://github.com/element-hq/synapse/pull/17450#discussion_r1697335578

This PR is acting as the Synapse version `N+1` step in the gradual
migration being tracked by
https://github.com/element-hq/synapse/issues/17623

Adding two new database tables:

- `sliding_sync_joined_rooms`: A table for storing room meta data that
the local server is still participating in. The info here can be shared
across all `Membership.JOIN`. Keyed on `(room_id)` and updated when the
relevant room current state changes or a new event is sent in the room.
- `sliding_sync_membership_snapshots`: A table for storing a snapshot of
room meta data at the time of the local user's membership. Keyed on
`(room_id, user_id)` and only updated when a user's membership in a room
changes.

Also adds background updates to populate these tables with all of the
existing data.


We want to have the guarantee that if a row exists in the sliding sync
tables, we are able to rely on it (accurate data). And if a row doesn't
exist, we use a fallback to get the same info until the background
updates fill in the rows or a new event comes in triggering it to be
fully inserted. This means we need a couple extra things in place until
we bump `SCHEMA_COMPAT_VERSION` and run the foreground update in the
`N+2` part of the gradual migration. For context on why we can't rely on
the tables without these things see [1].

1. On start-up, block until we clear out any rows for the rooms that
have had events since the max-`stream_ordering` of the
`sliding_sync_joined_rooms` table (compare to max-`stream_ordering` of
the `events` table). For `sliding_sync_membership_snapshots`, we can
compare to the max-`stream_ordering` of `local_current_membership`
- This accounts for when someone downgrades their Synapse version and
then upgrades it again. This will ensure that we don't have any
stale/out-of-date data in the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables
since any new events sent in rooms would have also needed to be written
to the sliding sync tables. For example a new event needs to bump
`event_stream_ordering` in `sliding_sync_joined_rooms` table or some
state in the room changing (like the room name). Or another example of
someone's membership changing in a room affecting
`sliding_sync_membership_snapshots`.
1. Add another background update that will catch-up with any rows that
were just deleted from the sliding sync tables (based on the activity in
the `events`/`local_current_membership`). The rooms that need
recalculating are added to the
`sliding_sync_joined_rooms_to_recalculate` table.
1. Making sure rows are fully inserted. Instead of partially inserting,
we need to check if the row already exists and fully insert all data if
not.

All of this extra functionality can be removed once the
`SCHEMA_COMPAT_VERSION` is bumped with support for the new sliding sync
tables so people can no longer downgrade (the `N+2` part of the gradual
migration).


<details>
<summary><sup>[1]</sup></summary>

For `sliding_sync_joined_rooms`, since we partially insert rows as state
comes in, we can't rely on the existence of the row for a given
`room_id`. We can't even rely on looking at whether the background
update has finished. There could still be partial rows from when someone
reverted their Synapse version after the background update finished, had
some state changes (or new rooms), then upgraded again and more state
changes happen leaving a partial row.

For `sliding_sync_membership_snapshots`, we insert items as a whole
except for the `forgotten` column ~~so we can rely on rows existing and
just need to always use a fallback for the `forgotten` data. We can't
use the `forgotten` column in the table for the same reasons above about
`sliding_sync_joined_rooms`.~~ We could have an out-of-date membership
from when someone reverted their Synapse version. (same problems as
outlined for `sliding_sync_joined_rooms` above)

Discussed in an [internal
meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.dz5x6ef4mxz7)

</details>


### TODO

 - [x] Update `stream_ordering`/`bump_stamp`
 - [x] Handle remote invites
 - [x] Handle state resets
- [x] Consider adding `sender` so we can filter `LEAVE` memberships and
distinguish from kicks.
     - [x] We should add it to be able to tell leaves from kicks 
- [x] Consider adding `tombstone` state to help address
https://github.com/element-hq/synapse/issues/17540
     - [x] We should add it `tombstone_successor_room_id`
- [x] Consider adding `forgotten` status to avoid extra
lookup/table-join on `room_memberships`
    - [x] We should add it
- [x] Background update to fill in values for all joined rooms and
non-join membership
 - [x] Clean-up tables when room is deleted
 - [ ] Make sure tables are useful to our use case
- First explored in
https://github.com/element-hq/synapse/compare/erikj/ss_use_new_tables
- Also explored in
76b5a576eb
 - [x] Plan for how can we use this with a fallback
     - See plan discussed above in main area of the issue description
- Discussed in an [internal
meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.dz5x6ef4mxz7)
 - [x] Plan for how we can rely on this new table without a fallback
- Synapse version `N+1`: (this PR) Bump `SCHEMA_VERSION` to `87`. Add
new tables and background update to backfill all rows. Since this is a
new table, we don't have to add any `NOT VALID` constraints and validate
them when the background update completes. Read from new tables with a
fallback in cases where the rows aren't filled in yet.
- Synapse version `N+2`: Bump `SCHEMA_VERSION` to `88` and bump
`SCHEMA_COMPAT_VERSION` to `87` because we don't want people to
downgrade and miss writes while they are on an older version. Add a
foreground update to finish off the backfill so we can read from new
tables without the fallback. Application code can now rely on the new
tables being populated.
- Discussed in an [internal
meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.hh7shg4cxdhj)




### Dev notes

```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.test_events.SlidingSyncPrePopulatedTablesTestCase

SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.test_events.SlidingSyncPrePopulatedTablesTestCase
```

```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.FilterRoomsTestCase
```

Reference:

- [Development docs on background updates and worked examples of gradual
migrations

](1dfa59b238/docs/development/database_schema.md (background-updates))
- A real example of a gradual migration:
https://github.com/matrix-org/synapse/pull/15649#discussion_r1213779514
- Adding `rooms.creator` field that needed a background update to
backfill data, https://github.com/matrix-org/synapse/pull/10697
- Adding `rooms.room_version` that needed a background update to
backfill data, https://github.com/matrix-org/synapse/pull/6729
- Adding `room_stats_state.room_type` that needed a background update to
backfill data, https://github.com/matrix-org/synapse/pull/13031
- Tables from MSC2716: `insertion_events`, `insertion_event_edges`,
`insertion_event_extremities`, `batch_events`
- `current_state_events` updated in
`synapse/storage/databases/main/events.py`

---

```
persist_event (adds to queue)
_persist_event_batch
_persist_events_and_state_updates (assigns `stream_ordering` to events)
_persist_events_txn
	_store_event_txn
        _update_metadata_tables_txn
            _store_room_members_txn
	_update_current_state_txn
```

---

> Concatenated Indexes [...] (also known as multi-column, composite or
combined index)
>
> [...] key consists of multiple columns.
> 
> We can take advantage of the fact that the first index column is
always usable for searching
>
> *--
https://use-the-index-luke.com/sql/where-clause/the-equals-operator/concatenated-keys*

---

Dealing with `portdb` (`synapse/_scripts/synapse_port_db.py`),
https://github.com/element-hq/synapse/pull/17512#discussion_r1725998219

---

<details>
<summary>SQL queries:</summary>

Both of these are equivalent and work in SQLite and Postgres

Options 1:
```sql
WITH data_table (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) AS (
    VALUES (
        ?, ?, ?,
        (SELECT membership FROM room_memberships WHERE event_id = ?),
        (SELECT stream_ordering FROM events WHERE event_id = ?),
        {", ".join("?" for _ in insert_values)}
    )
)
INSERT INTO sliding_sync_non_join_memberships
    (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)})
SELECT * FROM data_table
WHERE membership != ?
ON CONFLICT (room_id, user_id)
DO UPDATE SET
    membership_event_id = EXCLUDED.membership_event_id,
    membership = EXCLUDED.membership,
    event_stream_ordering = EXCLUDED.event_stream_ordering,
    {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
```

Option 2:
```sql
INSERT INTO sliding_sync_non_join_memberships
    (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)})
SELECT 
    column1 as room_id,
    column2 as user_id,
    column3 as membership_event_id,
    column4 as membership,
    column5 as event_stream_ordering,
    {", ".join("column" + str(i) for i in range(6, 6 + len(insert_keys)))}
FROM (
    VALUES (
        ?, ?, ?,
        (SELECT membership FROM room_memberships WHERE event_id = ?),
        (SELECT stream_ordering FROM events WHERE event_id = ?),
        {", ".join("?" for _ in insert_values)}
    )
) as v
WHERE membership != ?
ON CONFLICT (room_id, user_id)
DO UPDATE SET
    membership_event_id = EXCLUDED.membership_event_id,
    membership = EXCLUDED.membership,
    event_stream_ordering = EXCLUDED.event_stream_ordering,
    {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
```

If we don't need the `membership` condition, we could use:

```sql
INSERT INTO sliding_sync_non_join_memberships
    (room_id, membership_event_id, user_id, membership, event_stream_ordering, {", ".join(insert_keys)})
VALUES (
    ?, ?, ?,
    (SELECT membership FROM room_memberships WHERE event_id = ?),
    (SELECT stream_ordering FROM events WHERE event_id = ?),
    {", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id, user_id)
DO UPDATE SET
    membership_event_id = EXCLUDED.membership_event_id,
    membership = EXCLUDED.membership,
    event_stream_ordering = EXCLUDED.event_stream_ordering,
    {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
```

</details>

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erik@matrix.org>
2024-08-29 16:09:51 +01:00

486 lines
18 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2020 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import List, Optional
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
from synapse.federation.federation_base import event_from_pdu_json
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import StateMap
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class ExtremPruneTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.state = self.hs.get_state_handler()
persistence = self.hs.get_storage_controllers().persistence
assert persistence is not None
self._persistence = persistence
self._state_storage_controller = self.hs.get_storage_controllers().state
self.store = self.hs.get_datastores().main
self.register_user("user", "pass")
self.token = self.login("user", "pass")
self.room_id = self.helper.create_room_as(
"user", room_version=RoomVersions.V6.identifier, tok=self.token
)
body = self.helper.send(self.room_id, body="Test", tok=self.token)
local_message_event_id = body["event_id"]
# Fudge a remote event and persist it. This will be the extremity before
# the gap.
self.remote_event_1 = event_from_pdu_json(
{
"type": EventTypes.Message,
"state_key": "@user:other",
"content": {},
"room_id": self.room_id,
"sender": "@user:other",
"depth": 5,
"prev_events": [local_message_event_id],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
self.persist_event(self.remote_event_1)
# Check that the current extremities is the remote event.
self.assert_extremities([self.remote_event_1.event_id])
def persist_event(
self, event: EventBase, state: Optional[StateMap[str]] = None
) -> None:
"""Persist the event, with optional state"""
context = self.get_success(
self.state.compute_event_context(
event,
state_ids_before_event=state,
partial_state=None if state is None else False,
)
)
self.get_success(self._persistence.persist_event(event, context))
def assert_extremities(self, expected_extremities: List[str]) -> None:
"""Assert the current extremities for the room"""
extremities = self.get_success(
self.store.get_prev_events_for_room(self.room_id)
)
self.assertCountEqual(extremities, expected_extremities)
def test_prune_gap(self) -> None:
"""Test that we drop extremities after a gap when we see an event from
the same domain.
"""
# Fudge a second event which points to an event we don't have. This is a
# state event so that the state changes (otherwise we won't prune the
# extremity as they'll have the same state group).
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": "@user:other",
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
"sender": "@user:other",
"depth": 50,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
state_before_gap = self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
self.persist_event(remote_event_2, state=state_before_gap)
# Check the new extremity is just the new remote event.
self.assert_extremities([remote_event_2.event_id])
def test_do_not_prune_gap_if_state_different(self) -> None:
"""Test that we don't prune extremities after a gap if the resolved
state is different.
"""
# Fudge a second event which points to an event we don't have.
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Message,
"state_key": "@user:other",
"content": {},
"room_id": self.room_id,
"sender": "@user:other",
"depth": 10,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
# Now we persist it with state with a dropped history visibility
# setting. The state resolution across the old and new event will then
# include it, and so the resolved state won't match the new state.
state_before_gap = dict(
self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
)
state_before_gap.pop(("m.room.history_visibility", ""))
context = self.get_success(
self.state.compute_event_context(
remote_event_2,
state_ids_before_event=state_before_gap,
partial_state=False,
)
)
self.get_success(self._persistence.persist_event(remote_event_2, context))
# Check that we haven't dropped the old extremity.
self.assert_extremities([self.remote_event_1.event_id, remote_event_2.event_id])
def test_prune_gap_if_old(self) -> None:
"""Test that we drop extremities after a gap when the previous extremity
is "old"
"""
# Advance the clock for many days to make the old extremity "old". We
# also set the depth to "lots".
self.reactor.advance(7 * 24 * 60 * 60)
# Fudge a second event which points to an event we don't have. This is a
# state event so that the state changes (otherwise we won't prune the
# extremity as they'll have the same state group).
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": "@user:other2",
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
"sender": "@user:other2",
"depth": 10000,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
state_before_gap = self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
self.persist_event(remote_event_2, state=state_before_gap)
# Check the new extremity is just the new remote event.
self.assert_extremities([remote_event_2.event_id])
def test_do_not_prune_gap_if_other_server(self) -> None:
"""Test that we do not drop extremities after a gap when we see an event
from a different domain.
"""
# Fudge a second event which points to an event we don't have. This is a
# state event so that the state changes (otherwise we won't prune the
# extremity as they'll have the same state group).
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": "@user:other2",
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
"sender": "@user:other2",
"depth": 10,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
state_before_gap = self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
self.persist_event(remote_event_2, state=state_before_gap)
# Check the new extremity is just the new remote event.
self.assert_extremities([self.remote_event_1.event_id, remote_event_2.event_id])
def test_prune_gap_if_dummy_remote(self) -> None:
"""Test that we drop extremities after a gap when the previous extremity
is a local dummy event and only points to remote events.
"""
body = self.helper.send_event(
self.room_id, type=EventTypes.Dummy, content={}, tok=self.token
)
local_message_event_id = body["event_id"]
self.assert_extremities([local_message_event_id])
# Advance the clock for many days to make the old extremity "old". We
# also set the depth to "lots".
self.reactor.advance(7 * 24 * 60 * 60)
# Fudge a second event which points to an event we don't have. This is a
# state event so that the state changes (otherwise we won't prune the
# extremity as they'll have the same state group).
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": "@user:other2",
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
"sender": "@user:other2",
"depth": 10000,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
state_before_gap = self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
self.persist_event(remote_event_2, state=state_before_gap)
# Check the new extremity is just the new remote event.
self.assert_extremities([remote_event_2.event_id])
def test_prune_gap_if_dummy_local(self) -> None:
"""Test that we don't drop extremities after a gap when the previous
extremity is a local dummy event and points to local events.
"""
body = self.helper.send(self.room_id, body="Test", tok=self.token)
body = self.helper.send_event(
self.room_id, type=EventTypes.Dummy, content={}, tok=self.token
)
local_message_event_id = body["event_id"]
self.assert_extremities([local_message_event_id])
# Advance the clock for many days to make the old extremity "old". We
# also set the depth to "lots".
self.reactor.advance(7 * 24 * 60 * 60)
# Fudge a second event which points to an event we don't have. This is a
# state event so that the state changes (otherwise we won't prune the
# extremity as they'll have the same state group).
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": "@user:other2",
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
"sender": "@user:other2",
"depth": 10000,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
state_before_gap = self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
self.persist_event(remote_event_2, state=state_before_gap)
# Check the new extremity is just the new remote event.
self.assert_extremities([remote_event_2.event_id, local_message_event_id])
def test_do_not_prune_gap_if_not_dummy(self) -> None:
"""Test that we do not drop extremities after a gap when the previous extremity
is not a dummy event.
"""
body = self.helper.send(self.room_id, body="test", tok=self.token)
local_message_event_id = body["event_id"]
self.assert_extremities([local_message_event_id])
# Fudge a second event which points to an event we don't have. This is a
# state event so that the state changes (otherwise we won't prune the
# extremity as they'll have the same state group).
remote_event_2 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": "@user:other2",
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
"sender": "@user:other2",
"depth": 10000,
"prev_events": ["$some_unknown_message"],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
state_before_gap = self.get_success(
self._state_storage_controller.get_current_state_ids(self.room_id)
)
self.persist_event(remote_event_2, state=state_before_gap)
# Check the new extremity is just the new remote event.
self.assert_extremities([local_message_event_id, remote_event_2.event_id])
class InvalideUsersInRoomCacheTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.state = self.hs.get_state_handler()
persistence = self.hs.get_storage_controllers().persistence
assert persistence is not None
self._persistence = persistence
self.store = self.hs.get_datastores().main
def test_remote_user_rooms_cache_invalidated(self) -> None:
"""Test that if the server leaves a room the `get_rooms_for_user` cache
is invalidated for remote users.
"""
# Set up a room with a local and remote user in it.
user_id = self.register_user("user", "pass")
token = self.login("user", "pass")
room_id = self.helper.create_room_as(
"user", room_version=RoomVersions.V6.identifier, tok=token
)
body = self.helper.send(room_id, body="Test", tok=token)
local_message_event_id = body["event_id"]
# Fudge a join event for a remote user.
remote_user = "@user:other"
remote_event_1 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": remote_user,
"content": {"membership": Membership.JOIN},
"room_id": room_id,
"sender": remote_user,
"depth": 5,
"prev_events": [local_message_event_id],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
context = self.get_success(self.state.compute_event_context(remote_event_1))
self.get_success(self._persistence.persist_event(remote_event_1, context))
# Call `get_rooms_for_user` to add the remote user to the cache
rooms = self.get_success(self.store.get_rooms_for_user(remote_user))
self.assertEqual(set(rooms), {room_id})
# Now we have the local server leave the room, and check that calling
# `get_user_in_room` for the remote user no longer includes the room.
self.helper.leave(room_id, user_id, tok=token)
rooms = self.get_success(self.store.get_rooms_for_user(remote_user))
self.assertEqual(set(rooms), set())
def test_room_remote_user_cache_invalidated(self) -> None:
"""Test that if the server leaves a room the `get_users_in_room` cache
is invalidated for remote users.
"""
# Set up a room with a local and remote user in it.
user_id = self.register_user("user", "pass")
token = self.login("user", "pass")
room_id = self.helper.create_room_as(
"user", room_version=RoomVersions.V6.identifier, tok=token
)
body = self.helper.send(room_id, body="Test", tok=token)
local_message_event_id = body["event_id"]
# Fudge a join event for a remote user.
remote_user = "@user:other"
remote_event_1 = event_from_pdu_json(
{
"type": EventTypes.Member,
"state_key": remote_user,
"content": {"membership": Membership.JOIN},
"room_id": room_id,
"sender": remote_user,
"depth": 5,
"prev_events": [local_message_event_id],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V6,
)
context = self.get_success(self.state.compute_event_context(remote_event_1))
self.get_success(self._persistence.persist_event(remote_event_1, context))
# Call `get_users_in_room` to add the remote user to the cache
users = self.get_success(self.store.get_users_in_room(room_id))
self.assertEqual(set(users), {user_id, remote_user})
# Now we have the local server leave the room, and check that calling
# `get_user_in_room` for the remote user no longer includes the room.
self.helper.leave(room_id, user_id, tok=token)
users = self.get_success(self.store.get_users_in_room(room_id))
self.assertEqual(users, [])