Sliding sync: Add connection tracking to the account_data extension (#17695)

This is basically exactly the same logic as for receipts. Essentially we
just need to track which room account data we have and haven't sent down
to clients, and use that when we pull stuff out.

I think this just needs a couple of extra tests written

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
Erik Johnston 2024-09-19 21:51:51 +03:00 committed by GitHub
parent c2e5e9e67c
commit a851f6b237
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 748 additions and 73 deletions

1
changelog.d/17695.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where room account data would not correctly be sent down sliding sync for old rooms.

View file

@ -19,7 +19,6 @@ from typing import (
AbstractSet,
ChainMap,
Dict,
List,
Mapping,
MutableMapping,
Optional,
@ -119,6 +118,8 @@ class SlidingSyncExtensionHandler:
if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
account_data_request=sync_config.extensions.account_data,
@ -361,6 +362,8 @@ class SlidingSyncExtensionHandler:
async def get_account_data_extension_response(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
@ -425,15 +428,7 @@ class SlidingSyncExtensionHandler:
# Fetch room account data
#
# List of -> Mapping from room_id to mapping of `type` to `content` of room
# account data events.
#
# This is is a list so we can avoid making copies of immutable data and instead
# just provide multiple maps that need to be combined. Normally, we could
# reach for `ChainMap` in this scenario, but this is a nested map and accessing
# the ChainMap by room_id won't combine the two maps for that room (we would
# need a new `NestedChainMap` type class).
account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = []
account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {}
relevant_room_ids = self.find_relevant_room_ids_for_extension(
requested_lists=account_data_request.lists,
requested_room_ids=account_data_request.rooms,
@ -441,9 +436,43 @@ class SlidingSyncExtensionHandler:
actual_room_ids=actual_room_ids,
)
if len(relevant_room_ids) > 0:
# We need to handle the different cases depending on if we have sent
# down account data previously or not, so we split the relevant
# rooms up into different collections based on status.
live_rooms = set()
previously_rooms: Dict[str, int] = {}
initial_rooms = set()
for room_id in relevant_room_ids:
if not from_token:
initial_rooms.add(room_id)
continue
room_status = previous_connection_state.account_data.have_sent_room(
room_id
)
if room_status.status == HaveSentRoomFlag.LIVE:
live_rooms.add(room_id)
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
assert room_status.last_token is not None
previously_rooms[room_id] = room_status.last_token
elif room_status.status == HaveSentRoomFlag.NEVER:
initial_rooms.add(room_id)
else:
assert_never(room_status.status)
# We fetch all room account data since the from_token. This is so
# that we can record which rooms have updates that haven't been sent
# down.
#
# Mapping from room_id to mapping of `type` to `content` of room account
# data events.
all_updates_since_the_from_token: Mapping[
str, Mapping[str, JsonMapping]
] = {}
if from_token is not None:
# TODO: This should take into account the `from_token` and `to_token`
account_data_by_room_map = (
all_updates_since_the_from_token = (
await self.store.get_updated_room_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
@ -456,58 +485,108 @@ class SlidingSyncExtensionHandler:
user_id, from_token.stream_token.account_data_key
)
for room_id, tags in tags_by_room.items():
account_data_by_room_map.setdefault(room_id, {})[
all_updates_since_the_from_token.setdefault(room_id, {})[
AccountDataTypes.TAG
] = {"tags": tags}
account_data_by_room_maps.append(account_data_by_room_map)
# For live rooms we just get the updates from `all_updates_since_the_from_token`
if live_rooms:
for room_id in all_updates_since_the_from_token.keys() & live_rooms:
account_data_by_room_map[room_id] = (
all_updates_since_the_from_token[room_id]
)
# For previously and initial rooms we query each room individually.
if previously_rooms or initial_rooms:
async def handle_previously(room_id: str) -> None:
# Either get updates or all account data in the room
# depending on if the room state is PREVIOUSLY or NEVER.
previous_token = previously_rooms.get(room_id)
if previous_token is not None:
room_account_data = await (
self.store.get_updated_room_account_data_for_user_for_room(
user_id=user_id,
room_id=room_id,
from_stream_id=previous_token,
to_stream_id=to_token.account_data_key,
)
)
# Add room tags
changed = await self.store.has_tags_changed_for_room(
user_id=user_id,
room_id=room_id,
from_stream_id=previous_token,
to_stream_id=to_token.account_data_key,
)
if changed:
# XXX: Ideally, this should take into account the `to_token`
# and return the set of tags at that time but we don't track
# changes to tags so we just have to return all tags for the
# room.
immutable_tag_map = await self.store.get_tags_for_room(
user_id, room_id
)
room_account_data[AccountDataTypes.TAG] = {
"tags": immutable_tag_map
}
# Only add an entry if there were any updates.
if room_account_data:
account_data_by_room_map[room_id] = room_account_data
else:
# TODO: This should take into account the `to_token`
immutable_account_data_by_room_map = (
await self.store.get_room_account_data_for_user(user_id)
immutable_room_account_data = (
await self.store.get_account_data_for_room(user_id, room_id)
)
account_data_by_room_maps.append(immutable_account_data_by_room_map)
# Add room tags
#
# TODO: This should take into account the `to_token`
tags_by_room = await self.store.get_tags_for_user(user_id)
account_data_by_room_maps.append(
{
room_id: {AccountDataTypes.TAG: {"tags": tags}}
for room_id, tags in tags_by_room.items()
}
# XXX: Ideally, this should take into account the `to_token`
# and return the set of tags at that time but we don't track
# changes to tags so we just have to return all tags for the
# room.
immutable_tag_map = await self.store.get_tags_for_room(
user_id, room_id
)
# Filter down to the relevant rooms ... and combine the maps
relevant_account_data_by_room_map: MutableMapping[
str, Mapping[str, JsonMapping]
] = {}
for room_id in relevant_room_ids:
# We want to avoid adding empty maps for relevant rooms that have no room
# account data so do a quick check to see if it's in any of the maps.
is_room_in_maps = False
for room_map in account_data_by_room_maps:
if room_id in room_map:
is_room_in_maps = True
break
# If we found the room in any of the maps, combine the maps for that room
if is_room_in_maps:
relevant_account_data_by_room_map[room_id] = ChainMap(
{},
*(
account_data_by_room_map[room_id] = ChainMap(
{AccountDataTypes.TAG: {"tags": immutable_tag_map}}
if immutable_tag_map
else {},
# Cast is safe because `ChainMap` only mutates the top-most map,
# see https://github.com/python/typeshed/issues/8430
cast(MutableMapping[str, JsonMapping], room_map[room_id])
for room_map in account_data_by_room_maps
if room_map.get(room_id)
cast(
MutableMapping[str, JsonMapping],
immutable_room_account_data,
),
)
# We handle these rooms concurrently to speed it up.
await concurrently_execute(
handle_previously,
previously_rooms.keys() | initial_rooms,
limit=20,
)
# Now record which rooms are now up to data, and which rooms have
# pending updates to send.
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
missing_updates = (
all_updates_since_the_from_token.keys() - relevant_room_ids
)
if missing_updates:
# If we have missing updates then we must have had a from_token.
assert from_token is not None
new_connection_state.account_data.record_unsent_rooms(
missing_updates, from_token.stream_token.account_data_key
)
return SlidingSyncResult.Extensions.AccountDataExtension(
global_account_data_map=global_account_data_map,
account_data_by_room_map=relevant_account_data_by_room_map,
account_data_by_room_map=account_data_by_room_map,
)
@trace

View file

@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
get_updated_room_account_data_for_user_txn,
)
async def get_updated_room_account_data_for_user_for_room(
self,
# Since there are multiple arguments with the same type, force keyword arguments
# so people don't accidentally swap the order
*,
user_id: str,
room_id: str,
from_stream_id: int,
to_stream_id: int,
) -> Dict[str, JsonMapping]:
"""Get the room account_data that's changed for a user in a room.
(> `from_stream_id` and <= `to_stream_id`)
Args:
user_id: The user to get the account_data for.
room_id: The room to check
from_stream_id: The point in the stream to fetch from
to_stream_id: The point in the stream to fetch to
Returns:
A dict of the room account data.
"""
def get_updated_room_account_data_for_user_for_room_txn(
txn: LoggingTransaction,
) -> Dict[str, JsonMapping]:
sql = """
SELECT account_data_type, content FROM room_account_data
WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ?
"""
txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id))
room_account_data: Dict[str, JsonMapping] = {}
for row in txn:
room_account_data[row[0]] = db_to_json(row[1])
return room_account_data
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(from_stream_id)
)
if not changed:
return {}
return await self.db_pool.runInteraction(
"get_updated_room_account_data_for_user_for_room",
get_updated_room_account_data_for_user_for_room_txn,
)
@cached(max_entries=5000, iterable=True)
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
"""

View file

@ -267,6 +267,15 @@ class SlidingSyncStore(SQLBaseStore):
(have_sent_room.status.value, have_sent_room.last_token)
)
for (
room_id,
have_sent_room,
) in per_connection_state.account_data._statuses.items():
key_values.append((connection_position, "account_data", room_id))
value_values.append(
(have_sent_room.status.value, have_sent_room.last_token)
)
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_connection_streams",
@ -407,6 +416,7 @@ class SlidingSyncStore(SQLBaseStore):
# Now look up the per-room stream data.
rooms: Dict[str, HaveSentRoom[str]] = {}
receipts: Dict[str, HaveSentRoom[str]] = {}
account_data: Dict[str, HaveSentRoom[str]] = {}
receipt_rows = self.db_pool.simple_select_list_txn(
txn,
@ -427,6 +437,8 @@ class SlidingSyncStore(SQLBaseStore):
rooms[room_id] = have_sent_room
elif stream == "receipts":
receipts[room_id] = have_sent_room
elif stream == "account_data":
account_data[room_id] = have_sent_room
else:
# For forwards compatibility we ignore unknown streams, as in
# future we want to be able to easily add more stream types.
@ -435,6 +447,7 @@ class SlidingSyncStore(SQLBaseStore):
return PerConnectionStateDB(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=room_configs,
)
@ -452,6 +465,7 @@ class PerConnectionStateDB:
rooms: "RoomStatusMap[str]"
receipts: "RoomStatusMap[str]"
account_data: "RoomStatusMap[str]"
room_configs: Mapping[str, "RoomSyncConfig"]
@ -484,10 +498,21 @@ class PerConnectionStateDB:
for room_id, status in per_connection_state.receipts.get_updates().items()
}
account_data = {
room_id: HaveSentRoom(
status=status.status,
last_token=(
str(status.last_token) if status.last_token is not None else None
),
)
for room_id, status in per_connection_state.account_data.get_updates().items()
}
log_kv(
{
"rooms": rooms,
"receipts": receipts,
"account_data": account_data,
"room_configs": per_connection_state.room_configs.maps[0],
}
)
@ -495,6 +520,7 @@ class PerConnectionStateDB:
return PerConnectionStateDB(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=per_connection_state.room_configs.maps[0],
)
@ -524,8 +550,19 @@ class PerConnectionStateDB:
for room_id, status in self.receipts._statuses.items()
}
account_data = {
room_id: HaveSentRoom(
status=status.status,
last_token=(
int(status.last_token) if status.last_token is not None else None
),
)
for room_id, status in self.account_data._statuses.items()
}
return PerConnectionState(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=self.room_configs,
)

View file

@ -158,6 +158,52 @@ class TagsWorkerStore(AccountDataWorkerStore):
return results
async def has_tags_changed_for_room(
self,
# Since there are multiple arguments with the same type, force keyword arguments
# so people don't accidentally swap the order
*,
user_id: str,
room_id: str,
from_stream_id: int,
to_stream_id: int,
) -> bool:
"""Check if the users tags for a room have been updated in the token range
(> `from_stream_id` and <= `to_stream_id`)
Args:
user_id: The user to get tags for
room_id: The room to get tags for
from_stream_id: The point in the stream to fetch from
to_stream_id: The point in the stream to fetch to
Returns:
A mapping of tags to tag content.
"""
# Shortcut if no room has changed for the user
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(from_stream_id)
)
if not changed:
return False
last_change_position_for_room = await self.db_pool.simple_select_one_onecol(
table="room_tags_revisions",
keyvalues={"user_id": user_id, "room_id": room_id},
retcol="stream_id",
allow_none=True,
)
if last_change_position_for_room is None:
return False
return (
last_change_position_for_room > from_stream_id
and last_change_position_for_room <= to_stream_id
)
@cached(num_args=2, tree=True)
async def get_tags_for_room(
self, user_id: str, room_id: str

View file

@ -675,7 +675,7 @@ class HaveSentRoomFlag(Enum):
LIVE = "live"
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken, int)
@attr.s(auto_attribs=True, slots=True, frozen=True)
@ -823,6 +823,7 @@ class PerConnectionState:
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap)
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
@ -833,6 +834,7 @@ class PerConnectionState:
return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
account_data=self.account_data.get_mutable(),
room_configs=ChainMap({}, room_configs),
)
@ -840,6 +842,7 @@ class PerConnectionState:
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
account_data=self.account_data.copy(),
room_configs=dict(self.room_configs),
)
@ -853,6 +856,7 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
account_data: MutableRoomStatusMap[int]
room_configs: typing.ChainMap[str, RoomSyncConfig]
@ -860,6 +864,7 @@ class MutablePerConnectionState(PerConnectionState):
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.account_data.get_updates())
or bool(self.get_room_config_updates())
)

View file

@ -11,9 +11,11 @@
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import enum
import logging
from parameterized import parameterized_class
from parameterized import parameterized, parameterized_class
from typing_extensions import assert_never
from twisted.test.proto_helpers import MemoryReactor
@ -30,6 +32,11 @@ from tests.server import TimedOutException
logger = logging.getLogger(__name__)
class TagAction(enum.Enum):
ADD = enum.auto()
REMOVE = enum.auto()
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
@ -350,10 +357,20 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
account_data_map[AccountDataTypes.TAG], {"tags": {"m.favourite": {}}}
)
def test_room_account_data_incremental_sync(self) -> None:
@parameterized.expand(
[
("add tags", TagAction.ADD),
("remove tags", TagAction.REMOVE),
]
)
def test_room_account_data_incremental_sync(
self, test_description: str, tag_action: TagAction
) -> None:
"""
On incremental sync, we return all account data for a given room but only for
rooms that we request and are being returned in the Sliding Sync response.
(HaveSentRoomFlag.LIVE)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@ -432,6 +449,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
content={"roo": "rar"},
)
)
if tag_action == TagAction.ADD:
# Add another room tag
self.get_success(
self.account_data_handler.add_tag_to_room(
@ -449,6 +467,24 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
content={},
)
)
elif tag_action == TagAction.REMOVE:
# Remove the room tag
self.get_success(
self.account_data_handler.remove_tag_from_room(
user_id=user1_id,
room_id=room_id1,
tag="m.favourite",
)
)
self.get_success(
self.account_data_handler.remove_tag_from_room(
user_id=user1_id,
room_id=room_id2,
tag="m.favourite",
)
)
else:
assert_never(tag_action)
# Make an incremental Sliding Sync request with the account_data extension enabled
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
@ -475,10 +511,431 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
exact=True,
)
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
if tag_action == TagAction.ADD:
self.assertEqual(
account_data_map[AccountDataTypes.TAG],
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
)
elif tag_action == TagAction.REMOVE:
# If we previously showed the client that the room has tags, when it no
# longer has tags, we need to show them an empty map.
self.assertEqual(
account_data_map[AccountDataTypes.TAG],
{"tags": {}},
)
else:
assert_never(tag_action)
@parameterized.expand(
[
("add tags", TagAction.ADD),
("remove tags", TagAction.REMOVE),
]
)
def test_room_account_data_incremental_sync_out_of_range_never(
self, test_description: str, tag_action: TagAction
) -> None:
"""Tests that we don't return account data for rooms that are out of
range, but then do send all account data once they're in range.
(initial/HaveSentRoomFlag.NEVER)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a room and add some room account data
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Add a room tag to mark the room as a favourite
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id1,
tag="m.favourite",
content={},
)
)
# Create another room with some room account data
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Add a room tag to mark the room as a favourite
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id2,
tag="m.favourite",
content={},
)
)
# Now send a message into room1 so that it is at the top of the list
self.helper.send(room_id1, body="new event", tok=user1_tok)
# Make a SS request for only the top room.
sync_body = {
"lists": {
"main": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 0,
}
},
"extensions": {
"account_data": {
"enabled": True,
"lists": ["main"],
}
},
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Only room1 should be in the response since it's the latest room with activity
# and our range only includes 1 room.
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id1},
exact=True,
)
# Add some other room account data
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz2",
content={"roo": "rar"},
)
)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz2",
content={"roo": "rar"},
)
)
if tag_action == TagAction.ADD:
# Add another room tag
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id1,
tag="m.server_notice",
content={},
)
)
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id2,
tag="m.server_notice",
content={},
)
)
elif tag_action == TagAction.REMOVE:
# Remove the room tag
self.get_success(
self.account_data_handler.remove_tag_from_room(
user_id=user1_id,
room_id=room_id1,
tag="m.favourite",
)
)
self.get_success(
self.account_data_handler.remove_tag_from_room(
user_id=user1_id,
room_id=room_id2,
tag="m.favourite",
)
)
else:
assert_never(tag_action)
# Move room2 into range.
self.helper.send(room_id2, body="new event", tok=user1_tok)
# Make an incremental Sliding Sync request with the account_data extension enabled
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
# We expect to see the account data of room2, as that has the most
# recent update.
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id2},
exact=True,
)
# Since this is the first time we're seeing room2 down sync, we should see all
# room account data for it.
account_data_map = {
event["type"]: event["content"]
for event in response_body["extensions"]["account_data"]
.get("rooms")
.get(room_id2)
}
expected_account_data_keys = {
"org.matrix.roorarraz",
"org.matrix.roorarraz2",
}
if tag_action == TagAction.ADD:
expected_account_data_keys.add(AccountDataTypes.TAG)
self.assertIncludes(
account_data_map.keys(),
expected_account_data_keys,
exact=True,
)
self.assertEqual(account_data_map["org.matrix.roorarraz"], {"roo": "rar"})
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
if tag_action == TagAction.ADD:
self.assertEqual(
account_data_map[AccountDataTypes.TAG],
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
)
elif tag_action == TagAction.REMOVE:
# Since we never told the client about the room tags, we don't need to say
# anything if there are no tags now (the client doesn't need an update).
self.assertIsNone(
account_data_map.get(AccountDataTypes.TAG),
account_data_map,
)
else:
assert_never(tag_action)
@parameterized.expand(
[
("add tags", TagAction.ADD),
("remove tags", TagAction.REMOVE),
]
)
def test_room_account_data_incremental_sync_out_of_range_previously(
self, test_description: str, tag_action: TagAction
) -> None:
"""Tests that we don't return account data for rooms that fall out of
range, but then do send all account data that has changed they're back in range.
(HaveSentRoomFlag.PREVIOUSLY)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a room and add some room account data
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Add a room tag to mark the room as a favourite
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id1,
tag="m.favourite",
content={},
)
)
# Create another room with some room account data
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz",
content={"roo": "rar"},
)
)
# Add a room tag to mark the room as a favourite
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id2,
tag="m.favourite",
content={},
)
)
# Make an initial Sliding Sync request for only room1 and room2.
sync_body = {
"lists": {},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
},
room_id2: {
"required_state": [],
"timeline_limit": 0,
},
},
"extensions": {
"account_data": {
"enabled": True,
"rooms": [room_id1, room_id2],
}
},
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Both rooms show up because we have a room subscription for each and they're
# requested in the `account_data` extension.
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id1, room_id2},
exact=True,
)
# Add some other room account data
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id1,
account_data_type="org.matrix.roorarraz2",
content={"roo": "rar"},
)
)
self.get_success(
self.account_data_handler.add_account_data_to_room(
user_id=user1_id,
room_id=room_id2,
account_data_type="org.matrix.roorarraz2",
content={"roo": "rar"},
)
)
if tag_action == TagAction.ADD:
# Add another room tag
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id1,
tag="m.server_notice",
content={},
)
)
self.get_success(
self.account_data_handler.add_tag_to_room(
user_id=user1_id,
room_id=room_id2,
tag="m.server_notice",
content={},
)
)
elif tag_action == TagAction.REMOVE:
# Remove the room tag
self.get_success(
self.account_data_handler.remove_tag_from_room(
user_id=user1_id,
room_id=room_id1,
tag="m.favourite",
)
)
self.get_success(
self.account_data_handler.remove_tag_from_room(
user_id=user1_id,
room_id=room_id2,
tag="m.favourite",
)
)
else:
assert_never(tag_action)
# Make an incremental Sliding Sync request for just room1
response_body, from_token = self.do_sync(
{
**sync_body,
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
},
},
},
since=from_token,
tok=user1_tok,
)
# Only room1 shows up because we only have a room subscription for room1 now.
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id1},
exact=True,
)
# Make an incremental Sliding Sync request for just room2 now
response_body, from_token = self.do_sync(
{
**sync_body,
"room_subscriptions": {
room_id2: {
"required_state": [],
"timeline_limit": 0,
},
},
},
since=from_token,
tok=user1_tok,
)
# Only room2 shows up because we only have a room subscription for room2 now.
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id2},
exact=True,
)
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
# Check for room account data for room2
self.assertIncludes(
response_body["extensions"]["account_data"].get("rooms").keys(),
{room_id2},
exact=True,
)
# We should see any room account data updates for room2 since the last
# time we saw it down sync
account_data_map = {
event["type"]: event["content"]
for event in response_body["extensions"]["account_data"]
.get("rooms")
.get(room_id2)
}
self.assertIncludes(
account_data_map.keys(),
{"org.matrix.roorarraz2", AccountDataTypes.TAG},
exact=True,
)
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
if tag_action == TagAction.ADD:
self.assertEqual(
account_data_map[AccountDataTypes.TAG],
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
)
elif tag_action == TagAction.REMOVE:
# If we previously showed the client that the room has tags, when it no
# longer has tags, we need to show them an empty map.
self.assertEqual(
account_data_map[AccountDataTypes.TAG],
{"tags": {}},
)
else:
assert_never(tag_action)
def test_wait_for_new_data(self) -> None:
"""