mirror of
https://github.com/element-hq/synapse
synced 2024-10-06 10:02:39 +00:00
Prefer Sync v2 vs Sliding Sync distinction
We were using the enum just to distinguish /sync v2 vs Sliding Sync /sync/e2ee so we should just make an enum for that instead of trying to glom onto the existing `sync_type` (overloading it).
This commit is contained in:
parent
69f91436cf
commit
d4ff933748
2 changed files with 41 additions and 35 deletions
|
@ -113,13 +113,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
|
|||
SyncRequestKey = Tuple[Any, ...]
|
||||
|
||||
|
||||
class SyncType(Enum):
|
||||
"""Enum for specifying the type of sync request."""
|
||||
class SyncVersion(Enum):
|
||||
"""
|
||||
Enum for specifying the version of sync request. This is used to key which type of
|
||||
sync response that we are generating.
|
||||
|
||||
# These string values are semantically significant and are used in the the metrics
|
||||
INITIAL_SYNC = "initial_sync"
|
||||
FULL_STATE_SYNC = "full_state_sync"
|
||||
INCREMENTAL_SYNC = "incremental_sync"
|
||||
This is different than the `sync_type` you might see used in other code below; which
|
||||
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
|
||||
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
|
||||
"""
|
||||
|
||||
# These string values are semantically significant because they are used in the the
|
||||
# metrics
|
||||
|
||||
# Traditional `/sync` endpoint
|
||||
SYNC_V2 = "sync_v2"
|
||||
# Part of MSC3575 Sliding Sync
|
||||
E2EE_SYNC = "e2ee_sync"
|
||||
|
||||
|
||||
|
@ -328,7 +337,7 @@ class SyncHandler:
|
|||
self,
|
||||
requester: Requester,
|
||||
sync_config: SyncConfig,
|
||||
sync_type: SyncType,
|
||||
sync_version: SyncVersion,
|
||||
request_key: SyncRequestKey,
|
||||
since_token: Optional[StreamToken] = None,
|
||||
timeout: int = 0,
|
||||
|
@ -351,7 +360,7 @@ class SyncHandler:
|
|||
request_key,
|
||||
self._wait_for_sync_for_user,
|
||||
sync_config,
|
||||
sync_type,
|
||||
sync_version,
|
||||
since_token,
|
||||
timeout,
|
||||
full_state,
|
||||
|
@ -363,7 +372,7 @@ class SyncHandler:
|
|||
async def _wait_for_sync_for_user(
|
||||
self,
|
||||
sync_config: SyncConfig,
|
||||
sync_type: SyncType,
|
||||
sync_version: SyncVersion,
|
||||
since_token: Optional[StreamToken],
|
||||
timeout: int,
|
||||
full_state: bool,
|
||||
|
@ -382,9 +391,18 @@ class SyncHandler:
|
|||
Computing the body of the response begins in the next method,
|
||||
`current_sync_for_user`.
|
||||
"""
|
||||
if since_token is None:
|
||||
sync_type = "initial_sync"
|
||||
elif full_state:
|
||||
sync_type = "full_state_sync"
|
||||
else:
|
||||
sync_type = "incremental_sync"
|
||||
|
||||
sync_label = f"{sync_version}:{sync_type}"
|
||||
|
||||
context = current_context()
|
||||
if context:
|
||||
context.tag = sync_type
|
||||
context.tag = sync_label
|
||||
|
||||
# if we have a since token, delete any to-device messages before that token
|
||||
# (since we now know that the device has received them)
|
||||
|
@ -403,7 +421,7 @@ class SyncHandler:
|
|||
# we are going to return immediately, so don't bother calling
|
||||
# notifier.wait_for_events.
|
||||
result: SyncResult = await self.current_sync_for_user(
|
||||
sync_config, sync_type, since_token, full_state=full_state
|
||||
sync_config, sync_version, since_token, full_state=full_state
|
||||
)
|
||||
else:
|
||||
# Otherwise, we wait for something to happen and report it to the user.
|
||||
|
@ -411,7 +429,7 @@ class SyncHandler:
|
|||
before_token: StreamToken, after_token: StreamToken
|
||||
) -> SyncResult:
|
||||
return await self.current_sync_for_user(
|
||||
sync_config, sync_type, since_token
|
||||
sync_config, sync_version, since_token
|
||||
)
|
||||
|
||||
result = await self.notifier.wait_for_events(
|
||||
|
@ -437,14 +455,14 @@ class SyncHandler:
|
|||
lazy_loaded = "true"
|
||||
else:
|
||||
lazy_loaded = "false"
|
||||
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
|
||||
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()
|
||||
|
||||
return result
|
||||
|
||||
async def current_sync_for_user(
|
||||
self,
|
||||
sync_config: SyncConfig,
|
||||
sync_type: SyncType,
|
||||
sync_version: SyncVersion,
|
||||
since_token: Optional[StreamToken] = None,
|
||||
full_state: bool = False,
|
||||
) -> SyncResult:
|
||||
|
@ -458,22 +476,18 @@ class SyncHandler:
|
|||
log_kv({"since_token": since_token})
|
||||
|
||||
# Go through the `/sync` v2 path
|
||||
if sync_type in {
|
||||
SyncType.INITIAL_SYNC,
|
||||
SyncType.FULL_STATE_SYNC,
|
||||
SyncType.INCREMENTAL_SYNC,
|
||||
}:
|
||||
if sync_version == SyncVersion.SYNC_V2:
|
||||
sync_result = await self.generate_sync_result(
|
||||
sync_config, since_token, full_state
|
||||
)
|
||||
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
|
||||
elif sync_type == SyncType.E2EE_SYNC:
|
||||
elif sync_version == SyncVersion.E2EE_SYNC:
|
||||
sync_result = await self.generate_e2ee_sync_result(
|
||||
sync_config, since_token
|
||||
)
|
||||
else:
|
||||
raise Exception(
|
||||
f"Unknown sync_type (this is a Synapse problem): {sync_type}"
|
||||
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
|
||||
)
|
||||
|
||||
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
|
||||
|
|
|
@ -41,7 +41,7 @@ from synapse.handlers.sync import (
|
|||
KnockedSyncResult,
|
||||
SyncConfig,
|
||||
SyncResult,
|
||||
SyncType,
|
||||
SyncVersion,
|
||||
)
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
||||
|
@ -206,13 +206,6 @@ class SyncRestServlet(RestServlet):
|
|||
if since is not None:
|
||||
since_token = await StreamToken.from_string(self.store, since)
|
||||
|
||||
if since_token is None:
|
||||
sync_type = SyncType.INITIAL_SYNC
|
||||
elif full_state:
|
||||
sync_type = SyncType.FULL_STATE_SYNC
|
||||
else:
|
||||
sync_type = SyncType.INCREMENTAL_SYNC
|
||||
|
||||
# send any outstanding server notices to the user.
|
||||
await self._server_notices_sender.on_user_syncing(user.to_string())
|
||||
|
||||
|
@ -228,7 +221,7 @@ class SyncRestServlet(RestServlet):
|
|||
sync_result = await self.sync_handler.wait_for_sync_for_user(
|
||||
requester,
|
||||
sync_config,
|
||||
sync_type,
|
||||
SyncVersion.SYNC_V2,
|
||||
request_key,
|
||||
since_token=since_token,
|
||||
timeout=timeout,
|
||||
|
@ -567,8 +560,8 @@ class SlidingSyncE2eeRestServlet(RestServlet):
|
|||
"""
|
||||
API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
|
||||
of Sliding Sync but doesn't have any sliding window component. It's just a way to
|
||||
get E2EE events without having to sit through a initial sync. And not have
|
||||
encryption events backed up by the main sync response.
|
||||
get E2EE events without having to sit through a big initial sync (`/sync` v2). And
|
||||
we can avoid encryption events being backed up by the main sync response.
|
||||
|
||||
GET parameters::
|
||||
timeout(int): How long to wait for new events in milliseconds.
|
||||
|
@ -626,7 +619,6 @@ class SlidingSyncE2eeRestServlet(RestServlet):
|
|||
is_guest=requester.is_guest,
|
||||
device_id=device_id,
|
||||
)
|
||||
sync_type = SyncType.E2EE_SYNC
|
||||
|
||||
since_token = None
|
||||
if since is not None:
|
||||
|
@ -634,7 +626,7 @@ class SlidingSyncE2eeRestServlet(RestServlet):
|
|||
|
||||
# Request cache key
|
||||
request_key = (
|
||||
sync_type,
|
||||
SyncVersion.SYNC_V2,
|
||||
user,
|
||||
timeout,
|
||||
since,
|
||||
|
@ -644,7 +636,7 @@ class SlidingSyncE2eeRestServlet(RestServlet):
|
|||
sync_result = await self.sync_handler.wait_for_sync_for_user(
|
||||
requester,
|
||||
sync_config,
|
||||
sync_type,
|
||||
SyncVersion.SYNC_V2,
|
||||
request_key,
|
||||
since_token=since_token,
|
||||
timeout=timeout,
|
||||
|
|
Loading…
Reference in a new issue