diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index aa05b3f023..8ec8569a49 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -46,5 +46,37 @@ class WhoisRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class PurgeMediaCacheRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/admin/purge_media_cache") + + def __init__(self, hs): + self.media_repository = hs.get_media_repository() + super(PurgeMediaCacheRestServlet, self).__init__(hs) + + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + before_ts = request.args.get("before_ts", None) + if not before_ts: + raise SynapseError(400, "Missing 'before_ts' arg") + + logger.info("before_ts: %r", before_ts[0]) + + try: + before_ts = int(before_ts[0]) + except Exception: + raise SynapseError(400, "Invalid 'before_ts' arg") + + ret = yield self.media_repository.delete_old_remote_media(before_ts) + + defer.returnValue((200, ret)) + + def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) + PurgeMediaCacheRestServlet(hs).register(http_server) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 422ab86fb3..0137458f71 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -65,3 +65,9 @@ class MediaFilePaths(object): file_id[0:2], file_id[2:4], file_id[4:], file_name ) + + def remote_media_thumbnail_dir(self, server_name, file_id): + return os.path.join( + self.base_path, "remote_thumbnail", server_name, + file_id[0:2], file_id[2:4], file_id[4:], + ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 1a287b6fec..844628c121 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -30,11 +30,13 @@ from synapse.api.errors import SynapseError from twisted.internet import defer, threads -from synapse.util.async import ObservableDeferred +from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii from synapse.util.logcontext import preserve_context_over_fn import os +import errno +import shutil import cgi import logging @@ -47,7 +49,7 @@ UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000 class MediaRepository(object): - def __init__(self, hs, filepaths): + def __init__(self, hs): self.auth = hs.get_auth() self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() @@ -55,11 +57,12 @@ class MediaRepository(object): self.store = hs.get_datastore() self.max_upload_size = hs.config.max_upload_size self.max_image_pixels = hs.config.max_image_pixels - self.filepaths = filepaths - self.downloads = {} + self.filepaths = MediaFilePaths(hs.config.media_store_path) self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements + self.remote_media_linearizer = Linearizer() + self.recently_accessed_remotes = set() self.clock.looping_call( @@ -112,22 +115,12 @@ class MediaRepository(object): defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) + @defer.inlineCallbacks def get_remote_media(self, server_name, media_id): key = (server_name, media_id) - download = self.downloads.get(key) - if download is None: - download = self._get_remote_media_impl(server_name, media_id) - download = ObservableDeferred( - download, - consumeErrors=True - ) - self.downloads[key] = download - - @download.addBoth - def callback(media_info): - del self.downloads[key] - return media_info - return download.observe() + with (yield self.remote_media_linearizer.queue(key)): + media_info = yield self._get_remote_media_impl(server_name, media_id) + defer.returnValue(media_info) @defer.inlineCallbacks def _get_remote_media_impl(self, server_name, media_id): @@ -440,6 +433,52 @@ class MediaRepository(object): "height": m_height, }) + @defer.inlineCallbacks + def delete_old_remote_media(self, before_ts): + old_media = yield self.store.get_remote_media_before(before_ts) + + deleted = 0 + + for media in old_media: + origin = media["media_origin"] + media_id = media["media_id"] + file_id = media["filesystem_id"] + key = (origin, media_id) + + logger.info("Deleting: %r", key) + + with (yield self.remote_media_linearizer.queue(key)): + full_path = self.filepaths.remote_media_filepath(origin, file_id) + full_dir = os.path.dirname(full_path) + try: + os.remove(full_path) + except OSError as e: + logger.warn("Failed to remove file: %r", full_path) + if e.errno == errno.ENOENT: + pass + else: + continue + + try: + os.removedirs(full_dir) + except OSError: + pass + + thumbnail_dir = self.filepaths.remote_media_thumbnail_dir( + origin, file_id + ) + shutil.rmtree(thumbnail_dir, ignore_errors=True) + + yield self.store.delete_remote_media(origin, media_id) + try: + os.removedirs(thumbnail_dir) + except OSError: + pass + + deleted += 1 + + defer.returnValue({"deleted": deleted}) + class MediaRepositoryResource(Resource): """File uploading and downloading. @@ -488,9 +527,8 @@ class MediaRepositoryResource(Resource): def __init__(self, hs): Resource.__init__(self) - filepaths = MediaFilePaths(hs.config.media_store_path) - media_repo = MediaRepository(hs, filepaths) + media_repo = hs.get_media_repository() self.putChild("upload", UploadResource(hs, media_repo)) self.putChild("download", DownloadResource(hs, media_repo)) diff --git a/synapse/server.py b/synapse/server.py index dd4b81c658..d49a1a8a96 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -45,6 +45,7 @@ from synapse.crypto.keyring import Keyring from synapse.push.pusherpool import PusherPool from synapse.events.builder import EventBuilderFactory from synapse.api.filtering import Filtering +from synapse.rest.media.v1.media_repository import MediaRepository from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -113,6 +114,7 @@ class HomeServer(object): 'filtering', 'http_client_context_factory', 'simple_http_client', + 'media_repository', ] def __init__(self, hostname, **kwargs): @@ -233,6 +235,9 @@ class HomeServer(object): **self.db_config.get("args", {}) ) + def build_media_repository(self): + return MediaRepository(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 44e4d38307..4c0f82353d 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -205,3 +205,32 @@ class MediaRepositoryStore(SQLBaseStore): }, desc="store_remote_media_thumbnail", ) + + def get_remote_media_before(self, before_ts): + sql = ( + "SELECT media_origin, media_id, filesystem_id" + " FROM remote_media_cache" + " WHERE last_access_ts < ?" + ) + + return self._execute( + "get_remote_media_before", self.cursor_to_dict, sql, before_ts + ) + + def delete_remote_media(self, media_origin, media_id): + def delete_remote_media_txn(txn): + self._simple_delete_txn( + txn, + "remote_media_cache", + keyvalues={ + "media_origin": media_origin, "media_id": media_id + }, + ) + self._simple_delete_txn( + txn, + "remote_media_cache_thumbnails", + keyvalues={ + "media_origin": media_origin, "media_id": media_id + }, + ) + return self.runInteraction("delete_remote_media", delete_remote_media_txn)