Various clean ups to room stream tokens. (#8423)

This commit is contained in:
Erik Johnston 2020-09-29 21:48:33 +01:00 committed by GitHub
parent 8238b55e08
commit ea70f1c362
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 96 additions and 76 deletions

1
changelog.d/8423.misc Normal file
View file

@ -0,0 +1 @@
Various refactors to simplify stream token handling.

View file

@ -23,7 +23,7 @@ from typing import Dict, Optional, Tuple, Type
from unpaddedbase64 import encode_base64 from unpaddedbase64 import encode_base64
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict from synapse.types import JsonDict, RoomStreamToken
from synapse.util.caches import intern_dict from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze from synapse.util.frozenutils import freeze
@ -118,8 +118,8 @@ class _EventInternalMetadata:
# XXX: These are set by StreamWorkerStore._set_before_and_after. # XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't # I'm pretty sure that these are never persisted to the database, so shouldn't
# be here # be here
before = DictProperty("before") # type: str before = DictProperty("before") # type: RoomStreamToken
after = DictProperty("after") # type: str after = DictProperty("after") # type: RoomStreamToken
order = DictProperty("order") # type: Tuple[int, int] order = DictProperty("order") # type: Tuple[int, int]
def get_dict(self) -> JsonDict: def get_dict(self) -> JsonDict:

View file

@ -153,7 +153,7 @@ class AdminHandler(BaseHandler):
if not events: if not events:
break break
from_key = RoomStreamToken.parse(events[-1].internal_metadata.after) from_key = events[-1].internal_metadata.after
events = await filter_events_for_client(self.storage, user_id, events) events = await filter_events_for_client(self.storage, user_id, events)

View file

@ -29,7 +29,6 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ( from synapse.types import (
RoomStreamToken,
StreamToken, StreamToken,
get_domain_from_id, get_domain_from_id,
get_verify_key_from_cross_signing_key, get_verify_key_from_cross_signing_key,
@ -113,8 +112,7 @@ class DeviceWorkerHandler(BaseHandler):
set_tag("user_id", user_id) set_tag("user_id", user_id)
set_tag("from_token", from_token) set_tag("from_token", from_token)
now_room_id = self.store.get_room_max_stream_ordering() now_room_key = self.store.get_room_max_token()
now_room_key = RoomStreamToken(None, now_room_id)
room_ids = await self.store.get_rooms_for_user(user_id) room_ids = await self.store.get_rooms_for_user(user_id)

View file

@ -325,7 +325,8 @@ class InitialSyncHandler(BaseHandler):
if limit is None: if limit is None:
limit = 10 limit = 10
stream_token = await self.store.get_stream_token_for_event(member_event_id) leave_position = await self.store.get_position_for_event(member_event_id)
stream_token = leave_position.to_room_stream_token()
messages, token = await self.store.get_recent_events_for_room( messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token room_id, limit=limit, end_token=stream_token

View file

@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import Requester, RoomStreamToken from synapse.types import Requester
from synapse.util.async_helpers import ReadWriteLock from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
@ -373,10 +373,9 @@ class PaginationHandler:
# case "JOIN" would have been returned. # case "JOIN" would have been returned.
assert member_event_id assert member_event_id
leave_token_str = await self.store.get_topological_token_for_event( leave_token = await self.store.get_topological_token_for_event(
member_event_id member_event_id
) )
leave_token = RoomStreamToken.parse(leave_token_str)
assert leave_token.topological is not None assert leave_token.topological is not None
if leave_token.topological < curr_topo: if leave_token.topological < curr_topo:

View file

@ -1134,14 +1134,14 @@ class RoomEventSource:
events[:] = events[:limit] events[:] = events[:limit]
if events: if events:
end_key = RoomStreamToken.parse(events[-1].internal_metadata.after) end_key = events[-1].internal_metadata.after
else: else:
end_key = to_key end_key = to_key
return (events, end_key) return (events, end_key)
def get_current_key(self) -> RoomStreamToken: def get_current_key(self) -> RoomStreamToken:
return RoomStreamToken(None, self.store.get_room_max_stream_ordering()) return self.store.get_room_max_token()
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id) return self.store.get_room_events_max_id(room_id)

View file

@ -519,7 +519,7 @@ class SyncHandler:
if len(recents) > timeline_limit: if len(recents) > timeline_limit:
limited = True limited = True
recents = recents[-timeline_limit:] recents = recents[-timeline_limit:]
room_key = RoomStreamToken.parse(recents[0].internal_metadata.before) room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace("room_key", room_key) prev_batch_token = now_token.copy_and_replace("room_key", room_key)
@ -1595,16 +1595,24 @@ class SyncHandler:
if leave_events: if leave_events:
leave_event = leave_events[-1] leave_event = leave_events[-1]
leave_stream_token = await self.store.get_stream_token_for_event( leave_position = await self.store.get_position_for_event(
leave_event.event_id leave_event.event_id
) )
leave_token = since_token.copy_and_replace(
"room_key", leave_stream_token
)
if since_token and since_token.is_after(leave_token): # If the leave event happened before the since token then we
# bail.
if since_token and not leave_position.persisted_after(
since_token.room_key
):
continue continue
# We can safely convert the position of the leave event into a
# stream token as it'll only be used in the context of this
# room. (c.f. the docstring of `to_room_stream_token`).
leave_token = since_token.copy_and_replace(
"room_key", leave_position.to_room_stream_token()
)
# If this is an out of band message, like a remote invite # If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we # rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct # let _load_filtered_recents handle fetching the correct

View file

@ -163,7 +163,7 @@ class _NotifierUserStream:
""" """
# Immediately wake up stream if something has already since happened # Immediately wake up stream if something has already since happened
# since their last token. # since their last token.
if self.last_notified_token.is_after(token): if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token)) return _NotificationListener(defer.succeed(self.current_token))
else: else:
return _NotificationListener(self.notify_deferred.observe()) return _NotificationListener(self.notify_deferred.observe())
@ -470,7 +470,7 @@ class Notifier:
async def check_for_updates( async def check_for_updates(
before_token: StreamToken, after_token: StreamToken before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult: ) -> EventStreamResult:
if not after_token.is_after(before_token): if after_token == before_token:
return EventStreamResult([], (from_token, from_token)) return EventStreamResult([], (from_token, from_token))
events = [] # type: List[EventBase] events = [] # type: List[EventBase]

View file

@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow, EventsStreamEventRow,
EventsStreamRow, EventsStreamRow,
) )
from synapse.types import PersistedEventPosition, RoomStreamToken, UserID from synapse.types import PersistedEventPosition, UserID
from synapse.util.async_helpers import timeout_deferred from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -152,9 +152,7 @@ class ReplicationDataHandler:
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),) extra_users = (UserID.from_string(event.state_key),)
max_token = RoomStreamToken( max_token = self.store.get_room_max_token()
None, self.store.get_room_max_stream_ordering()
)
event_pos = PersistedEventPosition(instance_name, token) event_pos = PersistedEventPosition(instance_name, token)
self.notifier.on_new_room_event( self.notifier.on_new_room_event(
event, event_pos, max_token, extra_users event, event_pos, max_token, extra_users

View file

@ -109,7 +109,8 @@ class PurgeHistoryRestServlet(RestServlet):
if event.room_id != room_id: if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.") raise SynapseError(400, "Event is for wrong room.")
token = await self.store.get_topological_token_for_event(event_id) room_token = await self.store.get_topological_token_for_event(event_id)
token = str(room_token)
logger.info("[purge] purging up to token %s (event_id %s)", token, event_id) logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
elif "purge_up_to_ts" in body: elif "purge_up_to_ts" in body:

View file

@ -35,7 +35,6 @@ what sort order was used:
- topological tokems: "t%d-%d", where the integers map to the topological - topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively. and stream ordering columns respectively.
""" """
import abc import abc
import logging import logging
from collections import namedtuple from collections import namedtuple
@ -54,7 +53,7 @@ from synapse.storage.database import (
) )
from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import Collection, RoomStreamToken from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
if TYPE_CHECKING: if TYPE_CHECKING:
@ -305,6 +304,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
def get_room_min_stream_ordering(self) -> int: def get_room_min_stream_ordering(self) -> int:
raise NotImplementedError() raise NotImplementedError()
def get_room_max_token(self) -> RoomStreamToken:
return RoomStreamToken(None, self.get_room_max_stream_ordering())
async def get_room_events_stream_for_rooms( async def get_room_events_stream_for_rooms(
self, self,
room_ids: Collection[str], room_ids: Collection[str],
@ -611,26 +613,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
allow_none=allow_none, allow_none=allow_none,
) )
async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken: async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
"""The stream token for an event """Get the persisted position for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
A stream token.
""" """
stream_id = await self.get_stream_id_for_event(event_id) row = await self.db_pool.simple_select_one(
return RoomStreamToken(None, stream_id) table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "instance_name"),
desc="get_position_for_event",
)
async def get_topological_token_for_event(self, event_id: str) -> str: return PersistedEventPosition(
row["instance_name"] or "master", row["stream_ordering"]
)
async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event """The stream token for an event
Args: Args:
event_id: The id of the event to look up a stream token for. event_id: The id of the event to look up a stream token for.
Raises: Raises:
StoreError if the event wasn't in the database. StoreError if the event wasn't in the database.
Returns: Returns:
A "t%d-%d" topological token. A `RoomStreamToken` topological token.
""" """
row = await self.db_pool.simple_select_one( row = await self.db_pool.simple_select_one(
table="events", table="events",
@ -638,7 +642,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
retcols=("stream_ordering", "topological_ordering"), retcols=("stream_ordering", "topological_ordering"),
desc="get_topological_token_for_event", desc="get_topological_token_for_event",
) )
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"]) return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream """Gets the topological token in a room after or at the given stream
@ -687,8 +691,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
else: else:
topo = None topo = None
internal = event.internal_metadata internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1)) internal.before = RoomStreamToken(topo, stream - 1)
internal.after = str(RoomStreamToken(topo, stream)) internal.after = RoomStreamToken(topo, stream)
internal.order = (int(topo) if topo else 0, int(stream)) internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around( async def get_events_around(

View file

@ -229,7 +229,7 @@ class EventsPersistenceStorage:
defer.gatherResults(deferreds, consumeErrors=True) defer.gatherResults(deferreds, consumeErrors=True)
) )
return RoomStreamToken(None, self.main_store.get_current_events_token()) return self.main_store.get_room_max_token()
async def persist_event( async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False self, event: EventBase, context: EventContext, backfilled: bool = False
@ -247,11 +247,10 @@ class EventsPersistenceStorage:
await make_deferred_yieldable(deferred) await make_deferred_yieldable(deferred)
max_persisted_id = self.main_store.get_current_events_token()
event_stream_id = event.internal_metadata.stream_ordering event_stream_id = event.internal_metadata.stream_ordering
pos = PersistedEventPosition(self._instance_name, event_stream_id) pos = PersistedEventPosition(self._instance_name, event_stream_id)
return pos, RoomStreamToken(None, max_persisted_id) return pos, self.main_store.get_room_max_token()
def _maybe_start_persisting(self, room_id: str): def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item): async def persisting_queue(item):

