diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index a3a17e9f9f..acf1bc5749 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -8,9 +8,9 @@ Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. -The API is simply: +The API is: -``POST /_matrix/client/r0/admin/purge_history//`` +``POST /_matrix/client/r0/admin/purge_history/[/]`` including an ``access_token`` of a server admin. @@ -25,3 +25,10 @@ To delete local events as well, set ``delete_local_events`` in the body: { "delete_local_events": true } + +The caller must specify the point in the room to purge up to. This can be +specified by including an event_id in the URI, or by setting a +``purge_up_to_event_id`` or ``purge_up_to_ts`` in the request body. If an event +id is given, that event (and others at the same graph depth) will be retained. +If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch, +in milliseconds. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d28c2745c..dd00d8a86c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -52,16 +52,12 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() @defer.inlineCallbacks - def purge_history(self, room_id, event_id, delete_local_events=False): - event = yield self.store.get_event(event_id) - - if event.room_id != room_id: - raise SynapseError(400, "Event is for wrong room.") - - depth = event.depth - + def purge_history(self, room_id, topological_ordering, + delete_local_events=False): with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history(room_id, depth, delete_local_events) + yield self.store.purge_history( + room_id, topological_ordering, delete_local_events, + ) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 3917eee42d..dcf6215dad 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import AuthError, SynapseError +from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request @@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet): class PurgeHistoryRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns( - "/admin/purge_history/(?P[^/]*)/(?P[^/]*)" + "/admin/purge_history/(?P[^/]*)(/(?P[^/]+))?" ) def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer) + """ super(PurgeHistoryRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.store = hs.get_datastore() @defer.inlineCallbacks def on_POST(self, request, room_id, event_id): @@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): delete_local_events = bool(body.get("delete_local_events", False)) + # establish the topological ordering we should keep events from. The + # user can provide an event_id in the URL or the request body, or can + # provide a timestamp in the request body. + if event_id is None: + event_id = body.get('purge_up_to_event_id') + + if event_id is not None: + event = yield self.store.get_event(event_id) + + if event.room_id != room_id: + raise SynapseError(400, "Event is for wrong room.") + + depth = event.depth + logger.info( + "[purge] purging up to depth %i (event_id %s)", + depth, event_id, + ) + elif 'purge_up_to_ts' in body: + ts = body['purge_up_to_ts'] + if not isinstance(ts, int): + raise SynapseError( + 400, "purge_up_to_ts must be an int", + errcode=Codes.BAD_JSON, + ) + + stream_ordering = ( + yield self.store.find_first_stream_ordering_after_ts(ts) + ) + + (_, depth, _) = ( + yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, + ) + ) + logger.info( + "[purge] purging up to depth %i (received_ts %i => " + "stream_ordering %i)", + depth, ts, stream_ordering, + ) + else: + raise SynapseError( + 400, + "must specify purge_up_to_event_id or purge_up_to_ts", + errcode=Codes.BAD_JSON, + ) + yield self.handlers.message_handler.purge_history( - room_id, event_id, + room_id, depth, delete_local_events=delete_local_events, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b78151cd82..2956c3b3e0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -415,6 +415,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + def get_room_event_after_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or after a stream ordering + + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + def _f(txn): + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering >= ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + txn.execute(sql, (room_id, stream_ordering, )) + return txn.fetchone() + + return self.runInteraction( + "get_room_event_after_stream_ordering", _f, + ) + @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): """Returns the current token for rooms stream.