Calculate new current token once

Turns out doing `.copy_and_advance` can be expensive
This commit is contained in:
Erik Johnston 2024-09-27 09:06:50 +01:00
parent a3f8ec284a
commit 9d3e8d7fcd

View file

@ -121,14 +121,13 @@ class _NotifierUserStream:
):
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
@ -137,8 +136,7 @@ class _NotifierUserStream:
def notify(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
@ -148,8 +146,7 @@ class _NotifierUserStream:
stream_id: The new id for the stream the event came from.
time_now_ms: The current time in milliseconds.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred
@ -181,7 +178,7 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
if self.current_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
@ -332,11 +329,10 @@ class Notifier:
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
user_stream.notify(current_token, time_now_ms)
except Exception:
logger.exception("Failed to notify listener")
@ -542,9 +538,10 @@ class Notifier:
)
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
user_stream.notify(current_token, time_now_ms)
except Exception:
logger.exception("Failed to notify listener")