View file

@ -413,6 +413,18 @@ class RoomStreamToken:
pass pass
raise SynapseError(400, "Invalid token %r" % (string,)) raise SynapseError(400, "Invalid token %r" % (string,))
def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken":
"""Return a new token such that if an event is after both this token and
the other token, then its after the returned token too.
"""
if self.topological or other.topological:
raise Exception("Can't advance topological tokens")
max_stream = max(self.stream, other.stream)
return RoomStreamToken(None, max_stream)
def as_tuple(self) -> Tuple[Optional[int], int]: def as_tuple(self) -> Tuple[Optional[int], int]:
return (self.topological, self.stream) return (self.topological, self.stream)
@ -458,31 +470,20 @@ class StreamToken:
def room_stream_id(self): def room_stream_id(self):
return self.room_key.stream return self.room_key.stream
def is_after(self, other):
"""Does this token contain events that the other doesn't?"""
return (
(other.room_stream_id < self.room_stream_id)
or (int(other.presence_key) < int(self.presence_key))
or (int(other.typing_key) < int(self.typing_key))
or (int(other.receipt_key) < int(self.receipt_key))
or (int(other.account_data_key) < int(self.account_data_key))
or (int(other.push_rules_key) < int(self.push_rules_key))
or (int(other.to_device_key) < int(self.to_device_key))
or (int(other.device_list_key) < int(self.device_list_key))
or (int(other.groups_key) < int(self.groups_key))
)
def copy_and_advance(self, key, new_value) -> "StreamToken": def copy_and_advance(self, key, new_value) -> "StreamToken":
"""Advance the given key in the token to a new value if and only if the """Advance the given key in the token to a new value if and only if the
new value is after the old value. new value is after the old value.
""" """
new_token = self.copy_and_replace(key, new_value)
if key == "room_key": if key == "room_key":
new_id = new_token.room_stream_id new_token = self.copy_and_replace(
old_id = self.room_stream_id "room_key", self.room_key.copy_and_advance(new_value)
else: )
return new_token
new_token = self.copy_and_replace(key, new_value)
new_id = int(getattr(new_token, key)) new_id = int(getattr(new_token, key))
old_id = int(getattr(self, key)) old_id = int(getattr(self, key))
if old_id < new_id: if old_id < new_id:
return new_token return new_token
else: else:
@ -509,6 +510,18 @@ class PersistedEventPosition:
def persisted_after(self, token: RoomStreamToken) -> bool: def persisted_after(self, token: RoomStreamToken) -> bool:
return token.stream < self.stream return token.stream < self.stream
def to_room_stream_token(self) -> RoomStreamToken:
"""Converts the position to a room stream token such that events
persisted in the same room after this position will be after the
returned `RoomStreamToken`.
Note: no guarentees are made about ordering w.r.t. events in other
rooms.
"""
# Doing the naive thing satisfies the desired properties described in
# the docstring.
return RoomStreamToken(None, self.stream)
class ThirdPartyInstanceID( class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id")) namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))

View file

@ -902,15 +902,15 @@ class RoomMessageListTestCase(RoomBase):
# Send a first message in the room, which will be removed by the purge. # Send a first message in the room, which will be removed by the purge.
first_event_id = self.helper.send(self.room_id, "message 1")["event_id"] first_event_id = self.helper.send(self.room_id, "message 1")["event_id"]
first_token = self.get_success( first_token = str(
store.get_topological_token_for_event(first_event_id) self.get_success(store.get_topological_token_for_event(first_event_id))
) )
# Send a second message in the room, which won't be removed, and which we'll # Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before. # use as the marker to purge events before.
second_event_id = self.helper.send(self.room_id, "message 2")["event_id"] second_event_id = self.helper.send(self.room_id, "message 2")["event_id"]
second_token = self.get_success( second_token = str(
store.get_topological_token_for_event(second_event_id) self.get_success(store.get_topological_token_for_event(second_event_id))
) )
# Send a third event in the room to ensure we don't fall under any edge case # Send a third event in the room to ensure we don't fall under any edge case

View file

@ -47,8 +47,8 @@ class PurgeTests(HomeserverTestCase):
storage = self.hs.get_storage() storage = self.hs.get_storage()
# Get the topological token # Get the topological token
event = self.get_success( event = str(
store.get_topological_token_for_event(last["event_id"]) self.get_success(store.get_topological_token_for_event(last["event_id"]))
) )
# Purge everything before this topological token # Purge everything before this topological token
@ -74,12 +74,10 @@ class PurgeTests(HomeserverTestCase):
storage = self.hs.get_datastore() storage = self.hs.get_datastore()
# Set the topological token higher than it should be # Set the topological token higher than it should be
event = self.get_success( token = self.get_success(
storage.get_topological_token_for_event(last["event_id"]) storage.get_topological_token_for_event(last["event_id"])
) )
event = "t{}-{}".format( event = "t{}-{}".format(token.topological + 1, token.stream + 1)
*list(map(lambda x: x + 1, map(int, event[1:].split("-"))))
)
# Purge everything before this topological token # Purge everything before this topological token
purge = defer.ensureDeferred(storage.purge_history(self.room_id, event, True)) purge = defer.ensureDeferred(storage.purge_history(self.room_id, event, True))