send batched events over replication

This commit is contained in:
H. Shay 2022-09-19 11:03:46 -07:00
parent abc497bae6
commit 031d2aa543
3 changed files with 312 additions and 1 deletions

View file

@ -56,6 +56,7 @@ from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
@ -66,7 +67,7 @@ from synapse.types import (
StateMap,
StreamToken,
UserID,
create_requester,
create_requester, PersistedEventPosition,
)
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results
@ -493,6 +494,7 @@ class EventCreationHandler:
self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
self.send_event = ReplicationSendEventRestServlet.make_client(hs)
self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
self.request_ratelimiter = hs.get_request_ratelimiter()
@ -1316,6 +1318,148 @@ class EventCreationHandler:
400, "Cannot start threads from an event with a relation"
)
async def handle_create_room_events(
self,
requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
"""
Process a batch of room creation events. For each event in the list it checks
the authorization and that the event can be serialized. Returns the last event in the
list once it has been persisted.
Args:
requester: the room creator
events_and_ctx: a set of events and their associated contexts to persist
ratelimit: whether to ratelimit this request
"""
for event, context in events_and_ctx:
try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(
event, context
)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
raise err
# Ensure that we can round trip before trying to persist in db
try:
dump = json_encoder.encode(event.content)
json_decoder.decode(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
# We now persist the events
try:
result = await self._persist_events_batch(
requester, events_and_ctx, ratelimit
)
except Exception as e:
logger.info(f"Encountered an error persisting events: {e}")
return result
async def _persist_events_batch(
self,
requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
"""
Processes the push actions and adds them to the push staging area before attempting to
persist the batch of events.
See handle_create_room_events for arguments
Returns the last event in the list if persisted successfully
"""
for event, context in events_and_ctx:
with opentracing.start_active_span("calculate_push_actions"):
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
try:
# If we're a worker we need to hit out to the master.
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
try:
result = await self.send_events(
instance_name=writer_instance,
store=self.store,
requester=requester,
events_and_ctx=events_and_ctx,
ratelimit=ratelimit,
)
except SynapseError as e:
if e.code == HTTPStatus.CONFLICT:
raise PartialStateConflictError()
raise
stream_id = result["stream_id"]
# If we newly persisted the event then we need to update its
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
return event
last_event = await self.persist_and_notify_batched_events(
requester, events_and_ctx, ratelimit
)
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
for event, _ in events_and_ctx:
await self.store.remove_push_actions_from_staging(event.event_id)
raise
return last_event
async def persist_and_notify_batched_events(
self,
requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
"""
Handles the actual persisting of a batch of events to the DB, and sends the appropriate
notifications when this is done.
Args:
requester: the room creator
events_and_ctx: list of events and their associated contexts to persist
ratelimit: whether to apply ratelimiting to this request
"""
if ratelimit:
await self.request_ratelimiter.ratelimit(requester)
for event, context in events_and_ctx:
await self._actions_by_event_type(event, context)
assert self._storage_controllers.persistence is not None
(
persisted_events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(events_and_ctx)
stream_ordering = persisted_events[-1].internal_metadata.stream_ordering
assert stream_ordering is not None
pos = PersistedEventPosition(self._instance_name, stream_ordering)
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
persisted_events[-1], pos, max_stream_token
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
return persisted_events[-1]
@measure_func("handle_new_client_event")
async def handle_new_client_event(
self,

View file

@ -25,6 +25,7 @@ from synapse.replication.http import (
push,
register,
send_event,
send_events,
state,
streams,
)
@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs: "HomeServer") -> None:
send_event.register_servlets(hs, self)
send_events.register_servlets(hs, self)
federation.register_servlets(hs, self)
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)

View file

@ -0,0 +1,165 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Tuple
from twisted.web.server import Request
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.http.server import HttpServer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, Requester
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.main import DataStore
logger = logging.getLogger(__name__)
class ReplicationSendEventsRestServlet(ReplicationEndpoint):
"""Handles batches of newly created events on workers, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/send_events/:txn_id
{
"events": [{
"event": { .. serialized event .. },
"room_version": .., // "1", "2", "3", etc: the version of the room
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
"ratelimit": true,
}]
}
200 OK
{ "stream_id": 12345, "event_id": "$abcdef..." }
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
context that needs to be recomputed due to the un-partial stating of a room.
"""
NAME = "send_events"
PATH_ARGS = ()
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload( # type: ignore[override]
store: "DataStore",
events_and_ctx: List[Tuple[EventBase, EventContext]],
requester: Requester,
ratelimit: bool,
) -> JsonDict:
"""
Args:
store
requester
events_and_ctx
ratelimit
"""
serialized_events = []
for event, context in events_and_ctx:
serialized_context = await context.serialize(event, store)
serialized_event = {
"event": event.get_pdu_json(),
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
}
serialized_events.append(serialized_event)
payload = {"events": serialized_events}
return payload
async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_events_parse"):
payload = parse_json_object_from_request(request)
events_and_ctx = []
events = payload["events"]
for event_payload in events:
event_dict = event_payload["event"]
room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
event.internal_metadata.outlier = event_payload["outlier"]
requester = Requester.deserialize(
self.store, event_payload["requester"]
)
context = EventContext.deserialize(
self._storage_controllers, event_payload["context"]
)
ratelimit = event_payload["ratelimit"]
events_and_ctx.append((event, context))
logger.info(
"Got batch of events to send, last ID of batch is: %s, sending into room: %s",
event.event_id,
event.room_id,
)
last_event = (
await self.event_creation_handler.persist_and_notify_batched_events(
requester, events_and_ctx, ratelimit
)
)
return (
200,
{
"stream_id": last_event.internal_metadata.stream_ordering,
"event_id": last_event.event_id,
},
)
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationSendEventsRestServlet(hs).register(http_server)