Merge branch 'develop' into squah/fix_remote_user_leave_device_list_tracking

This commit is contained in:
Sean Quah 2022-09-09 17:08:29 +01:00 committed by GitHub
commit b234f4d9f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
41 changed files with 442 additions and 318 deletions

View file

@ -84,6 +84,12 @@ jobs:
if: ${{ matrix.postgres-version }}
timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done
# We nuke the local copy, as we've installed synapse into the virtualenv
# (rather than use an editable install, which we no longer support). If we
# don't do this then python can't find the native lib.
- run: rm -rf synapse/
- run: python -m twisted.trial --jobs=2 tests
env:
SYNAPSE_POSTGRES: ${{ matrix.database == 'postgres' || '' }}
@ -128,6 +134,14 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- uses: Swatinem/rust-cache@v2
- name: Ensure sytest runs `pip install`
# Delete the lockfile so sytest will `pip install` rather than `poetry install`
run: rm /src/poetry.lock

View file

@ -10,6 +10,23 @@ concurrency:
cancel-in-progress: true
jobs:
# Job to detect what has changed so we don't run e.g. Rust checks on PRs that
# don't modify Rust code.
changes:
runs-on: ubuntu-latest
outputs:
rust: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.rust }}
steps:
- uses: dorny/paths-filter@v2
id: filter
# We only check on PRs
if: startsWith(github.ref, 'refs/pull/')
with:
filters: |
rust:
- 'rust/**'
- 'Cargo.toml'
check-sampleconfig:
runs-on: ubuntu-latest
steps:
@ -65,10 +82,54 @@ jobs:
extras: "all"
- run: poetry run scripts-dev/check_pydantic_models.py
lint-clippy:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.rust == 'true' }}
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
override: true
components: clippy
- uses: Swatinem/rust-cache@v2
- run: cargo clippy
lint-rustfmt:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.rust == 'true' }}
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
override: true
components: rustfmt
- uses: Swatinem/rust-cache@v2
- run: cargo fmt --check
# Dummy step to gate other tests on without repeating the whole list
linting-done:
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
needs: [lint, lint-crlf, lint-newsfile, lint-pydantic, check-sampleconfig, check-schema-delta]
needs:
- lint
- lint-crlf
- lint-newsfile
- lint-pydantic
- check-sampleconfig
- check-schema-delta
- lint-clippy
- lint-rustfmt
runs-on: ubuntu-latest
steps:
- run: "true"
@ -384,6 +445,25 @@ jobs:
shell: bash
name: Run Complement Tests
cargo-test:
if: ${{ needs.changes.outputs.rust == 'true' }}
runs-on: ubuntu-latest
needs:
- linting-done
- changes
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
override: true
- uses: Swatinem/rust-cache@v2
- run: cargo test
# a job which marks all the other jobs as complete, thus allowing PRs to be merged.
tests-done:
if: ${{ always() }}
@ -398,6 +478,7 @@ jobs:
- export-data
- portdb
- complement
- cargo-test
runs-on: ubuntu-latest
steps:
- uses: matrix-org/done-action@v2

1
changelog.d/13717.misc Normal file
View file

@ -0,0 +1 @@
Add experimental configuration option to allow disabling legacy Prometheus metric names.

1
changelog.d/13718.misc Normal file
View file

@ -0,0 +1 @@
Add experimental configuration option to allow disabling legacy Prometheus metric names.

View file

