This commit is contained in:
Erik Johnston 2023-01-20 14:17:18 +00:00
parent 71472bac91
commit b72b698701
2 changed files with 8 additions and 17 deletions

View file

@ -759,7 +759,7 @@ class Notifier:
self._federation_client.wake_destination(server)
@attr.s
@attr.s(auto_attribs=True)
class ReplicationNotifier:
"""Tracks callbacks for things that need to know about stream changes.

View file

@ -14,7 +14,7 @@
from twisted.internet import defer
from synapse.replication.tcp.commands import PositionCommand, RdataCommand
from synapse.replication.tcp.commands import PositionCommand
from tests.replication._base import BaseMultiWorkerStreamTestCase
@ -111,20 +111,14 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
next_token = self.get_success(ctx.__aenter__())
self.get_success(ctx.__aexit__(None, None, None))
cmd_handler.send_command(
RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
)
self.replicate()
self.get_success(
data_handler.wait_for_stream_position("worker1", "caches", next_token)
)
# `wait_for_stream_position` should only return once master receives an
# RDATA from the worker
ctx = cache_id_gen.get_next()
next_token = self.get_success(ctx.__aenter__())
self.get_success(ctx.__aexit__(None, None, None))
# `wait_for_stream_position` should only return once master receives a
# notification that `next_token` has persisted.
ctx_worker1 = cache_id_gen.get_next()
next_token = self.get_success(ctx_worker1.__aenter__())
d = defer.ensureDeferred(
data_handler.wait_for_stream_position("worker1", "caches", next_token)
@ -142,10 +136,7 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
)
self.assertFalse(d.called)
# ... but receiving the RDATA should
cmd_handler.send_command(
RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
)
self.replicate()
# ... but worker1 finishing (and so sending an update) should.
self.get_success(ctx_worker1.__aexit__(None, None, None))
self.assertTrue(d.called)