Respect the @cancellable flag for ReplicationEndpoints (#12700)

While `ReplicationEndpoint`s register themselves via `JsonResource`,
they pass a method that calls the handler, instead of the handler itself,
to `register_paths`. As a result, `JsonResource` will not correctly pick
up the `@cancellable` flag and we have to apply it ourselves.

Signed-off-by: Sean Quah <seanq@element.io>
This commit is contained in:
Sean Quah 2022-05-11 12:25:39 +01:00 committed by GitHub
parent 9d8e380d2e
commit a559c8b0d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 2 deletions

1
changelog.d/12700.misc Normal file
View file

@ -0,0 +1 @@
Respect the `@cancellable` flag for `ReplicationEndpoint`s.

View file

@ -26,7 +26,8 @@ from twisted.web.server import Request
from synapse.api.errors import HttpResponseException, SynapseError from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer from synapse.http.server import HttpServer, is_method_cancellable
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing from synapse.logging import opentracing
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.types import JsonDict from synapse.types import JsonDict
@ -310,6 +311,12 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
url_args = list(self.PATH_ARGS) url_args = list(self.PATH_ARGS)
method = self.METHOD method = self.METHOD
if self.CACHE and is_method_cancellable(self._handle_request):
raise Exception(
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
"is set. The cancellable flag would have no effect."
)
if self.CACHE: if self.CACHE:
url_args.append("txn_id") url_args.append("txn_id")
@ -324,7 +331,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
) )
async def _check_auth_and_handle( async def _check_auth_and_handle(
self, request: Request, **kwargs: Any self, request: SynapseRequest, **kwargs: Any
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:
"""Called on new incoming requests when caching is enabled. Checks """Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that, if there is a cached response for the request and returns that,
@ -340,8 +347,18 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if self.CACHE: if self.CACHE:
txn_id = kwargs.pop("txn_id") txn_id = kwargs.pop("txn_id")
# We ignore the `@cancellable` flag, since cancellation wouldn't interupt
# `_handle_request` and `ResponseCache` does not handle cancellation
# correctly yet. In particular, there may be issues to do with logging
# context lifetimes.
return await self.response_cache.wrap( return await self.response_cache.wrap(
txn_id, self._handle_request, request, **kwargs txn_id, self._handle_request, request, **kwargs
) )
# The `@cancellable` decorator may be applied to `_handle_request`. But we
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
# so we have to set up the cancellable flag ourselves.
request.is_render_cancellable = is_method_cancellable(self._handle_request)
return await self._handle_request(request, **kwargs) return await self._handle_request(request, **kwargs)

View file

@ -0,0 +1,13 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,106 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http import HTTPStatus
from typing import Tuple
from twisted.web.server import Request
from synapse.api.errors import Codes
from synapse.http.server import JsonResource, cancellable
from synapse.replication.http import REPLICATION_PREFIX
from synapse.replication.http._base import ReplicationEndpoint
from synapse.server import HomeServer
from synapse.types import JsonDict
from tests import unittest
from tests.http.server._base import EndpointCancellationTestHelperMixin
class CancellableReplicationEndpoint(ReplicationEndpoint):
NAME = "cancellable_sleep"
PATH_ARGS = ()
CACHE = False
def __init__(self, hs: HomeServer):
super().__init__(hs)
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload() -> JsonDict:
return {}
@cancellable
async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
await self.clock.sleep(1.0)
return HTTPStatus.OK, {"result": True}
class UncancellableReplicationEndpoint(ReplicationEndpoint):
NAME = "uncancellable_sleep"
PATH_ARGS = ()
CACHE = False
def __init__(self, hs: HomeServer):
super().__init__(hs)
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload() -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
await self.clock.sleep(1.0)
return HTTPStatus.OK, {"result": True}
class ReplicationEndpointCancellationTestCase(
unittest.HomeserverTestCase, EndpointCancellationTestHelperMixin
):
"""Tests for `ReplicationEndpoint` cancellation."""
def create_test_resource(self):
"""Overrides `HomeserverTestCase.create_test_resource`."""
resource = JsonResource(self.hs)
CancellableReplicationEndpoint(self.hs).register(resource)
UncancellableReplicationEndpoint(self.hs).register(resource)
return resource
def test_cancellable_disconnect(self) -> None:
"""Test that handlers with the `@cancellable` flag can be cancelled."""
path = f"{REPLICATION_PREFIX}/{CancellableReplicationEndpoint.NAME}/"
channel = self.make_request("POST", path, await_result=False)
self._test_disconnect(
self.reactor,
channel,
expect_cancellation=True,
expected_body={"error": "Request cancelled", "errcode": Codes.UNKNOWN},
)
def test_uncancellable_disconnect(self) -> None:
"""Test that handlers without the `@cancellable` flag cannot be cancelled."""
path = f"{REPLICATION_PREFIX}/{UncancellableReplicationEndpoint.NAME}/"
channel = self.make_request("POST", path, await_result=False)
self._test_disconnect(
self.reactor,
channel,
expect_cancellation=False,
expected_body={"result": True},
)