diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 24c2b2fad6..6d193a10c4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,6 +52,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "state", # dict[(str, str), FrozenEvent] "ephemeral", "account_data", + "unread_notification_count", ])): __slots__ = [] @@ -64,6 +65,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ or self.state or self.ephemeral or self.account_data + or self.unread_notification_count > 0 ) @@ -161,6 +163,18 @@ class SyncHandler(BaseHandler): else: return self.incremental_sync_with_gap(sync_config, since_token) + def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): + if room_id not in ephemeral_by_room: + return None + for e in ephemeral_by_room[room_id]: + if e['type'] != 'm.receipt': + continue + for receipt_event_id,val in e['content'].items(): + if 'm.read' in val: + if user_id in val['m.read']: + return receipt_event_id + return None + @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -265,6 +279,16 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room( + room_id, last_unread_event_id + ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( @@ -275,6 +299,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=len(notifs) )) def account_data_for_user(self, account_data): @@ -509,6 +534,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=0 ) logger.debug("Result for room %s: %r", room_id, room_sync) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a72a7d703c..1c7cd31666 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -45,5 +45,5 @@ class ActionGenerator: logger.info("actions for user %s: %s", uid, actions) if len(actions): self.store.set_actions_for_event( - event['event_id'], uid, None, actions + event, uid, None, actions ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f0a637a6da..4ca10732c1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -304,6 +304,7 @@ class SyncRestServlet(RestServlet): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, + "unread_notification_count": room.unread_notification_count } if joined: diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 593b1714c7..40ac8e2d27 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,18 +24,67 @@ logger = logging.getLogger(__name__) class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + def set_actions_for_event(self, event, user_id, profile_tag, actions): actionsJson = json.dumps(actions) ret = yield self.runInteraction( "_set_actions_for_event", self._simple_upsert_txn, EventActionsTable.table_name, - {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + { + 'room_id': event['room_id'], + 'event_id': event['event_id'], + 'user_id': user_id, + 'profile_tag': profile_tag + }, {'actions': actionsJson} ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_unread_event_actions_by_room(self, room_id, last_read_event_id): + #events = yield self._get_events( + # [last_read_event_id], + # check_redacted=False + #) + + def _get_unread_event_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return [] + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT ea.actions" + " FROM event_actions ea, events e" + " WHERE ea.room_id = e.room_id" + " AND ea.event_id = e.event_id" + " AND ea.room_id = ?" + " AND (" + " e.topological_ordering > ?" + " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + ")" + ) + txn.execute(sql, + (room_id, topological_ordering, topological_ordering, stream_ordering) + ) + return txn.fetchall() + + ret = yield self.runInteraction( + "get_unread_event_actions_by_room", + _get_unread_event_actions_by_room + ) + defer.returnValue(ret) class EventActionsTable(object): table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql index 1246823a00..bbdaee990e 100644 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -14,12 +14,13 @@ */ CREATE TABLE IF NOT EXISTS event_actions( + room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) ); -CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); +CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag);