Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry

Conflicts:
	synapse/storage/schema/__init__.py
This commit is contained in:
Eric Eastwood 2022-09-14 15:36:57 -05:00
commit 84f91e36f3
38 changed files with 783 additions and 121 deletions

View file

@ -0,0 +1 @@
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.

1
changelog.d/13749.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long standing bug where device lists would remain cached when remote users left and rejoined the last room shared with the local homeserver.

1
changelog.d/13753.misc Normal file
View file

@ -0,0 +1 @@
Prepatory work for storing thread IDs for notifications and receipts.

1
changelog.d/13780.misc Normal file
View file

@ -0,0 +1 @@
Deduplicate `is_server_notices_room`.

1
changelog.d/13788.misc Normal file
View file

@ -0,0 +1 @@
Remove an old, incorrect migration file.

1
changelog.d/13789.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing spec compliance bug where Synapse would accept a trailing slash on the end of `/get_missing_events` federation requests.

1
changelog.d/13795.misc Normal file
View file

@ -0,0 +1 @@
Remove unused method in `synapse.api.auth.Auth`.

1
changelog.d/13798.misc Normal file
View file

@ -0,0 +1 @@
Fix a memory leak when running the unit tests.

1
changelog.d/13802.misc Normal file
View file

@ -0,0 +1 @@
Use partial indices on SQLite.

View file

@ -459,15 +459,6 @@ class Auth:
)
raise InvalidClientTokenError("Invalid access token passed.")
def get_appservice_by_req(self, request: SynapseRequest) -> ApplicationService:
token = self.get_access_token_from_request(request)
service = self.store.get_app_service_by_token(token)
if not service:
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
request.requester = create_requester(service.sender, app_service=service)
return service
async def is_server_admin(self, requester: Requester) -> bool:
"""Check if the given user is a local server admin.

View file

@ -549,8 +549,7 @@ class FederationClientKeysClaimServlet(BaseFederationServerServlet):
class FederationGetMissingEventsServlet(BaseFederationServerServlet):
# TODO(paul): Why does this path alone end with "/?" optional?
PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
PATH = "/get_missing_events/(?P<room_id>[^/]*)"
async def on_POST(
self,

View file

@ -45,7 +45,6 @@ from synapse.types import (
JsonDict,
StreamKeyType,
StreamToken,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
@ -324,8 +323,6 @@ class DeviceHandler(DeviceWorkerHandler):
self.device_list_updater.incoming_device_list_update,
)
hs.get_distributor().observe("user_left_room", self.user_left_room)
# Whether `_handle_new_device_update_async` is currently processing.
self._handle_new_device_update_is_processing = False
@ -569,14 +566,6 @@ class DeviceHandler(DeviceWorkerHandler):
StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
)
async def user_left_room(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We no longer share rooms with this user, so we'll no longer
# receive device updates. Mark this in DB.
await self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
async def store_dehydrated_device(
self,
user_id: str,

View file

@ -175,6 +175,32 @@ class E2eKeysHandler:
user_ids_not_in_cache,
remote_results,
) = await self.store.get_user_devices_from_cache(query_list)
# Check that the homeserver still shares a room with all cached users.
# Note that this check may be slightly racy when a remote user leaves a
# room after we have fetched their cached device list. In the worst case
# we will do extra federation queries for devices that we had cached.
cached_users = set(remote_results.keys())
valid_cached_users = (
await self.store.get_users_server_still_shares_room_with(
remote_results.keys()
)
)
invalid_cached_users = cached_users - valid_cached_users
if invalid_cached_users:
# Fix up results. If we get here, there is either a bug in device
# list tracking, or we hit the race mentioned above.
user_ids_not_in_cache.update(invalid_cached_users)
for invalid_user_id in invalid_cached_users:
remote_results.pop(invalid_user_id)
# This log message may be removed if it turns out it's almost
# entirely triggered by races.
logger.error(
"Devices for %s were cached, but the server no longer shares "
"any rooms with them. The cached device lists are stale.",
invalid_cached_users,
)
for user_id, devices in remote_results.items():
user_devices = results.setdefault(user_id, {})
for device_id, device in devices.items():

View file

@ -862,6 +862,9 @@ class FederationEventHandler:
self._sanity_check_event(event)
except SynapseError as err:
logger.warning("Event %s failed sanity check: %s", event_id, err)
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(err)
)
return
try:
@ -897,6 +900,10 @@ class FederationEventHandler:
backfilled=backfilled,
)
except FederationError as e:
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(e)
)
if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:

View file

@ -752,20 +752,12 @@ class EventCreationHandler:
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
return await self._is_server_notices_room(builder.room_id)
return await self.store.is_server_notice_room(builder.room_id)
elif membership == Membership.LEAVE:
# the user is always allowed to leave (but not kick people)
return builder.state_key == requester.user.to_string()
return False
async def _is_server_notices_room(self, room_id: str) -> bool:
if self.config.servernotices.server_notices_mxid is None:
return False
is_server_notices_room = await self.store.check_local_user_in_room(
user_id=self.config.servernotices.server_notices_mxid, room_id=room_id
)
return is_server_notices_room
async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
"""Check if a user has accepted the privacy policy

View file

@ -837,7 +837,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
old_membership == Membership.INVITE
and effective_membership_state == Membership.LEAVE
):
is_blocked = await self._is_server_notice_room(room_id)
is_blocked = await self.store.is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
HTTPStatus.FORBIDDEN,
@ -1617,14 +1617,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
return False
async def _is_server_notice_room(self, room_id: str) -> bool:
if self._server_notices_mxid is None:
return False
is_server_notices_room = await self.store.check_local_user_in_room(
user_id=self._server_notices_mxid, room_id=room_id
)
return is_server_notices_room
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs: "HomeServer"):

View file

@ -198,7 +198,7 @@ class BulkPushRuleEvaluator:
return pl_event.content if pl_event else {}, sender_level
async def _get_mutual_relations(
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
) -> Dict[str, Set[Tuple[str, str]]]:
"""
Fetch event metadata for events which related to the same event as the given event.
@ -206,7 +206,7 @@ class BulkPushRuleEvaluator:
If the given event has no relation information, returns an empty dictionary.
Args:
event_id: The event ID which is targeted by relations.
parent_id: The event ID which is targeted by relations.
rules: The push rules which will be processed for this event.
Returns:
@ -220,12 +220,6 @@ class BulkPushRuleEvaluator:
if not self._relations_match_enabled:
return {}
# If the event does not have a relation, then cannot have any mutual
# relations.
relation = relation_from_event(event)
if not relation:
return {}
# Pre-filter to figure out which relation types are interesting.
rel_types = set()
for rule, enabled in rules:
@ -246,9 +240,7 @@ class BulkPushRuleEvaluator:
return {}
# If any valid rules were found, fetch the mutual relations.
return await self.store.get_mutual_event_relations(
relation.parent_id, rel_types
)
return await self.store.get_mutual_event_relations(parent_id, rel_types)
@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
@ -281,9 +273,17 @@ class BulkPushRuleEvaluator:
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)
relations = await self._get_mutual_relations(
event, itertools.chain(*rules_by_user.values())
)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = "main"
if relation:
relations = await self._get_mutual_relations(
relation.parent_id, itertools.chain(*rules_by_user.values())
)
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
evaluator = PushRuleEvaluatorForEvent(
event,
@ -352,6 +352,7 @@ class BulkPushRuleEvaluator:
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)

View file

@ -581,9 +581,6 @@ class BackgroundUpdater:
def create_index_sqlite(conn: Connection) -> None:
# Sqlite doesn't support concurrent creation of indexes.
#
# We don't use partial indices on SQLite as it wasn't introduced
# until 3.8, and wheezy and CentOS 7 have 3.7
#
# We assume that sqlite doesn't give us invalid indices; however
# we may still end up with the index existing but the
# background_updates not having been recorded if synapse got shut
@ -591,12 +588,13 @@ class BackgroundUpdater:
# has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
sql = (
"CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
" (%(columns)s)"
" (%(columns)s) %(where_clause)s"
) % {
"unique": "UNIQUE" if unique else "",
"name": index_name,
"table": table,
"columns": ", ".join(columns),
"where_clause": "WHERE " + where_clause if where_clause else "",
}
c = conn.cursor()

View file

@ -602,9 +602,9 @@ class EventsPersistenceStorageController:
# room
state_delta_for_room: Dict[str, DeltaState] = {}
# Set of remote users which were in rooms the server has left. We
# should check if we still share any rooms and if not we mark their
# device lists as stale.
# Set of remote users which were in rooms the server has left or who may
# have left rooms the server is in. We should check if we still share any
# rooms and if not we mark their device lists as stale.
potentially_left_users: Set[str] = set()
if not backfilled:
@ -729,6 +729,20 @@ class EventsPersistenceStorageController:
current_state = {}
delta.no_longer_in_room = True
# Add all remote users that might have left rooms.
potentially_left_users.update(
user_id
for event_type, user_id in delta.to_delete
if event_type == EventTypes.Member
and not self.is_mine_id(user_id)
)
potentially_left_users.update(
user_id
for event_type, user_id in delta.to_insert.keys()
if event_type == EventTypes.Member
and not self.is_mine_id(user_id)
)
state_delta_for_room[room_id] = delta
await self.persist_events_store._persist_events_and_state_updates(

View file

@ -1294,6 +1294,51 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return event_id_results
@trace
async def record_event_failed_pull_attempt(
self, room_id: str, event_id: str, cause: str
) -> None:
"""
Record when we fail to pull an event over federation.
This information allows us to be more intelligent when we decide to
retry (we don't need to fail over and over) and we can process that
event in the background so we don't block on it each time.
Args:
room_id: The room where the event failed to pull from
event_id: The event that failed to be fetched or processed
cause: The error message or reason that we failed to pull the event
"""
await self.db_pool.runInteraction(
"record_event_failed_pull_attempt",
self._record_event_failed_pull_attempt_upsert_txn,
room_id,
event_id,
cause,
db_autocommit=True, # Safe as it's a single upsert
)
def _record_event_failed_pull_attempt_upsert_txn(
self,
txn: LoggingTransaction,
room_id: str,
event_id: str,
cause: str,
) -> None:
sql = """
INSERT INTO event_failed_pull_attempts (
room_id, event_id, num_attempts, last_attempt_ts, last_cause
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (room_id, event_id) DO UPDATE SET
num_attempts=event_failed_pull_attempts.num_attempts + 1,
last_attempt_ts=EXCLUDED.last_attempt_ts,
last_cause=EXCLUDED.last_cause;
"""
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
async def get_missing_events(
self,
room_id: str,

View file

@ -98,6 +98,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@ -232,6 +233,104 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
replaces_index="event_push_summary_user_rm",
)
self.db_pool.updates.register_background_index_update(
"event_push_summary_unique_index2",
index_name="event_push_summary_unique_index2",
table="event_push_summary",
columns=["user_id", "room_id", "thread_id"],
unique=True,
)
self.db_pool.updates.register_background_update_handler(
"event_push_backfill_thread_id",
self._background_backfill_thread_id,
)
async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Fill in the thread_id field for event_push_actions and event_push_summary.
This is preparatory so that it can be made non-nullable in the future.
Because all current (null) data is done in an unthreaded manner this
simply assumes it is on the "main" timeline. Since event_push_actions
are periodically cleared it is not possible to correctly re-calculate
the thread_id.
"""
event_push_actions_done = progress.get("event_push_actions_done", False)
def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
) -> int:
sql = f"""
SELECT stream_ordering
FROM {table_name}
WHERE
thread_id IS NULL
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT ?
"""
txn.execute(sql, (start_stream_ordering, batch_size))
# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return 0
# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]
sql = f"""
UPDATE {table_name}
SET thread_id = 'main'
WHERE stream_ordering <= ? AND thread_id IS NULL
"""
txn.execute(sql, (max_stream_ordering,))
# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return processed_rows
# First update the event_push_actions table, then the event_push_summary table.
#
# Note that the event_push_actions_staging table is ignored since it is
# assumed that items in that table will only exist for a short period of
# time.
if not event_push_actions_done:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
)
# Only done after the event_push_summary table is done.
if not result:
await self.db_pool.updates._end_background_update(
"event_push_backfill_thread_id"
)
return result
@cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
@ -670,6 +769,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
event_id: str,
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
count_as_unread: bool,
thread_id: str,
) -> None:
"""Add the push actions for the event to the push action staging area.
@ -678,6 +778,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
count_as_unread: Whether this event should increment unread counts.
thread_id: The thread this event is parent of, if applicable.
"""
if not user_id_actions:
return
@ -686,7 +787,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(
user_id: str, actions: Collection[Union[Mapping, str]]
) -> Tuple[str, str, str, int, int, int]:
) -> Tuple[str, str, str, int, int, int, str]:
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
return (
@ -696,11 +797,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
notif, # notif column
is_highlight, # highlight column
int(count_as_unread), # unread column
thread_id, # thread_id column
)
await self.db_pool.simple_insert_many(
"event_push_actions_staging",
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
keys=(
"event_id",
"user_id",
"actions",
"notif",
"highlight",
"unread",
"thread_id",
),
values=[
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.items()
@ -981,6 +1091,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
@ -990,6 +1102,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)
@ -1138,17 +1251,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications, handling %d rows", len(summaries))
# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values()
],
@ -1255,7 +1370,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
table="event_push_actions",
columns=["highlight", "stream_ordering"],
where_clause="highlight=0",
psql_only=True,
)
async def get_push_actions_for_user(

View file

@ -2192,9 +2192,9 @@ class PersistEventsStore:
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight, unread
topological_ordering, notif, highlight, unread, thread_id
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
FROM event_push_actions_staging
WHERE event_id = ?
"""
@ -2435,17 +2435,31 @@ class PersistEventsStore:
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
]
txn.execute_batch(
query,
[
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
],
backward_extremity_tuples_to_remove,
)
# Clear out the failed backfill attempts after we successfully pulled
# the event. Since we no longer need these events as backward
# extremities, it also means that they won't be backfilled from again so
# we no longer need to store the backfill attempts around it.
query = """
DELETE FROM event_failed_pull_attempts
WHERE event_id = ? and room_id = ?
"""
txn.execute_batch(
query,
backward_extremity_tuples_to_remove,
)

View file

@ -113,6 +113,24 @@ class ReceiptsWorkerStore(SQLBaseStore):
prefilled_cache=receipts_stream_prefill,
)
self.db_pool.updates.register_background_index_update(
"receipts_linearized_unique_index",
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)
self.db_pool.updates.register_background_index_update(
"receipts_graph_unique_index",
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)
def get_max_receipt_stream_id(self) -> int:
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
@ -677,6 +695,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"event_id": event_id,
"event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
@ -824,6 +843,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock

View file

@ -88,6 +88,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# at a time. Keyed by room_id.
self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
if (
self.hs.config.worker.run_background_tasks
and self.hs.config.metrics.metrics_flags.known_servers
@ -504,6 +506,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return membership == Membership.JOIN
async def is_server_notice_room(self, room_id: str) -> bool:
"""
Determines whether the given room is a 'Server Notices' room, used for
sending server notices to a user.
This is determined by seeing whether the server notices user is present
in the room.
"""
if self._server_notices_mxid is None:
return False
is_server_notices_room = await self.check_local_user_in_room(
user_id=self._server_notices_mxid, room_id=room_id
)
return is_server_notices_room
async def get_local_current_membership_for_user_in_room(
self, user_id: str, room_id: str
) -> Tuple[Optional[str], Optional[str]]:

View file

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SCHEMA_VERSION = 72 # remember to update the list below when updating
SCHEMA_VERSION = 73 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@ -77,6 +77,12 @@ Changes in SCHEMA_VERSION = 72:
- Tables related to groups are dropped.
- Unused column application_services_state.last_txn is dropped
- Cache invalidation stream id sequence now begins at 2 to match code expectation.
Changes in SCHEMA_VERSION = 73;
- thread_id column is added to event_push_actions, event_push_actions_staging
event_push_summary, receipts_linearized, and receipts_graph.
- Add table `event_failed_pull_attempts` to keep track when we fail to pull
events over federation.
- Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
from `opentracing_context` to generalized `tracing_context`.
"""

View file

@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a nullable column for thread ID to the event push actions tables; this
-- will be filled in with a default value for any previously existing rows.
--
-- After migration this can be made non-nullable.
ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;
-- Update the unique index for `event_push_summary`.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7006, 'event_push_summary_unique_index2', '{}');
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');