@ -0,0 +1 @@
Document the timestamp when a user accepts the consent, if [consent tracking](https://matrix-org.github.io/synapse/latest/consent_tracking.html) is used.

1
changelog.d/13743.misc Normal file
View file

@ -0,0 +1 @@
Add a stub Rust crate.

1
changelog.d/13746.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long standing bug where Synapse would fail to handle malformed user IDs or room aliases gracefully in certain cases.

1
changelog.d/13748.misc Normal file
View file

@ -0,0 +1 @@
Avoid raising an error due to malformed user IDs in `get_current_hosts_in_room`. Malformed user IDs cannot currently join a room, so this error would not be hit.

1
changelog.d/13750.misc Normal file
View file

@ -0,0 +1 @@
Update the docstrings for `get_users_in_room` and `get_current_hosts_in_room` to explain the impact of partial state.

1
changelog.d/13752.misc Normal file
View file

@ -0,0 +1 @@
User an additional database query when persisting receipts.

1
changelog.d/13754.misc Normal file
View file

@ -0,0 +1 @@
Re-type hint some collections as read-only.

1
changelog.d/13756.misc Normal file
View file

@ -0,0 +1 @@
Remove unused Prometheus recording rules from `synapse-v2.rules` and add comments describing where the rest are used.

View file

@ -0,0 +1 @@
Synapse will now refuse to start if configured to use SQLite < 3.27.

1
changelog.d/13763.misc Normal file
View file

@ -0,0 +1 @@
Add a stub Rust crate.

View file

@ -1,21 +0,0 @@
synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0
synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0
synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job)
synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0
synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0
synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job)
synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0
synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0

View file

@ -1,55 +1,35 @@
groups:
- name: synapse
rules:
- record: "synapse_federation_transaction_queue_pendingEdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- record: "synapse_federation_transaction_queue_pendingPdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- record: 'synapse_http_server_request_count:method'
labels:
servlet: ""
expr: "sum(synapse_http_server_request_count) by (method)"
- record: 'synapse_http_server_request_count:servlet'
labels:
method: ""
expr: 'sum(synapse_http_server_request_count) by (servlet)'
- record: 'synapse_http_server_request_count:total'
labels:
servlet: ""
expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
- record: 'synapse_cache:hit_ratio_5m'
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'
- record: 'synapse_cache:hit_ratio_30s'
expr: 'rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])'
# These 3 rules are used in the included Prometheus console
- record: 'synapse_federation_client_sent'
labels:
type: "EDU"
expr: 'synapse_federation_client_sent_edus + 0'
expr: 'synapse_federation_client_sent_edus_total + 0'
- record: 'synapse_federation_client_sent'
labels:
type: "PDU"
expr: 'synapse_federation_client_sent_pdu_destinations:count + 0'
expr: 'synapse_federation_client_sent_pdu_destinations_count_total + 0'
- record: 'synapse_federation_client_sent'
labels:
type: "Query"
expr: 'sum(synapse_federation_client_sent_queries) by (job)'
# These 3 rules are used in the included Prometheus console
- record: 'synapse_federation_server_received'
labels:
type: "EDU"
expr: 'synapse_federation_server_received_edus + 0'
expr: 'synapse_federation_server_received_edus_total + 0'
- record: 'synapse_federation_server_received'
labels:
type: "PDU"
expr: 'synapse_federation_server_received_pdus + 0'
expr: 'synapse_federation_server_received_pdus_total + 0'
- record: 'synapse_federation_server_received'
labels:
type: "Query"
expr: 'sum(synapse_federation_server_received_queries) by (job)'
# These 2 rules are used in the included Prometheus console
- record: 'synapse_federation_transaction_queue_pending'
labels:
type: "EDU"
@ -59,20 +39,25 @@ groups:
type: "PDU"
expr: 'synapse_federation_transaction_queue_pending_pdus + 0'
# These 3 rules are used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_source_type
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_type="remote"})
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_type="remote"})
labels:
type: remote
- record: synapse_storage_events_persisted_by_source_type
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_entity="*client*",origin_type="local"})
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_entity="*client*",origin_type="local"})
labels:
type: local
- record: synapse_storage_events_persisted_by_source_type
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_entity!="*client*",origin_type="local"})
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_entity!="*client*",origin_type="local"})
labels:
type: bridges
- record: synapse_storage_events_persisted_by_event_type
expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep)
- record: synapse_storage_events_persisted_by_origin
expr: sum without(type) (synapse_storage_events_persisted_events_sep)
# This rule is used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_event_type
expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep_total)
# This rule is used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_origin
expr: sum without(type) (synapse_storage_events_persisted_events_sep_total)

