# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2021 The Matrix.org Foundation C.I.C. # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # from typing import Collection from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.errors import Codes from synapse.rest.client import login from synapse.server import HomeServer from synapse.storage.background_updates import BackgroundUpdater from synapse.types import JsonDict from synapse.util import Clock from tests import unittest class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, login.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") self.updater = BackgroundUpdater(hs, self.store.db_pool) @parameterized.expand( [ ("GET", "/_synapse/admin/v1/background_updates/enabled"), ("POST", "/_synapse/admin/v1/background_updates/enabled"), ("GET", "/_synapse/admin/v1/background_updates/status"), ("POST", "/_synapse/admin/v1/background_updates/start_job"), ] ) def test_requester_is_no_admin(self, method: str, url: str) -> None: """ If the user is not a server admin, an error 403 is returned. """ self.register_user("user", "pass", admin=False) other_user_tok = self.login("user", "pass") channel = self.make_request( method, url, content={}, access_token=other_user_tok, ) self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) def test_invalid_parameter(self) -> None: """ If parameters are invalid, an error is returned. """ url = "/_synapse/admin/v1/background_updates/start_job" # empty content channel = self.make_request( "POST", url, content={}, access_token=self.admin_user_tok, ) self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"]) # job_name invalid channel = self.make_request( "POST", url, content={"job_name": "unknown"}, access_token=self.admin_user_tok, ) self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"]) def _register_bg_update(self) -> None: "Adds a bg update but doesn't start it" async def _fake_update(progress: JsonDict, batch_size: int) -> int: await self.clock.sleep(0.2) return batch_size self.store.db_pool.updates.register_background_update_handler( "test_update", _fake_update, ) self.get_success( self.store.db_pool.simple_insert( table="background_updates", values={ "update_name": "test_update", "progress_json": "{}", }, ) ) def test_status_empty(self) -> None: """Test the status API works.""" channel = self.make_request( "GET", "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) # Background updates should be enabled, but none should be running. self.assertDictEqual( channel.json_body, {"current_updates": {}, "enabled": True} ) def test_status_bg_update(self) -> None: """Test the status API works with a background update.""" # Create a new background update self._register_bg_update() self.store.db_pool.updates.start_doing_background_updates() self.reactor.pump([1.0, 1.0, 1.0]) channel = self.make_request( "GET", "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) # Background updates should be enabled, and one should be running. self.assertDictEqual( channel.json_body, { "current_updates": { "master": { "name": "test_update", "average_items_per_ms": 0.1, "total_duration_ms": 1000.0, "total_item_count": ( self.updater.default_background_batch_size ), } }, "enabled": True, }, ) def test_enabled(self) -> None: """Test the enabled API works.""" # Create a new background update self._register_bg_update() self.store.db_pool.updates.start_doing_background_updates() # Test that GET works and returns enabled is True. channel = self.make_request( "GET", "/_synapse/admin/v1/background_updates/enabled", access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertDictEqual(channel.json_body, {"enabled": True}) # Disable the BG updates channel = self.make_request( "POST", "/_synapse/admin/v1/background_updates/enabled", content={"enabled": False}, access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertDictEqual(channel.json_body, {"enabled": False}) # Advance a bit and get the current status, note this will finish the in # flight background update so we call it the status API twice and check # there was no change. self.reactor.pump([1.0, 1.0]) channel = self.make_request( "GET", "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertDictEqual( channel.json_body, { "current_updates": { "master": { "name": "test_update", "average_items_per_ms": 0.1, "total_duration_ms": 1000.0, "total_item_count": ( self.updater.default_background_batch_size ), } }, "enabled": False, }, ) # Run the reactor for a bit so the BG updates would have a chance to run # if they were to. self.reactor.pump([1.0, 1.0]) channel = self.make_request( "GET", "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) # There should be no change from the previous /status response. self.assertDictEqual( channel.json_body, { "current_updates": { "master": { "name": "test_update", "average_items_per_ms": 0.1, "total_duration_ms": 1000.0, "total_item_count": ( self.updater.default_background_batch_size ), } }, "enabled": False, }, ) # Re-enable the background updates. channel = self.make_request( "POST", "/_synapse/admin/v1/background_updates/enabled", content={"enabled": True}, access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertDictEqual(channel.json_body, {"enabled": True}) self.reactor.pump([1.0, 1.0]) channel = self.make_request( "GET", "/_synapse/admin/v1/background_updates/status", access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) # Background updates should be enabled and making progress. self.assertDictEqual( channel.json_body, { "current_updates": { "master": { "name": "test_update", "average_items_per_ms": 0.05263157894736842, "total_duration_ms": 2000.0, "total_item_count": (110), } }, "enabled": True, }, ) @parameterized.expand( [ ("populate_stats_process_rooms", ["populate_stats_process_rooms"]), ( "regenerate_directory", [ "populate_user_directory_createtables", "populate_user_directory_process_rooms", "populate_user_directory_process_users", "populate_user_directory_cleanup", ], ), ] ) def test_start_backround_job(self, job_name: str, updates: Collection[str]) -> None: """ Test that background updates add to database and be processed. Args: job_name: name of the job to call with API updates: collection of background updates to be started """ # no background update is waiting self.assertTrue( self.get_success( self.store.db_pool.updates.has_completed_background_updates() ) ) channel = self.make_request( "POST", "/_synapse/admin/v1/background_updates/start_job", content={"job_name": job_name}, access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) # test that each background update is waiting now for update in updates: self.assertFalse( self.get_success( self.store.db_pool.updates.has_completed_background_update(update) ) ) self.wait_for_background_updates() # background updates are done self.assertTrue( self.get_success( self.store.db_pool.updates.has_completed_background_updates() ) ) def test_start_backround_job_twice(self) -> None: """Test that add a background update twice return an error.""" # add job to database self.get_success( self.store.db_pool.simple_insert( table="background_updates", values={ "update_name": "populate_stats_process_rooms", "progress_json": "{}", }, ) ) channel = self.make_request( "POST", "/_synapse/admin/v1/background_updates/start_job", content={"job_name": "populate_stats_process_rooms"}, access_token=self.admin_user_tok, ) self.assertEqual(400, channel.code, msg=channel.json_body)