View file

@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a nullable column for thread ID to the receipts table; this allows a
-- receipt per user, per room, as well as an unthreaded receipt (corresponding
-- to a null thread ID).
ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT;
ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT;
-- Rebuild the unique constraint with the thread_id.
ALTER TABLE receipts_linearized
ADD CONSTRAINT receipts_linearized_uniqueness_thread
UNIQUE (room_id, receipt_type, user_id, thread_id);
ALTER TABLE receipts_graph
ADD CONSTRAINT receipts_graph_uniqueness_thread
UNIQUE (room_id, receipt_type, user_id, thread_id);

View file

@ -0,0 +1,70 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Allow multiple receipts per user per room via a nullable thread_id column.
--
-- SQLite doesn't support modifying constraints to an existing table, so it must
-- be recreated.
-- Create the new tables.
CREATE TABLE receipts_linearized_new (
stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
thread_id TEXT,
event_stream_ordering BIGINT,
data TEXT NOT NULL,
CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id),
CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
);
CREATE TABLE receipts_graph_new (
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_ids TEXT NOT NULL,
thread_id TEXT,
data TEXT NOT NULL,
CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id),
CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
);
-- Drop the old indexes.
DROP INDEX IF EXISTS receipts_linearized_id;
DROP INDEX IF EXISTS receipts_linearized_room_stream;
DROP INDEX IF EXISTS receipts_linearized_user;
-- Copy the data.
INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data)
SELECT stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data
FROM receipts_linearized;
INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
SELECT room_id, receipt_type, user_id, event_ids, data
FROM receipts_graph;
-- Drop the old tables.
DROP TABLE receipts_linearized;
DROP TABLE receipts_graph;
-- Rename the tables.
ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
-- Create the indices.
CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );

View file

@ -0,0 +1,20 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7007, 'receipts_linearized_unique_index', '{}');
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7007, 'receipts_graph_unique_index', '{}');

View file

@ -0,0 +1,56 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- SQLite needs to rebuild indices which use partial indices on Postgres, but
-- previously did not use them on SQLite.
-- Drop each index that was added with register_background_index_update AND specified
-- a where_clause (that existed before this delta).
-- From events_bg_updates.py
DROP INDEX IF EXISTS event_contains_url_index;
-- There is also a redactions_censored_redacts index, but that gets dropped.
DROP INDEX IF EXISTS redactions_have_censored_ts;
-- There is also a PostgreSQL only index (event_contains_url_index2)
-- which gets renamed to event_contains_url_index.
-- From roommember.py
DROP INDEX IF EXISTS room_memberships_user_room_forgotten;
-- From presence.py
DROP INDEX IF EXISTS presence_stream_state_not_offline_idx;
-- From media_repository.py
DROP INDEX IF EXISTS local_media_repository_url_idx;
-- From event_push_actions.py
DROP INDEX IF EXISTS event_push_actions_highlights_index;
-- There's also a event_push_actions_stream_highlight_index which was previously
-- PostgreSQL-only.
-- From state.py
DROP INDEX IF EXISTS current_state_events_member_index;
-- Re-insert the background jobs to re-create the indices.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7209, 'event_contains_url_index', '{}', NULL),
(7209, 'redactions_have_censored_ts_idx', '{}', NULL),
(7209, 'room_membership_forgotten_idx', '{}', NULL),
(7209, 'presence_stream_not_offline_index', '{}', NULL),
(7209, 'local_media_repository_url_idx', '{}', NULL),
(7209, 'event_push_actions_highlights_index', '{}', NULL),
(7209, 'event_push_actions_stream_highlight_index', '{}', NULL),
(7209, 'current_state_members_idx', '{}', NULL)
ON CONFLICT (update_name) DO NOTHING;

