From dd3092c3a357be2453e25293b5590d627f3cfc48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jan 2018 17:45:11 +0000 Subject: [PATCH] Use MediaStorage for local files --- synapse/rest/media/v1/download_resource.py | 24 +-- synapse/rest/media/v1/media_repository.py | 168 +++++++++------------ 2 files changed, 73 insertions(+), 119 deletions(-) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 6879249c8a..2a5afa9ff4 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -57,34 +57,12 @@ class DownloadResource(Resource): ) server_name, media_id, name = parse_media_id(request) if server_name == self.server_name: - yield self._respond_local_file(request, media_id, name) + yield self.media_repo.get_local_media(request, media_id, name) else: yield self._respond_remote_file( request, server_name, media_id, name ) - @defer.inlineCallbacks - def _respond_local_file(self, request, media_id, name): - media_info = yield self.store.get_local_media(media_id) - if not media_info or media_info["quarantined_by"]: - respond_404(request) - return - - media_type = media_info["media_type"] - media_length = media_info["media_length"] - upload_name = name if name else media_info["upload_name"] - if media_info["url_cache"]: - # TODO: Check the file still exists, if it doesn't we can redownload - # it from the url `media_info["url_cache"]` - file_path = self.filepaths.url_cache_filepath(media_id) - else: - file_path = self.filepaths.local_media_filepath(media_id) - - yield respond_with_file( - request, media_type, file_path, media_length, - upload_name=upload_name, - ) - @defer.inlineCallbacks def _respond_remote_file(self, request, server_name, media_id, name): # don't forward requests for remote media if allow_remote is false diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index eed9056a2f..6ad9320b69 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vecotr Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +19,7 @@ import twisted.internet.error import twisted.web.http from twisted.web.resource import Resource +from ._base import respond_404, RequestWriter, FileInfo, respond_with_responder from .upload_resource import UploadResource from .download_resource import DownloadResource from .thumbnail_resource import ThumbnailResource @@ -25,6 +27,7 @@ from .identicon_resource import IdenticonResource from .preview_url_resource import PreviewUrlResource from .filepath import MediaFilePaths from .thumbnailer import Thumbnailer +from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.util.stringutils import random_string @@ -33,7 +36,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \ from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination import os @@ -74,6 +77,8 @@ class MediaRepository(object): self.recently_accessed_remotes = set() + self.media_storage = MediaStorage(self.primary_base_path, self.filepaths) + self.clock.looping_call( self._update_recently_accessed_remotes, UPDATE_RECENTLY_ACCESSED_REMOTES_TS @@ -88,72 +93,6 @@ class MediaRepository(object): media, self.clock.time_msec() ) - @staticmethod - def _makedirs(filepath): - dirname = os.path.dirname(filepath) - if not os.path.exists(dirname): - os.makedirs(dirname) - - @staticmethod - def _write_file_synchronously(source, fname): - """Write `source` to the path `fname` synchronously. Should be called - from a thread. - - Args: - source: A file like object to be written - fname (str): Path to write to - """ - MediaRepository._makedirs(fname) - source.seek(0) # Ensure we read from the start of the file - with open(fname, "wb") as f: - shutil.copyfileobj(source, f) - - @defer.inlineCallbacks - def write_to_file_and_backup(self, source, path): - """Write `source` to the on disk media store, and also the backup store - if configured. - - Args: - source: A file like object that should be written - path (str): Relative path to write file to - - Returns: - Deferred[str]: the file path written to in the primary media store - """ - fname = os.path.join(self.primary_base_path, path) - - # Write to the main repository - yield make_deferred_yieldable(threads.deferToThread( - self._write_file_synchronously, source, fname, - )) - - # Write to backup repository - yield self.copy_to_backup(path) - - defer.returnValue(fname) - - @defer.inlineCallbacks - def copy_to_backup(self, path): - """Copy a file from the primary to backup media store, if configured. - - Args: - path(str): Relative path to write file to - """ - if self.backup_base_path: - primary_fname = os.path.join(self.primary_base_path, path) - backup_fname = os.path.join(self.backup_base_path, path) - - # We can either wait for successful writing to the backup repository - # or write in the background and immediately return - if self.synchronous_backup_media_store: - yield make_deferred_yieldable(threads.deferToThread( - shutil.copyfile, primary_fname, backup_fname, - )) - else: - preserve_fn(threads.deferToThread)( - shutil.copyfile, primary_fname, backup_fname, - ) - @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, auth_user): @@ -171,10 +110,13 @@ class MediaRepository(object): """ media_id = random_string(24) - fname = yield self.write_to_file_and_backup( - content, self.filepaths.local_media_filepath_rel(media_id) + file_info = FileInfo( + server_name=None, + file_id=media_id, ) + fname = yield self.media_storage.store_file(content, file_info) + logger.info("Stored local media in file %r", fname) yield self.store.store_local_media( @@ -194,6 +136,30 @@ class MediaRepository(object): defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) + @defer.inlineCallbacks + def get_local_media(self, request, media_id, name): + """Responds to reqests for local media, if exists, or returns 404. + """ + media_info = yield self.store.get_local_media(media_id) + if not media_info or media_info["quarantined_by"]: + respond_404(request) + return + + media_type = media_info["media_type"] + media_length = media_info["media_length"] + upload_name = name if name else media_info["upload_name"] + url_cache = media_info["url_cache"] + + file_info = FileInfo( + None, media_id, + url_cache=url_cache, + ) + + responder = yield self.media_storage.fetch_media(file_info) + yield respond_with_responder( + request, responder, media_type, media_length, upload_name, + ) + @defer.inlineCallbacks def get_remote_media(self, server_name, media_id): key = (server_name, media_id) @@ -368,11 +334,18 @@ class MediaRepository(object): if t_byte_source: try: - output_path = yield self.write_to_file_and_backup( - t_byte_source, - self.filepaths.local_media_thumbnail_rel( - media_id, t_width, t_height, t_type, t_method - ) + file_info = FileInfo( + server_name=None, + file_id=media_id, + thumbnail=True, + thumbnail_width=t_width, + thumbnail_height=t_height, + thumbnail_method=t_method, + thumbnail_type=t_type, + ) + + output_path = yield self.media_storage.store_file( + t_byte_source, file_info, ) finally: t_byte_source.close() @@ -400,11 +373,18 @@ class MediaRepository(object): if t_byte_source: try: - output_path = yield self.write_to_file_and_backup( - t_byte_source, - self.filepaths.remote_media_thumbnail_rel( - server_name, file_id, t_width, t_height, t_type, t_method - ) + file_info = FileInfo( + server_name=server_name, + file_id=media_id, + thumbnail=True, + thumbnail_width=t_width, + thumbnail_height=t_height, + thumbnail_method=t_method, + thumbnail_type=t_type, + ) + + output_path = yield self.media_storage.store_file( + t_byte_source, file_info, ) finally: t_byte_source.close() @@ -472,20 +452,6 @@ class MediaRepository(object): # Now we generate the thumbnails for each dimension, store it for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): - # Work out the correct file name for thumbnail - if server_name: - file_path = self.filepaths.remote_media_thumbnail_rel( - server_name, file_id, t_width, t_height, t_type, t_method - ) - elif url_cache: - file_path = self.filepaths.url_cache_thumbnail_rel( - media_id, t_width, t_height, t_type, t_method - ) - else: - file_path = self.filepaths.local_media_thumbnail_rel( - media_id, t_width, t_height, t_type, t_method - ) - # Generate the thumbnail if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( @@ -505,9 +471,19 @@ class MediaRepository(object): continue try: - # Write to disk - output_path = yield self.write_to_file_and_backup( - t_byte_source, file_path, + file_info = FileInfo( + server_name=server_name, + file_id=media_id, + thumbnail=True, + thumbnail_width=t_width, + thumbnail_height=t_height, + thumbnail_method=t_method, + thumbnail_type=t_type, + url_cache=url_cache, + ) + + output_path = yield self.media_storage.store_file( + t_byte_source, file_info, ) finally: t_byte_source.close()