diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 7755fbf00a..2fd988615a 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -136,6 +136,16 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id ) + stream = results.get("receipts") + if stream: + rows = stream["rows"] + affected_room_ids = set(row[1] for row in rows) + min_stream_id = rows[0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_receipts)( + min_stream_id, max_stream_id, affected_room_ids + ) + while True: try: args = store.stream_positions() diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index aaf9015ebf..fd6b6492dd 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -40,6 +40,7 @@ class SlavedReceiptsStore(BaseSlavedStore): get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ def stream_positions(self): result = super(SlavedReceiptsStore, self).stream_positions()