Fix stream token ordering

This commit is contained in:
Mark Haines 2015-01-13 14:14:21 +00:00
parent 38e3241eb7
commit 895fcb377e

View file

@ -39,6 +39,8 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from collections import namedtuple
import logging import logging
@ -52,58 +54,76 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological" _TOPOLOGICAL_TOKEN = "topological"
def _parse_stream_token(string): class _StreamToken(namedtuple("_StreamToken", "topological stream")):
try: """Tokens are positions between events. The token "s1" comes after event 1.
if string[0] != 's':
raise
return int(string[1:])
except:
raise SynapseError(400, "Invalid token")
s0 s1
| |
[0] V [1] V [2]
def _parse_topological_token(string): Tokens can either be a point in the live event stream or a cursor going
through historic events.
When traversing the live event stream events are ordered by when they
arrived at the homeserver.
When traversing historic events the events are ordered by their depth in
the event graph "topological_ordering" and then by when they arrived at the
homeserver "stream_ordering".
Live tokens start with an "s" followed by the "stream_ordering" id of the
event it comes after. Historic tokens start with a "t" followed by the
"topological_ordering" id of the event it comes after, follewed by "-",
followed by the "stream_ordering" id of the event it comes after.
"""
__slots__ = []
@classmethod
def parse(cls, string):
try: try:
if string[0] != 't': if string[0] == 's':
raise return cls(None, int(string[1:]))
if string[0] == 't':
parts = string[1:].split('-', 1) parts = string[1:].split('-', 1)
return (int(parts[0]), int(parts[1])) return cls(int(parts[1]), int(parts[0]))
except:
raise SynapseError(400, "Invalid token")
def is_stream_token(string):
try:
_parse_stream_token(string)
return True
except:
return False
def is_topological_token(string):
try:
_parse_topological_token(string)
return True
except:
return False
def _get_token_bound(token, comparison):
try:
s = _parse_stream_token(token)
return "%s %s %d" % ("stream_ordering", comparison, s)
except: except:
pass pass
raise SynapseError(400, "Invalid token %r" % (string,))
@classmethod
def parse_stream_token(cls, string):
try: try:
top, stream = _parse_topological_token(token) if string[0] == 's':
return "%s %s %d AND %s %s %d" % ( return cls(None, int(string[1:]))
"topological_ordering", comparison, top, except:
"stream_ordering", comparison, stream, pass
raise SynapseError(400, "Invalid token %r" % (string,))
def __str__(self):
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:
return "s%d" % (self.stream,)
def lower_bound(self):
if self.topological is None:
return "(%d < %s)" % (self.stream, "stream_ordering")
else:
return "(%d < %s OR (%d == %s AND %d < %s))" % (
self.topological, "topological_ordering",
self.topological, "topological_ordering",
self.stream, "stream_ordering",
) )
except:
pass
raise SynapseError(400, "Invalid token") def upper_bound(self):
if self.topological is None:
return "(%d >= %s)" % (self.stream, "stream_ordering")
else:
return "(%d > %s OR (%d == %s AND %d >= %s))" % (
self.topological, "topological_ordering",
self.topological, "topological_ordering",
self.stream, "stream_ordering",
)
class StreamStore(SQLBaseStore): class StreamStore(SQLBaseStore):
@ -162,8 +182,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering. # From and to keys should be integers from ordering.
from_id = _parse_stream_token(from_key) from_id = _StreamToken.parse_stream_token(from_key)
to_id = _parse_stream_token(to_key) to_id = _StreamToken.parse_stream_token(to_key)
if from_key == to_key: if from_key == to_key:
return defer.succeed(([], to_key)) return defer.succeed(([], to_key))
@ -181,7 +201,7 @@ class StreamStore(SQLBaseStore):
} }
def f(txn): def f(txn):
txn.execute(sql, (user_id, user_id, from_id, to_id,)) txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn) rows = self.cursor_to_dict(txn)
@ -211,16 +231,20 @@ class StreamStore(SQLBaseStore):
# Tokens really represent positions between elements, but we use # Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence # the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities. # we have a bit of asymmetry when it comes to equalities.
from_comp = '<=' if direction == 'b' else '>'
to_comp = '>' if direction == 'b' else '<='
order = "DESC" if direction == 'b' else "ASC"
args = [room_id] args = [room_id]
if direction == 'b':
bounds = _get_token_bound(from_key, from_comp) order = "DESC"
bounds = _StreamToken.parse(from_key).upper_bound()
if to_key: if to_key:
bounds = "%s AND %s" % ( bounds = "%s AND %s" % (
bounds, _get_token_bound(to_key, to_comp) bounds, _StreamToken.parse(to_key).lower_bound()
)
else:
order = "ASC"
bounds = _StreamToken.parse(from_key).lower_bound()
if to_key:
bounds = "%s AND %s" % (
bounds, _StreamToken.parse(to_key).upper_bound()
) )
if int(limit) > 0: if int(limit) > 0:
@ -249,9 +273,13 @@ class StreamStore(SQLBaseStore):
topo = rows[-1]["topological_ordering"] topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"] toke = rows[-1]["stream_ordering"]
if direction == 'b': if direction == 'b':
topo -= 1 # Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1 toke -= 1
next_token = "t%s-%s" % (topo, toke) next_token = str(_StreamToken(topo, toke))
else: else:
# TODO (erikj): We should work out what to do here instead. # TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key next_token = to_key if to_key else from_key
@ -284,13 +312,14 @@ class StreamStore(SQLBaseStore):
rows.reverse() # As we selected with reverse ordering rows.reverse() # As we selected with reverse ordering
if rows: if rows:
# XXX: Always subtract 1 since the start token always goes # Tokens are positions between events.
# backwards (parity with paginate_room_events). It isn't # This token points *after* the last event in the chunk.
# obvious that this is correct; we should clarify the algorithm # We need it to point to the event before it in the chunk
# used here. # since we are going backwards so we subtract one from the
topo = rows[0]["topological_ordering"] - 1 # stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1 toke = rows[0]["stream_ordering"] - 1
start_token = "t%s-%s" % (topo, toke) start_token = str(_StreamToken(topo, toke))
token = (start_token, end_token) token = (start_token, end_token)
else: else: