Optimise notifier mk2

This commit is contained in:
Erik Johnston 2024-09-27 10:32:58 +01:00
parent 210f76f4b9
commit e52725b2ac

View file

@ -41,6 +41,7 @@ import attr
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
@ -61,7 +63,9 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
@ -90,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n
class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
__slots__ = ["deferred"]
def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred
class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
@ -114,11 +106,13 @@ class _NotifierUserStream:
def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
@ -130,28 +124,31 @@ class _NotifierUserStream:
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()
def notify(
def update_and_fetch_deferreds(
self,
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.
Args:
current_token: The new current token.
time_now_ms: The current time in milliseconds.
Returns:
The set of deferreds that need to be called.
"""
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
listeners = self.listeners
self.listeners = set()
return listeners
def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
@ -165,9 +162,9 @@ class _NotifierUserStream:
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)
def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@ -177,10 +174,17 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.current_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)
# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)
return deferred
@attr.s(slots=True, frozen=True, auto_attribs=True)
@ -233,6 +237,7 @@ class Notifier:
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@ -329,12 +334,20 @@ class Notifier:
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(current_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
@ -538,12 +551,24 @@ class Notifier:
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(current_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
@ -582,6 +607,7 @@ class Notifier:
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
@ -604,8 +630,8 @@ class Notifier:
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
@ -618,7 +644,7 @@ class Notifier:
)
with PreserveLoggingContext():
await listener.deferred
await listener
log_kv(
{