Clarifications for event push action processing. (#13485)

* Clarifies comments.
* Fixes an erroneous comment (about return type) added in #13455
  (ec24813220).
* Clarifies the name of a variable.
* Simplifies logic of pulling out the latest join for the requesting user.
This commit is contained in:
Patrick Cloke 2022-08-15 09:33:17 -04:00 committed by GitHub
parent f383b9b3ec
commit 46bd7f4ed9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 21 deletions

1
changelog.d/13485.misc Normal file
View file

@ -0,0 +1 @@
Add comments about how event push actions are rotated.

View file

@ -227,7 +227,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
user_id: str, user_id: str,
) -> NotifCounts: ) -> NotifCounts:
"""Get the notification count, the highlight count and the unread message count """Get the notification count, the highlight count and the unread message count
for a given user in a given room after the given read receipt. for a given user in a given room after their latest read receipt.
Note that this function assumes the user to be a current member of the room, Note that this function assumes the user to be a current member of the room,
since it's either called by the sync handler to handle joined room entries, or by since it's either called by the sync handler to handle joined room entries, or by
@ -238,9 +238,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
user_id: The user to retrieve the counts for. user_id: The user to retrieve the counts for.
Returns Returns
A dict containing the counts mentioned earlier in this docstring, A NotifCounts object containing the notification count, the highlight count
respectively under the keys "notify_count", "highlight_count" and and the unread message count.
"unread_count".
""" """
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_unread_event_push_actions_by_room", "get_unread_event_push_actions_by_room",
@ -255,6 +254,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
room_id: str, room_id: str,
user_id: str, user_id: str,
) -> NotifCounts: ) -> NotifCounts:
# Get the stream ordering of the user's latest receipt in the room.
result = self.get_last_receipt_for_user_txn( result = self.get_last_receipt_for_user_txn(
txn, txn,
user_id, user_id,
@ -266,13 +266,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
), ),
) )
stream_ordering = None
if result: if result:
_, stream_ordering = result _, stream_ordering = result
if stream_ordering is None: else:
# Either last_read_event_id is None, or it's an event we don't have (e.g. # If the user has no receipts in the room, retrieve the stream ordering for
# because it's been purged), in which case retrieve the stream ordering for
# the latest membership event from this user in this room (which we assume is # the latest membership event from this user in this room (which we assume is
# a join). # a join).
event_id = self.db_pool.simple_select_one_onecol_txn( event_id = self.db_pool.simple_select_one_onecol_txn(
@ -289,10 +287,26 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
) )
def _get_unread_counts_by_pos_txn( def _get_unread_counts_by_pos_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int self,
txn: LoggingTransaction,
room_id: str,
user_id: str,
receipt_stream_ordering: int,
) -> NotifCounts: ) -> NotifCounts:
"""Get the number of unread messages for a user/room that have happened """Get the number of unread messages for a user/room that have happened
since the given stream ordering. since the given stream ordering.
Args:
txn: The database transaction.
room_id: The room ID to get unread counts for.
user_id: The user ID to get unread counts for.
receipt_stream_ordering: The stream ordering of the user's latest
receipt in the room. If there are no receipts, the stream ordering
of the user's join event.
Returns
A NotifCounts object containing the notification count, the highlight count
and the unread message count.
""" """
counts = NotifCounts() counts = NotifCounts()
@ -320,7 +334,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
OR last_receipt_stream_ordering = ? OR last_receipt_stream_ordering = ?
) )
""", """,
(room_id, user_id, stream_ordering, stream_ordering), (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
) )
row = txn.fetchone() row = txn.fetchone()
@ -338,17 +352,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND stream_ordering > ? AND stream_ordering > ?
AND highlight = 1 AND highlight = 1
""" """
txn.execute(sql, (user_id, room_id, stream_ordering)) txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
row = txn.fetchone() row = txn.fetchone()
if row: if row:
counts.highlight_count += row[0] counts.highlight_count += row[0]
# Finally we need to count push actions that aren't included in the # Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been # summary returned above. This might be due to recent events that haven't
# summarised yet, or the summary is empty due to a recent read receipt. # been summarised yet or the summary is out of date due to a recent read
stream_ordering = max(stream_ordering, summary_stream_ordering) # receipt.
start_unread_stream_ordering = max(
receipt_stream_ordering, summary_stream_ordering
)
notify_count, unread_count = self._get_notif_unread_count_for_user_room( notify_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering txn, room_id, user_id, start_unread_stream_ordering
) )
counts.notify_count += notify_count counts.notify_count += notify_count
@ -1151,8 +1168,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn: The database transaction. txn: The database transaction.
old_rotate_stream_ordering: The previous maximum event stream ordering. old_rotate_stream_ordering: The previous maximum event stream ordering.
rotate_to_stream_ordering: The new maximum event stream ordering to summarise. rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
Returns whether the archiving process has caught up or not.
""" """
# Calculate the new counts that should be upserted into event_push_summary # Calculate the new counts that should be upserted into event_push_summary
@ -1238,9 +1253,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(rotate_to_stream_ordering,), (rotate_to_stream_ordering,),
) )
async def _remove_old_push_actions_that_have_rotated( async def _remove_old_push_actions_that_have_rotated(self) -> None:
self,
) -> None:
"""Clear out old push actions that have been summarised.""" """Clear out old push actions that have been summarised."""
# We want to clear out anything that is older than a day that *has* already # We want to clear out anything that is older than a day that *has* already

View file

@ -161,7 +161,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: The receipt types to fetch. receipt_type: The receipt types to fetch.
Returns: Returns:
The latest receipt, if one exists. The event ID and stream ordering of the latest receipt, if one exists.
""" """
clause, args = make_in_list_sql_clause( clause, args = make_in_list_sql_clause(