View file

@ -42,6 +42,7 @@ It returns a JSON body like the following:
"appservice_id": null,
"consent_server_notice_sent": null,
"consent_version": null,
"consent_ts": null,
"external_ids": [
{
"auth_provider": "<provider1>",
@ -364,6 +365,7 @@ The following actions are **NOT** performed. The list may be incomplete.
- Remove the user's creation (registration) timestamp
- [Remove rate limit overrides](#override-ratelimiting-for-users)
- Remove from monthly active users
- Remove user's consent information (consent version and timestamp)
## Reset password

View file

@ -11,5 +11,6 @@ fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
#[pymodule]
fn synapse_rust(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
Ok(())
}

View file

@ -32,15 +32,15 @@ logger = logging.getLogger("synapse.app.homeserver")
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
current_mau_gauge = Gauge("synapse_admin_mau_current", "Current MAU")
current_mau_by_service_gauge = Gauge(
"synapse_admin_mau_current_mau_by_service",
"Current MAU by service",
["app_service"],
)
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
"synapse_admin_mau:registered_reserved_users",
"synapse_admin_mau_registered_reserved_users",
"Registered users with reserved threepids",
)

View file

@ -62,12 +62,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count",
"synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total",
"synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
)

View file

@ -70,6 +70,7 @@ class AdminHandler:
"appservice_id",
"consent_server_notice_sent",
"consent_version",
"consent_ts",
"user_type",
"is_guest",
}

View file

@ -15,6 +15,7 @@ import itertools
import logging
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Collection,
Dict,
@ -1413,10 +1414,10 @@ class SyncHandler:
async def _generate_sync_entry_for_device_list(
self,
sync_result_builder: "SyncResultBuilder",
newly_joined_rooms: Set[str],
newly_joined_or_invited_or_knocked_users: Set[str],
newly_left_rooms: Set[str],
newly_left_users: Set[str],
newly_joined_rooms: AbstractSet[str],
newly_joined_or_invited_or_knocked_users: AbstractSet[str],
newly_left_rooms: AbstractSet[str],
newly_left_users: AbstractSet[str],
) -> DeviceListUpdates:
"""Generate the DeviceListUpdates section of sync
@ -1434,8 +1435,7 @@ class SyncHandler:
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
# We're going to mutate these fields, so lets copy them rather than
# assume they won't get used later.
# Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
newly_joined_or_invited_or_knocked_users
)
@ -1635,8 +1635,8 @@ class SyncHandler:
async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
newly_joined_rooms: Set[str],
newly_joined_or_invited_users: Set[str],
newly_joined_rooms: AbstractSet[str],
newly_joined_or_invited_users: AbstractSet[str],
) -> None:
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
@ -1694,7 +1694,7 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@ -2534,7 +2534,7 @@ class SyncResultBuilder:
archived: List[ArchivedSyncResult] = attr.Factory(list)
to_device: List[JsonDict] = attr.Factory(list)
def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
def calculate_user_changes(self) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Work out which other users have joined or left rooms we are joined to.
This data only is only useful for an incremental sync.

View file

@ -34,8 +34,6 @@ from prometheus_client.core import Sample
from twisted.web.resource import Resource
from twisted.web.server import Request
from synapse.util import caches
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
@ -93,6 +91,11 @@ LEGACY_METRIC_NAMES = {
"synapse_util_caches_response_cache_hits": "synapse_util_caches_response_cache:hits",
"synapse_util_caches_response_cache_evicted_size": "synapse_util_caches_response_cache:evicted_size",
"synapse_util_caches_response_cache": "synapse_util_caches_response_cache:total",
"synapse_federation_client_sent_pdu_destinations": "synapse_federation_client_sent_pdu_destinations:total",
"synapse_federation_client_sent_pdu_destinations_count": "synapse_federation_client_sent_pdu_destinations:count",
"synapse_admin_mau_current": "synapse_admin_mau:current",
"synapse_admin_mau_max": "synapse_admin_mau:max",
"synapse_admin_mau_registered_reserved_users": "synapse_admin_mau:registered_reserved_users",
}
@ -102,11 +105,6 @@ def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> byt
by prometheus-client.
"""
# Trigger the cache metrics to be rescraped, which updates the common
# metrics but do not produce metrics themselves
for collector in caches.collectors_by_name.values():
collector.collect()
output = []
for metric in registry.collect():

