From b72b698701f1f6ed89c2aef1dffbebb1648a6d52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 14:17:18 +0000 Subject: [PATCH] Fix test --- synapse/notifier.py | 2 +- tests/replication/tcp/test_handler.py | 23 +++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e7b011125b..28f0d4a25a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -759,7 +759,7 @@ class Notifier: self._federation_client.wake_destination(server) -@attr.s +@attr.s(auto_attribs=True) class ReplicationNotifier: """Tracks callbacks for things that need to know about stream changes. diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 555922409d..6e4055cc21 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -14,7 +14,7 @@ from twisted.internet import defer -from synapse.replication.tcp.commands import PositionCommand, RdataCommand +from synapse.replication.tcp.commands import PositionCommand from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -111,20 +111,14 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase): next_token = self.get_success(ctx.__aenter__()) self.get_success(ctx.__aexit__(None, None, None)) - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() - self.get_success( data_handler.wait_for_stream_position("worker1", "caches", next_token) ) - # `wait_for_stream_position` should only return once master receives an - # RDATA from the worker - ctx = cache_id_gen.get_next() - next_token = self.get_success(ctx.__aenter__()) - self.get_success(ctx.__aexit__(None, None, None)) + # `wait_for_stream_position` should only return once master receives a + # notification that `next_token` has persisted. + ctx_worker1 = cache_id_gen.get_next() + next_token = self.get_success(ctx_worker1.__aenter__()) d = defer.ensureDeferred( data_handler.wait_for_stream_position("worker1", "caches", next_token) @@ -142,10 +136,7 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase): ) self.assertFalse(d.called) - # ... but receiving the RDATA should - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() + # ... but worker1 finishing (and so sending an update) should. + self.get_success(ctx_worker1.__aexit__(None, None, None)) self.assertTrue(d.called)