Reduce spurious replication catchup (#16555)

This commit is contained in:
Erik Johnston 2023-10-27 14:27:20 +01:00 committed by GitHub
parent 928e964857
commit 89dbbd68e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 5 deletions

1
changelog.d/16555.misc Normal file
View file

@ -0,0 +1 @@
Reduce some spurious logging in worker mode.

View file

@ -611,10 +611,14 @@ class ReplicationCommandHandler:
# Find where we previously streamed up to. # Find where we previously streamed up to.
current_token = stream.current_token(cmd.instance_name) current_token = stream.current_token(cmd.instance_name)
# If the position token matches our current token then we're up to # If the incoming previous position is less than our current position
# date and there's nothing to do. Otherwise, fetch all updates # then we're up to date and there's nothing to do. Otherwise, fetch
# between then and now. # all updates between then and now.
missing_updates = cmd.prev_token != current_token #
# Note: We also have to check that `current_token` is at most the
# new position, to handle the case where the stream gets "reset"
# (e.g. for `caches` and `typing` after the writer's restart).
missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token)
while missing_updates: while missing_updates:
# Note: There may very well not be any new updates, but we check to # Note: There may very well not be any new updates, but we check to
# make sure. This can particularly happen for the event stream where # make sure. This can particularly happen for the event stream where
@ -644,7 +648,7 @@ class ReplicationCommandHandler:
[stream.parse_row(row) for row in rows], [stream.parse_row(row) for row in rows],
) )
logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token) logger.info("Caught up with stream '%s' to %i", stream_name, current_token)
# We've now caught up to position sent to us, notify handler. # We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position( await self._replication_data_handler.on_position(