View file

@ -341,7 +341,17 @@ class HomeServer(metaclass=abc.ABCMeta):
return domain_specific_string.domain == self.hostname
def is_mine_id(self, string: str) -> bool:
return string.split(":", 1)[1] == self.hostname
"""Determines whether a user ID or room alias originates from this homeserver.
Returns:
`True` if the hostname part of the user ID or room alias matches this
homeserver.
`False` otherwise, or if the user ID or room alias is malformed.
"""
localpart_hostname = string.split(":", 1)
if len(localpart_hostname) < 2:
return False
return localpart_hostname[1] == self.hostname
@cache_in_self
def get_clock(self) -> Clock:

View file

@ -533,15 +533,14 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
if self.engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
)
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
)
def name(self) -> str:
"Return the name of this database"
@ -1160,11 +1159,8 @@ class DatabasePool:
attempts = 0
while True:
try:
# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
)
# We can autocommit if it is safe to upsert
autocommit = table not in self._unsafe_to_upsert_tables
return await self.runInteraction(
desc,
@ -1199,7 +1195,7 @@ class DatabasePool:
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
Args:
txn: The transaction to use.
@ -1207,14 +1203,15 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
@ -1365,14 +1362,12 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""
# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
)
# We can autocommit if it safe to upsert
autocommit = table not in self._unsafe_to_upsert_tables
await self.runInteraction(
desc,
@ -1406,10 +1401,10 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)

View file

@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
now = self._clock.time_msec()
token = random_string(6)
if self.db_pool.engine.can_native_upsert:
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)
# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
)
if not did_lock:
return None
else:
# If we're on an old SQLite we emulate the above logic by first
# clearing out any existing stale locks and then upserting.
def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
sql = """
DELETE FROM worker_locks
WHERE
lock_name = ?
AND lock_key = ?
AND (last_renewed_ts < ? OR instance_name = ?)
"""
txn.execute(
sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
)
inserted = self.db_pool.simple_upsert_txn_emulated(
txn,
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
},
values={},
insertion_values={
"token": token,
"last_renewed_ts": self._clock.time_msec(),
"instance_name": self._instance_name,
},
)
return inserted
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)
if not did_lock:
return None
# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
)
if not did_lock:
return None
lock = Lock(
self._reactor,

View file

@ -812,7 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
self.db_pool.simple_delete_txn(
self.db_pool.simple_upsert_txn(
txn,
table="receipts_graph",
keyvalues={
@ -820,17 +820,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
"receipt_type": receipt_type,
"user_id": user_id,
},
)
self.db_pool.simple_insert_txn(
txn,
table="receipts_graph",
values={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
},
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
)

View file

@ -175,6 +175,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"is_guest",
"admin",
"consent_version",
"consent_ts",
"consent_server_notice_sent",
"appservice_id",
"creation_ts",
@ -2227,7 +2228,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"consent_version": consent_version},
updatevalues={
"consent_version": consent_version,
"consent_ts": self._clock.time_msec(),
},
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))

View file