View file

@ -0,0 +1,29 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a table that keeps track of when we failed to pull an event over
-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
-- us to be more intelligent when we decide to retry (we don't need to fail over
-- and over) and we can process that event in the background so we don't block
-- on it each time.
CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
room_id TEXT NOT NULL REFERENCES rooms (room_id),
event_id TEXT NOT NULL,
num_attempts INT NOT NULL,
last_attempt_ts BIGINT NOT NULL,
last_cause TEXT NOT NULL,
PRIMARY KEY (room_id, event_id)
);

View file

@ -1,37 +0,0 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* We used to create a table called current_state_resets, but this is no
* longer used and is removed in delta 54.
*/
/* The outlier events that have aquired a state group typically through
* backfill. This is tracked separately to the events table, as assigning a
* state group change the position of the existing event in the stream
* ordering.
* However since a stream_ordering is assigned in persist_event for the
* (event, state) pair, we can use that stream_ordering to identify when
* the new state was assigned for the event.
*/
/* NB: This table belongs to the `main` logical database; it should not be present
* in `state`.
*/
CREATE TABLE IF NOT EXISTS ex_outlier_stream(
event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
event_id TEXT NOT NULL,
state_group BIGINT NOT NULL
);

View file

@ -205,8 +205,9 @@ def register_cache(
add_resizable_cache(cache_name, resize_callback)
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
metric_name = "cache_%s_%s" % (cache_type, cache_name)
caches_by_name[cache_name] = cache
CACHE_METRIC_REGISTRY.register_hook(metric.collect)
CACHE_METRIC_REGISTRY.register_hook(metric_name, metric.collect)
return metric

View file

@ -15,7 +15,7 @@
import logging
from functools import wraps
from types import TracebackType
from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
from typing import Awaitable, Callable, Dict, Generator, Optional, Type, TypeVar
from prometheus_client import CollectorRegistry, Counter, Metric
from typing_extensions import Concatenate, ParamSpec, Protocol
@ -220,21 +220,21 @@ class DynamicCollectorRegistry(CollectorRegistry):
def __init__(self) -> None:
super().__init__()
self._pre_update_hooks: List[Callable[[], None]] = []
self._pre_update_hooks: Dict[str, Callable[[], None]] = {}
def collect(self) -> Generator[Metric, None, None]:
"""
Collects metrics, calling pre-update hooks first.
"""
for pre_update_hook in self._pre_update_hooks:
for pre_update_hook in self._pre_update_hooks.values():
pre_update_hook()
yield from super().collect()
def register_hook(self, hook: Callable[[], None]) -> None:
def register_hook(self, metric_name: str, hook: Callable[[], None]) -> None:
"""
Registers a hook that is called before metric collection.
"""
self._pre_update_hooks.append(hook)
self._pre_update_hooks[metric_name] = hook

View file

@ -891,6 +891,12 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
new_callable=mock.MagicMock,
return_value=make_awaitable(["some_room_id"]),
)
mock_get_users = mock.patch.object(
self.store,
"get_users_server_still_shares_room_with",
new_callable=mock.MagicMock,
return_value=make_awaitable({remote_user_id}),
)
mock_request = mock.patch.object(
self.hs.get_federation_client(),
"query_user_devices",
@ -898,7 +904,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
return_value=make_awaitable(response_body),
)
with mock_get_rooms, mock_request as mocked_federation_request:
with mock_get_rooms, mock_get_users, mock_request as mocked_federation_request:
# Make the first query and sanity check it succeeds.
response_1 = self.get_success(
e2e_handler.query_devices(

View file

@ -227,3 +227,225 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()
def test_process_pulled_event_records_failed_backfill_attempts(
self,
) -> None:
"""
Test to make sure that failed backfill attempts for an event are
recorded in the `event_failed_pull_attempts` table.
In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event has a fake `prev_event` which our server has
obviously never seen before so it attempts to request the state at that
`prev_event` which expectedly fails because it's a fake event. Because
the server can't fetch the state at the missing `prev_event`, the
"pulled" event fails the history check and is fails to process.
We check that we correctly record the number of failed pull attempts
of the pulled event and as a sanity check, that the "pulled" event isn't
persisted.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main
# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))
# We expect an outbound request to /state_ids, so stub that out
self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
{
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
"pdu_ids": [],
"auth_chain_ids": [],
}
)
# We also expect an outbound request to /state
self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
StateRequestResponse(
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
auth_events=[],
state=[],
)
)
pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [
# The fake prev event will make the pulled event fail
# the history check (`Unable to get missing prev_event
# $fake_prev_event`)
"$fake_prev_event"
],
"auth_events": [],
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)
# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)
# Make sure our failed pull attempt was recorded
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)
# The function under test: try to process the pulled event again
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)
# Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 2)
# And as a sanity check, make sure the event was not persisted through all of this.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNone(
persisted,
"pulled event that fails the history check should not be persisted at all",
)
def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
self,
) -> None:
"""
Test to make sure that failed pull attempts
(`event_failed_pull_attempts` table) for an event are cleared after the
event is successfully persisted.
In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event succesfully processes and the backward
extremeties are updated along with clearing out any failed pull attempts
for those old extremities.
We check that we correctly cleared failed pull attempts of the
pulled event.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main
# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))
# allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)
# add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)
initial_state_map = self.get_success(
main_store.get_partial_current_state_ids(room_id)
)
auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
member_event.event_id,
]
pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [member_event.event_id],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)
# Fake the "pulled" event failing to backfill once so we can test
# if it's cleared out later on.
self.get_success(
main_store.record_event_failed_pull_attempt(
pulled_event.room_id, pulled_event.event_id, "fake cause"
)
)
# Make sure we have a failed pull attempt recorded for the pulled event
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)
# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)
# Make sure the failed pull attempts for the pulled event are cleared
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
allow_none=True,
)
)
self.assertIsNone(backfill_num_attempts)
# And as a sanity check, make sure the "pulled" event was persisted.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNotNone(persisted, "pulled event was not persisted at all")

View file

@ -404,6 +404,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
event.event_id,
{user_id: actions for user_id, actions in push_actions},
False,
"main",
)
)
return event, context