synapse/tests/util/test_linearizer.py
2023-11-21 15:29:58 -05:00

262 lines
8.5 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Hashable, Tuple
from typing_extensions import Protocol
from twisted.internet import defer, reactor
from twisted.internet.base import ReactorBase
from twisted.internet.defer import CancelledError, Deferred
from synapse.logging.context import LoggingContext, current_context
from synapse.util.async_helpers import Linearizer
from tests import unittest
class UnblockFunction(Protocol):
def __call__(self, pump_reactor: bool = True) -> None:
...
class LinearizerTestCase(unittest.TestCase):
def _start_task(
self, linearizer: Linearizer, key: Hashable
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
"""Starts a task which acquires the linearizer lock, blocks, then completes.
Args:
linearizer: The `Linearizer`.
key: The `Linearizer` key.
Returns:
A tuple containing:
* A cancellable `Deferred` for the entire task.
* A `Deferred` that resolves once the task acquires the lock.
* A function that unblocks the task. Must be called by the caller
to allow the task to release the lock and complete.
"""
acquired_d: "Deferred[None]" = Deferred()
unblock_d: "Deferred[None]" = Deferred()
async def task() -> None:
async with linearizer.queue(key):
acquired_d.callback(None)
await unblock_d
d = defer.ensureDeferred(task())
def unblock(pump_reactor: bool = True) -> None:
unblock_d.callback(None)
# The next task, if it exists, will acquire the lock and require a kick of
# the reactor to advance.
if pump_reactor:
self._pump()
return d, acquired_d, unblock
def _pump(self) -> None:
"""Pump the reactor to advance `Linearizer`s."""
assert isinstance(reactor, ReactorBase)
while reactor.getDelayedCalls():
reactor.runUntilCurrent()
def test_linearizer(self) -> None:
"""Tests that a task is queued up behind an earlier task."""
linearizer = Linearizer()
key = object()
_, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
_, acquired_d2, unblock2 = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)
# Once the first task is done, the second task can continue.
unblock1()
self.assertTrue(acquired_d2.called)
unblock2()
def test_linearizer_is_queued(self) -> None:
"""Tests `Linearizer.is_queued`.
Runs through the same scenario as `test_linearizer`.
"""
linearizer = Linearizer()
key = object()
_, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
# Since the first task acquires the lock immediately, "is_queued" should return
# false.
self.assertFalse(linearizer.is_queued(key))
_, acquired_d2, unblock2 = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)
# Now the second task is queued up behind the first.
self.assertTrue(linearizer.is_queued(key))
unblock1()
# And now the second task acquires the lock and nothing is in the queue again.
self.assertTrue(acquired_d2.called)
self.assertFalse(linearizer.is_queued(key))
unblock2()
self.assertFalse(linearizer.is_queued(key))
def test_lots_of_queued_things(self) -> None:
"""Tests lots of fast things queued up behind a slow thing.
The stack should *not* explode when the slow thing completes.
"""
linearizer = Linearizer()
key = ""
async def func(i: int) -> None:
with LoggingContext("func(%s)" % i) as lc:
async with linearizer.queue(key):
self.assertEqual(current_context(), lc)
self.assertEqual(current_context(), lc)
_, _, unblock = self._start_task(linearizer, key)
for i in range(1, 100):
defer.ensureDeferred(func(i))
d = defer.ensureDeferred(func(1000))
unblock()
self.successResultOf(d)
def test_multiple_entries(self) -> None:
"""Tests a `Linearizer` with a concurrency above 1."""
limiter = Linearizer(max_count=3)
key = object()
_, acquired_d1, unblock1 = self._start_task(limiter, key)
self.assertTrue(acquired_d1.called)
_, acquired_d2, unblock2 = self._start_task(limiter, key)
self.assertTrue(acquired_d2.called)
_, acquired_d3, unblock3 = self._start_task(limiter, key)
self.assertTrue(acquired_d3.called)
# These next two tasks have to wait.
_, acquired_d4, unblock4 = self._start_task(limiter, key)
self.assertFalse(acquired_d4.called)
_, acquired_d5, unblock5 = self._start_task(limiter, key)
self.assertFalse(acquired_d5.called)
# Once the first task completes, the fourth task can continue.
unblock1()
self.assertTrue(acquired_d4.called)
self.assertFalse(acquired_d5.called)
# Once the third task completes, the fifth task can continue.
unblock3()
self.assertTrue(acquired_d5.called)
# Make all tasks finish.
unblock2()
unblock4()
unblock5()
# The next task shouldn't have to wait.
_, acquired_d6, unblock6 = self._start_task(limiter, key)
self.assertTrue(acquired_d6)
unblock6()
def test_cancellation(self) -> None:
"""Tests cancellation while waiting for a `Linearizer`."""
linearizer = Linearizer()
key = object()
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
# Create a second task, waiting for the first task.
d2, acquired_d2, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)
# Create a third task, waiting for the second task.
d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
self.assertFalse(acquired_d3.called)
# Cancel the waiting second task.
d2.cancel()
unblock1()
self.successResultOf(d1)
self.assertTrue(d2.called)
self.failureResultOf(d2, CancelledError)
# The third task should continue running.
self.assertTrue(
acquired_d3.called,
"Third task did not get the lock after the second task was cancelled",
)
unblock3()
self.successResultOf(d3)
def test_cancellation_during_sleep(self) -> None:
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
linearizer = Linearizer()
key = object()
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
# Create a second task, waiting for the first task.
d2, acquired_d2, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)
# Create a third task, waiting for the second task.
d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
self.assertFalse(acquired_d3.called)
# Once the first task completes, cancel the waiting second task while it is
# sleeping just after acquiring the lock.
unblock1(pump_reactor=False)
self.successResultOf(d1)
d2.cancel()
self._pump()
self.assertTrue(d2.called)
self.failureResultOf(d2, CancelledError)
# The third task should continue running.
self.assertTrue(
acquired_d3.called,
"Third task did not get the lock after the second task was cancelled",
)
unblock3()
self.successResultOf(d3)