Optimise notifier (#17765)

The notifier is quite inefficient when it has to wake up many user
streams all at once

From a silly benchmark this takes the time to notify 1M user streams
from ~30s to ~5s
This commit is contained in:
Erik Johnston 2024-09-30 12:58:13 +01:00 committed by GitHub
parent ece66ba61c
commit 93889eb2e7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 43 additions and 35 deletions

1
changelog.d/17765.misc Normal file
View file

@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

View file

@ -63,6 +63,7 @@ from synapse.types import (
) )
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
if TYPE_CHECKING: if TYPE_CHECKING:
@ -120,14 +121,13 @@ class _NotifierUserStream:
): ):
self.user_id = user_id self.user_id = user_id
self.rooms = set(rooms) self.rooms = set(rooms)
self.current_token = current_token
# The last token for which we should wake up any streams that have a # The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked. # token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams # We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be # that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up. # woken up or not, so lets just wake them up.
self.last_notified_token = current_token self.current_token = current_token
self.last_notified_ms = time_now_ms self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
@ -136,33 +136,19 @@ class _NotifierUserStream:
def notify( def notify(
self, self,
stream_key: StreamKeyType, current_token: StreamToken,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
time_now_ms: int, time_now_ms: int,
) -> None: ) -> None:
"""Notify any listeners for this user of a new event from an """Notify any listeners for this user of a new event from an
event source. event source.
Args: Args:
stream_key: The stream the event came from. current_token: The new current token.
stream_id: The new id for the stream the event came from.
time_now_ms: The current time in milliseconds. time_now_ms: The current time in milliseconds.
""" """
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.current_token = current_token
self.last_notified_token = self.current_token
self.last_notified_ms = time_now_ms self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred notify_deferred = self.notify_deferred
log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext(): with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred()) self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token) notify_deferred.callback(self.current_token)
@ -191,7 +177,7 @@ class _NotifierUserStream:
""" """
# Immediately wake up stream if something has already since happened # Immediately wake up stream if something has already since happened
# since their last token. # since their last token.
if self.last_notified_token != token: if self.current_token != token:
return _NotificationListener(defer.succeed(self.current_token)) return _NotificationListener(defer.succeed(self.current_token))
else: else:
return _NotificationListener(self.notify_deferred.observe()) return _NotificationListener(self.notify_deferred.observe())
@ -342,14 +328,17 @@ class Notifier:
# Wake up all related user stream notifiers # Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set()) user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec() time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_stream.notify( user_stream.notify(current_token, time_now_ms)
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
except Exception: except Exception:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
# Poke the replication so that other workers also see the write to # Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream. # the un-partial-stated rooms stream.
self.notify_replication() self.notify_replication()
@ -519,12 +508,16 @@ class Notifier:
rooms = rooms or [] rooms = rooms or []
with Measure(self.clock, "on_new_event"): with Measure(self.clock, "on_new_event"):
user_streams = set() user_streams: Set[_NotifierUserStream] = set()
log_kv( log_kv(
{ {
"waking_up_explicit_users": len(users), "waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms), "waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
} }
) )
@ -544,12 +537,15 @@ class Notifier:
) )
time_now_ms = self.clock.time_msec() time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_stream.notify(stream_key, new_token, time_now_ms) user_stream.notify(current_token, time_now_ms)
except Exception: except Exception:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication() self.notify_replication()
# Notify appservices. # Notify appservices.

View file

@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code) self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"] next_batch = channel.json_body["next_batch"]
# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Clear the typing information, so that it doesn't think everything is # Clear the typing information, so that it doesn't think everything is
# in the future. # in the future. This happens automatically when the typing stream
# resets.
typing._reset() typing._reset()
# Now it SHOULD fail as it never completes! # Nothing new, so we time out.
with self.assertRaises(TimedOutException): with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch)) self.make_request("GET", sync_url % (access_token, next_batch))
# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())
channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)
# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]
class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin): class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [ servlets = [