Merge pull request #4792 from matrix-org/anoa/replication_tokens

Support batch updates in the worker sender
This commit is contained in:
Andrew Morgan 2019-03-06 15:48:29 +00:00 committed by GitHub
commit 7b8a157b79
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 3 deletions

1
changelog.d/4792.bugfix Normal file
View file

@ -0,0 +1 @@
Handle batch updates in worker replication protocol.

View file

@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
@defer.inlineCallbacks @defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token): def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a streams. """Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those This invloves checking if they've missed anything and sending those
updates down if they have. During that time new updates for the stream updates down if they have. During that time new updates for the stream
@ -478,11 +478,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing # Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, []) pending_rdata = self.pending_rdata.pop(stream_name, [])
updates = []
for token, update in pending_rdata: for token, update in pending_rdata:
# Only send updates newer than the current token # If the token is null, it is part of a batch update. Batches
if token > current_token: # are multiple updates that share a single token. To denote
# this, the token is set to None for all tokens in the batch
# except for the last. If we find a None token, we keep looking
# through tokens until we find one that is not None and then
# process all previous updates in the batch as if they had the
# final token.
if token is None:
# Store this update as part of a batch
updates.append(update)
continue
if token <= current_token:
# This update or batch of updates is older than
# current_token, dismiss it
updates = []
continue
updates.append(update)
# Send all updates that are part of this batch with the
# found token
for update in updates:
self.send_command(RdataCommand(stream_name, token, update)) self.send_command(RdataCommand(stream_name, token, update))
# Clear stored updates
updates = []
# They're now fully subscribed # They're now fully subscribed
self.replication_streams.add(stream_name) self.replication_streams.add(stream_name)
except Exception as e: except Exception as e: