Discard notifications in ranges.

This commit is contained in:
Patrick Cloke 2022-05-26 16:07:39 -04:00
parent 7c320b79bf
commit 580fbb740f
2 changed files with 40 additions and 15 deletions

View file

@ -927,8 +927,13 @@ class EventPushActionsWorkerStore(SQLBaseStore):
(rotate_to_stream_ordering,), (rotate_to_stream_ordering,),
) )
def _remove_old_push_actions_before_txn( def _remove_old_push_actions_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int self,
txn: LoggingTransaction,
room_id: str,
user_id: str,
end_stream_ordering: int,
start_stream_ordering: Optional[int],
) -> None: ) -> None:
""" """
Purges old push actions for a user and room before a given Purges old push actions for a user and room before a given
@ -957,20 +962,33 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Instead, we look up the stream ordering for the last event in that # Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions # room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that. # in the room with a stream_odering before that.
txn.execute( if start_stream_ordering is None:
"DELETE FROM event_push_actions " stream_ordering_clause = "stream_ordering <= ?"
" WHERE user_id = ? AND room_id = ? AND " stream_ordering_args: Tuple[int, ...] = (end_stream_ordering,)
" stream_ordering <= ?" else:
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", stream_ordering_clause = "stream_ordering >= ? AND stream_ordering <= ?"
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago), stream_ordering_args = (start_stream_ordering, end_stream_ordering)
)
txn.execute( txn.execute(
""" f"""
DELETE FROM event_push_actions
WHERE user_id = ? AND room_id = ?
AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)
AND {stream_ordering_clause}
""",
(user_id, room_id, self.stream_ordering_month_ago) + stream_ordering_args,
)
# XXX What to do about these summaries? They're currently updated daily.
# Deleting a chunk of them if any region overlaps seems suspect.
# Maybe we can do a daily update to limit the damage? That would not
# give true unread status per event, however.
txn.execute(
f"""
DELETE FROM event_push_summary DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? WHERE room_id = ? AND user_id = ? AND {stream_ordering_clause}
""", """,
(room_id, user_id, stream_ordering), (room_id, user_id) + stream_ordering_args,
) )

View file

@ -672,13 +672,20 @@ class ReceiptsWorkerStore(SQLBaseStore):
# When updating a local users read receipt, remove any push actions # When updating a local users read receipt, remove any push actions
# which resulted from the receipt's event and all earlier events. # which resulted from the receipt's event and all earlier events.
#
# XXX Can the stream orderings from local users not be known? Maybe if
# events are purged (retention?)
#
# XXX Do we need to differentiate between an unbounded start
# (start_event_id == None) vs. being unable to find the event
# (start_stream_ordering == None)?
if ( if (
self.hs.is_mine_id(user_id) self.hs.is_mine_id(user_id)
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
and end_stream_ordering is not None and (start_stream_ordering is not None or end_stream_ordering is not None)
): ):
self._remove_old_push_actions_before_txn( # type: ignore[attr-defined] self._remove_old_push_actions_txn( # type: ignore[attr-defined]
txn, room_id=room_id, user_id=user_id, stream_ordering=end_stream_ordering txn, room_id, user_id, end_stream_ordering, start_stream_ordering
) )
return rx_ts return rx_ts