Merge remote-tracking branch 'origin/release-v1.110' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2024-07-02 14:09:36 +01:00
commit 6e2453066d
6 changed files with 194 additions and 21 deletions

1
changelog.d/17371.misc Normal file
View file

@ -0,0 +1 @@
Limit size of presence EDUs to 50 entries.

1
changelog.d/17391.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.

View file

@ -21,6 +21,7 @@
#
import datetime
import logging
from collections import OrderedDict
from types import TracebackType
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
@ -68,6 +69,10 @@ sent_edus_by_type = Counter(
# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
# Limit how many presence states we add to each presence EDU, to ensure that
# they are bounded in size.
MAX_PRESENCE_STATES_PER_EDU = 50
class PerDestinationQueue:
"""
@ -144,7 +149,7 @@ class PerDestinationQueue:
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}
self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()
# List of room_id -> receipt_type -> user_id -> receipt_dict,
#
@ -399,7 +404,7 @@ class PerDestinationQueue:
# through another mechanism, because this is all volatile!
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_presence.clear()
self._pending_receipt_edus = []
self._start_catching_up()
@ -721,22 +726,26 @@ class _TransactionQueueManager:
# Add presence EDU.
if self.queue._pending_presence:
# Only send max 50 presence entries in the EDU, to bound the amount
# of data we're sending.
presence_to_add: List[JsonDict] = []
while (
self.queue._pending_presence
and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
):
_, presence = self.queue._pending_presence.popitem(last=False)
presence_to_add.append(
format_user_presence_state(presence, self.queue._clock.time_msec())
)
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
content={"push": presence_to_add},
)
)
self.queue._pending_presence = {}
# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))

View file

@ -19,6 +19,7 @@
#
#
import logging
from typing import TYPE_CHECKING, Sequence, Tuple
import attr
@ -41,6 +42,9 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner:
room: RoomEventSource
@ -139,9 +143,16 @@ class EventSources:
key
].get_max_allocated_token()
token = token.copy_and_replace(
key, token.room_key.bound_stream_token(max_token)
)
if max_token < token_value.get_max_stream_pos():
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(
key, token_value.bound_stream_token(max_token)
)
else:
assert isinstance(current_value, int)
if current_value < token_value:
@ -149,7 +160,14 @@ class EventSources:
key
].get_max_allocated_token()
token = token.copy_and_replace(key, min(token_value, max_token))
if max_token < token_value:
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(key, max_token)
return token

View file

@ -27,6 +27,8 @@ from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
from synapse.api.presence import UserPresenceState
from synapse.federation.sender.per_destination_queue import MAX_PRESENCE_STATES_PER_EDU
from synapse.federation.units import Transaction
from synapse.handlers.device import DeviceHandler
from synapse.rest import admin
@ -266,6 +268,123 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
)
class FederationSenderPresenceTestCases(HomeserverTestCase):
"""
Test federation sending for presence updates.
"""
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.federation_transport_client = Mock(spec=["send_transaction"])
self.federation_transport_client.send_transaction = AsyncMock()
hs = self.setup_test_homeserver(
federation_transport_client=self.federation_transport_client,
)
return hs
def default_config(self) -> JsonDict:
config = super().default_config()
config["federation_sender_instances"] = None
return config
def test_presence_simple(self) -> None:
"Test that sending a single presence update works"
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
self.get_success(
sender.send_presence_to_destinations(
[UserPresenceState.default("@user:test")],
["server"],
)
)
self.pump()
# expect a call to send_transaction
mock_send_transaction.assert_awaited_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(
data["edus"],
[
{
"edu_type": EduTypes.PRESENCE,
"content": {
"push": [
{
"presence": "offline",
"user_id": "@user:test",
}
]
},
}
],
)
def test_presence_batched(self) -> None:
"""Test that sending lots of presence updates to a destination are
batched, rather than having them all sent in one EDU."""
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
# We now send lots of presence updates to force the federation sender to
# batch the mup.
number_presence_updates_to_send = MAX_PRESENCE_STATES_PER_EDU * 2
self.get_success(
sender.send_presence_to_destinations(
[
UserPresenceState.default(f"@user{i}:test")
for i in range(number_presence_updates_to_send)
],
["server"],
)
)
self.pump()
# We should have seen at least one transcation be sent by now.
mock_send_transaction.assert_called()
# We don't want to specify exactly how the presence EDUs get sent out,
# could be one per transaction or multiple per transaction. We just want
# to assert that a) each presence EDU has bounded number of updates, and
# b) that all updates get sent out.
presence_edus = []
for transaction_call in mock_send_transaction.call_args_list:
json_cb = transaction_call[0][1]
data = json_cb()
for edu in data["edus"]:
self.assertEqual(edu.get("edu_type"), EduTypes.PRESENCE)
presence_edus.append(edu)
# A set of all user presence we see, this should end up matching the
# number we sent out above.
seen_users: Set[str] = set()
for edu in presence_edus:
presence_states = edu["content"]["push"]
# This is where we actually check that the number of presence
# updates is bounded.
self.assertLessEqual(len(presence_states), MAX_PRESENCE_STATES_PER_EDU)
seen_users.update(p["user_id"] for p in presence_states)
self.assertEqual(len(seen_users), number_presence_updates_to_send)
class FederationSenderDevicesTestCases(HomeserverTestCase):
"""
Test federation sending to update devices.

View file

@ -36,7 +36,14 @@ from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVe
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
from synapse.types import JsonDict, StreamKeyType, UserID, create_requester
from synapse.types import (
JsonDict,
MultiWriterStreamToken,
RoomStreamToken,
StreamKeyType,
UserID,
create_requester,
)
from synapse.util import Clock
import tests.unittest
@ -999,7 +1006,13 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.get_success(sync_d, by=1.0)
def test_wait_for_invalid_future_sync_token(self) -> None:
@parameterized.expand(
[(key,) for key in StreamKeyType.__members__.values()],
name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
)
def test_wait_for_invalid_future_sync_token(
self, stream_key: StreamKeyType
) -> None:
"""Like the previous test, except we give a token that has a stream
position ahead of what is in the DB, i.e. its invalid and we shouldn't
wait for the stream to advance (as it may never do so).
@ -1010,11 +1023,23 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
"""
user = self.register_user("alice", "password")
# Create a token and arbitrarily advance one of the streams.
# Create a token and advance one of the streams.
current_token = self.hs.get_event_sources().get_current_token()
since_token = current_token.copy_and_advance(
StreamKeyType.PRESENCE, current_token.presence_key + 1
)
token_value = current_token.get_field(stream_key)
# How we advance the streams depends on the type.
if isinstance(token_value, int):
since_token = current_token.copy_and_advance(stream_key, token_value + 1)
elif isinstance(token_value, MultiWriterStreamToken):
since_token = current_token.copy_and_advance(
stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
)
elif isinstance(token_value, RoomStreamToken):
since_token = current_token.copy_and_advance(
stream_key, RoomStreamToken(stream=token_value.stream + 1)
)
else:
raise Exception("Unreachable")
sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(