@ -192,8 +192,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
(aka. with the lowest depth). This is done to match the sort in
`get_current_hosts_in_room()` and so we can re-use the cache but it's
not horrible to have here either.
"""
Uses `m.room.member`s in the room state at the current forward extremities to
determine which users are in the room.
Will return inaccurate results for rooms with partial state, since the state for
the forward extremities of those rooms will exclude most members. We may also
calculate room state incorrectly for such rooms and believe that a member is or
is not in the room when the opposite is true.
"""
return await self.db_pool.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)
@ -1022,6 +1029,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
longest is good because they're most likely to have anything we ask
about.
Uses `m.room.member`s in the room state at the current forward extremities to
determine which hosts are in the room.
Will return inaccurate results for rooms with partial state, since the state for
the forward extremities of those rooms will exclude most members. We may also
calculate room state incorrectly for such rooms and believe that a host is or
is not in the room when the opposite is true.
Returns:
Returns a list of servers sorted by longest in the room first. (aka.
sorted by join with the lowest depth first).
@ -1044,6 +1059,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# We use a `Set` just for fast lookups
domain_set: Set[str] = set()
for u in users:
if ":" not in u:
continue
domain = get_domain_from_id(u)
if domain not in domain_set:
domain_set.add(domain)
@ -1077,7 +1094,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
ORDER BY min(e.depth) ASC;
"""
txn.execute(sql, (room_id,))
return [d for d, in txn]
# `server_domain` will be `NULL` for malformed MXIDs with no colons.
return [d for d, in txn if d is not None]
return await self.db_pool.runInteraction(
"get_current_hosts_in_room", get_current_hosts_in_room_txn

View file

@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
insert_cols = []
qargs = []
insert_cols = []
qargs = []
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self.db_pool.simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self.db_pool.simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
if current_row[key] is None:
current_row[key] = val
else:
current_row[key] += val
current_row.update(absolutes)
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
txn.execute(sql, qargs)
async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.

View file

@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval: how long until next retry in ms
"""
if self.database_engine.can_native_upsert:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as its a single upsert
)
else:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_emulated,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as it's a single upsert
)
def _set_destination_retry_timings_native(
self,
@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_last_ts: int,
retry_interval: int,
) -> None:
assert self.database_engine.can_native_upsert
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#

View file

@ -43,14 +43,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
def single_threaded(self) -> bool:
...
@property
@abc.abstractmethod
def can_native_upsert(self) -> bool:
"""
Do we support native UPSERTs?
"""
...
@property
@abc.abstractmethod
def supports_using_any_list(self) -> bool:

View file

@ -158,13 +158,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
cursor.close()
db_conn.commit()
@property
def can_native_upsert(self) -> bool:
"""
Can we use native UPSERTs?
"""
return True
@property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""

View file

