diff --git a/changelog.d/16558.bugfix b/changelog.d/16558.bugfix new file mode 100644 index 0000000000..64f419fd82 --- /dev/null +++ b/changelog.d/16558.bugfix @@ -0,0 +1 @@ +Fix ratelimiting of message sending when using workers, where the ratelimit would only be applied after most of the work has been done. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a0b4a93ae8..811a41f161 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -999,7 +999,26 @@ class EventCreationHandler: raise ShadowBanError() if ratelimit: - await self.request_ratelimiter.ratelimit(requester, update=False) + room_id = event_dict["room_id"] + try: + room_version = await self.store.get_room_version(room_id) + except NotFoundError: + # The room doesn't exist. + raise AuthError(403, f"User {requester.user} not in room {room_id}") + + if room_version.updated_redaction_rules: + redacts = event_dict["content"].get("redacts") + else: + redacts = event_dict.get("redacts") + + is_admin_redaction = await self.is_admin_redaction( + event_type=event_dict["type"], + sender=event_dict["sender"], + redacts=redacts, + ) + await self.request_ratelimiter.ratelimit( + requester, is_admin_redaction=is_admin_redaction, update=False + ) # We limit the number of concurrent event sends in a room so that we # don't fork the DAG too much. If we don't limit then we can end up in @@ -1508,6 +1527,18 @@ class EventCreationHandler: first_event.room_id ) if writer_instance != self._instance_name: + # Ratelimit before sending to the other event persister, to + # ensure that we correctly have ratelimits on both the event + # creators and event persisters. + if ratelimit: + for event, _ in events_and_context: + is_admin_redaction = await self.is_admin_redaction( + event.type, event.sender, event.redacts + ) + await self.request_ratelimiter.ratelimit( + requester, is_admin_redaction=is_admin_redaction + ) + try: result = await self.send_events( instance_name=writer_instance, @@ -1538,6 +1569,7 @@ class EventCreationHandler: # stream_ordering entry manually (as it was persisted on # another worker). event.internal_metadata.stream_ordering = stream_id + return event event = await self.persist_and_notify_client_events( @@ -1696,21 +1728,9 @@ class EventCreationHandler: # can apply different ratelimiting. We do this by simply checking # it's not a self-redaction (to avoid having to look up whether the # user is actually admin or not). - is_admin_redaction = False - if event.type == EventTypes.Redaction: - assert event.redacts is not None - - original_event = await self.store.get_event( - event.redacts, - redact_behaviour=EventRedactBehaviour.as_is, - get_prev_content=False, - allow_rejected=False, - allow_none=True, - ) - - is_admin_redaction = bool( - original_event and event.sender != original_event.sender - ) + is_admin_redaction = await self.is_admin_redaction( + event.type, event.sender, event.redacts + ) await self.request_ratelimiter.ratelimit( requester, is_admin_redaction=is_admin_redaction @@ -1930,6 +1950,27 @@ class EventCreationHandler: return persisted_events[-1] + async def is_admin_redaction( + self, event_type: str, sender: str, redacts: Optional[str] + ) -> bool: + """Return whether the event is a redaction made by an admin, and thus + should use a different ratelimiter. + """ + if event_type != EventTypes.Redaction: + return False + + assert redacts is not None + + original_event = await self.store.get_event( + redacts, + redact_behaviour=EventRedactBehaviour.as_is, + get_prev_content=False, + allow_rejected=False, + allow_none=True, + ) + + return bool(original_event and sender != original_event.sender) + async def _maybe_kick_guest_users( self, event: EventBase, context: EventContext ) -> None: