mirror of
https://github.com/element-hq/synapse
synced 2024-09-28 18:52:40 +00:00
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into madlittlemods/msc3575-sliding-sync-filter-encrypted2
Conflicts: synapse/handlers/sliding_sync.py tests/handlers/test_sliding_sync.py
This commit is contained in:
commit
2e4627bf90
32 changed files with 993 additions and 298 deletions
2
changelog.d/17172.feature
Normal file
2
changelog.d/17172.feature
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
Support [MSC3916](https://github.com/matrix-org/matrix-spec-proposals/blob/rav/authentication-for-media/proposals/3916-authentication-for-media.md)
|
||||||
|
by adding a federation /download endpoint (#17172).
|
1
changelog.d/17266.misc
Normal file
1
changelog.d/17266.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add debug logging for when room keys are uploaded, including whether they are replacing other room keys.
|
1
changelog.d/17272.bugfix
Normal file
1
changelog.d/17272.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix wrong retention policy being used when filtering events.
|
1
changelog.d/17279.misc
Normal file
1
changelog.d/17279.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Re-organize Pydantic models and types used in handlers.
|
|
@ -47,9 +47,9 @@ from synapse.events.utils import (
|
||||||
validate_canonicaljson,
|
validate_canonicaljson,
|
||||||
)
|
)
|
||||||
from synapse.http.servlet import validate_json_object
|
from synapse.http.servlet import validate_json_object
|
||||||
from synapse.rest.models import RequestBodyModel
|
|
||||||
from synapse.storage.controllers.state import server_acl_evaluator_from_event
|
from synapse.storage.controllers.state import server_acl_evaluator_from_event
|
||||||
from synapse.types import EventID, JsonDict, RoomID, StrCollection, UserID
|
from synapse.types import EventID, JsonDict, RoomID, StrCollection, UserID
|
||||||
|
from synapse.types.rest import RequestBodyModel
|
||||||
|
|
||||||
|
|
||||||
class EventValidator:
|
class EventValidator:
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
# [This file includes modifications made by New Vector Limited]
|
# [This file includes modifications made by New Vector Limited]
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
|
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
|
||||||
|
|
||||||
|
@ -33,6 +34,7 @@ from synapse.federation.transport.server.federation import (
|
||||||
FEDERATION_SERVLET_CLASSES,
|
FEDERATION_SERVLET_CLASSES,
|
||||||
FederationAccountStatusServlet,
|
FederationAccountStatusServlet,
|
||||||
FederationUnstableClientKeysClaimServlet,
|
FederationUnstableClientKeysClaimServlet,
|
||||||
|
FederationUnstableMediaDownloadServlet,
|
||||||
)
|
)
|
||||||
from synapse.http.server import HttpServer, JsonResource
|
from synapse.http.server import HttpServer, JsonResource
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
|
@ -315,6 +317,28 @@ def register_servlets(
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if servletclass == FederationUnstableMediaDownloadServlet:
|
||||||
|
if (
|
||||||
|
not hs.config.server.enable_media_repo
|
||||||
|
or not hs.config.experimental.msc3916_authenticated_media_enabled
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# don't load the endpoint if the storage provider is incompatible
|
||||||
|
media_repo = hs.get_media_repository()
|
||||||
|
load_download_endpoint = True
|
||||||
|
for provider in media_repo.media_storage.storage_providers:
|
||||||
|
signature = inspect.signature(provider.backend.fetch)
|
||||||
|
if "federation" not in signature.parameters:
|
||||||
|
logger.warning(
|
||||||
|
f"Federation media `/download` endpoint will not be enabled as storage provider {provider.backend} is not compatible with this endpoint."
|
||||||
|
)
|
||||||
|
load_download_endpoint = False
|
||||||
|
break
|
||||||
|
|
||||||
|
if not load_download_endpoint:
|
||||||
|
continue
|
||||||
|
|
||||||
servletclass(
|
servletclass(
|
||||||
hs=hs,
|
hs=hs,
|
||||||
authenticator=authenticator,
|
authenticator=authenticator,
|
||||||
|
|
|
@ -360,9 +360,25 @@ class BaseFederationServlet:
|
||||||
"request"
|
"request"
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
if (
|
||||||
|
func.__self__.__class__.__name__ # type: ignore
|
||||||
|
== "FederationUnstableMediaDownloadServlet"
|
||||||
|
):
|
||||||
|
response = await func(
|
||||||
|
origin, content, request, *args, **kwargs
|
||||||
|
)
|
||||||
|
else:
|
||||||
response = await func(
|
response = await func(
|
||||||
origin, content, request.args, *args, **kwargs
|
origin, content, request.args, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
if (
|
||||||
|
func.__self__.__class__.__name__ # type: ignore
|
||||||
|
== "FederationUnstableMediaDownloadServlet"
|
||||||
|
):
|
||||||
|
response = await func(
|
||||||
|
origin, content, request, *args, **kwargs
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
response = await func(
|
response = await func(
|
||||||
origin, content, request.args, *args, **kwargs
|
origin, content, request.args, *args, **kwargs
|
||||||
|
|
|
@ -44,10 +44,13 @@ from synapse.federation.transport.server._base import (
|
||||||
)
|
)
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
parse_boolean_from_args,
|
parse_boolean_from_args,
|
||||||
|
parse_integer,
|
||||||
parse_integer_from_args,
|
parse_integer_from_args,
|
||||||
parse_string_from_args,
|
parse_string_from_args,
|
||||||
parse_strings_from_args,
|
parse_strings_from_args,
|
||||||
)
|
)
|
||||||
|
from synapse.http.site import SynapseRequest
|
||||||
|
from synapse.media._base import DEFAULT_MAX_TIMEOUT_MS, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
from synapse.util import SYNAPSE_VERSION
|
from synapse.util import SYNAPSE_VERSION
|
||||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||||
|
@ -787,6 +790,43 @@ class FederationAccountStatusServlet(BaseFederationServerServlet):
|
||||||
return 200, {"account_statuses": statuses, "failures": failures}
|
return 200, {"account_statuses": statuses, "failures": failures}
|
||||||
|
|
||||||
|
|
||||||
|
class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet):
|
||||||
|
"""
|
||||||
|
Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns
|
||||||
|
a multipart/form-data response consisting of a JSON object and the requested media
|
||||||
|
item. This endpoint only returns local media.
|
||||||
|
"""
|
||||||
|
|
||||||
|
PATH = "/media/download/(?P<media_id>[^/]*)"
|
||||||
|
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916"
|
||||||
|
RATELIMIT = True
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
hs: "HomeServer",
|
||||||
|
ratelimiter: FederationRateLimiter,
|
||||||
|
authenticator: Authenticator,
|
||||||
|
server_name: str,
|
||||||
|
):
|
||||||
|
super().__init__(hs, authenticator, ratelimiter, server_name)
|
||||||
|
self.media_repo = self.hs.get_media_repository()
|
||||||
|
|
||||||
|
async def on_GET(
|
||||||
|
self,
|
||||||
|
origin: Optional[str],
|
||||||
|
content: Literal[None],
|
||||||
|
request: SynapseRequest,
|
||||||
|
media_id: str,
|
||||||
|
) -> None:
|
||||||
|
max_timeout_ms = parse_integer(
|
||||||
|
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
|
||||||
|
)
|
||||||
|
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
|
||||||
|
await self.media_repo.get_local_media(
|
||||||
|
request, media_id, None, max_timeout_ms, federation=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
|
FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
|
||||||
FederationSendServlet,
|
FederationSendServlet,
|
||||||
FederationEventServlet,
|
FederationEventServlet,
|
||||||
|
@ -818,4 +858,5 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
|
||||||
FederationV1SendKnockServlet,
|
FederationV1SendKnockServlet,
|
||||||
FederationMakeKnockServlet,
|
FederationMakeKnockServlet,
|
||||||
FederationAccountStatusServlet,
|
FederationAccountStatusServlet,
|
||||||
|
FederationUnstableMediaDownloadServlet,
|
||||||
)
|
)
|
||||||
|
|
|
@ -247,6 +247,12 @@ class E2eRoomKeysHandler:
|
||||||
if current_room_key:
|
if current_room_key:
|
||||||
if self._should_replace_room_key(current_room_key, room_key):
|
if self._should_replace_room_key(current_room_key, room_key):
|
||||||
log_kv({"message": "Replacing room key."})
|
log_kv({"message": "Replacing room key."})
|
||||||
|
logger.debug(
|
||||||
|
"Replacing room key. room=%s session=%s user=%s",
|
||||||
|
room_id,
|
||||||
|
session_id,
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
# updates are done one at a time in the DB, so send
|
# updates are done one at a time in the DB, so send
|
||||||
# updates right away rather than batching them up,
|
# updates right away rather than batching them up,
|
||||||
# like we do with the inserts
|
# like we do with the inserts
|
||||||
|
@ -256,6 +262,12 @@ class E2eRoomKeysHandler:
|
||||||
changed = True
|
changed = True
|
||||||
else:
|
else:
|
||||||
log_kv({"message": "Not replacing room_key."})
|
log_kv({"message": "Not replacing room_key."})
|
||||||
|
logger.debug(
|
||||||
|
"Not replacing room key. room=%s session=%s user=%s",
|
||||||
|
room_id,
|
||||||
|
session_id,
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log_kv(
|
log_kv(
|
||||||
{
|
{
|
||||||
|
@ -265,6 +277,12 @@ class E2eRoomKeysHandler:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
log_kv({"message": "Replacing room key."})
|
log_kv({"message": "Replacing room key."})
|
||||||
|
logger.debug(
|
||||||
|
"Inserting new room key. room=%s session=%s user=%s",
|
||||||
|
room_id,
|
||||||
|
session_id,
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
to_insert.append((room_id, session_id, room_key))
|
to_insert.append((room_id, session_id, room_key))
|
||||||
changed = True
|
changed = True
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,10 @@ from synapse.types import (
|
||||||
JsonMapping,
|
JsonMapping,
|
||||||
Requester,
|
Requester,
|
||||||
ScheduledTask,
|
ScheduledTask,
|
||||||
ShutdownRoomParams,
|
|
||||||
ShutdownRoomResponse,
|
|
||||||
StreamKeyType,
|
StreamKeyType,
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
|
from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util.async_helpers import ReadWriteLock
|
from synapse.util.async_helpers import ReadWriteLock
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
|
@ -80,8 +80,6 @@ from synapse.types import (
|
||||||
RoomAlias,
|
RoomAlias,
|
||||||
RoomID,
|
RoomID,
|
||||||
RoomStreamToken,
|
RoomStreamToken,
|
||||||
ShutdownRoomParams,
|
|
||||||
ShutdownRoomResponse,
|
|
||||||
StateMap,
|
StateMap,
|
||||||
StrCollection,
|
StrCollection,
|
||||||
StreamKeyType,
|
StreamKeyType,
|
||||||
|
@ -89,6 +87,7 @@ from synapse.types import (
|
||||||
UserID,
|
UserID,
|
||||||
create_requester,
|
create_requester,
|
||||||
)
|
)
|
||||||
|
from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util import stringutils
|
from synapse.util import stringutils
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
|
|
|
@ -18,23 +18,14 @@
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
import logging
|
import logging
|
||||||
from enum import Enum
|
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
|
||||||
from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Tuple
|
|
||||||
|
|
||||||
import attr
|
|
||||||
from immutabledict import immutabledict
|
from immutabledict import immutabledict
|
||||||
|
|
||||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
|
||||||
|
|
||||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
|
||||||
from pydantic.v1 import Extra
|
|
||||||
else:
|
|
||||||
from pydantic import Extra
|
|
||||||
|
|
||||||
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
|
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.rest.client.models import SlidingSyncBody
|
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
|
||||||
from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID
|
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -63,166 +54,6 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
|
||||||
return membership != Membership.LEAVE or sender != user_id
|
return membership != Membership.LEAVE or sender != user_id
|
||||||
|
|
||||||
|
|
||||||
class SlidingSyncConfig(SlidingSyncBody):
|
|
||||||
"""
|
|
||||||
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
|
||||||
extra fields that we need in the handler
|
|
||||||
"""
|
|
||||||
|
|
||||||
user: UserID
|
|
||||||
device_id: Optional[str]
|
|
||||||
|
|
||||||
# Pydantic config
|
|
||||||
class Config:
|
|
||||||
# By default, ignore fields that we don't recognise.
|
|
||||||
extra = Extra.ignore
|
|
||||||
# By default, don't allow fields to be reassigned after parsing.
|
|
||||||
allow_mutation = False
|
|
||||||
# Allow custom types like `UserID` to be used in the model
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
|
|
||||||
|
|
||||||
class OperationType(Enum):
|
|
||||||
"""
|
|
||||||
Represents the operation types in a Sliding Sync window.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
|
||||||
entries in this range.
|
|
||||||
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
|
||||||
entries to the left or the right depending on where the closest empty space is.
|
|
||||||
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
|
||||||
places.
|
|
||||||
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
|
||||||
offline support, but they should be treated as empty when additional operations
|
|
||||||
which concern indexes in the range arrive from the server.
|
|
||||||
"""
|
|
||||||
|
|
||||||
SYNC: Final = "SYNC"
|
|
||||||
INSERT: Final = "INSERT"
|
|
||||||
DELETE: Final = "DELETE"
|
|
||||||
INVALIDATE: Final = "INVALIDATE"
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class SlidingSyncResult:
|
|
||||||
"""
|
|
||||||
The Sliding Sync result to be serialized to JSON for a response.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
next_pos: The next position token in the sliding window to request (next_batch).
|
|
||||||
lists: Sliding window API. A map of list key to list results.
|
|
||||||
rooms: Room subscription API. A map of room ID to room subscription to room results.
|
|
||||||
extensions: Extensions API. A map of extension key to extension results.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class RoomResult:
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
name: Room name or calculated room name.
|
|
||||||
avatar: Room avatar
|
|
||||||
heroes: List of stripped membership events (containing `user_id` and optionally
|
|
||||||
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
|
||||||
initial: Flag which is set when this is the first time the server is sending this
|
|
||||||
data on this connection. Clients can use this flag to replace or update
|
|
||||||
their local state. When there is an update, servers MUST omit this flag
|
|
||||||
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
|
||||||
absence of this flag means 'false'.
|
|
||||||
required_state: The current state of the room
|
|
||||||
timeline: Latest events in the room. The last event is the most recent
|
|
||||||
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
|
||||||
between two people).
|
|
||||||
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
|
|
||||||
in sync v2, absent on joined/left rooms
|
|
||||||
prev_batch: A token that can be passed as a start parameter to the
|
|
||||||
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
|
||||||
limited: True if their are more events than fit between the given position and now.
|
|
||||||
Sync again to get more.
|
|
||||||
joined_count: The number of users with membership of join, including the client's
|
|
||||||
own user ID. (same as sync `v2 m.joined_member_count`)
|
|
||||||
invited_count: The number of users with membership of invite. (same as sync v2
|
|
||||||
`m.invited_member_count`)
|
|
||||||
notification_count: The total number of unread notifications for this room. (same
|
|
||||||
as sync v2)
|
|
||||||
highlight_count: The number of unread notifications for this room with the highlight
|
|
||||||
flag set. (same as sync v2)
|
|
||||||
num_live: The number of timeline events which have just occurred and are not historical.
|
|
||||||
The last N events are 'live' and should be treated as such. This is mostly
|
|
||||||
useful to determine whether a given @mention event should make a noise or not.
|
|
||||||
Clients cannot rely solely on the absence of `initial: true` to determine live
|
|
||||||
events because if a room not in the sliding window bumps into the window because
|
|
||||||
of an @mention it will have `initial: true` yet contain a single live event
|
|
||||||
(with potentially other old events in the timeline).
|
|
||||||
"""
|
|
||||||
|
|
||||||
name: str
|
|
||||||
avatar: Optional[str]
|
|
||||||
heroes: Optional[List[EventBase]]
|
|
||||||
initial: bool
|
|
||||||
required_state: List[EventBase]
|
|
||||||
timeline: List[EventBase]
|
|
||||||
is_dm: bool
|
|
||||||
invite_state: List[EventBase]
|
|
||||||
prev_batch: StreamToken
|
|
||||||
limited: bool
|
|
||||||
joined_count: int
|
|
||||||
invited_count: int
|
|
||||||
notification_count: int
|
|
||||||
highlight_count: int
|
|
||||||
num_live: int
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class SlidingWindowList:
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
count: The total number of entries in the list. Always present if this list
|
|
||||||
is.
|
|
||||||
ops: The sliding list operations to perform.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class Operation:
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
op: The operation type to perform.
|
|
||||||
range: Which index positions are affected by this operation. These are
|
|
||||||
both inclusive.
|
|
||||||
room_ids: Which room IDs are affected by this operation. These IDs match
|
|
||||||
up to the positions in the `range`, so the last room ID in this list
|
|
||||||
matches the 9th index. The room data is held in a separate object.
|
|
||||||
"""
|
|
||||||
|
|
||||||
op: OperationType
|
|
||||||
range: Tuple[int, int]
|
|
||||||
room_ids: List[str]
|
|
||||||
|
|
||||||
count: int
|
|
||||||
ops: List[Operation]
|
|
||||||
|
|
||||||
next_pos: StreamToken
|
|
||||||
lists: Dict[str, SlidingWindowList]
|
|
||||||
rooms: Dict[str, RoomResult]
|
|
||||||
extensions: JsonMapping
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
"""Make the result appear empty if there are no updates. This is used
|
|
||||||
to tell if the notifier needs to wait for more events when polling for
|
|
||||||
events.
|
|
||||||
"""
|
|
||||||
return bool(self.lists or self.rooms or self.extensions)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
|
|
||||||
"Return a new empty result"
|
|
||||||
return SlidingSyncResult(
|
|
||||||
next_pos=next_pos,
|
|
||||||
lists={},
|
|
||||||
rooms={},
|
|
||||||
extensions={},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class SlidingSyncHandler:
|
class SlidingSyncHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
@ -240,9 +71,19 @@ class SlidingSyncHandler:
|
||||||
from_token: Optional[StreamToken] = None,
|
from_token: Optional[StreamToken] = None,
|
||||||
timeout_ms: int = 0,
|
timeout_ms: int = 0,
|
||||||
) -> SlidingSyncResult:
|
) -> SlidingSyncResult:
|
||||||
"""Get the sync for a client if we have new data for it now. Otherwise
|
"""
|
||||||
|
Get the sync for a client if we have new data for it now. Otherwise
|
||||||
wait for new data to arrive on the server. If the timeout expires, then
|
wait for new data to arrive on the server. If the timeout expires, then
|
||||||
return an empty sync result.
|
return an empty sync result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requester: The user making the request
|
||||||
|
sync_config: Sync configuration
|
||||||
|
from_token: The point in the stream to sync from. Token of the end of the
|
||||||
|
previous batch. May be `None` if this is the initial sync request.
|
||||||
|
timeout_ms: The time in milliseconds to wait for new data to arrive. If 0,
|
||||||
|
we will immediately but there might not be any new data so we just return an
|
||||||
|
empty response.
|
||||||
"""
|
"""
|
||||||
# If the user is not part of the mau group, then check that limits have
|
# If the user is not part of the mau group, then check that limits have
|
||||||
# not been exceeded (if not part of the group by this point, almost certain
|
# not been exceeded (if not part of the group by this point, almost certain
|
||||||
|
@ -314,6 +155,14 @@ class SlidingSyncHandler:
|
||||||
"""
|
"""
|
||||||
Generates the response body of a Sliding Sync result, represented as a
|
Generates the response body of a Sliding Sync result, represented as a
|
||||||
`SlidingSyncResult`.
|
`SlidingSyncResult`.
|
||||||
|
|
||||||
|
We fetch data according to the token range (> `from_token` and <= `to_token`).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sync_config: Sync configuration
|
||||||
|
to_token: The point in the stream to sync up to.
|
||||||
|
from_token: The point in the stream to sync from. Token of the end of the
|
||||||
|
previous batch. May be `None` if this is the initial sync request.
|
||||||
"""
|
"""
|
||||||
user_id = sync_config.user.to_string()
|
user_id = sync_config.user.to_string()
|
||||||
app_service = self.store.get_app_service_by_user_id(user_id)
|
app_service = self.store.get_app_service_by_user_id(user_id)
|
||||||
|
@ -337,9 +186,6 @@ class SlidingSyncHandler:
|
||||||
# Apply filters
|
# Apply filters
|
||||||
filtered_room_ids = room_id_set
|
filtered_room_ids = room_id_set
|
||||||
if list_config.filters is not None:
|
if list_config.filters is not None:
|
||||||
# TODO: To be absolutely correct, this could also take into account
|
|
||||||
# from/to tokens but some of the streams don't support looking back
|
|
||||||
# in time (like global account_data).
|
|
||||||
filtered_room_ids = await self.filter_rooms(
|
filtered_room_ids = await self.filter_rooms(
|
||||||
sync_config.user, room_id_set, list_config.filters, to_token
|
sync_config.user, room_id_set, list_config.filters, to_token
|
||||||
)
|
)
|
||||||
|
@ -392,6 +238,12 @@ class SlidingSyncHandler:
|
||||||
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
|
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
|
||||||
to tell when a room was forgotten at the moment so we can't factor it into the
|
to tell when a room was forgotten at the moment so we can't factor it into the
|
||||||
from/to range.
|
from/to range.
|
||||||
|
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user: User to fetch rooms for
|
||||||
|
to_token: The token to fetch rooms up to.
|
||||||
|
from_token: The point in the stream to sync from.
|
||||||
"""
|
"""
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
|
|
||||||
|
@ -646,6 +498,9 @@ class SlidingSyncHandler:
|
||||||
# `is_direct` on membership events because that property only appears for
|
# `is_direct` on membership events because that property only appears for
|
||||||
# the invitee membership event (doesn't show up for the inviter). Account
|
# the invitee membership event (doesn't show up for the inviter). Account
|
||||||
# data is set by the client so it needs to be scrutinized.
|
# data is set by the client so it needs to be scrutinized.
|
||||||
|
#
|
||||||
|
# We're unable to take `to_token` into account for global account data since
|
||||||
|
# we only keep track of the latest account data for the user.
|
||||||
dm_map = await self.store.get_global_account_data_by_type_for_user(
|
dm_map = await self.store.get_global_account_data_by_type_for_user(
|
||||||
user_id, AccountDataTypes.DIRECT
|
user_id, AccountDataTypes.DIRECT
|
||||||
)
|
)
|
||||||
|
|
|
@ -2540,13 +2540,15 @@ class SyncHandler:
|
||||||
newly_left_rooms.append(room_id)
|
newly_left_rooms.append(room_id)
|
||||||
else:
|
else:
|
||||||
if not old_state_ids:
|
if not old_state_ids:
|
||||||
old_state_ids = await self._state_storage_controller.get_state_at(
|
old_state_ids = (
|
||||||
|
await self._state_storage_controller.get_state_at(
|
||||||
room_id,
|
room_id,
|
||||||
since_token,
|
since_token,
|
||||||
state_filter=StateFilter.from_types(
|
state_filter=StateFilter.from_types(
|
||||||
[(EventTypes.Member, user_id)]
|
[(EventTypes.Member, user_id)]
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
)
|
||||||
old_mem_ev_id = old_state_ids.get(
|
old_mem_ev_id = old_state_ids.get(
|
||||||
(EventTypes.Member, user_id), None
|
(EventTypes.Member, user_id), None
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,7 +25,16 @@ import os
|
||||||
import urllib
|
import urllib
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Awaitable,
|
||||||
|
Dict,
|
||||||
|
Generator,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Tuple,
|
||||||
|
Type,
|
||||||
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
|
@ -39,6 +48,11 @@ from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.util.stringutils import is_ascii
|
from synapse.util.stringutils import is_ascii
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.media.media_storage import MultipartResponder
|
||||||
|
from synapse.storage.databases.main.media_repository import LocalMedia
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# list all text content types that will have the charset default to UTF-8 when
|
# list all text content types that will have the charset default to UTF-8 when
|
||||||
|
@ -260,6 +274,53 @@ def _can_encode_filename_as_token(x: str) -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def respond_with_multipart_responder(
|
||||||
|
request: SynapseRequest,
|
||||||
|
responder: "Optional[MultipartResponder]",
|
||||||
|
media_info: "LocalMedia",
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Responds via a Multipart responder for the federation media `/download` requests
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: the federation request to respond to
|
||||||
|
responder: the Multipart responder which will send the response
|
||||||
|
media_info: metadata about the media item
|
||||||
|
"""
|
||||||
|
if not responder:
|
||||||
|
respond_404(request)
|
||||||
|
return
|
||||||
|
|
||||||
|
# If we have a responder we *must* use it as a context manager.
|
||||||
|
with responder:
|
||||||
|
if request._disconnected:
|
||||||
|
logger.warning(
|
||||||
|
"Not sending response to request %s, already disconnected.", request
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug("Responding to media request with responder %s", responder)
|
||||||
|
if media_info.media_length is not None:
|
||||||
|
request.setHeader(b"Content-Length", b"%d" % (media_info.media_length,))
|
||||||
|
request.setHeader(
|
||||||
|
b"Content-Type", b"multipart/mixed; boundary=%s" % responder.boundary
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await responder.write_to_consumer(request)
|
||||||
|
except Exception as e:
|
||||||
|
# The majority of the time this will be due to the client having gone
|
||||||
|
# away. Unfortunately, Twisted simply throws a generic exception at us
|
||||||
|
# in that case.
|
||||||
|
logger.warning("Failed to write to consumer: %s %s", type(e), e)
|
||||||
|
|
||||||
|
# Unregister the producer, if it has one, so Twisted doesn't complain
|
||||||
|
if request.producer:
|
||||||
|
request.unregisterProducer()
|
||||||
|
|
||||||
|
finish_request(request)
|
||||||
|
|
||||||
|
|
||||||
async def respond_with_responder(
|
async def respond_with_responder(
|
||||||
request: SynapseRequest,
|
request: SynapseRequest,
|
||||||
responder: "Optional[Responder]",
|
responder: "Optional[Responder]",
|
||||||
|
|
|
@ -54,10 +54,11 @@ from synapse.media._base import (
|
||||||
ThumbnailInfo,
|
ThumbnailInfo,
|
||||||
get_filename_from_headers,
|
get_filename_from_headers,
|
||||||
respond_404,
|
respond_404,
|
||||||
|
respond_with_multipart_responder,
|
||||||
respond_with_responder,
|
respond_with_responder,
|
||||||
)
|
)
|
||||||
from synapse.media.filepath import MediaFilePaths
|
from synapse.media.filepath import MediaFilePaths
|
||||||
from synapse.media.media_storage import MediaStorage
|
from synapse.media.media_storage import MediaStorage, MultipartResponder
|
||||||
from synapse.media.storage_provider import StorageProviderWrapper
|
from synapse.media.storage_provider import StorageProviderWrapper
|
||||||
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
|
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
|
||||||
from synapse.media.url_previewer import UrlPreviewer
|
from synapse.media.url_previewer import UrlPreviewer
|
||||||
|
@ -429,6 +430,7 @@ class MediaRepository:
|
||||||
media_id: str,
|
media_id: str,
|
||||||
name: Optional[str],
|
name: Optional[str],
|
||||||
max_timeout_ms: int,
|
max_timeout_ms: int,
|
||||||
|
federation: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Responds to requests for local media, if exists, or returns 404.
|
"""Responds to requests for local media, if exists, or returns 404.
|
||||||
|
|
||||||
|
@ -440,6 +442,7 @@ class MediaRepository:
|
||||||
the filename in the Content-Disposition header of the response.
|
the filename in the Content-Disposition header of the response.
|
||||||
max_timeout_ms: the maximum number of milliseconds to wait for the
|
max_timeout_ms: the maximum number of milliseconds to wait for the
|
||||||
media to be uploaded.
|
media to be uploaded.
|
||||||
|
federation: whether the local media being fetched is for a federation request
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Resolves once a response has successfully been written to request
|
Resolves once a response has successfully been written to request
|
||||||
|
@ -459,7 +462,14 @@ class MediaRepository:
|
||||||
|
|
||||||
file_info = FileInfo(None, media_id, url_cache=bool(url_cache))
|
file_info = FileInfo(None, media_id, url_cache=bool(url_cache))
|
||||||
|
|
||||||
responder = await self.media_storage.fetch_media(file_info)
|
responder = await self.media_storage.fetch_media(
|
||||||
|
file_info, media_info, federation
|
||||||
|
)
|
||||||
|
if federation:
|
||||||
|
# this really should be a Multipart responder but just in case
|
||||||
|
assert isinstance(responder, MultipartResponder)
|
||||||
|
await respond_with_multipart_responder(request, responder, media_info)
|
||||||
|
else:
|
||||||
await respond_with_responder(
|
await respond_with_responder(
|
||||||
request, responder, media_type, media_length, upload_name
|
request, responder, media_type, media_length, upload_name
|
||||||
)
|
)
|
||||||
|
|
|
@ -19,9 +19,12 @@
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
from contextlib import closing
|
||||||
|
from io import BytesIO
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import (
|
from typing import (
|
||||||
IO,
|
IO,
|
||||||
|
@ -30,14 +33,19 @@ from typing import (
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
BinaryIO,
|
BinaryIO,
|
||||||
Callable,
|
Callable,
|
||||||
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
Sequence,
|
Sequence,
|
||||||
Tuple,
|
Tuple,
|
||||||
Type,
|
Type,
|
||||||
|
Union,
|
||||||
)
|
)
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
from zope.interface import implementer
|
||||||
|
|
||||||
|
from twisted.internet import defer, interfaces
|
||||||
from twisted.internet.defer import Deferred
|
from twisted.internet.defer import Deferred
|
||||||
from twisted.internet.interfaces import IConsumer
|
from twisted.internet.interfaces import IConsumer
|
||||||
from twisted.protocols.basic import FileSender
|
from twisted.protocols.basic import FileSender
|
||||||
|
@ -48,15 +56,19 @@ from synapse.logging.opentracing import start_active_span, trace, trace_with_opn
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||||
|
|
||||||
|
from ..storage.databases.main.media_repository import LocalMedia
|
||||||
|
from ..types import JsonDict
|
||||||
from ._base import FileInfo, Responder
|
from ._base import FileInfo, Responder
|
||||||
from .filepath import MediaFilePaths
|
from .filepath import MediaFilePaths
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.media.storage_provider import StorageProvider
|
from synapse.media.storage_provider import StorageProviderWrapper
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CRLF = b"\r\n"
|
||||||
|
|
||||||
|
|
||||||
class MediaStorage:
|
class MediaStorage:
|
||||||
"""Responsible for storing/fetching files from local sources.
|
"""Responsible for storing/fetching files from local sources.
|
||||||
|
@ -73,7 +85,7 @@ class MediaStorage:
|
||||||
hs: "HomeServer",
|
hs: "HomeServer",
|
||||||
local_media_directory: str,
|
local_media_directory: str,
|
||||||
filepaths: MediaFilePaths,
|
filepaths: MediaFilePaths,
|
||||||
storage_providers: Sequence["StorageProvider"],
|
storage_providers: Sequence["StorageProviderWrapper"],
|
||||||
):
|
):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.reactor = hs.get_reactor()
|
self.reactor = hs.get_reactor()
|
||||||
|
@ -169,15 +181,23 @@ class MediaStorage:
|
||||||
|
|
||||||
raise e from None
|
raise e from None
|
||||||
|
|
||||||
async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
|
async def fetch_media(
|
||||||
|
self,
|
||||||
|
file_info: FileInfo,
|
||||||
|
media_info: Optional[LocalMedia] = None,
|
||||||
|
federation: bool = False,
|
||||||
|
) -> Optional[Responder]:
|
||||||
"""Attempts to fetch media described by file_info from the local cache
|
"""Attempts to fetch media described by file_info from the local cache
|
||||||
and configured storage providers.
|
and configured storage providers.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_info
|
file_info: Metadata about the media file
|
||||||
|
media_info: Metadata about the media item
|
||||||
|
federation: Whether this file is being fetched for a federation request
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Returns a Responder if the file was found, otherwise None.
|
If the file was found returns a Responder (a Multipart Responder if the requested
|
||||||
|
file is for the federation /download endpoint), otherwise None.
|
||||||
"""
|
"""
|
||||||
paths = [self._file_info_to_path(file_info)]
|
paths = [self._file_info_to_path(file_info)]
|
||||||
|
|
||||||
|
@ -197,12 +217,19 @@ class MediaStorage:
|
||||||
local_path = os.path.join(self.local_media_directory, path)
|
local_path = os.path.join(self.local_media_directory, path)
|
||||||
if os.path.exists(local_path):
|
if os.path.exists(local_path):
|
||||||
logger.debug("responding with local file %s", local_path)
|
logger.debug("responding with local file %s", local_path)
|
||||||
|
if federation:
|
||||||
|
assert media_info is not None
|
||||||
|
boundary = uuid4().hex.encode("ascii")
|
||||||
|
return MultipartResponder(
|
||||||
|
open(local_path, "rb"), media_info, boundary
|
||||||
|
)
|
||||||
|
else:
|
||||||
return FileResponder(open(local_path, "rb"))
|
return FileResponder(open(local_path, "rb"))
|
||||||
logger.debug("local file %s did not exist", local_path)
|
logger.debug("local file %s did not exist", local_path)
|
||||||
|
|
||||||
for provider in self.storage_providers:
|
for provider in self.storage_providers:
|
||||||
for path in paths:
|
for path in paths:
|
||||||
res: Any = await provider.fetch(path, file_info)
|
res: Any = await provider.fetch(path, file_info, media_info, federation)
|
||||||
if res:
|
if res:
|
||||||
logger.debug("Streaming %s from %s", path, provider)
|
logger.debug("Streaming %s from %s", path, provider)
|
||||||
return res
|
return res
|
||||||
|
@ -316,7 +343,7 @@ class FileResponder(Responder):
|
||||||
"""Wraps an open file that can be sent to a request.
|
"""Wraps an open file that can be sent to a request.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
open_file: A file like object to be streamed ot the client,
|
open_file: A file like object to be streamed to the client,
|
||||||
is closed when finished streaming.
|
is closed when finished streaming.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -337,6 +364,38 @@ class FileResponder(Responder):
|
||||||
self.open_file.close()
|
self.open_file.close()
|
||||||
|
|
||||||
|
|
||||||
|
class MultipartResponder(Responder):
|
||||||
|
"""Wraps an open file, formats the response according to MSC3916 and sends it to a
|
||||||
|
federation request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
open_file: A file like object to be streamed to the client,
|
||||||
|
is closed when finished streaming.
|
||||||
|
media_info: metadata about the media item
|
||||||
|
boundary: bytes to use for the multipart response boundary
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, open_file: IO, media_info: LocalMedia, boundary: bytes) -> None:
|
||||||
|
self.open_file = open_file
|
||||||
|
self.media_info = media_info
|
||||||
|
self.boundary = boundary
|
||||||
|
|
||||||
|
def write_to_consumer(self, consumer: IConsumer) -> Deferred:
|
||||||
|
return make_deferred_yieldable(
|
||||||
|
MultipartFileSender().beginFileTransfer(
|
||||||
|
self.open_file, consumer, self.media_info.media_type, {}, self.boundary
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def __exit__(
|
||||||
|
self,
|
||||||
|
exc_type: Optional[Type[BaseException]],
|
||||||
|
exc_val: Optional[BaseException],
|
||||||
|
exc_tb: Optional[TracebackType],
|
||||||
|
) -> None:
|
||||||
|
self.open_file.close()
|
||||||
|
|
||||||
|
|
||||||
class SpamMediaException(NotFoundError):
|
class SpamMediaException(NotFoundError):
|
||||||
"""The media was blocked by a spam checker, so we simply 404 the request (in
|
"""The media was blocked by a spam checker, so we simply 404 the request (in
|
||||||
the same way as if it was quarantined).
|
the same way as if it was quarantined).
|
||||||
|
@ -370,3 +429,151 @@ class ReadableFileWrapper:
|
||||||
|
|
||||||
# We yield to the reactor by sleeping for 0 seconds.
|
# We yield to the reactor by sleeping for 0 seconds.
|
||||||
await self.clock.sleep(0)
|
await self.clock.sleep(0)
|
||||||
|
|
||||||
|
|
||||||
|
@implementer(interfaces.IProducer)
|
||||||
|
class MultipartFileSender:
|
||||||
|
"""
|
||||||
|
A producer that sends the contents of a file to a federation request in the format
|
||||||
|
outlined in MSC3916 - a multipart/format-data response where the first field is a
|
||||||
|
JSON object and the second is the requested file.
|
||||||
|
|
||||||
|
This is a slight re-writing of twisted.protocols.basic.FileSender to achieve the format
|
||||||
|
outlined above.
|
||||||
|
"""
|
||||||
|
|
||||||
|
CHUNK_SIZE = 2**14
|
||||||
|
|
||||||
|
lastSent = ""
|
||||||
|
deferred: Optional[defer.Deferred] = None
|
||||||
|
|
||||||
|
def beginFileTransfer(
|
||||||
|
self,
|
||||||
|
file: IO,
|
||||||
|
consumer: IConsumer,
|
||||||
|
file_content_type: str,
|
||||||
|
json_object: JsonDict,
|
||||||
|
boundary: bytes,
|
||||||
|
) -> Deferred:
|
||||||
|
"""
|
||||||
|
Begin transferring a file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file: The file object to read data from
|
||||||
|
consumer: The synapse request to write the data to
|
||||||
|
file_content_type: The content-type of the file
|
||||||
|
json_object: The JSON object to write to the first field of the response
|
||||||
|
boundary: bytes to be used as the multipart/form-data boundary
|
||||||
|
|
||||||
|
Returns: A deferred whose callback will be invoked when the file has
|
||||||
|
been completely written to the consumer. The last byte written to the
|
||||||
|
consumer is passed to the callback.
|
||||||
|
"""
|
||||||
|
self.file: Optional[IO] = file
|
||||||
|
self.consumer = consumer
|
||||||
|
self.json_field = json_object
|
||||||
|
self.json_field_written = False
|
||||||
|
self.content_type_written = False
|
||||||
|
self.file_content_type = file_content_type
|
||||||
|
self.boundary = boundary
|
||||||
|
self.deferred: Deferred = defer.Deferred()
|
||||||
|
self.consumer.registerProducer(self, False)
|
||||||
|
# while it's not entirely clear why this assignment is necessary, it mirrors
|
||||||
|
# the behavior in FileSender.beginFileTransfer and thus is preserved here
|
||||||
|
deferred = self.deferred
|
||||||
|
return deferred
|
||||||
|
|
||||||
|
def resumeProducing(self) -> None:
|
||||||
|
# write the first field, which will always be a json field
|
||||||
|
if not self.json_field_written:
|
||||||
|
self.consumer.write(CRLF + b"--" + self.boundary + CRLF)
|
||||||
|
|
||||||
|
content_type = Header(b"Content-Type", b"application/json")
|
||||||
|
self.consumer.write(bytes(content_type) + CRLF)
|
||||||
|
|
||||||
|
json_field = json.dumps(self.json_field)
|
||||||
|
json_bytes = json_field.encode("utf-8")
|
||||||
|
self.consumer.write(json_bytes)
|
||||||
|
self.consumer.write(CRLF + b"--" + self.boundary + CRLF)
|
||||||
|
|
||||||
|
self.json_field_written = True
|
||||||
|
|
||||||
|
chunk: Any = ""
|
||||||
|
if self.file:
|
||||||
|
# if we haven't written the content type yet, do so
|
||||||
|
if not self.content_type_written:
|
||||||
|
type = self.file_content_type.encode("utf-8")
|
||||||
|
content_type = Header(b"Content-Type", type)
|
||||||
|
self.consumer.write(bytes(content_type) + CRLF)
|
||||||
|
self.content_type_written = True
|
||||||
|
|
||||||
|
chunk = self.file.read(self.CHUNK_SIZE)
|
||||||
|
|
||||||
|
if not chunk:
|
||||||
|
# we've reached the end of the file
|
||||||
|
self.consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF)
|
||||||
|
self.file = None
|
||||||
|
self.consumer.unregisterProducer()
|
||||||
|
|
||||||
|
if self.deferred:
|
||||||
|
self.deferred.callback(self.lastSent)
|
||||||
|
self.deferred = None
|
||||||
|
return
|
||||||
|
|
||||||
|
self.consumer.write(chunk)
|
||||||
|
self.lastSent = chunk[-1:]
|
||||||
|
|
||||||
|
def pauseProducing(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stopProducing(self) -> None:
|
||||||
|
if self.deferred:
|
||||||
|
self.deferred.errback(Exception("Consumer asked us to stop producing"))
|
||||||
|
self.deferred = None
|
||||||
|
|
||||||
|
|
||||||
|
class Header:
|
||||||
|
"""
|
||||||
|
`Header` This class is a tiny wrapper that produces
|
||||||
|
request headers. We can't use standard python header
|
||||||
|
class because it encodes unicode fields using =? bla bla ?=
|
||||||
|
encoding, which is correct, but no one in HTTP world expects
|
||||||
|
that, everyone wants utf-8 raw bytes. (stolen from treq.multipart)
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: bytes,
|
||||||
|
value: Any,
|
||||||
|
params: Optional[List[Tuple[Any, Any]]] = None,
|
||||||
|
):
|
||||||
|
self.name = name
|
||||||
|
self.value = value
|
||||||
|
self.params = params or []
|
||||||
|
|
||||||
|
def add_param(self, name: Any, value: Any) -> None:
|
||||||
|
self.params.append((name, value))
|
||||||
|
|
||||||
|
def __bytes__(self) -> bytes:
|
||||||
|
with closing(BytesIO()) as h:
|
||||||
|
h.write(self.name + b": " + escape(self.value).encode("us-ascii"))
|
||||||
|
if self.params:
|
||||||
|
for name, val in self.params:
|
||||||
|
h.write(b"; ")
|
||||||
|
h.write(escape(name).encode("us-ascii"))
|
||||||
|
h.write(b"=")
|
||||||
|
h.write(b'"' + escape(val).encode("utf-8") + b'"')
|
||||||
|
h.seek(0)
|
||||||
|
return h.read()
|
||||||
|
|
||||||
|
|
||||||
|
def escape(value: Union[str, bytes]) -> str:
|
||||||
|
"""
|
||||||
|
This function prevents header values from corrupting the request,
|
||||||
|
a newline in the file name parameter makes form-data request unreadable
|
||||||
|
for a majority of parsers. (stolen from treq.multipart)
|
||||||
|
"""
|
||||||
|
if isinstance(value, bytes):
|
||||||
|
value = value.decode("utf-8")
|
||||||
|
return value.replace("\r", "").replace("\n", "").replace('"', '\\"')
|
||||||
|
|
|
@ -24,14 +24,16 @@ import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
from typing import TYPE_CHECKING, Callable, Optional
|
from typing import TYPE_CHECKING, Callable, Optional
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
from synapse.config._base import Config
|
from synapse.config._base import Config
|
||||||
from synapse.logging.context import defer_to_thread, run_in_background
|
from synapse.logging.context import defer_to_thread, run_in_background
|
||||||
from synapse.logging.opentracing import start_active_span, trace_with_opname
|
from synapse.logging.opentracing import start_active_span, trace_with_opname
|
||||||
from synapse.util.async_helpers import maybe_awaitable
|
from synapse.util.async_helpers import maybe_awaitable
|
||||||
|
|
||||||
|
from ..storage.databases.main.media_repository import LocalMedia
|
||||||
from ._base import FileInfo, Responder
|
from ._base import FileInfo, Responder
|
||||||
from .media_storage import FileResponder
|
from .media_storage import FileResponder, MultipartResponder
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -55,13 +57,21 @@ class StorageProvider(metaclass=abc.ABCMeta):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
|
async def fetch(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
file_info: FileInfo,
|
||||||
|
media_info: Optional[LocalMedia] = None,
|
||||||
|
federation: bool = False,
|
||||||
|
) -> Optional[Responder]:
|
||||||
"""Attempt to fetch the file described by file_info and stream it
|
"""Attempt to fetch the file described by file_info and stream it
|
||||||
into writer.
|
into writer.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
path: Relative path of file in local cache
|
path: Relative path of file in local cache
|
||||||
file_info: The metadata of the file.
|
file_info: The metadata of the file.
|
||||||
|
media_info: metadata of the media item
|
||||||
|
federation: Whether the requested media is for a federation request
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Returns a Responder if the provider has the file, otherwise returns None.
|
Returns a Responder if the provider has the file, otherwise returns None.
|
||||||
|
@ -124,7 +134,13 @@ class StorageProviderWrapper(StorageProvider):
|
||||||
run_in_background(store)
|
run_in_background(store)
|
||||||
|
|
||||||
@trace_with_opname("StorageProviderWrapper.fetch")
|
@trace_with_opname("StorageProviderWrapper.fetch")
|
||||||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
|
async def fetch(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
file_info: FileInfo,
|
||||||
|
media_info: Optional[LocalMedia] = None,
|
||||||
|
federation: bool = False,
|
||||||
|
) -> Optional[Responder]:
|
||||||
if file_info.url_cache:
|
if file_info.url_cache:
|
||||||
# Files in the URL preview cache definitely aren't stored here,
|
# Files in the URL preview cache definitely aren't stored here,
|
||||||
# so avoid any potentially slow I/O or network access.
|
# so avoid any potentially slow I/O or network access.
|
||||||
|
@ -132,7 +148,9 @@ class StorageProviderWrapper(StorageProvider):
|
||||||
|
|
||||||
# store_file is supposed to return an Awaitable, but guard
|
# store_file is supposed to return an Awaitable, but guard
|
||||||
# against improper implementations.
|
# against improper implementations.
|
||||||
return await maybe_awaitable(self.backend.fetch(path, file_info))
|
return await maybe_awaitable(
|
||||||
|
self.backend.fetch(path, file_info, media_info, federation)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class FileStorageProviderBackend(StorageProvider):
|
class FileStorageProviderBackend(StorageProvider):
|
||||||
|
@ -172,11 +190,23 @@ class FileStorageProviderBackend(StorageProvider):
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace_with_opname("FileStorageProviderBackend.fetch")
|
@trace_with_opname("FileStorageProviderBackend.fetch")
|
||||||
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
|
async def fetch(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
file_info: FileInfo,
|
||||||
|
media_info: Optional[LocalMedia] = None,
|
||||||
|
federation: bool = False,
|
||||||
|
) -> Optional[Responder]:
|
||||||
"""See StorageProvider.fetch"""
|
"""See StorageProvider.fetch"""
|
||||||
|
|
||||||
backup_fname = os.path.join(self.base_directory, path)
|
backup_fname = os.path.join(self.base_directory, path)
|
||||||
if os.path.isfile(backup_fname):
|
if os.path.isfile(backup_fname):
|
||||||
|
if federation:
|
||||||
|
assert media_info is not None
|
||||||
|
boundary = uuid4().hex.encode("ascii")
|
||||||
|
return MultipartResponder(
|
||||||
|
open(backup_fname, "rb"), media_info, boundary
|
||||||
|
)
|
||||||
return FileResponder(open(backup_fname, "rb"))
|
return FileResponder(open(backup_fname, "rb"))
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -56,14 +56,14 @@ from synapse.http.servlet import (
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.metrics import threepid_send_requests
|
from synapse.metrics import threepid_send_requests
|
||||||
from synapse.push.mailer import Mailer
|
from synapse.push.mailer import Mailer
|
||||||
from synapse.rest.client.models import (
|
from synapse.types import JsonDict
|
||||||
|
from synapse.types.rest import RequestBodyModel
|
||||||
|
from synapse.types.rest.client import (
|
||||||
AuthenticationData,
|
AuthenticationData,
|
||||||
ClientSecretStr,
|
ClientSecretStr,
|
||||||
EmailRequestTokenBody,
|
EmailRequestTokenBody,
|
||||||
MsisdnRequestTokenBody,
|
MsisdnRequestTokenBody,
|
||||||
)
|
)
|
||||||
from synapse.rest.models import RequestBodyModel
|
|
||||||
from synapse.types import JsonDict
|
|
||||||
from synapse.util.msisdn import phone_number_to_msisdn
|
from synapse.util.msisdn import phone_number_to_msisdn
|
||||||
from synapse.util.stringutils import assert_valid_client_secret, random_string
|
from synapse.util.stringutils import assert_valid_client_secret, random_string
|
||||||
from synapse.util.threepids import check_3pid_allowed, validate_email
|
from synapse.util.threepids import check_3pid_allowed, validate_email
|
||||||
|
|
|
@ -42,9 +42,9 @@ from synapse.http.servlet import (
|
||||||
)
|
)
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.rest.client._base import client_patterns, interactive_auth_handler
|
from synapse.rest.client._base import client_patterns, interactive_auth_handler
|
||||||
from synapse.rest.client.models import AuthenticationData
|
|
||||||
from synapse.rest.models import RequestBodyModel
|
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
from synapse.types.rest import RequestBodyModel
|
||||||
|
from synapse.types.rest.client import AuthenticationData
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
|
|
@ -41,8 +41,8 @@ from synapse.http.servlet import (
|
||||||
)
|
)
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.rest.client._base import client_patterns
|
from synapse.rest.client._base import client_patterns
|
||||||
from synapse.rest.models import RequestBodyModel
|
|
||||||
from synapse.types import JsonDict, RoomAlias
|
from synapse.types import JsonDict, RoomAlias
|
||||||
|
from synapse.types.rest import RequestBodyModel
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
|
|
@ -53,8 +53,8 @@ from synapse.http.servlet import (
|
||||||
)
|
)
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.opentracing import trace_with_opname
|
from synapse.logging.opentracing import trace_with_opname
|
||||||
from synapse.rest.client.models import SlidingSyncBody
|
|
||||||
from synapse.types import JsonDict, Requester, StreamToken
|
from synapse.types import JsonDict, Requester, StreamToken
|
||||||
|
from synapse.types.rest.client import SlidingSyncBody
|
||||||
from synapse.util import json_decoder
|
from synapse.util import json_decoder
|
||||||
from synapse.util.caches.lrucache import LruCache
|
from synapse.util.caches.lrucache import LruCache
|
||||||
|
|
||||||
|
|
|
@ -41,9 +41,9 @@ from synapse.http.servlet import (
|
||||||
parse_and_validate_json_object_from_request,
|
parse_and_validate_json_object_from_request,
|
||||||
parse_integer,
|
parse_integer,
|
||||||
)
|
)
|
||||||
from synapse.rest.models import RequestBodyModel
|
|
||||||
from synapse.storage.keys import FetchKeyResultForRemote
|
from synapse.storage.keys import FetchKeyResultForRemote
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
from synapse.types.rest import RequestBodyModel
|
||||||
from synapse.util import json_decoder
|
from synapse.util import json_decoder
|
||||||
from synapse.util.async_helpers import yieldable_gather_results
|
from synapse.util.async_helpers import yieldable_gather_results
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ from synapse.storage.util.partial_state_events_tracker import (
|
||||||
PartialStateEventsTracker,
|
PartialStateEventsTracker,
|
||||||
)
|
)
|
||||||
from synapse.synapse_rust.acl import ServerAclEvaluator
|
from synapse.synapse_rust.acl import ServerAclEvaluator
|
||||||
from synapse.types import MutableStateMap, StreamToken, StateMap, get_domain_from_id
|
from synapse.types import MutableStateMap, StateMap, StreamToken, get_domain_from_id
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.caches import intern_string
|
from synapse.util.caches import intern_string
|
||||||
|
|
|
@ -1279,60 +1279,3 @@ class ScheduledTask:
|
||||||
result: Optional[JsonMapping]
|
result: Optional[JsonMapping]
|
||||||
# Optional error that should be assigned a value when the status is FAILED
|
# Optional error that should be assigned a value when the status is FAILED
|
||||||
error: Optional[str]
|
error: Optional[str]
|
||||||
|
|
||||||
|
|
||||||
class ShutdownRoomParams(TypedDict):
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
requester_user_id:
|
|
||||||
User who requested the action. Will be recorded as putting the room on the
|
|
||||||
blocking list.
|
|
||||||
new_room_user_id:
|
|
||||||
If set, a new room will be created with this user ID
|
|
||||||
as the creator and admin, and all users in the old room will be
|
|
||||||
moved into that room. If not set, no new room will be created
|
|
||||||
and the users will just be removed from the old room.
|
|
||||||
new_room_name:
|
|
||||||
A string representing the name of the room that new users will
|
|
||||||
be invited to. Defaults to `Content Violation Notification`
|
|
||||||
message:
|
|
||||||
A string containing the first message that will be sent as
|
|
||||||
`new_room_user_id` in the new room. Ideally this will clearly
|
|
||||||
convey why the original room was shut down.
|
|
||||||
Defaults to `Sharing illegal content on this server is not
|
|
||||||
permitted and rooms in violation will be blocked.`
|
|
||||||
block:
|
|
||||||
If set to `true`, this room will be added to a blocking list,
|
|
||||||
preventing future attempts to join the room. Defaults to `false`.
|
|
||||||
purge:
|
|
||||||
If set to `true`, purge the given room from the database.
|
|
||||||
force_purge:
|
|
||||||
If set to `true`, the room will be purged from database
|
|
||||||
even if there are still users joined to the room.
|
|
||||||
"""
|
|
||||||
|
|
||||||
requester_user_id: Optional[str]
|
|
||||||
new_room_user_id: Optional[str]
|
|
||||||
new_room_name: Optional[str]
|
|
||||||
message: Optional[str]
|
|
||||||
block: bool
|
|
||||||
purge: bool
|
|
||||||
force_purge: bool
|
|
||||||
|
|
||||||
|
|
||||||
class ShutdownRoomResponse(TypedDict):
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
kicked_users: An array of users (`user_id`) that were kicked.
|
|
||||||
failed_to_kick_users:
|
|
||||||
An array of users (`user_id`) that that were not kicked.
|
|
||||||
local_aliases:
|
|
||||||
An array of strings representing the local aliases that were
|
|
||||||
migrated from the old room to the new.
|
|
||||||
new_room_id: A string representing the room ID of the new room.
|
|
||||||
"""
|
|
||||||
|
|
||||||
kicked_users: List[str]
|
|
||||||
failed_to_kick_users: List[str]
|
|
||||||
local_aliases: List[str]
|
|
||||||
new_room_id: Optional[str]
|
|
||||||
|
|
252
synapse/types/handlers/__init__.py
Normal file
252
synapse/types/handlers/__init__.py
Normal file
|
@ -0,0 +1,252 @@
|
||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2024 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
# Originally licensed under the Apache License, Version 2.0:
|
||||||
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||||
|
#
|
||||||
|
# [This file includes modifications made by New Vector Limited]
|
||||||
|
#
|
||||||
|
#
|
||||||
|
from enum import Enum
|
||||||
|
from typing import TYPE_CHECKING, Dict, Final, List, Optional, Tuple
|
||||||
|
|
||||||
|
import attr
|
||||||
|
from typing_extensions import TypedDict
|
||||||
|
|
||||||
|
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||||
|
|
||||||
|
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||||
|
from pydantic.v1 import Extra
|
||||||
|
else:
|
||||||
|
from pydantic import Extra
|
||||||
|
|
||||||
|
from synapse.events import EventBase
|
||||||
|
from synapse.types import JsonMapping, StreamToken, UserID
|
||||||
|
from synapse.types.rest.client import SlidingSyncBody
|
||||||
|
|
||||||
|
|
||||||
|
class ShutdownRoomParams(TypedDict):
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
requester_user_id:
|
||||||
|
User who requested the action. Will be recorded as putting the room on the
|
||||||
|
blocking list.
|
||||||
|
new_room_user_id:
|
||||||
|
If set, a new room will be created with this user ID
|
||||||
|
as the creator and admin, and all users in the old room will be
|
||||||
|
moved into that room. If not set, no new room will be created
|
||||||
|
and the users will just be removed from the old room.
|
||||||
|
new_room_name:
|
||||||
|
A string representing the name of the room that new users will
|
||||||
|
be invited to. Defaults to `Content Violation Notification`
|
||||||
|
message:
|
||||||
|
A string containing the first message that will be sent as
|
||||||
|
`new_room_user_id` in the new room. Ideally this will clearly
|
||||||
|
convey why the original room was shut down.
|
||||||
|
Defaults to `Sharing illegal content on this server is not
|
||||||
|
permitted and rooms in violation will be blocked.`
|
||||||
|
block:
|
||||||
|
If set to `true`, this room will be added to a blocking list,
|
||||||
|
preventing future attempts to join the room. Defaults to `false`.
|
||||||
|
purge:
|
||||||
|
If set to `true`, purge the given room from the database.
|
||||||
|
force_purge:
|
||||||
|
If set to `true`, the room will be purged from database
|
||||||
|
even if there are still users joined to the room.
|
||||||
|
"""
|
||||||
|
|
||||||
|
requester_user_id: Optional[str]
|
||||||
|
new_room_user_id: Optional[str]
|
||||||
|
new_room_name: Optional[str]
|
||||||
|
message: Optional[str]
|
||||||
|
block: bool
|
||||||
|
purge: bool
|
||||||
|
force_purge: bool
|
||||||
|
|
||||||
|
|
||||||
|
class ShutdownRoomResponse(TypedDict):
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
kicked_users: An array of users (`user_id`) that were kicked.
|
||||||
|
failed_to_kick_users:
|
||||||
|
An array of users (`user_id`) that that were not kicked.
|
||||||
|
local_aliases:
|
||||||
|
An array of strings representing the local aliases that were
|
||||||
|
migrated from the old room to the new.
|
||||||
|
new_room_id: A string representing the room ID of the new room.
|
||||||
|
"""
|
||||||
|
|
||||||
|
kicked_users: List[str]
|
||||||
|
failed_to_kick_users: List[str]
|
||||||
|
local_aliases: List[str]
|
||||||
|
new_room_id: Optional[str]
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncConfig(SlidingSyncBody):
|
||||||
|
"""
|
||||||
|
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
||||||
|
extra fields that we need in the handler
|
||||||
|
"""
|
||||||
|
|
||||||
|
user: UserID
|
||||||
|
device_id: Optional[str]
|
||||||
|
|
||||||
|
# Pydantic config
|
||||||
|
class Config:
|
||||||
|
# By default, ignore fields that we don't recognise.
|
||||||
|
extra = Extra.ignore
|
||||||
|
# By default, don't allow fields to be reassigned after parsing.
|
||||||
|
allow_mutation = False
|
||||||
|
# Allow custom types like `UserID` to be used in the model
|
||||||
|
arbitrary_types_allowed = True
|
||||||
|
|
||||||
|
|
||||||
|
class OperationType(Enum):
|
||||||
|
"""
|
||||||
|
Represents the operation types in a Sliding Sync window.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
||||||
|
entries in this range.
|
||||||
|
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
||||||
|
entries to the left or the right depending on where the closest empty space is.
|
||||||
|
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
||||||
|
places.
|
||||||
|
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
||||||
|
offline support, but they should be treated as empty when additional operations
|
||||||
|
which concern indexes in the range arrive from the server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SYNC: Final = "SYNC"
|
||||||
|
INSERT: Final = "INSERT"
|
||||||
|
DELETE: Final = "DELETE"
|
||||||
|
INVALIDATE: Final = "INVALIDATE"
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class SlidingSyncResult:
|
||||||
|
"""
|
||||||
|
The Sliding Sync result to be serialized to JSON for a response.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
next_pos: The next position token in the sliding window to request (next_batch).
|
||||||
|
lists: Sliding window API. A map of list key to list results.
|
||||||
|
rooms: Room subscription API. A map of room ID to room subscription to room results.
|
||||||
|
extensions: Extensions API. A map of extension key to extension results.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class RoomResult:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
name: Room name or calculated room name.
|
||||||
|
avatar: Room avatar
|
||||||
|
heroes: List of stripped membership events (containing `user_id` and optionally
|
||||||
|
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
||||||
|
initial: Flag which is set when this is the first time the server is sending this
|
||||||
|
data on this connection. Clients can use this flag to replace or update
|
||||||
|
their local state. When there is an update, servers MUST omit this flag
|
||||||
|
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||||
|
absence of this flag means 'false'.
|
||||||
|
required_state: The current state of the room
|
||||||
|
timeline: Latest events in the room. The last event is the most recent
|
||||||
|
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
||||||
|
between two people).
|
||||||
|
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
|
||||||
|
in sync v2, absent on joined/left rooms
|
||||||
|
prev_batch: A token that can be passed as a start parameter to the
|
||||||
|
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
||||||
|
limited: True if their are more events than fit between the given position and now.
|
||||||
|
Sync again to get more.
|
||||||
|
joined_count: The number of users with membership of join, including the client's
|
||||||
|
own user ID. (same as sync `v2 m.joined_member_count`)
|
||||||
|
invited_count: The number of users with membership of invite. (same as sync v2
|
||||||
|
`m.invited_member_count`)
|
||||||
|
notification_count: The total number of unread notifications for this room. (same
|
||||||
|
as sync v2)
|
||||||
|
highlight_count: The number of unread notifications for this room with the highlight
|
||||||
|
flag set. (same as sync v2)
|
||||||
|
num_live: The number of timeline events which have just occurred and are not historical.
|
||||||
|
The last N events are 'live' and should be treated as such. This is mostly
|
||||||
|
useful to determine whether a given @mention event should make a noise or not.
|
||||||
|
Clients cannot rely solely on the absence of `initial: true` to determine live
|
||||||
|
events because if a room not in the sliding window bumps into the window because
|
||||||
|
of an @mention it will have `initial: true` yet contain a single live event
|
||||||
|
(with potentially other old events in the timeline).
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
avatar: Optional[str]
|
||||||
|
heroes: Optional[List[EventBase]]
|
||||||
|
initial: bool
|
||||||
|
required_state: List[EventBase]
|
||||||
|
timeline: List[EventBase]
|
||||||
|
is_dm: bool
|
||||||
|
invite_state: List[EventBase]
|
||||||
|
prev_batch: StreamToken
|
||||||
|
limited: bool
|
||||||
|
joined_count: int
|
||||||
|
invited_count: int
|
||||||
|
notification_count: int
|
||||||
|
highlight_count: int
|
||||||
|
num_live: int
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class SlidingWindowList:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
count: The total number of entries in the list. Always present if this list
|
||||||
|
is.
|
||||||
|
ops: The sliding list operations to perform.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class Operation:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
op: The operation type to perform.
|
||||||
|
range: Which index positions are affected by this operation. These are
|
||||||
|
both inclusive.
|
||||||
|
room_ids: Which room IDs are affected by this operation. These IDs match
|
||||||
|
up to the positions in the `range`, so the last room ID in this list
|
||||||
|
matches the 9th index. The room data is held in a separate object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
op: OperationType
|
||||||
|
range: Tuple[int, int]
|
||||||
|
room_ids: List[str]
|
||||||
|
|
||||||
|
count: int
|
||||||
|
ops: List[Operation]
|
||||||
|
|
||||||
|
next_pos: StreamToken
|
||||||
|
lists: Dict[str, SlidingWindowList]
|
||||||
|
rooms: Dict[str, RoomResult]
|
||||||
|
extensions: JsonMapping
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
"""Make the result appear empty if there are no updates. This is used
|
||||||
|
to tell if the notifier needs to wait for more events when polling for
|
||||||
|
events.
|
||||||
|
"""
|
||||||
|
return bool(self.lists or self.rooms or self.extensions)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
|
||||||
|
"Return a new empty result"
|
||||||
|
return SlidingSyncResult(
|
||||||
|
next_pos=next_pos,
|
||||||
|
lists={},
|
||||||
|
rooms={},
|
||||||
|
extensions={},
|
||||||
|
)
|
|
@ -43,7 +43,7 @@ else:
|
||||||
validator,
|
validator,
|
||||||
)
|
)
|
||||||
|
|
||||||
from synapse.rest.models import RequestBodyModel
|
from synapse.types.rest import RequestBodyModel
|
||||||
from synapse.util.threepids import validate_email
|
from synapse.util.threepids import validate_email
|
||||||
|
|
||||||
|
|
|
@ -151,7 +151,7 @@ async def filter_events_for_client(
|
||||||
filter_send_to_client=filter_send_to_client,
|
filter_send_to_client=filter_send_to_client,
|
||||||
sender_ignored=event.sender in ignore_list,
|
sender_ignored=event.sender in ignore_list,
|
||||||
always_include_ids=always_include_ids,
|
always_include_ids=always_include_ids,
|
||||||
retention_policy=retention_policies[room_id],
|
retention_policy=retention_policies[event.room_id],
|
||||||
state=state_after_event,
|
state=state_after_event,
|
||||||
is_peeking=is_peeking,
|
is_peeking=is_peeking,
|
||||||
sender_erased=erased_senders.get(event.sender, False),
|
sender_erased=erased_senders.get(event.sender, False),
|
||||||
|
|
234
tests/federation/test_federation_media.py
Normal file
234
tests/federation/test_federation_media.py
Normal file
|
@ -0,0 +1,234 @@
|
||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2024 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
# Originally licensed under the Apache License, Version 2.0:
|
||||||
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||||
|
#
|
||||||
|
# [This file includes modifications made by New Vector Limited]
|
||||||
|
#
|
||||||
|
#
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
|
from synapse.media._base import FileInfo, Responder
|
||||||
|
from synapse.media.filepath import MediaFilePaths
|
||||||
|
from synapse.media.media_storage import MediaStorage
|
||||||
|
from synapse.media.storage_provider import (
|
||||||
|
FileStorageProviderBackend,
|
||||||
|
StorageProviderWrapper,
|
||||||
|
)
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
from synapse.storage.databases.main.media_repository import LocalMedia
|
||||||
|
from synapse.types import JsonDict, UserID
|
||||||
|
from synapse.util import Clock
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
from tests.test_utils import SMALL_PNG
|
||||||
|
from tests.unittest import override_config
|
||||||
|
|
||||||
|
|
||||||
|
class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
super().prepare(reactor, clock, hs)
|
||||||
|
self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-")
|
||||||
|
self.addCleanup(shutil.rmtree, self.test_dir)
|
||||||
|
self.primary_base_path = os.path.join(self.test_dir, "primary")
|
||||||
|
self.secondary_base_path = os.path.join(self.test_dir, "secondary")
|
||||||
|
|
||||||
|
hs.config.media.media_store_path = self.primary_base_path
|
||||||
|
|
||||||
|
storage_providers = [
|
||||||
|
StorageProviderWrapper(
|
||||||
|
FileStorageProviderBackend(hs, self.secondary_base_path),
|
||||||
|
store_local=True,
|
||||||
|
store_remote=False,
|
||||||
|
store_synchronous=True,
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
self.filepaths = MediaFilePaths(self.primary_base_path)
|
||||||
|
self.media_storage = MediaStorage(
|
||||||
|
hs, self.primary_base_path, self.filepaths, storage_providers
|
||||||
|
)
|
||||||
|
self.media_repo = hs.get_media_repository()
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"experimental_features": {"msc3916_authenticated_media_enabled": True}}
|
||||||
|
)
|
||||||
|
def test_file_download(self) -> None:
|
||||||
|
content = io.BytesIO(b"file_to_stream")
|
||||||
|
content_uri = self.get_success(
|
||||||
|
self.media_repo.create_content(
|
||||||
|
"text/plain",
|
||||||
|
"test_upload",
|
||||||
|
content,
|
||||||
|
46,
|
||||||
|
UserID.from_string("@user_id:whatever.org"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# test with a text file
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"GET",
|
||||||
|
f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
|
||||||
|
)
|
||||||
|
self.pump()
|
||||||
|
self.assertEqual(200, channel.code)
|
||||||
|
|
||||||
|
content_type = channel.headers.getRawHeaders("content-type")
|
||||||
|
assert content_type is not None
|
||||||
|
assert "multipart/mixed" in content_type[0]
|
||||||
|
assert "boundary" in content_type[0]
|
||||||
|
|
||||||
|
# extract boundary
|
||||||
|
boundary = content_type[0].split("boundary=")[1]
|
||||||
|
# split on boundary and check that json field and expected value exist
|
||||||
|
stripped = channel.text_body.split("\r\n" + "--" + boundary)
|
||||||
|
# TODO: the json object expected will change once MSC3911 is implemented, currently
|
||||||
|
# {} is returned for all requests as a placeholder (per MSC3196)
|
||||||
|
found_json = any(
|
||||||
|
"\r\nContent-Type: application/json\r\n{}" in field for field in stripped
|
||||||
|
)
|
||||||
|
self.assertTrue(found_json)
|
||||||
|
|
||||||
|
# check that text file and expected value exist
|
||||||
|
found_file = any(
|
||||||
|
"\r\nContent-Type: text/plain\r\nfile_to_stream" in field
|
||||||
|
for field in stripped
|
||||||
|
)
|
||||||
|
self.assertTrue(found_file)
|
||||||
|
|
||||||
|
content = io.BytesIO(SMALL_PNG)
|
||||||
|
content_uri = self.get_success(
|
||||||
|
self.media_repo.create_content(
|
||||||
|
"image/png",
|
||||||
|
"test_png_upload",
|
||||||
|
content,
|
||||||
|
67,
|
||||||
|
UserID.from_string("@user_id:whatever.org"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# test with an image file
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"GET",
|
||||||
|
f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
|
||||||
|
)
|
||||||
|
self.pump()
|
||||||
|
self.assertEqual(200, channel.code)
|
||||||
|
|
||||||
|
content_type = channel.headers.getRawHeaders("content-type")
|
||||||
|
assert content_type is not None
|
||||||
|
assert "multipart/mixed" in content_type[0]
|
||||||
|
assert "boundary" in content_type[0]
|
||||||
|
|
||||||
|
# extract boundary
|
||||||
|
boundary = content_type[0].split("boundary=")[1]
|
||||||
|
# split on boundary and check that json field and expected value exist
|
||||||
|
body = channel.result.get("body")
|
||||||
|
assert body is not None
|
||||||
|
stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8"))
|
||||||
|
found_json = any(
|
||||||
|
b"\r\nContent-Type: application/json\r\n{}" in field
|
||||||
|
for field in stripped_bytes
|
||||||
|
)
|
||||||
|
self.assertTrue(found_json)
|
||||||
|
|
||||||
|
# check that png file exists and matches what was uploaded
|
||||||
|
found_file = any(SMALL_PNG in field for field in stripped_bytes)
|
||||||
|
self.assertTrue(found_file)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"experimental_features": {"msc3916_authenticated_media_enabled": False}}
|
||||||
|
)
|
||||||
|
def test_disable_config(self) -> None:
|
||||||
|
content = io.BytesIO(b"file_to_stream")
|
||||||
|
content_uri = self.get_success(
|
||||||
|
self.media_repo.create_content(
|
||||||
|
"text/plain",
|
||||||
|
"test_upload",
|
||||||
|
content,
|
||||||
|
46,
|
||||||
|
UserID.from_string("@user_id:whatever.org"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"GET",
|
||||||
|
f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
|
||||||
|
)
|
||||||
|
self.pump()
|
||||||
|
self.assertEqual(404, channel.code)
|
||||||
|
self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED")
|
||||||
|
|
||||||
|
|
||||||
|
class FakeFileStorageProviderBackend:
|
||||||
|
"""
|
||||||
|
Fake storage provider stub with incompatible `fetch` signature for testing
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer", config: str):
|
||||||
|
self.hs = hs
|
||||||
|
self.cache_directory = hs.config.media.media_store_path
|
||||||
|
self.base_directory = config
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "FakeFileStorageProviderBackend[%s]" % (self.base_directory,)
|
||||||
|
|
||||||
|
async def fetch(
|
||||||
|
self, path: str, file_info: FileInfo, media_info: Optional[LocalMedia] = None
|
||||||
|
) -> Optional[Responder]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
TEST_DIR = tempfile.mkdtemp(prefix="synapse-tests-")
|
||||||
|
|
||||||
|
|
||||||
|
class FederationUnstableMediaEndpointCompatibilityTest(
|
||||||
|
unittest.FederatingHomeserverTestCase
|
||||||
|
):
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
super().prepare(reactor, clock, hs)
|
||||||
|
self.test_dir = TEST_DIR
|
||||||
|
self.addCleanup(shutil.rmtree, self.test_dir)
|
||||||
|
self.media_repo = hs.get_media_repository()
|
||||||
|
|
||||||
|
def default_config(self) -> JsonDict:
|
||||||
|
config = super().default_config()
|
||||||
|
primary_base_path = os.path.join(TEST_DIR, "primary")
|
||||||
|
config["media_storage_providers"] = [
|
||||||
|
{
|
||||||
|
"module": "tests.federation.test_federation_media.FakeFileStorageProviderBackend",
|
||||||
|
"store_local": "True",
|
||||||
|
"store_remote": "False",
|
||||||
|
"store_synchronous": "False",
|
||||||
|
"config": {"directory": primary_base_path},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
return config
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"experimental_features": {"msc3916_authenticated_media_enabled": True}}
|
||||||
|
)
|
||||||
|
def test_incompatible_storage_provider_fails_to_load_endpoint(self) -> None:
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"GET",
|
||||||
|
"/_matrix/federation/unstable/org.matrix.msc3916/media/download/xyz",
|
||||||
|
)
|
||||||
|
self.pump()
|
||||||
|
self.assertEqual(404, channel.code)
|
||||||
|
self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED")
|
|
@ -24,6 +24,7 @@ from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
|
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
|
from synapse.handlers.sliding_sync import SlidingSyncConfig
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import knock, login, room
|
from synapse.rest.client import knock, login, room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
@ -1216,22 +1217,14 @@ class FilterRoomsTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
after_rooms_token = self.event_sources.get_current_token()
|
after_rooms_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
# TODO: Better way to avoid the circular import? (see
|
|
||||||
# https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779)
|
|
||||||
from synapse.handlers.sliding_sync import SlidingSyncConfig
|
|
||||||
|
|
||||||
# Try with `is_dm=True`
|
# Try with `is_dm=True`
|
||||||
# -----------------------------
|
|
||||||
truthy_filters = SlidingSyncConfig.SlidingSyncList.Filters(
|
|
||||||
is_dm=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Filter the rooms
|
|
||||||
truthy_filtered_room_ids = self.get_success(
|
truthy_filtered_room_ids = self.get_success(
|
||||||
self.sliding_sync_handler.filter_rooms(
|
self.sliding_sync_handler.filter_rooms(
|
||||||
UserID.from_string(user1_id),
|
UserID.from_string(user1_id),
|
||||||
{room_id, dm_room_id},
|
{room_id, dm_room_id},
|
||||||
truthy_filters,
|
SlidingSyncConfig.SlidingSyncList.Filters(
|
||||||
|
is_dm=True,
|
||||||
|
),
|
||||||
after_rooms_token,
|
after_rooms_token,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -1239,17 +1232,13 @@ class FilterRoomsTestCase(HomeserverTestCase):
|
||||||
self.assertEqual(truthy_filtered_room_ids, {dm_room_id})
|
self.assertEqual(truthy_filtered_room_ids, {dm_room_id})
|
||||||
|
|
||||||
# Try with `is_dm=False`
|
# Try with `is_dm=False`
|
||||||
# -----------------------------
|
|
||||||
falsy_filters = SlidingSyncConfig.SlidingSyncList.Filters(
|
|
||||||
is_dm=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Filter the rooms
|
|
||||||
falsy_filtered_room_ids = self.get_success(
|
falsy_filtered_room_ids = self.get_success(
|
||||||
self.sliding_sync_handler.filter_rooms(
|
self.sliding_sync_handler.filter_rooms(
|
||||||
UserID.from_string(user1_id),
|
UserID.from_string(user1_id),
|
||||||
{room_id, dm_room_id},
|
{room_id, dm_room_id},
|
||||||
falsy_filters,
|
SlidingSyncConfig.SlidingSyncList.Filters(
|
||||||
|
is_dm=False,
|
||||||
|
),
|
||||||
after_rooms_token,
|
after_rooms_token,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -49,7 +49,10 @@ from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.media._base import FileInfo, ThumbnailInfo
|
from synapse.media._base import FileInfo, ThumbnailInfo
|
||||||
from synapse.media.filepath import MediaFilePaths
|
from synapse.media.filepath import MediaFilePaths
|
||||||
from synapse.media.media_storage import MediaStorage, ReadableFileWrapper
|
from synapse.media.media_storage import MediaStorage, ReadableFileWrapper
|
||||||
from synapse.media.storage_provider import FileStorageProviderBackend
|
from synapse.media.storage_provider import (
|
||||||
|
FileStorageProviderBackend,
|
||||||
|
StorageProviderWrapper,
|
||||||
|
)
|
||||||
from synapse.media.thumbnailer import ThumbnailProvider
|
from synapse.media.thumbnailer import ThumbnailProvider
|
||||||
from synapse.module_api import ModuleApi
|
from synapse.module_api import ModuleApi
|
||||||
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
|
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
|
||||||
|
@ -78,7 +81,14 @@ class MediaStorageTests(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
hs.config.media.media_store_path = self.primary_base_path
|
hs.config.media.media_store_path = self.primary_base_path
|
||||||
|
|
||||||
storage_providers = [FileStorageProviderBackend(hs, self.secondary_base_path)]
|
storage_providers = [
|
||||||
|
StorageProviderWrapper(
|
||||||
|
FileStorageProviderBackend(hs, self.secondary_base_path),
|
||||||
|
store_local=True,
|
||||||
|
store_remote=False,
|
||||||
|
store_synchronous=True,
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
self.filepaths = MediaFilePaths(self.primary_base_path)
|
self.filepaths = MediaFilePaths(self.primary_base_path)
|
||||||
self.media_storage = MediaStorage(
|
self.media_storage = MediaStorage(
|
||||||
|
|
|
@ -24,7 +24,7 @@ from typing import TYPE_CHECKING
|
||||||
from typing_extensions import Literal
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||||
from synapse.rest.client.models import EmailRequestTokenBody
|
from synapse.types.rest.client import EmailRequestTokenBody
|
||||||
|
|
||||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||||
from pydantic.v1 import BaseModel, ValidationError
|
from pydantic.v1 import BaseModel, ValidationError
|
||||||
|
|
Loading…
Reference in a new issue