@ -48,14 +48,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
def single_threaded(self) -> bool:
return True
@property
def can_native_upsert(self) -> bool:
"""
Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
more work we haven't done yet to tell what was inserted vs updated.
"""
return sqlite3.sqlite_version_info >= (3, 24, 0)
@property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
) -> None:
if not allow_outdated_version:
version = sqlite3.sqlite_version_info
# Synapse is untested against older SQLite versions, and we don't want
# to let users upgrade to a version of Synapse with broken support for their
# sqlite version, because it risks leaving them with a half-upgraded db.
if version < (3, 22, 0):
raise RuntimeError("Synapse requires sqlite 3.22 or above.")
if sqlite3.sqlite_version_info < (3, 27, 0):
raise RuntimeError("Synapse requires sqlite 3.27 or above.")
def check_new_database(self, txn: Cursor) -> None:
"""Gets called when setting up a brand new database. This allows us to

View file

@ -0,0 +1,16 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE users ADD consent_ts bigint;

View file

@ -20,9 +20,11 @@ from sys import intern
from typing import Any, Callable, Dict, List, Optional, Sized, TypeVar
import attr
from prometheus_client import REGISTRY
from prometheus_client.core import Gauge
from synapse.config.cache import add_resizable_cache
from synapse.util.metrics import DynamicCollectorRegistry
logger = logging.getLogger(__name__)
@ -30,27 +32,62 @@ logger = logging.getLogger(__name__)
# Whether to track estimated memory usage of the LruCaches.
TRACK_MEMORY_USAGE = False
# We track cache metrics in a special registry that lets us update the metrics
# just before they are returned from the scrape endpoint.
CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
caches_by_name: Dict[str, Sized] = {}
collectors_by_name: Dict[str, "CacheMetric"] = {}
cache_size = Gauge("synapse_util_caches_cache_size", "", ["name"])
cache_hits = Gauge("synapse_util_caches_cache_hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache_evicted_size", "", ["name", "reason"])
cache_total = Gauge("synapse_util_caches_cache", "", ["name"])
cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
cache_size = Gauge(
"synapse_util_caches_cache_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_hits = Gauge(
"synapse_util_caches_cache_hits", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_evicted = Gauge(
"synapse_util_caches_cache_evicted_size",
"",
["name", "reason"],
registry=CACHE_METRIC_REGISTRY,
)
cache_total = Gauge(
"synapse_util_caches_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_max_size = Gauge(
"synapse_util_caches_cache_max_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_memory_usage = Gauge(
"synapse_util_caches_cache_size_bytes",
"Estimated memory usage of the caches",
["name"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_size = Gauge("synapse_util_caches_response_cache_size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache_hits", "", ["name"])
response_cache_evicted = Gauge(
"synapse_util_caches_response_cache_evicted_size", "", ["name", "reason"]
response_cache_size = Gauge(
"synapse_util_caches_response_cache_size",
"",
["name"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_total = Gauge("synapse_util_caches_response_cache", "", ["name"])
response_cache_hits = Gauge(
"synapse_util_caches_response_cache_hits",
"",
["name"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_evicted = Gauge(
"synapse_util_caches_response_cache_evicted_size",
"",
["name", "reason"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_total = Gauge(
"synapse_util_caches_response_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
# Register our custom cache metrics registry with the global registry
REGISTRY.register(CACHE_METRIC_REGISTRY)
class EvictionReason(Enum):
@ -168,9 +205,8 @@ def register_cache(
add_resizable_cache(cache_name, resize_callback)
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
metric_name = "cache_%s_%s" % (cache_type, cache_name)
caches_by_name[cache_name] = cache
collectors_by_name[metric_name] = metric
CACHE_METRIC_REGISTRY.register_hook(metric.collect)
return metric

View file

@ -15,9 +15,9 @@
import logging
from functools import wraps
from types import TracebackType
from typing import Awaitable, Callable, Optional, Type, TypeVar
from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
from prometheus_client import Counter
from prometheus_client import CollectorRegistry, Counter, Metric
from typing_extensions import Concatenate, ParamSpec, Protocol
from synapse.logging.context import (
@ -208,3 +208,33 @@ class Measure:
metrics.real_time_sum += duration
# TODO: Add other in flight metrics.
class DynamicCollectorRegistry(CollectorRegistry):
"""
Custom Prometheus Collector registry that calls a hook first, allowing you
to update metrics on-demand.
Don't forget to register this registry with the main registry!
"""
def __init__(self) -> None:
super().__init__()
self._pre_update_hooks: List[Callable[[], None]] = []
def collect(self) -> Generator[Metric, None, None]:
"""
Collects metrics, calling pre-update hooks first.
"""
for pre_update_hook in self._pre_update_hooks:
pre_update_hook()
yield from super().collect()
def register_hook(self, hook: Callable[[], None]) -> None:
"""
Registers a hook that is called before metric collection.
"""
self._pre_update_hooks.append(hook)

View file

@ -2580,6 +2580,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertIn("appservice_id", content)
self.assertIn("consent_server_notice_sent", content)
self.assertIn("consent_version", content)
self.assertIn("consent_ts", content)
self.assertIn("external_ids", content)
# This key was removed intentionally. Ensure it is not accidentally re-included.

View file

@ -54,7 +54,6 @@ class SQLBaseStoreTestCase(unittest.TestCase):
sqlite_config = {"name": "sqlite3"}
engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine)
fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False
db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)

View file

@ -11,15 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import UserTypes
from synapse.api.errors import ThreepidValidationError
from synapse.server import HomeServer
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
class RegistrationStoreTestCase(HomeserverTestCase):
def prepare(self, reactor, clock, hs):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.user_id = "@my-user:test"
@ -27,7 +30,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
self.pwhash = "{xx1}123456789"
self.device_id = "akgjhdjklgshg"
def test_register(self):
def test_register(self) -> None:
self.get_success(self.store.register_user(self.user_id, self.pwhash))
self.assertEqual(
@ -38,6 +41,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
"admin": 0,
"is_guest": 0,
"consent_version": None,
"consent_ts": None,
"consent_server_notice_sent": None,
"appservice_id": None,
"creation_ts": 0,
@ -48,7 +52,20 @@ class RegistrationStoreTestCase(HomeserverTestCase):
(self.get_success(self.store.get_user_by_id(self.user_id))),
)
def test_add_tokens(self):
def test_consent(self) -> None:
self.get_success(self.store.register_user(self.user_id, self.pwhash))
before_consent = self.clock.time_msec()
self.reactor.advance(5)
self.get_success(self.store.user_set_consent_version(self.user_id, "1"))
self.reactor.advance(5)
user = self.get_success(self.store.get_user_by_id(self.user_id))
assert user
self.assertEqual(user["consent_version"], "1")
self.assertGreater(user["consent_ts"], before_consent)
self.assertLess(user["consent_ts"], self.clock.time_msec())
def test_add_tokens(self) -> None:
self.get_success(self.store.register_user(self.user_id, self.pwhash))
self.get_success(
self.store.add_access_token_to_user(
@ -58,11 +75,12 @@ class RegistrationStoreTestCase(HomeserverTestCase):
result = self.get_success(self.store.get_user_by_access_token(self.tokens[1]))
assert result
self.assertEqual(result.user_id, self.user_id)
self.assertEqual(result.device_id, self.device_id)
self.assertIsNotNone(result.token_id)
def test_user_delete_access_tokens(self):
def test_user_delete_access_tokens(self) -> None:
# add some tokens
self.get_success(self.store.register_user(self.user_id, self.pwhash))
self.get_success(
@ -87,6 +105,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
# check the one not associated with the device was not deleted
user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
assert user
self.assertEqual(self.user_id, user.user_id)
# now delete the rest
@ -95,11 +114,11 @@ class RegistrationStoreTestCase(HomeserverTestCase):
user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
self.assertIsNone(user, "access token was not deleted without device_id")
def test_is_support_user(self):
def test_is_support_user(self) -> None:
TEST_USER = "@test:test"
SUPPORT_USER = "@support:test"
res = self.get_success(self.store.is_support_user(None))
res = self.get_success(self.store.is_support_user(None)) # type: ignore[arg-type]
self.assertFalse(res)
self.get_success(
self.store.register_user(user_id=TEST_USER, password_hash=None)
@ -115,7 +134,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
res = self.get_success(self.store.is_support_user(SUPPORT_USER))
self.assertTrue(res)
def test_3pid_inhibit_invalid_validation_session_error(self):
def test_3pid_inhibit_invalid_validation_session_error(self) -> None:
"""Tests that enabling the configuration option to inhibit 3PID errors on
/requestToken also inhibits validation errors caused by an unknown session ID.
"""

View file

@ -13,11 +13,35 @@
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.types import RoomAlias, UserID, map_username_to_mxid_localpart
from synapse.types import (
RoomAlias,
UserID,
get_domain_from_id,
get_localpart_from_id,
map_username_to_mxid_localpart,
)
from tests import unittest
class IsMineIDTests(unittest.HomeserverTestCase):
def test_is_mine_id(self) -> None:
self.assertTrue(self.hs.is_mine_id("@user:test"))
self.assertTrue(self.hs.is_mine_id("#room:test"))
self.assertTrue(self.hs.is_mine_id("invalid:test"))
self.assertFalse(self.hs.is_mine_id("@user:test\0"))
self.assertFalse(self.hs.is_mine_id("@user"))
def test_two_colons(self) -> None:
"""Test handling of IDs containing more than one colon."""
# The domain starts after the first colon.
# These functions must interpret things consistently.
self.assertFalse(self.hs.is_mine_id("@user:test:test"))
self.assertEqual("user", get_localpart_from_id("@user:test:test"))
self.assertEqual("test:test", get_domain_from_id("@user:test:test"))
class UserIDTestCase(unittest.HomeserverTestCase):
def test_parse(self):
user = UserID.from_string("@1234abcd:test")