diff --git a/.github/workflows/docs-pr-netlify.yaml b/.github/workflows/docs-pr-netlify.yaml index 277083ded3..a724816392 100644 --- a/.github/workflows/docs-pr-netlify.yaml +++ b/.github/workflows/docs-pr-netlify.yaml @@ -14,7 +14,7 @@ jobs: # There's a 'download artifact' action, but it hasn't been updated for the workflow_run action # (https://github.com/actions/download-artifact/issues/60) so instead we get this mess: - name: 📥 Download artifact - uses: dawidd6/action-download-artifact@09f2f74827fd3a8607589e5ad7f9398816f540fe # v3.1.4 + uses: dawidd6/action-download-artifact@deb3bb83256a78589fef6a7b942e5f2573ad7c13 # v5 with: workflow: docs-pr.yaml run_id: ${{ github.event.workflow_run.id }} diff --git a/CHANGES.md b/CHANGES.md index 092dbdbf2d..61c6170c62 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,13 @@ +# Synapse 1.109.0rc2 (2024-06-11) + +### Bugfixes + +- Fix bug where one-time-keys were not always included in `/sync` response when using workers. Introduced in v1.109.0rc1. ([\#17275](https://github.com/element-hq/synapse/issues/17275)) +- Fix bug where `/sync` could get stuck due to edge case in device lists handling. Introduced in v1.109.0rc1. ([\#17292](https://github.com/element-hq/synapse/issues/17292)) + + + + # Synapse 1.109.0rc1 (2024-06-04) ### Features diff --git a/Cargo.lock b/Cargo.lock index e3e63fc205..7472e16291 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,9 +444,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.4" +version = "1.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" dependencies = [ "aho-corasick", "memchr", diff --git a/changelog.d/17172.feature b/changelog.d/17172.feature new file mode 100644 index 0000000000..245dea815c --- /dev/null +++ b/changelog.d/17172.feature @@ -0,0 +1,2 @@ +Support [MSC3916](https://github.com/matrix-org/matrix-spec-proposals/blob/rav/authentication-for-media/proposals/3916-authentication-for-media.md) +by adding a federation /download endpoint (#17172). \ No newline at end of file diff --git a/changelog.d/17266.misc b/changelog.d/17266.misc new file mode 100644 index 0000000000..ce8c4ab086 --- /dev/null +++ b/changelog.d/17266.misc @@ -0,0 +1 @@ +Add debug logging for when room keys are uploaded, including whether they are replacing other room keys. \ No newline at end of file diff --git a/changelog.d/17270.feature b/changelog.d/17270.feature new file mode 100644 index 0000000000..4ea5e7be85 --- /dev/null +++ b/changelog.d/17270.feature @@ -0,0 +1 @@ +Add support for the unstable [MSC4151](https://github.com/matrix-org/matrix-spec-proposals/pull/4151) report room API. diff --git a/changelog.d/17272.bugfix b/changelog.d/17272.bugfix new file mode 100644 index 0000000000..83e7ca426a --- /dev/null +++ b/changelog.d/17272.bugfix @@ -0,0 +1 @@ +Fix wrong retention policy being used when filtering events. diff --git a/changelog.d/17279.misc b/changelog.d/17279.misc new file mode 100644 index 0000000000..2090b11d7f --- /dev/null +++ b/changelog.d/17279.misc @@ -0,0 +1 @@ +Re-organize Pydantic models and types used in handlers. diff --git a/changelog.d/17295.bugfix b/changelog.d/17295.bugfix new file mode 100644 index 0000000000..4484253bb8 --- /dev/null +++ b/changelog.d/17295.bugfix @@ -0,0 +1 @@ +Fix edge case in `/sync` returning the wrong the state when using sharded event persisters. diff --git a/changelog.d/17296.feature b/changelog.d/17296.feature new file mode 100644 index 0000000000..4ea5e7be85 --- /dev/null +++ b/changelog.d/17296.feature @@ -0,0 +1 @@ +Add support for the unstable [MSC4151](https://github.com/matrix-org/matrix-spec-proposals/pull/4151) report room API. diff --git a/changelog.d/17297.misc b/changelog.d/17297.misc new file mode 100644 index 0000000000..7ec351d2c1 --- /dev/null +++ b/changelog.d/17297.misc @@ -0,0 +1 @@ +Bump `mypy` from 1.8.0 to 1.9.0. \ No newline at end of file diff --git a/changelog.d/17300.misc b/changelog.d/17300.misc new file mode 100644 index 0000000000..cdc40bb2e5 --- /dev/null +++ b/changelog.d/17300.misc @@ -0,0 +1 @@ +Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`. diff --git a/changelog.d/17301.bugfix b/changelog.d/17301.bugfix new file mode 100644 index 0000000000..50383cb4a4 --- /dev/null +++ b/changelog.d/17301.bugfix @@ -0,0 +1 @@ +Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/debian/changelog b/debian/changelog index 927248bdab..ac2536749d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.109.0~rc2) stable; urgency=medium + + * New synapse release 1.109.0rc2. + + -- Synapse Packaging team Tue, 11 Jun 2024 13:20:17 +0000 + matrix-synapse-py3 (1.109.0~rc1) stable; urgency=medium * New Synapse release 1.109.0rc1. diff --git a/poetry.lock b/poetry.lock index 942e26701d..7b169ceb6e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -912,13 +912,13 @@ trio = ["async_generator", "trio"] [[package]] name = "jinja2" -version = "3.1.3" +version = "3.1.4" description = "A very fast and expressive template engine." optional = false python-versions = ">=3.7" files = [ - {file = "Jinja2-3.1.3-py3-none-any.whl", hash = "sha256:7d6d50dd97d52cbc355597bd845fabfbac3f551e1f99619e39a35ce8c370b5fa"}, - {file = "Jinja2-3.1.3.tar.gz", hash = "sha256:ac8bd6544d4bb2c9792bf3a159e80bba8fda7f07e81bc3aed565432d5925ba90"}, + {file = "jinja2-3.1.4-py3-none-any.whl", hash = "sha256:bc5dd2abb727a5319567b7a813e6a2e7318c39f4f487cfe6c89c6f9c7d25197d"}, + {file = "jinja2-3.1.4.tar.gz", hash = "sha256:4a3aee7acbbe7303aede8e9648d13b8bf88a429282aa6122a993f0ac800cb369"}, ] [package.dependencies] @@ -1384,38 +1384,38 @@ files = [ [[package]] name = "mypy" -version = "1.8.0" +version = "1.9.0" description = "Optional static typing for Python" optional = false python-versions = ">=3.8" files = [ - {file = "mypy-1.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:485a8942f671120f76afffff70f259e1cd0f0cfe08f81c05d8816d958d4577d3"}, - {file = "mypy-1.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:df9824ac11deaf007443e7ed2a4a26bebff98d2bc43c6da21b2b64185da011c4"}, - {file = "mypy-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2afecd6354bbfb6e0160f4e4ad9ba6e4e003b767dd80d85516e71f2e955ab50d"}, - {file = "mypy-1.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8963b83d53ee733a6e4196954502b33567ad07dfd74851f32be18eb932fb1cb9"}, - {file = "mypy-1.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:e46f44b54ebddbeedbd3d5b289a893219065ef805d95094d16a0af6630f5d410"}, - {file = "mypy-1.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:855fe27b80375e5c5878492f0729540db47b186509c98dae341254c8f45f42ae"}, - {file = "mypy-1.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4c886c6cce2d070bd7df4ec4a05a13ee20c0aa60cb587e8d1265b6c03cf91da3"}, - {file = "mypy-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d19c413b3c07cbecf1f991e2221746b0d2a9410b59cb3f4fb9557f0365a1a817"}, - {file = "mypy-1.8.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9261ed810972061388918c83c3f5cd46079d875026ba97380f3e3978a72f503d"}, - {file = "mypy-1.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:51720c776d148bad2372ca21ca29256ed483aa9a4cdefefcef49006dff2a6835"}, - {file = "mypy-1.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:52825b01f5c4c1c4eb0db253ec09c7aa17e1a7304d247c48b6f3599ef40db8bd"}, - {file = "mypy-1.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f5ac9a4eeb1ec0f1ccdc6f326bcdb464de5f80eb07fb38b5ddd7b0de6bc61e55"}, - {file = "mypy-1.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afe3fe972c645b4632c563d3f3eff1cdca2fa058f730df2b93a35e3b0c538218"}, - {file = "mypy-1.8.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:42c6680d256ab35637ef88891c6bd02514ccb7e1122133ac96055ff458f93fc3"}, - {file = "mypy-1.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:720a5ca70e136b675af3af63db533c1c8c9181314d207568bbe79051f122669e"}, - {file = "mypy-1.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:028cf9f2cae89e202d7b6593cd98db6759379f17a319b5faf4f9978d7084cdc6"}, - {file = "mypy-1.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4e6d97288757e1ddba10dd9549ac27982e3e74a49d8d0179fc14d4365c7add66"}, - {file = "mypy-1.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f1478736fcebb90f97e40aff11a5f253af890c845ee0c850fe80aa060a267c6"}, - {file = "mypy-1.8.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:42419861b43e6962a649068a61f4a4839205a3ef525b858377a960b9e2de6e0d"}, - {file = "mypy-1.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:2b5b6c721bd4aabaadead3a5e6fa85c11c6c795e0c81a7215776ef8afc66de02"}, - {file = "mypy-1.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5c1538c38584029352878a0466f03a8ee7547d7bd9f641f57a0f3017a7c905b8"}, - {file = "mypy-1.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4ef4be7baf08a203170f29e89d79064463b7fc7a0908b9d0d5114e8009c3a259"}, - {file = "mypy-1.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7178def594014aa6c35a8ff411cf37d682f428b3b5617ca79029d8ae72f5402b"}, - {file = "mypy-1.8.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:ab3c84fa13c04aeeeabb2a7f67a25ef5d77ac9d6486ff33ded762ef353aa5592"}, - {file = "mypy-1.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:99b00bc72855812a60d253420d8a2eae839b0afa4938f09f4d2aa9bb4654263a"}, - {file = "mypy-1.8.0-py3-none-any.whl", hash = "sha256:538fd81bb5e430cc1381a443971c0475582ff9f434c16cd46d2c66763ce85d9d"}, - {file = "mypy-1.8.0.tar.gz", hash = "sha256:6ff8b244d7085a0b425b56d327b480c3b29cafbd2eff27316a004f9a7391ae07"}, + {file = "mypy-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f8a67616990062232ee4c3952f41c779afac41405806042a8126fe96e098419f"}, + {file = "mypy-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d357423fa57a489e8c47b7c85dfb96698caba13d66e086b412298a1a0ea3b0ed"}, + {file = "mypy-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49c87c15aed320de9b438ae7b00c1ac91cd393c1b854c2ce538e2a72d55df150"}, + {file = "mypy-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:48533cdd345c3c2e5ef48ba3b0d3880b257b423e7995dada04248725c6f77374"}, + {file = "mypy-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:4d3dbd346cfec7cb98e6cbb6e0f3c23618af826316188d587d1c1bc34f0ede03"}, + {file = "mypy-1.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:653265f9a2784db65bfca694d1edd23093ce49740b2244cde583aeb134c008f3"}, + {file = "mypy-1.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a3c007ff3ee90f69cf0a15cbcdf0995749569b86b6d2f327af01fd1b8aee9dc"}, + {file = "mypy-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2418488264eb41f69cc64a69a745fad4a8f86649af4b1041a4c64ee61fc61129"}, + {file = "mypy-1.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:68edad3dc7d70f2f17ae4c6c1b9471a56138ca22722487eebacfd1eb5321d612"}, + {file = "mypy-1.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:85ca5fcc24f0b4aeedc1d02f93707bccc04733f21d41c88334c5482219b1ccb3"}, + {file = "mypy-1.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aceb1db093b04db5cd390821464504111b8ec3e351eb85afd1433490163d60cd"}, + {file = "mypy-1.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0235391f1c6f6ce487b23b9dbd1327b4ec33bb93934aa986efe8a9563d9349e6"}, + {file = "mypy-1.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d4d5ddc13421ba3e2e082a6c2d74c2ddb3979c39b582dacd53dd5d9431237185"}, + {file = "mypy-1.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:190da1ee69b427d7efa8aa0d5e5ccd67a4fb04038c380237a0d96829cb157913"}, + {file = "mypy-1.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe28657de3bfec596bbeef01cb219833ad9d38dd5393fc649f4b366840baefe6"}, + {file = "mypy-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e54396d70be04b34f31d2edf3362c1edd023246c82f1730bbf8768c28db5361b"}, + {file = "mypy-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5e6061f44f2313b94f920e91b204ec600982961e07a17e0f6cd83371cb23f5c2"}, + {file = "mypy-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81a10926e5473c5fc3da8abb04119a1f5811a236dc3a38d92015cb1e6ba4cb9e"}, + {file = "mypy-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b685154e22e4e9199fc95f298661deea28aaede5ae16ccc8cbb1045e716b3e04"}, + {file = "mypy-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:5d741d3fc7c4da608764073089e5f58ef6352bedc223ff58f2f038c2c4698a89"}, + {file = "mypy-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:587ce887f75dd9700252a3abbc9c97bbe165a4a630597845c61279cf32dfbf02"}, + {file = "mypy-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f88566144752999351725ac623471661c9d1cd8caa0134ff98cceeea181789f4"}, + {file = "mypy-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:61758fabd58ce4b0720ae1e2fea5cfd4431591d6d590b197775329264f86311d"}, + {file = "mypy-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e49499be624dead83927e70c756970a0bc8240e9f769389cdf5714b0784ca6bf"}, + {file = "mypy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:571741dc4194b4f82d344b15e8837e8c5fcc462d66d076748142327626a1b6e9"}, + {file = "mypy-1.9.0-py3-none-any.whl", hash = "sha256:a260627a570559181a9ea5de61ac6297aa5af202f06fd7ab093ce74e7181e43e"}, + {file = "mypy-1.9.0.tar.gz", hash = "sha256:3cc5da0127e6a478cddd906068496a97a7618a21ce9b54bde5bf7e539c7af974"}, ] [package.dependencies] @@ -2808,13 +2808,13 @@ files = [ [[package]] name = "types-jsonschema" -version = "4.21.0.20240311" +version = "4.22.0.20240610" description = "Typing stubs for jsonschema" optional = false python-versions = ">=3.8" files = [ - {file = "types-jsonschema-4.21.0.20240311.tar.gz", hash = "sha256:f7165ce70abd91df490c73b089873afd2899c5e56430ee495b64f851ad01f287"}, - {file = "types_jsonschema-4.21.0.20240311-py3-none-any.whl", hash = "sha256:e872f5661513824edf9698f73a66c9c114713d93eab58699bd0532e7e6db5750"}, + {file = "types-jsonschema-4.22.0.20240610.tar.gz", hash = "sha256:f82ab9fe756e3a2642ea9712c46b403ce61eb380b939b696cff3252af42f65b0"}, + {file = "types_jsonschema-4.22.0.20240610-py3-none-any.whl", hash = "sha256:89996b9bd1928f820a0e252b2844be21cd2e55d062b6fa1048d88453006ad89e"}, ] [package.dependencies] @@ -2844,13 +2844,13 @@ files = [ [[package]] name = "types-pillow" -version = "10.2.0.20240423" +version = "10.2.0.20240520" description = "Typing stubs for Pillow" optional = false python-versions = ">=3.8" files = [ - {file = "types-Pillow-10.2.0.20240423.tar.gz", hash = "sha256:696e68b9b6a58548fc307a8669830469237c5b11809ddf978ac77fafa79251cd"}, - {file = "types_Pillow-10.2.0.20240423-py3-none-any.whl", hash = "sha256:bd12923093b96c91d523efcdb66967a307f1a843bcfaf2d5a529146c10a9ced3"}, + {file = "types-Pillow-10.2.0.20240520.tar.gz", hash = "sha256:130b979195465fa1e1676d8e81c9c7c30319e8e95b12fae945e8f0d525213107"}, + {file = "types_Pillow-10.2.0.20240520-py3-none-any.whl", hash = "sha256:33c36494b380e2a269bb742181bea5d9b00820367822dbd3760f07210a1da23d"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index 8cc99b8cba..f4f7f70603 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust" [tool.poetry] name = "matrix-synapse" -version = "1.109.0rc1" +version = "1.109.0rc2" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors "] license = "AGPL-3.0-or-later" diff --git a/rust/src/events/internal_metadata.rs b/rust/src/events/internal_metadata.rs index 63774fbd54..ad87825f16 100644 --- a/rust/src/events/internal_metadata.rs +++ b/rust/src/events/internal_metadata.rs @@ -204,6 +204,8 @@ pub struct EventInternalMetadata { /// The stream ordering of this event. None, until it has been persisted. #[pyo3(get, set)] stream_ordering: Option, + #[pyo3(get, set)] + instance_name: Option, /// whether this event is an outlier (ie, whether we have the state at that /// point in the DAG) @@ -232,6 +234,7 @@ impl EventInternalMetadata { Ok(EventInternalMetadata { data, stream_ordering: None, + instance_name: None, outlier: false, }) } diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 75fe6d7b24..5fe5b951dd 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -443,3 +443,6 @@ class ExperimentalConfig(Config): self.msc3916_authenticated_media_enabled = experimental.get( "msc3916_authenticated_media_enabled", False ) + + # MSC4151: Report room API (Client-Server API) + self.msc4151_enabled: bool = experimental.get("msc4151_enabled", False) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 0772472312..b997d82d71 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase: pruned_event.internal_metadata.stream_ordering = ( event.internal_metadata.stream_ordering ) + pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name pruned_event.internal_metadata.outlier = event.internal_metadata.outlier # Mark the event as redacted @@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase: new_event.internal_metadata.stream_ordering = ( event.internal_metadata.stream_ordering ) + new_event.internal_metadata.instance_name = event.internal_metadata.instance_name new_event.internal_metadata.outlier = event.internal_metadata.outlier return new_event diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 62f0b67dbd..73b63b77f2 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -47,9 +47,9 @@ from synapse.events.utils import ( validate_canonicaljson, ) from synapse.http.servlet import validate_json_object -from synapse.rest.models import RequestBodyModel from synapse.storage.controllers.state import server_acl_evaluator_from_event from synapse.types import EventID, JsonDict, RoomID, StrCollection, UserID +from synapse.types.rest import RequestBodyModel class EventValidator: diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index bac569e977..266675c9b8 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -19,6 +19,7 @@ # [This file includes modifications made by New Vector Limited] # # +import inspect import logging from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type @@ -33,6 +34,7 @@ from synapse.federation.transport.server.federation import ( FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, FederationUnstableClientKeysClaimServlet, + FederationUnstableMediaDownloadServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -315,6 +317,28 @@ def register_servlets( ): continue + if servletclass == FederationUnstableMediaDownloadServlet: + if ( + not hs.config.server.enable_media_repo + or not hs.config.experimental.msc3916_authenticated_media_enabled + ): + continue + + # don't load the endpoint if the storage provider is incompatible + media_repo = hs.get_media_repository() + load_download_endpoint = True + for provider in media_repo.media_storage.storage_providers: + signature = inspect.signature(provider.backend.fetch) + if "federation" not in signature.parameters: + logger.warning( + f"Federation media `/download` endpoint will not be enabled as storage provider {provider.backend} is not compatible with this endpoint." + ) + load_download_endpoint = False + break + + if not load_download_endpoint: + continue + servletclass( hs=hs, authenticator=authenticator, diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index db0f5076a9..4e2717b565 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -360,13 +360,29 @@ class BaseFederationServlet: "request" ) return None + if ( + func.__self__.__class__.__name__ # type: ignore + == "FederationUnstableMediaDownloadServlet" + ): + response = await func( + origin, content, request, *args, **kwargs + ) + else: + response = await func( + origin, content, request.args, *args, **kwargs + ) + else: + if ( + func.__self__.__class__.__name__ # type: ignore + == "FederationUnstableMediaDownloadServlet" + ): + response = await func( + origin, content, request, *args, **kwargs + ) + else: response = await func( origin, content, request.args, *args, **kwargs ) - else: - response = await func( - origin, content, request.args, *args, **kwargs - ) finally: # if we used the origin's context as the parent, add a new span using # the servlet span as a parent, so that we have a link diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index a59734785f..1f02451efa 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -44,10 +44,13 @@ from synapse.federation.transport.server._base import ( ) from synapse.http.servlet import ( parse_boolean_from_args, + parse_integer, parse_integer_from_args, parse_string_from_args, parse_strings_from_args, ) +from synapse.http.site import SynapseRequest +from synapse.media._base import DEFAULT_MAX_TIMEOUT_MS, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS from synapse.types import JsonDict from synapse.util import SYNAPSE_VERSION from synapse.util.ratelimitutils import FederationRateLimiter @@ -787,6 +790,43 @@ class FederationAccountStatusServlet(BaseFederationServerServlet): return 200, {"account_statuses": statuses, "failures": failures} +class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet): + """ + Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns + a multipart/form-data response consisting of a JSON object and the requested media + item. This endpoint only returns local media. + """ + + PATH = "/media/download/(?P[^/]*)" + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916" + RATELIMIT = True + + def __init__( + self, + hs: "HomeServer", + ratelimiter: FederationRateLimiter, + authenticator: Authenticator, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self.media_repo = self.hs.get_media_repository() + + async def on_GET( + self, + origin: Optional[str], + content: Literal[None], + request: SynapseRequest, + media_id: str, + ) -> None: + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + await self.media_repo.get_local_media( + request, media_id, None, max_timeout_ms, federation=True + ) + + FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationSendServlet, FederationEventServlet, @@ -818,4 +858,5 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationV1SendKnockServlet, FederationMakeKnockServlet, FederationAccountStatusServlet, + FederationUnstableMediaDownloadServlet, ) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index e76a51ba30..99f9f6e64a 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -247,6 +247,12 @@ class E2eRoomKeysHandler: if current_room_key: if self._should_replace_room_key(current_room_key, room_key): log_kv({"message": "Replacing room key."}) + logger.debug( + "Replacing room key. room=%s session=%s user=%s", + room_id, + session_id, + user_id, + ) # updates are done one at a time in the DB, so send # updates right away rather than batching them up, # like we do with the inserts @@ -256,6 +262,12 @@ class E2eRoomKeysHandler: changed = True else: log_kv({"message": "Not replacing room_key."}) + logger.debug( + "Not replacing room key. room=%s session=%s user=%s", + room_id, + session_id, + user_id, + ) else: log_kv( { @@ -265,6 +277,12 @@ class E2eRoomKeysHandler: } ) log_kv({"message": "Replacing room key."}) + logger.debug( + "Inserting new room key. room=%s session=%s user=%s", + room_id, + session_id, + user_id, + ) to_insert.append((room_id, session_id, room_key)) changed = True diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index de5bd44a5f..721ef04f41 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1551,6 +1551,7 @@ class EventCreationHandler: # stream_ordering entry manually (as it was persisted on # another worker). event.internal_metadata.stream_ordering = stream_id + event.internal_metadata.instance_name = writer_instance return event diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index f7447b8ba5..dab3f90e74 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -37,11 +37,10 @@ from synapse.types import ( JsonMapping, Requester, ScheduledTask, - ShutdownRoomParams, - ShutdownRoomResponse, StreamKeyType, TaskStatus, ) +from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7f1b674d10..203209427b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -80,8 +80,6 @@ from synapse.types import ( RoomAlias, RoomID, RoomStreamToken, - ShutdownRoomParams, - ShutdownRoomResponse, StateMap, StrCollection, StreamKeyType, @@ -89,6 +87,7 @@ from synapse.types import ( UserID, create_requester, ) +from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse from synapse.types.state import StateFilter from synapse.util import stringutils from synapse.util.caches.response_cache import ResponseCache diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index a4a1dfc9b2..51692cf697 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,23 +18,14 @@ # # import logging -from enum import Enum -from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional, Set -import attr from immutabledict import immutabledict -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra -else: - from pydantic import Extra - from synapse.api.constants import AccountDataTypes, Membership from synapse.events import EventBase -from synapse.rest.client.models import SlidingSyncBody -from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID +from synapse.types import Requester, RoomStreamToken, StreamToken, UserID +from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult if TYPE_CHECKING: from synapse.server import HomeServer @@ -62,166 +53,6 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> return membership != Membership.LEAVE or sender != user_id -class SlidingSyncConfig(SlidingSyncBody): - """ - Inherit from `SlidingSyncBody` since we need all of the same fields and add a few - extra fields that we need in the handler - """ - - user: UserID - device_id: Optional[str] - - # Pydantic config - class Config: - # By default, ignore fields that we don't recognise. - extra = Extra.ignore - # By default, don't allow fields to be reassigned after parsing. - allow_mutation = False - # Allow custom types like `UserID` to be used in the model - arbitrary_types_allowed = True - - -class OperationType(Enum): - """ - Represents the operation types in a Sliding Sync window. - - Attributes: - SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about - entries in this range. - INSERT: Sets a single entry. If the position is not empty then clients MUST move - entries to the left or the right depending on where the closest empty space is. - DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move - places. - INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for - offline support, but they should be treated as empty when additional operations - which concern indexes in the range arrive from the server. - """ - - SYNC: Final = "SYNC" - INSERT: Final = "INSERT" - DELETE: Final = "DELETE" - INVALIDATE: Final = "INVALIDATE" - - -@attr.s(slots=True, frozen=True, auto_attribs=True) -class SlidingSyncResult: - """ - The Sliding Sync result to be serialized to JSON for a response. - - Attributes: - next_pos: The next position token in the sliding window to request (next_batch). - lists: Sliding window API. A map of list key to list results. - rooms: Room subscription API. A map of room ID to room subscription to room results. - extensions: Extensions API. A map of extension key to extension results. - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class RoomResult: - """ - Attributes: - name: Room name or calculated room name. - avatar: Room avatar - heroes: List of stripped membership events (containing `user_id` and optionally - `avatar_url` and `displayname`) for the users used to calculate the room name. - initial: Flag which is set when this is the first time the server is sending this - data on this connection. Clients can use this flag to replace or update - their local state. When there is an update, servers MUST omit this flag - entirely and NOT send "initial":false as this is wasteful on bandwidth. The - absence of this flag means 'false'. - required_state: The current state of the room - timeline: Latest events in the room. The last event is the most recent - is_dm: Flag to specify whether the room is a direct-message room (most likely - between two people). - invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` - in sync v2, absent on joined/left rooms - prev_batch: A token that can be passed as a start parameter to the - `/rooms//messages` API to retrieve earlier messages. - limited: True if their are more events than fit between the given position and now. - Sync again to get more. - joined_count: The number of users with membership of join, including the client's - own user ID. (same as sync `v2 m.joined_member_count`) - invited_count: The number of users with membership of invite. (same as sync v2 - `m.invited_member_count`) - notification_count: The total number of unread notifications for this room. (same - as sync v2) - highlight_count: The number of unread notifications for this room with the highlight - flag set. (same as sync v2) - num_live: The number of timeline events which have just occurred and are not historical. - The last N events are 'live' and should be treated as such. This is mostly - useful to determine whether a given @mention event should make a noise or not. - Clients cannot rely solely on the absence of `initial: true` to determine live - events because if a room not in the sliding window bumps into the window because - of an @mention it will have `initial: true` yet contain a single live event - (with potentially other old events in the timeline). - """ - - name: str - avatar: Optional[str] - heroes: Optional[List[EventBase]] - initial: bool - required_state: List[EventBase] - timeline: List[EventBase] - is_dm: bool - invite_state: List[EventBase] - prev_batch: StreamToken - limited: bool - joined_count: int - invited_count: int - notification_count: int - highlight_count: int - num_live: int - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class SlidingWindowList: - """ - Attributes: - count: The total number of entries in the list. Always present if this list - is. - ops: The sliding list operations to perform. - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class Operation: - """ - Attributes: - op: The operation type to perform. - range: Which index positions are affected by this operation. These are - both inclusive. - room_ids: Which room IDs are affected by this operation. These IDs match - up to the positions in the `range`, so the last room ID in this list - matches the 9th index. The room data is held in a separate object. - """ - - op: OperationType - range: Tuple[int, int] - room_ids: List[str] - - count: int - ops: List[Operation] - - next_pos: StreamToken - lists: Dict[str, SlidingWindowList] - rooms: Dict[str, RoomResult] - extensions: JsonMapping - - def __bool__(self) -> bool: - """Make the result appear empty if there are no updates. This is used - to tell if the notifier needs to wait for more events when polling for - events. - """ - return bool(self.lists or self.rooms or self.extensions) - - @staticmethod - def empty(next_pos: StreamToken) -> "SlidingSyncResult": - "Return a new empty result" - return SlidingSyncResult( - next_pos=next_pos, - lists={}, - rooms={}, - extensions={}, - ) - - class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -241,9 +72,19 @@ class SlidingSyncHandler: from_token: Optional[StreamToken] = None, timeout_ms: int = 0, ) -> SlidingSyncResult: - """Get the sync for a client if we have new data for it now. Otherwise + """ + Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. + + Args: + requester: The user making the request + sync_config: Sync configuration + from_token: The point in the stream to sync from. Token of the end of the + previous batch. May be `None` if this is the initial sync request. + timeout_ms: The time in milliseconds to wait for new data to arrive. If 0, + we will immediately but there might not be any new data so we just return an + empty response. """ # If the user is not part of the mau group, then check that limits have # not been exceeded (if not part of the group by this point, almost certain @@ -315,6 +156,14 @@ class SlidingSyncHandler: """ Generates the response body of a Sliding Sync result, represented as a `SlidingSyncResult`. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + sync_config: Sync configuration + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. Token of the end of the + previous batch. May be `None` if this is the initial sync request. """ user_id = sync_config.user.to_string() app_service = self.store.get_app_service_by_user_id(user_id) @@ -338,11 +187,8 @@ class SlidingSyncHandler: # Apply filters filtered_room_ids = room_id_set if list_config.filters is not None: - # TODO: To be absolutely correct, this could also take into account - # from/to tokens but some of the streams don't support looking back - # in time (like global account_data). filtered_room_ids = await self.filter_rooms( - sync_config.user, room_id_set, list_config.filters + sync_config.user, room_id_set, list_config.filters, to_token ) # TODO: Apply sorts sorted_room_ids = sorted(filtered_room_ids) @@ -393,6 +239,12 @@ class SlidingSyncHandler: `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way to tell when a room was forgotten at the moment so we can't factor it into the from/to range. + + + Args: + user: User to fetch rooms for + to_token: The token to fetch rooms up to. + from_token: The point in the stream to sync from. """ user_id = user.to_string() @@ -451,12 +303,6 @@ class SlidingSyncHandler: instance_map=immutabledict(instance_to_max_stream_ordering_map), ) - # If our `to_token` is already the same or ahead of the latest room membership - # for the user, we can just straight-up return the room list (nothing has - # changed) - if membership_snapshot_token.is_before_or_eq(to_token.room_key): - return sync_room_id_set - # Since we fetched the users room list at some point in time after the from/to # tokens, we need to revert/rewind some membership changes to match the point in # time of the `to_token`. In particular, we need to make these fixups: @@ -476,14 +322,20 @@ class SlidingSyncHandler: # 1) Fetch membership changes that fall in the range from `to_token` up to # `membership_snapshot_token` - membership_change_events_after_to_token = ( - await self.store.get_membership_changes_for_user( - user_id, - from_key=to_token.room_key, - to_key=membership_snapshot_token, - excluded_rooms=self.rooms_to_exclude_globally, + # + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we don't need to do any "2)" fix-ups and can just straight-up + # use the room list from the snapshot as a base (nothing has changed) + membership_change_events_after_to_token = [] + if not membership_snapshot_token.is_before_or_eq(to_token.room_key): + membership_change_events_after_to_token = ( + await self.store.get_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_rooms=self.rooms_to_exclude_globally, + ) ) - ) # 1) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only @@ -621,9 +473,16 @@ class SlidingSyncHandler: user: UserID, room_id_set: AbstractSet[str], filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, ) -> AbstractSet[str]: """ Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + room_id_set: Set of room IDs to filter down + filters: Filters to apply + to_token: We filter based on the state of the room at this token """ user_id = user.to_string() @@ -642,10 +501,13 @@ class SlidingSyncHandler: # `is_direct` on membership events because that property only appears for # the invitee membership event (doesn't show up for the inviter). Account # data is set by the client so it needs to be scrutinized. + # + # We're unable to take `to_token` into account for global account data since + # we only keep track of the latest account data for the user. dm_map = await self.store.get_global_account_data_by_type_for_user( user_id, AccountDataTypes.DIRECT ) - logger.warn("dm_map: %s", dm_map) + # Flatten out the map dm_room_id_set = set() if dm_map: diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 3fbed6062f..19bca94170 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -25,7 +25,16 @@ import os import urllib from abc import ABC, abstractmethod from types import TracebackType -from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type +from typing import ( + TYPE_CHECKING, + Awaitable, + Dict, + Generator, + List, + Optional, + Tuple, + Type, +) import attr @@ -39,6 +48,11 @@ from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable from synapse.util.stringutils import is_ascii +if TYPE_CHECKING: + from synapse.media.media_storage import MultipartResponder + from synapse.storage.databases.main.media_repository import LocalMedia + + logger = logging.getLogger(__name__) # list all text content types that will have the charset default to UTF-8 when @@ -260,6 +274,53 @@ def _can_encode_filename_as_token(x: str) -> bool: return True +async def respond_with_multipart_responder( + request: SynapseRequest, + responder: "Optional[MultipartResponder]", + media_info: "LocalMedia", +) -> None: + """ + Responds via a Multipart responder for the federation media `/download` requests + + Args: + request: the federation request to respond to + responder: the Multipart responder which will send the response + media_info: metadata about the media item + """ + if not responder: + respond_404(request) + return + + # If we have a responder we *must* use it as a context manager. + with responder: + if request._disconnected: + logger.warning( + "Not sending response to request %s, already disconnected.", request + ) + return + + logger.debug("Responding to media request with responder %s", responder) + if media_info.media_length is not None: + request.setHeader(b"Content-Length", b"%d" % (media_info.media_length,)) + request.setHeader( + b"Content-Type", b"multipart/mixed; boundary=%s" % responder.boundary + ) + + try: + await responder.write_to_consumer(request) + except Exception as e: + # The majority of the time this will be due to the client having gone + # away. Unfortunately, Twisted simply throws a generic exception at us + # in that case. + logger.warning("Failed to write to consumer: %s %s", type(e), e) + + # Unregister the producer, if it has one, so Twisted doesn't complain + if request.producer: + request.unregisterProducer() + + finish_request(request) + + async def respond_with_responder( request: SynapseRequest, responder: "Optional[Responder]", diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 6ed56099ca..c335e518a0 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -54,10 +54,11 @@ from synapse.media._base import ( ThumbnailInfo, get_filename_from_headers, respond_404, + respond_with_multipart_responder, respond_with_responder, ) from synapse.media.filepath import MediaFilePaths -from synapse.media.media_storage import MediaStorage +from synapse.media.media_storage import MediaStorage, MultipartResponder from synapse.media.storage_provider import StorageProviderWrapper from synapse.media.thumbnailer import Thumbnailer, ThumbnailError from synapse.media.url_previewer import UrlPreviewer @@ -429,6 +430,7 @@ class MediaRepository: media_id: str, name: Optional[str], max_timeout_ms: int, + federation: bool = False, ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -440,6 +442,7 @@ class MediaRepository: the filename in the Content-Disposition header of the response. max_timeout_ms: the maximum number of milliseconds to wait for the media to be uploaded. + federation: whether the local media being fetched is for a federation request Returns: Resolves once a response has successfully been written to request @@ -459,10 +462,17 @@ class MediaRepository: file_info = FileInfo(None, media_id, url_cache=bool(url_cache)) - responder = await self.media_storage.fetch_media(file_info) - await respond_with_responder( - request, responder, media_type, media_length, upload_name + responder = await self.media_storage.fetch_media( + file_info, media_info, federation ) + if federation: + # this really should be a Multipart responder but just in case + assert isinstance(responder, MultipartResponder) + await respond_with_multipart_responder(request, responder, media_info) + else: + await respond_with_responder( + request, responder, media_type, media_length, upload_name + ) async def get_remote_media( self, diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index b3cd3fd8f4..2f55d12b6b 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -19,9 +19,12 @@ # # import contextlib +import json import logging import os import shutil +from contextlib import closing +from io import BytesIO from types import TracebackType from typing import ( IO, @@ -30,14 +33,19 @@ from typing import ( AsyncIterator, BinaryIO, Callable, + List, Optional, Sequence, Tuple, Type, + Union, ) +from uuid import uuid4 import attr +from zope.interface import implementer +from twisted.internet import defer, interfaces from twisted.internet.defer import Deferred from twisted.internet.interfaces import IConsumer from twisted.protocols.basic import FileSender @@ -48,15 +56,19 @@ from synapse.logging.opentracing import start_active_span, trace, trace_with_opn from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer +from ..storage.databases.main.media_repository import LocalMedia +from ..types import JsonDict from ._base import FileInfo, Responder from .filepath import MediaFilePaths if TYPE_CHECKING: - from synapse.media.storage_provider import StorageProvider + from synapse.media.storage_provider import StorageProviderWrapper from synapse.server import HomeServer logger = logging.getLogger(__name__) +CRLF = b"\r\n" + class MediaStorage: """Responsible for storing/fetching files from local sources. @@ -73,7 +85,7 @@ class MediaStorage: hs: "HomeServer", local_media_directory: str, filepaths: MediaFilePaths, - storage_providers: Sequence["StorageProvider"], + storage_providers: Sequence["StorageProviderWrapper"], ): self.hs = hs self.reactor = hs.get_reactor() @@ -169,15 +181,23 @@ class MediaStorage: raise e from None - async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: + async def fetch_media( + self, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: """Attempts to fetch media described by file_info from the local cache and configured storage providers. Args: - file_info + file_info: Metadata about the media file + media_info: Metadata about the media item + federation: Whether this file is being fetched for a federation request Returns: - Returns a Responder if the file was found, otherwise None. + If the file was found returns a Responder (a Multipart Responder if the requested + file is for the federation /download endpoint), otherwise None. """ paths = [self._file_info_to_path(file_info)] @@ -197,12 +217,19 @@ class MediaStorage: local_path = os.path.join(self.local_media_directory, path) if os.path.exists(local_path): logger.debug("responding with local file %s", local_path) - return FileResponder(open(local_path, "rb")) + if federation: + assert media_info is not None + boundary = uuid4().hex.encode("ascii") + return MultipartResponder( + open(local_path, "rb"), media_info, boundary + ) + else: + return FileResponder(open(local_path, "rb")) logger.debug("local file %s did not exist", local_path) for provider in self.storage_providers: for path in paths: - res: Any = await provider.fetch(path, file_info) + res: Any = await provider.fetch(path, file_info, media_info, federation) if res: logger.debug("Streaming %s from %s", path, provider) return res @@ -316,7 +343,7 @@ class FileResponder(Responder): """Wraps an open file that can be sent to a request. Args: - open_file: A file like object to be streamed ot the client, + open_file: A file like object to be streamed to the client, is closed when finished streaming. """ @@ -337,6 +364,38 @@ class FileResponder(Responder): self.open_file.close() +class MultipartResponder(Responder): + """Wraps an open file, formats the response according to MSC3916 and sends it to a + federation request. + + Args: + open_file: A file like object to be streamed to the client, + is closed when finished streaming. + media_info: metadata about the media item + boundary: bytes to use for the multipart response boundary + """ + + def __init__(self, open_file: IO, media_info: LocalMedia, boundary: bytes) -> None: + self.open_file = open_file + self.media_info = media_info + self.boundary = boundary + + def write_to_consumer(self, consumer: IConsumer) -> Deferred: + return make_deferred_yieldable( + MultipartFileSender().beginFileTransfer( + self.open_file, consumer, self.media_info.media_type, {}, self.boundary + ) + ) + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.open_file.close() + + class SpamMediaException(NotFoundError): """The media was blocked by a spam checker, so we simply 404 the request (in the same way as if it was quarantined). @@ -370,3 +429,151 @@ class ReadableFileWrapper: # We yield to the reactor by sleeping for 0 seconds. await self.clock.sleep(0) + + +@implementer(interfaces.IProducer) +class MultipartFileSender: + """ + A producer that sends the contents of a file to a federation request in the format + outlined in MSC3916 - a multipart/format-data response where the first field is a + JSON object and the second is the requested file. + + This is a slight re-writing of twisted.protocols.basic.FileSender to achieve the format + outlined above. + """ + + CHUNK_SIZE = 2**14 + + lastSent = "" + deferred: Optional[defer.Deferred] = None + + def beginFileTransfer( + self, + file: IO, + consumer: IConsumer, + file_content_type: str, + json_object: JsonDict, + boundary: bytes, + ) -> Deferred: + """ + Begin transferring a file + + Args: + file: The file object to read data from + consumer: The synapse request to write the data to + file_content_type: The content-type of the file + json_object: The JSON object to write to the first field of the response + boundary: bytes to be used as the multipart/form-data boundary + + Returns: A deferred whose callback will be invoked when the file has + been completely written to the consumer. The last byte written to the + consumer is passed to the callback. + """ + self.file: Optional[IO] = file + self.consumer = consumer + self.json_field = json_object + self.json_field_written = False + self.content_type_written = False + self.file_content_type = file_content_type + self.boundary = boundary + self.deferred: Deferred = defer.Deferred() + self.consumer.registerProducer(self, False) + # while it's not entirely clear why this assignment is necessary, it mirrors + # the behavior in FileSender.beginFileTransfer and thus is preserved here + deferred = self.deferred + return deferred + + def resumeProducing(self) -> None: + # write the first field, which will always be a json field + if not self.json_field_written: + self.consumer.write(CRLF + b"--" + self.boundary + CRLF) + + content_type = Header(b"Content-Type", b"application/json") + self.consumer.write(bytes(content_type) + CRLF) + + json_field = json.dumps(self.json_field) + json_bytes = json_field.encode("utf-8") + self.consumer.write(json_bytes) + self.consumer.write(CRLF + b"--" + self.boundary + CRLF) + + self.json_field_written = True + + chunk: Any = "" + if self.file: + # if we haven't written the content type yet, do so + if not self.content_type_written: + type = self.file_content_type.encode("utf-8") + content_type = Header(b"Content-Type", type) + self.consumer.write(bytes(content_type) + CRLF) + self.content_type_written = True + + chunk = self.file.read(self.CHUNK_SIZE) + + if not chunk: + # we've reached the end of the file + self.consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF) + self.file = None + self.consumer.unregisterProducer() + + if self.deferred: + self.deferred.callback(self.lastSent) + self.deferred = None + return + + self.consumer.write(chunk) + self.lastSent = chunk[-1:] + + def pauseProducing(self) -> None: + pass + + def stopProducing(self) -> None: + if self.deferred: + self.deferred.errback(Exception("Consumer asked us to stop producing")) + self.deferred = None + + +class Header: + """ + `Header` This class is a tiny wrapper that produces + request headers. We can't use standard python header + class because it encodes unicode fields using =? bla bla ?= + encoding, which is correct, but no one in HTTP world expects + that, everyone wants utf-8 raw bytes. (stolen from treq.multipart) + + """ + + def __init__( + self, + name: bytes, + value: Any, + params: Optional[List[Tuple[Any, Any]]] = None, + ): + self.name = name + self.value = value + self.params = params or [] + + def add_param(self, name: Any, value: Any) -> None: + self.params.append((name, value)) + + def __bytes__(self) -> bytes: + with closing(BytesIO()) as h: + h.write(self.name + b": " + escape(self.value).encode("us-ascii")) + if self.params: + for name, val in self.params: + h.write(b"; ") + h.write(escape(name).encode("us-ascii")) + h.write(b"=") + h.write(b'"' + escape(val).encode("utf-8") + b'"') + h.seek(0) + return h.read() + + +def escape(value: Union[str, bytes]) -> str: + """ + This function prevents header values from corrupting the request, + a newline in the file name parameter makes form-data request unreadable + for a majority of parsers. (stolen from treq.multipart) + """ + if isinstance(value, bytes): + value = value.decode("utf-8") + return value.replace("\r", "").replace("\n", "").replace('"', '\\"') diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py index 06e5d27a53..a2d50adf65 100644 --- a/synapse/media/storage_provider.py +++ b/synapse/media/storage_provider.py @@ -24,14 +24,16 @@ import logging import os import shutil from typing import TYPE_CHECKING, Callable, Optional +from uuid import uuid4 from synapse.config._base import Config from synapse.logging.context import defer_to_thread, run_in_background from synapse.logging.opentracing import start_active_span, trace_with_opname from synapse.util.async_helpers import maybe_awaitable +from ..storage.databases.main.media_repository import LocalMedia from ._base import FileInfo, Responder -from .media_storage import FileResponder +from .media_storage import FileResponder, MultipartResponder logger = logging.getLogger(__name__) @@ -55,13 +57,21 @@ class StorageProvider(metaclass=abc.ABCMeta): """ @abc.abstractmethod - async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: + async def fetch( + self, + path: str, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: """Attempt to fetch the file described by file_info and stream it into writer. Args: path: Relative path of file in local cache file_info: The metadata of the file. + media_info: metadata of the media item + federation: Whether the requested media is for a federation request Returns: Returns a Responder if the provider has the file, otherwise returns None. @@ -124,7 +134,13 @@ class StorageProviderWrapper(StorageProvider): run_in_background(store) @trace_with_opname("StorageProviderWrapper.fetch") - async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: + async def fetch( + self, + path: str, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: if file_info.url_cache: # Files in the URL preview cache definitely aren't stored here, # so avoid any potentially slow I/O or network access. @@ -132,7 +148,9 @@ class StorageProviderWrapper(StorageProvider): # store_file is supposed to return an Awaitable, but guard # against improper implementations. - return await maybe_awaitable(self.backend.fetch(path, file_info)) + return await maybe_awaitable( + self.backend.fetch(path, file_info, media_info, federation) + ) class FileStorageProviderBackend(StorageProvider): @@ -172,11 +190,23 @@ class FileStorageProviderBackend(StorageProvider): ) @trace_with_opname("FileStorageProviderBackend.fetch") - async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: + async def fetch( + self, + path: str, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: """See StorageProvider.fetch""" backup_fname = os.path.join(self.base_directory, path) if os.path.isfile(backup_fname): + if federation: + assert media_info is not None + boundary = uuid4().hex.encode("ascii") + return MultipartResponder( + open(backup_fname, "rb"), media_info, boundary + ) return FileResponder(open(backup_fname, "rb")) return None diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 534dc0e276..0024ccf708 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -53,7 +53,7 @@ from synapse.rest.client import ( register, relations, rendezvous, - report_event, + reporting, room, room_keys, room_upgrade_rest_servlet, @@ -128,7 +128,7 @@ class ClientRestResource(JsonResource): tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) if is_main_process: - report_event.register_servlets(hs, client_resource) + reporting.register_servlets(hs, client_resource) openid.register_servlets(hs, client_resource) notifications.register_servlets(hs, client_resource) devices.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 6ac07d354c..8daa449f9e 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -56,14 +56,14 @@ from synapse.http.servlet import ( from synapse.http.site import SynapseRequest from synapse.metrics import threepid_send_requests from synapse.push.mailer import Mailer -from synapse.rest.client.models import ( +from synapse.types import JsonDict +from synapse.types.rest import RequestBodyModel +from synapse.types.rest.client import ( AuthenticationData, ClientSecretStr, EmailRequestTokenBody, MsisdnRequestTokenBody, ) -from synapse.rest.models import RequestBodyModel -from synapse.types import JsonDict from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.stringutils import assert_valid_client_secret, random_string from synapse.util.threepids import check_3pid_allowed, validate_email diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index b1b803549e..8313d687b7 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -42,9 +42,9 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.rest.client._base import client_patterns, interactive_auth_handler -from synapse.rest.client.models import AuthenticationData -from synapse.rest.models import RequestBodyModel from synapse.types import JsonDict +from synapse.types.rest import RequestBodyModel +from synapse.types.rest.client import AuthenticationData if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/rest/client/directory.py b/synapse/rest/client/directory.py index 8099fdf3e4..11fdd0f7c6 100644 --- a/synapse/rest/client/directory.py +++ b/synapse/rest/client/directory.py @@ -41,8 +41,8 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.rest.client._base import client_patterns -from synapse.rest.models import RequestBodyModel from synapse.types import JsonDict, RoomAlias +from synapse.types.rest import RequestBodyModel if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/rest/client/report_event.py b/synapse/rest/client/reporting.py similarity index 61% rename from synapse/rest/client/report_event.py rename to synapse/rest/client/reporting.py index 447281931e..4eee53e5a8 100644 --- a/synapse/rest/client/report_event.py +++ b/synapse/rest/client/reporting.py @@ -23,17 +23,28 @@ import logging from http import HTTPStatus from typing import TYPE_CHECKING, Tuple +from synapse._pydantic_compat import HAS_PYDANTIC_V2 from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.http.server import HttpServer -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import ( + RestServlet, + parse_and_validate_json_object_from_request, + parse_json_object_from_request, +) from synapse.http.site import SynapseRequest from synapse.types import JsonDict +from synapse.types.rest import RequestBodyModel from ._base import client_patterns if TYPE_CHECKING: from synapse.server import HomeServer +if TYPE_CHECKING or HAS_PYDANTIC_V2: + from pydantic.v1 import StrictStr +else: + from pydantic import StrictStr + logger = logging.getLogger(__name__) @@ -95,5 +106,57 @@ class ReportEventRestServlet(RestServlet): return 200, {} +class ReportRoomRestServlet(RestServlet): + """This endpoint lets clients report a room for abuse. + + Whilst MSC4151 is not yet merged, this unstable endpoint is enabled on matrix.org + for content moderation purposes, and therefore backwards compatibility should be + carefully considered when changing anything on this endpoint. + + More details on the MSC: https://github.com/matrix-org/matrix-spec-proposals/pull/4151 + """ + + PATTERNS = client_patterns( + "/org.matrix.msc4151/rooms/(?P[^/]*)/report$", + releases=[], + v1=False, + unstable=True, + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.hs = hs + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastores().main + + class PostBody(RequestBodyModel): + reason: StrictStr + + async def on_POST( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + body = parse_and_validate_json_object_from_request(request, self.PostBody) + + room = await self.store.get_room(room_id) + if room is None: + raise NotFoundError("Room does not exist") + + await self.store.add_room_report( + room_id=room_id, + user_id=user_id, + reason=body.reason, + received_ts=self.clock.time_msec(), + ) + + return 200, {} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReportEventRestServlet(hs).register(http_server) + + if hs.config.experimental.msc4151_enabled: + ReportRoomRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 385b102b3d..1b0ac20d94 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -53,8 +53,8 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.logging.opentracing import trace_with_opname -from synapse.rest.client.models import SlidingSyncBody from synapse.types import JsonDict, Requester, StreamToken +from synapse.types.rest.client import SlidingSyncBody from synapse.util import json_decoder from synapse.util.caches.lrucache import LruCache diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 56de6906d0..f428158139 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -149,6 +149,8 @@ class VersionsRestServlet(RestServlet): is not None ) ), + # MSC4151: Report room API (Client-Server API) + "org.matrix.msc4151": self.config.experimental.msc4151_enabled, }, }, ) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index dc7325fc57..a411ed614e 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -41,9 +41,9 @@ from synapse.http.servlet import ( parse_and_validate_json_object_from_request, parse_integer, ) -from synapse.rest.models import RequestBodyModel from synapse.storage.keys import FetchKeyResultForRemote from synapse.types import JsonDict +from synapse.types.rest import RequestBodyModel from synapse.util import json_decoder from synapse.util.async_helpers import yieldable_gather_results diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 1c771e48f7..40187496e2 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -108,6 +108,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ("device_lists_outbound_pokes", "instance_name", "stream_id"), ("device_lists_changes_in_room", "instance_name", "stream_id"), ("device_lists_remote_pending", "instance_name", "stream_id"), + ( + "device_lists_changes_converted_stream_position", + "instance_name", + "stream_id", + ), ], sequence_name="device_lists_sequence", writers=["master"], @@ -2394,15 +2399,16 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): `FALSE` have not been converted. """ - return cast( - Tuple[int, str], - await self.db_pool.simple_select_one( - table="device_lists_changes_converted_stream_position", - keyvalues={}, - retcols=["stream_id", "room_id"], - desc="get_device_change_last_converted_pos", - ), + # There should be only one row in this table, though we want to + # future-proof ourselves for when we have multiple rows (one for each + # instance). So to handle that case we take the minimum of all rows. + rows = await self.db_pool.simple_select_list( + table="device_lists_changes_converted_stream_position", + keyvalues={}, + retcols=["stream_id", "room_id"], + desc="get_device_change_last_converted_pos", ) + return cast(Tuple[int, str], min(rows)) async def set_device_change_last_converted_pos( self, @@ -2417,6 +2423,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): await self.db_pool.simple_update_one( table="device_lists_changes_converted_stream_position", keyvalues={}, - updatevalues={"stream_id": stream_id, "room_id": room_id}, + updatevalues={ + "stream_id": stream_id, + "instance_name": self._instance_name, + "room_id": room_id, + }, desc="set_device_change_last_converted_pos", ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f1bd85aa27..66428e6c8e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -207,6 +207,7 @@ class PersistEventsStore: async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream + event.internal_metadata.instance_name = self._instance_name await self.db_pool.runInteraction( "persist_events", diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c06c44deb1..e264d36f02 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -156,6 +156,7 @@ class _EventRow: event_id: str stream_ordering: int + instance_name: str json: str internal_metadata: str format_version: Optional[int] @@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore): rejected_reason=rejected_reason, ) original_ev.internal_metadata.stream_ordering = row.stream_ordering + original_ev.internal_metadata.instance_name = row.instance_name original_ev.internal_metadata.outlier = row.outlier # Consistency check: if the content of the event has been modified in the @@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore): SELECT e.event_id, e.stream_ordering, + e.instance_name, ej.internal_metadata, ej.json, ej.format_version, @@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore): event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], - internal_metadata=row[2], - json=row[3], - format_version=row[4], - room_version_id=row[5], - rejected_reason=row[6], + instance_name=row[2], + internal_metadata=row[3], + json=row[4], + format_version=row[5], + room_version_id=row[6], + rejected_reason=row[7], redactions=[], - outlier=bool(row[7]), # This is an int in SQLite3 + outlier=bool(row[8]), # This is an int in SQLite3 ) # check for redactions diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 616c941687..b8a71c803e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2207,6 +2207,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): super().__init__(database, db_conn, hs) self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") + self._room_reports_id_gen = IdGenerator(db_conn, "room_reports", "id") self._instance_name = hs.get_instance_name() @@ -2416,6 +2417,37 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): ) return next_id + async def add_room_report( + self, + room_id: str, + user_id: str, + reason: str, + received_ts: int, + ) -> int: + """Add a room report + + Args: + room_id: The room ID being reported. + user_id: User who reports the room. + reason: Description that the user specifies. + received_ts: Time when the user submitted the report (milliseconds). + Returns: + Id of the room report. + """ + next_id = self._room_reports_id_gen.get_next() + await self.db_pool.simple_insert( + table="room_reports", + values={ + "id": next_id, + "received_ts": received_ts, + "room_id": room_id, + "user_id": user_id, + "reason": reason, + }, + desc="add_room_report", + ) + return next_id + async def block_room(self, room_id: str, user_id: str) -> None: """Marks the room as blocked. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7ab6003f61..61373f0bfb 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -914,12 +914,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, ) -> Optional[str]: - # We need to handle the fact that the stream tokens can be vector - # clocks. We do this by getting all rows between the minimum and - # maximum stream ordering in the token, plus one row less than the - # minimum stream ordering. We then filter the results against the - # token and return the first row that matches. + # We're looking for the closest event at or before the token. We need to + # handle the fact that the stream token can be a vector clock (with an + # `instance_map`) and events can be persisted on different instances + # (sharded event persisters). The first subquery handles the events that + # would be within the vector clock and gets all rows between the minimum and + # maximum stream ordering in the token which need to be filtered against the + # `instance_map`. The second subquery handles the "before" case and finds + # the first row before the token. We then filter out any results past the + # token's vector clock and return the first row that matches. + min_stream = end_token.stream + max_stream = end_token.get_max_stream_pos() + # We use `union all` because we don't need any of the deduplication logic + # (`union` is really a union + distinct). `UNION ALL` does preserve the + # ordering of the operand queries but there is no actual gurantee that it + # has this behavior in all scenarios so we need the extra `ORDER BY` at the + # bottom. sql = """ SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id @@ -931,7 +942,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): AND rejections.event_id IS NULL ORDER BY stream_ordering DESC ) AS a - UNION + UNION ALL SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id FROM events @@ -943,15 +954,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ORDER BY stream_ordering DESC LIMIT 1 ) AS b + ORDER BY stream_ordering DESC """ txn.execute( sql, ( room_id, - end_token.stream, - end_token.get_max_stream_pos(), + min_stream, + max_stream, room_id, - end_token.stream, + min_stream, ), ) diff --git a/synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql b/synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql new file mode 100644 index 0000000000..c3f2b6a1dd --- /dev/null +++ b/synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql @@ -0,0 +1,16 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Add `instance_name` columns to stream tables to allow them to be used with +-- `MultiWriterIdGenerator` +ALTER TABLE device_lists_changes_converted_stream_position ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/schema/main/delta/85/06_add_room_reports.sql b/synapse/storage/schema/main/delta/85/06_add_room_reports.sql new file mode 100644 index 0000000000..f7b45276cf --- /dev/null +++ b/synapse/storage/schema/main/delta/85/06_add_room_reports.sql @@ -0,0 +1,20 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +CREATE TABLE room_reports ( + id BIGINT NOT NULL PRIMARY KEY, + received_ts BIGINT NOT NULL, + room_id TEXT NOT NULL, + user_id TEXT NOT NULL, + reason TEXT NOT NULL +); diff --git a/synapse/synapse_rust/events.pyi b/synapse/synapse_rust/events.pyi index 69837617f5..1682d0d151 100644 --- a/synapse/synapse_rust/events.pyi +++ b/synapse/synapse_rust/events.pyi @@ -19,6 +19,8 @@ class EventInternalMetadata: stream_ordering: Optional[int] """the stream ordering of this event. None, until it has been persisted.""" + instance_name: Optional[str] + """the instance name of the server that persisted this event. None, until it has been persisted.""" outlier: bool """whether this event is an outlier (ie, whether we have the state at that diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 3a89787cab..151658df53 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1279,60 +1279,3 @@ class ScheduledTask: result: Optional[JsonMapping] # Optional error that should be assigned a value when the status is FAILED error: Optional[str] - - -class ShutdownRoomParams(TypedDict): - """ - Attributes: - requester_user_id: - User who requested the action. Will be recorded as putting the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - even if there are still users joined to the room. - """ - - requester_user_id: Optional[str] - new_room_user_id: Optional[str] - new_room_name: Optional[str] - message: Optional[str] - block: bool - purge: bool - force_purge: bool - - -class ShutdownRoomResponse(TypedDict): - """ - Attributes: - kicked_users: An array of users (`user_id`) that were kicked. - failed_to_kick_users: - An array of users (`user_id`) that that were not kicked. - local_aliases: - An array of strings representing the local aliases that were - migrated from the old room to the new. - new_room_id: A string representing the room ID of the new room. - """ - - kicked_users: List[str] - failed_to_kick_users: List[str] - local_aliases: List[str] - new_room_id: Optional[str] diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py new file mode 100644 index 0000000000..1d65551d5b --- /dev/null +++ b/synapse/types/handlers/__init__.py @@ -0,0 +1,252 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +from enum import Enum +from typing import TYPE_CHECKING, Dict, Final, List, Optional, Tuple + +import attr +from typing_extensions import TypedDict + +from synapse._pydantic_compat import HAS_PYDANTIC_V2 + +if TYPE_CHECKING or HAS_PYDANTIC_V2: + from pydantic.v1 import Extra +else: + from pydantic import Extra + +from synapse.events import EventBase +from synapse.types import JsonMapping, StreamToken, UserID +from synapse.types.rest.client import SlidingSyncBody + + +class ShutdownRoomParams(TypedDict): + """ + Attributes: + requester_user_id: + User who requested the action. Will be recorded as putting the room on the + blocking list. + new_room_user_id: + If set, a new room will be created with this user ID + as the creator and admin, and all users in the old room will be + moved into that room. If not set, no new room will be created + and the users will just be removed from the old room. + new_room_name: + A string representing the name of the room that new users will + be invited to. Defaults to `Content Violation Notification` + message: + A string containing the first message that will be sent as + `new_room_user_id` in the new room. Ideally this will clearly + convey why the original room was shut down. + Defaults to `Sharing illegal content on this server is not + permitted and rooms in violation will be blocked.` + block: + If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Defaults to `false`. + purge: + If set to `true`, purge the given room from the database. + force_purge: + If set to `true`, the room will be purged from database + even if there are still users joined to the room. + """ + + requester_user_id: Optional[str] + new_room_user_id: Optional[str] + new_room_name: Optional[str] + message: Optional[str] + block: bool + purge: bool + force_purge: bool + + +class ShutdownRoomResponse(TypedDict): + """ + Attributes: + kicked_users: An array of users (`user_id`) that were kicked. + failed_to_kick_users: + An array of users (`user_id`) that that were not kicked. + local_aliases: + An array of strings representing the local aliases that were + migrated from the old room to the new. + new_room_id: A string representing the room ID of the new room. + """ + + kicked_users: List[str] + failed_to_kick_users: List[str] + local_aliases: List[str] + new_room_id: Optional[str] + + +class SlidingSyncConfig(SlidingSyncBody): + """ + Inherit from `SlidingSyncBody` since we need all of the same fields and add a few + extra fields that we need in the handler + """ + + user: UserID + device_id: Optional[str] + + # Pydantic config + class Config: + # By default, ignore fields that we don't recognise. + extra = Extra.ignore + # By default, don't allow fields to be reassigned after parsing. + allow_mutation = False + # Allow custom types like `UserID` to be used in the model + arbitrary_types_allowed = True + + +class OperationType(Enum): + """ + Represents the operation types in a Sliding Sync window. + + Attributes: + SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about + entries in this range. + INSERT: Sets a single entry. If the position is not empty then clients MUST move + entries to the left or the right depending on where the closest empty space is. + DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move + places. + INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for + offline support, but they should be treated as empty when additional operations + which concern indexes in the range arrive from the server. + """ + + SYNC: Final = "SYNC" + INSERT: Final = "INSERT" + DELETE: Final = "DELETE" + INVALIDATE: Final = "INVALIDATE" + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncResult: + """ + The Sliding Sync result to be serialized to JSON for a response. + + Attributes: + next_pos: The next position token in the sliding window to request (next_batch). + lists: Sliding window API. A map of list key to list results. + rooms: Room subscription API. A map of room ID to room subscription to room results. + extensions: Extensions API. A map of extension key to extension results. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class RoomResult: + """ + Attributes: + name: Room name or calculated room name. + avatar: Room avatar + heroes: List of stripped membership events (containing `user_id` and optionally + `avatar_url` and `displayname`) for the users used to calculate the room name. + initial: Flag which is set when this is the first time the server is sending this + data on this connection. Clients can use this flag to replace or update + their local state. When there is an update, servers MUST omit this flag + entirely and NOT send "initial":false as this is wasteful on bandwidth. The + absence of this flag means 'false'. + required_state: The current state of the room + timeline: Latest events in the room. The last event is the most recent + is_dm: Flag to specify whether the room is a direct-message room (most likely + between two people). + invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` + in sync v2, absent on joined/left rooms + prev_batch: A token that can be passed as a start parameter to the + `/rooms//messages` API to retrieve earlier messages. + limited: True if their are more events than fit between the given position and now. + Sync again to get more. + joined_count: The number of users with membership of join, including the client's + own user ID. (same as sync `v2 m.joined_member_count`) + invited_count: The number of users with membership of invite. (same as sync v2 + `m.invited_member_count`) + notification_count: The total number of unread notifications for this room. (same + as sync v2) + highlight_count: The number of unread notifications for this room with the highlight + flag set. (same as sync v2) + num_live: The number of timeline events which have just occurred and are not historical. + The last N events are 'live' and should be treated as such. This is mostly + useful to determine whether a given @mention event should make a noise or not. + Clients cannot rely solely on the absence of `initial: true` to determine live + events because if a room not in the sliding window bumps into the window because + of an @mention it will have `initial: true` yet contain a single live event + (with potentially other old events in the timeline). + """ + + name: str + avatar: Optional[str] + heroes: Optional[List[EventBase]] + initial: bool + required_state: List[EventBase] + timeline: List[EventBase] + is_dm: bool + invite_state: List[EventBase] + prev_batch: StreamToken + limited: bool + joined_count: int + invited_count: int + notification_count: int + highlight_count: int + num_live: int + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class SlidingWindowList: + """ + Attributes: + count: The total number of entries in the list. Always present if this list + is. + ops: The sliding list operations to perform. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Operation: + """ + Attributes: + op: The operation type to perform. + range: Which index positions are affected by this operation. These are + both inclusive. + room_ids: Which room IDs are affected by this operation. These IDs match + up to the positions in the `range`, so the last room ID in this list + matches the 9th index. The room data is held in a separate object. + """ + + op: OperationType + range: Tuple[int, int] + room_ids: List[str] + + count: int + ops: List[Operation] + + next_pos: StreamToken + lists: Dict[str, SlidingWindowList] + rooms: Dict[str, RoomResult] + extensions: JsonMapping + + def __bool__(self) -> bool: + """Make the result appear empty if there are no updates. This is used + to tell if the notifier needs to wait for more events when polling for + events. + """ + return bool(self.lists or self.rooms or self.extensions) + + @staticmethod + def empty(next_pos: StreamToken) -> "SlidingSyncResult": + "Return a new empty result" + return SlidingSyncResult( + next_pos=next_pos, + lists={}, + rooms={}, + extensions={}, + ) diff --git a/synapse/rest/models.py b/synapse/types/rest/__init__.py similarity index 100% rename from synapse/rest/models.py rename to synapse/types/rest/__init__.py diff --git a/synapse/rest/client/models.py b/synapse/types/rest/client/__init__.py similarity index 99% rename from synapse/rest/client/models.py rename to synapse/types/rest/client/__init__.py index 129fca2203..ec83d0daa6 100644 --- a/synapse/rest/client/models.py +++ b/synapse/types/rest/client/__init__.py @@ -43,7 +43,7 @@ else: validator, ) -from synapse.rest.models import RequestBodyModel +from synapse.types.rest import RequestBodyModel from synapse.util.threepids import validate_email diff --git a/synapse/visibility.py b/synapse/visibility.py index 09a947ef15..c891bd845b 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -151,7 +151,7 @@ async def filter_events_for_client( filter_send_to_client=filter_send_to_client, sender_ignored=event.sender in ignore_list, always_include_ids=always_include_ids, - retention_policy=retention_policies[room_id], + retention_policy=retention_policies[event.room_id], state=state_after_event, is_peeking=is_peeking, sender_erased=erased_senders.get(event.sender, False), diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py index d5ac66a6ed..30f8787758 100644 --- a/tests/events/test_utils.py +++ b/tests/events/test_utils.py @@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase): ) original.internal_metadata.stream_ordering = 1234 self.assertEqual(original.internal_metadata.stream_ordering, 1234) + original.internal_metadata.instance_name = "worker1" + self.assertEqual(original.internal_metadata.instance_name, "worker1") cloned = clone_event(original) cloned.unsigned["b"] = 3 @@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase): self.assertEqual(original.unsigned, {"a": 1, "b": 2}) self.assertEqual(cloned.unsigned, {"a": 1, "b": 3}) self.assertEqual(cloned.internal_metadata.stream_ordering, 1234) + self.assertEqual(cloned.internal_metadata.instance_name, "worker1") self.assertEqual(cloned.internal_metadata.txn_id, "txn") diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py new file mode 100644 index 0000000000..1c89d19e99 --- /dev/null +++ b/tests/federation/test_federation_media.py @@ -0,0 +1,234 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +import io +import os +import shutil +import tempfile +from typing import Optional + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.media._base import FileInfo, Responder +from synapse.media.filepath import MediaFilePaths +from synapse.media.media_storage import MediaStorage +from synapse.media.storage_provider import ( + FileStorageProviderBackend, + StorageProviderWrapper, +) +from synapse.server import HomeServer +from synapse.storage.databases.main.media_repository import LocalMedia +from synapse.types import JsonDict, UserID +from synapse.util import Clock + +from tests import unittest +from tests.test_utils import SMALL_PNG +from tests.unittest import override_config + + +class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase): + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + super().prepare(reactor, clock, hs) + self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-") + self.addCleanup(shutil.rmtree, self.test_dir) + self.primary_base_path = os.path.join(self.test_dir, "primary") + self.secondary_base_path = os.path.join(self.test_dir, "secondary") + + hs.config.media.media_store_path = self.primary_base_path + + storage_providers = [ + StorageProviderWrapper( + FileStorageProviderBackend(hs, self.secondary_base_path), + store_local=True, + store_remote=False, + store_synchronous=True, + ) + ] + + self.filepaths = MediaFilePaths(self.primary_base_path) + self.media_storage = MediaStorage( + hs, self.primary_base_path, self.filepaths, storage_providers + ) + self.media_repo = hs.get_media_repository() + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": True}} + ) + def test_file_download(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.media_repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with a text file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + stripped = channel.text_body.split("\r\n" + "--" + boundary) + # TODO: the json object expected will change once MSC3911 is implemented, currently + # {} is returned for all requests as a placeholder (per MSC3196) + found_json = any( + "\r\nContent-Type: application/json\r\n{}" in field for field in stripped + ) + self.assertTrue(found_json) + + # check that text file and expected value exist + found_file = any( + "\r\nContent-Type: text/plain\r\nfile_to_stream" in field + for field in stripped + ) + self.assertTrue(found_file) + + content = io.BytesIO(SMALL_PNG) + content_uri = self.get_success( + self.media_repo.create_content( + "image/png", + "test_png_upload", + content, + 67, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with an image file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + body = channel.result.get("body") + assert body is not None + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) + found_json = any( + b"\r\nContent-Type: application/json\r\n{}" in field + for field in stripped_bytes + ) + self.assertTrue(found_json) + + # check that png file exists and matches what was uploaded + found_file = any(SMALL_PNG in field for field in stripped_bytes) + self.assertTrue(found_file) + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": False}} + ) + def test_disable_config(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.media_repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(404, channel.code) + self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED") + + +class FakeFileStorageProviderBackend: + """ + Fake storage provider stub with incompatible `fetch` signature for testing + """ + + def __init__(self, hs: "HomeServer", config: str): + self.hs = hs + self.cache_directory = hs.config.media.media_store_path + self.base_directory = config + + def __str__(self) -> str: + return "FakeFileStorageProviderBackend[%s]" % (self.base_directory,) + + async def fetch( + self, path: str, file_info: FileInfo, media_info: Optional[LocalMedia] = None + ) -> Optional[Responder]: + pass + + +TEST_DIR = tempfile.mkdtemp(prefix="synapse-tests-") + + +class FederationUnstableMediaEndpointCompatibilityTest( + unittest.FederatingHomeserverTestCase +): + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + super().prepare(reactor, clock, hs) + self.test_dir = TEST_DIR + self.addCleanup(shutil.rmtree, self.test_dir) + self.media_repo = hs.get_media_repository() + + def default_config(self) -> JsonDict: + config = super().default_config() + primary_base_path = os.path.join(TEST_DIR, "primary") + config["media_storage_providers"] = [ + { + "module": "tests.federation.test_federation_media.FakeFileStorageProviderBackend", + "store_local": "True", + "store_remote": "False", + "store_synchronous": "False", + "config": {"directory": primary_base_path}, + } + ] + return config + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": True}} + ) + def test_incompatible_storage_provider_fails_to_load_endpoint(self) -> None: + channel = self.make_signed_federation_request( + "GET", + "/_matrix/federation/unstable/org.matrix.msc3916/media/download/xyz", + ) + self.pump() + self.assertEqual(404, channel.code) + self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED") diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 6caf72b1e5..ffb2681cc2 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -32,6 +32,7 @@ from synapse.api.constants import ( RoomTypes, ) from synapse.api.room_versions import RoomVersions +from synapse.handlers.sliding_sync import SlidingSyncConfig from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer @@ -334,7 +335,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Leave during the from_token/to_token range (newly_left) room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) after_room2_token = self.event_sources.get_current_token() @@ -1148,6 +1149,7 @@ class FilterRoomsTestCase(HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.sliding_sync_handler = self.hs.get_sliding_sync_handler() self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() def _create_dm_room( self, @@ -1247,35 +1249,31 @@ class FilterRoomsTestCase(HomeserverTestCase): invitee_tok=user2_tok, ) - # TODO: Better way to avoid the circular import? (see - # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) - from synapse.handlers.sliding_sync import SlidingSyncConfig + after_rooms_token = self.event_sources.get_current_token() # Try with `is_dm=True` - # ----------------------------- - truthy_filters = SlidingSyncConfig.SlidingSyncList.Filters( - is_dm=True, - ) - - # Filter the rooms truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, truthy_filters + UserID.from_string(user1_id), + {room_id, dm_room_id}, + SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=True, + ), + after_rooms_token, ) ) self.assertEqual(truthy_filtered_room_ids, {dm_room_id}) # Try with `is_dm=False` - # ----------------------------- - falsy_filters = SlidingSyncConfig.SlidingSyncList.Filters( - is_dm=False, - ) - - # Filter the rooms falsy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, falsy_filters + UserID.from_string(user1_id), + {room_id, dm_room_id}, + SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=False, + ), + after_rooms_token, ) ) @@ -1351,16 +1349,7 @@ class FilterRoomsTestCase(HomeserverTestCase): tok=user1_tok, ) - # TODO: Better way to avoid the circular import? (see - # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) - from synapse.handlers.sliding_sync import SlidingSyncConfig - - filters = SlidingSyncConfig.SlidingSyncList.Filters( - spaces=[ - space_a, - space_b, - ], - ) + after_rooms_token = self.event_sources.get_current_token() # Try filtering the rooms filtered_room_ids = self.get_success( @@ -1378,7 +1367,13 @@ class FilterRoomsTestCase(HomeserverTestCase): # not in any space room_not_in_space1, }, - filters, + SlidingSyncConfig.SlidingSyncList.Filters( + spaces=[ + space_a, + space_b, + ], + ), + after_rooms_token, ) ) @@ -1427,16 +1422,7 @@ class FilterRoomsTestCase(HomeserverTestCase): # Add to space_b self._add_space_child(space_b, room_id2, user2_tok) - # TODO: Better way to avoid the circular import? (see - # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) - from synapse.handlers.sliding_sync import SlidingSyncConfig - - filters = SlidingSyncConfig.SlidingSyncList.Filters( - spaces=[ - space_a, - space_b, - ], - ) + after_rooms_token = self.event_sources.get_current_token() # Try filtering the rooms filtered_room_ids = self.get_success( @@ -1448,7 +1434,13 @@ class FilterRoomsTestCase(HomeserverTestCase): # b (but user1 isn't in space_b) room_id2, }, - filters, + SlidingSyncConfig.SlidingSyncList.Filters( + spaces=[ + space_a, + space_b, + ], + ), + after_rooms_token, ) ) diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index 46d20ce775..47a89e9c66 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -49,7 +49,10 @@ from synapse.logging.context import make_deferred_yieldable from synapse.media._base import FileInfo, ThumbnailInfo from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage, ReadableFileWrapper -from synapse.media.storage_provider import FileStorageProviderBackend +from synapse.media.storage_provider import ( + FileStorageProviderBackend, + StorageProviderWrapper, +) from synapse.media.thumbnailer import ThumbnailProvider from synapse.module_api import ModuleApi from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers @@ -78,7 +81,14 @@ class MediaStorageTests(unittest.HomeserverTestCase): hs.config.media.media_store_path = self.primary_base_path - storage_providers = [FileStorageProviderBackend(hs, self.secondary_base_path)] + storage_providers = [ + StorageProviderWrapper( + FileStorageProviderBackend(hs, self.secondary_base_path), + store_local=True, + store_remote=False, + store_synchronous=True, + ) + ] self.filepaths = MediaFilePaths(self.primary_base_path) self.media_storage = MediaStorage( diff --git a/tests/push/test_email.py b/tests/push/test_email.py index c927a73fa6..e0aab1c046 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -205,8 +205,24 @@ class EmailPusherTests(HomeserverTestCase): # Multipart: plain text, base 64 encoded; html, base 64 encoded multipart_msg = email.message_from_bytes(msg) - txt = multipart_msg.get_payload()[0].get_payload(decode=True).decode() - html = multipart_msg.get_payload()[1].get_payload(decode=True).decode() + + # Extract the text (non-HTML) portion of the multipart Message, + # as a Message. + txt_message = multipart_msg.get_payload(i=0) + assert isinstance(txt_message, email.message.Message) + + # Extract the actual bytes from the Message object, and decode them to a `str`. + txt_bytes = txt_message.get_payload(decode=True) + assert isinstance(txt_bytes, bytes) + txt = txt_bytes.decode() + + # Do the same for the HTML portion of the multipart Message. + html_message = multipart_msg.get_payload(i=1) + assert isinstance(html_message, email.message.Message) + html_bytes = html_message.get_payload(decode=True) + assert isinstance(html_bytes, bytes) + html = html_bytes.decode() + self.assertIn("/_synapse/client/unsubscribe", txt) self.assertIn("/_synapse/client/unsubscribe", html) @@ -347,12 +363,17 @@ class EmailPusherTests(HomeserverTestCase): # That email should contain the room's avatar msg: bytes = args[5] # Multipart: plain text, base 64 encoded; html, base 64 encoded - html = ( - email.message_from_bytes(msg) - .get_payload()[1] - .get_payload(decode=True) - .decode() - ) + + # Extract the html Message object from the Multipart Message. + # We need the asserts to convince mypy that this is OK. + html_message = email.message_from_bytes(msg).get_payload(i=1) + assert isinstance(html_message, email.message.Message) + + # Extract the `bytes` from the html Message object, and decode to a `str`. + html = html_message.get_payload(decode=True) + assert isinstance(html, bytes) + html = html.decode() + self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html) def test_empty_room(self) -> None: diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 4e41a1c912..a56f1e2d5d 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): self.persist(type="m.room.create", key="", creator=USER_ID) self.check("get_invited_rooms_for_local_user", [USER_ID_2], []) event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite") + assert event.internal_metadata.instance_name is not None assert event.internal_metadata.stream_ordering is not None self.replicate() @@ -155,7 +156,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): "invite", event.event_id, PersistedEventPosition( - self.hs.get_instance_name(), + event.internal_metadata.instance_name, event.internal_metadata.stream_ordering, ), RoomVersions.V1.identifier, @@ -232,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): j2 = self.persist( type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" ) + assert j2.internal_metadata.instance_name is not None assert j2.internal_metadata.stream_ordering is not None self.replicate() expected_pos = PersistedEventPosition( - "master", j2.internal_metadata.stream_ordering + j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering ) self.check( "get_rooms_for_user_with_stream_ordering", @@ -288,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): msg, msgctx = self.build_event() self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)])) self.replicate() + assert j2.internal_metadata.instance_name is not None assert j2.internal_metadata.stream_ordering is not None event_source = RoomEventSource(self.hs) @@ -329,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): # joined_rooms list. if membership_changes: expected_pos = PersistedEventPosition( - "master", j2.internal_metadata.stream_ordering + j2.internal_metadata.instance_name, + j2.internal_metadata.stream_ordering, ) self.assertEqual( joined_rooms, diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py index a0f978911a..feb410a11d 100644 --- a/tests/rest/admin/test_event_reports.py +++ b/tests/rest/admin/test_event_reports.py @@ -24,7 +24,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.errors import Codes -from synapse.rest.client import login, report_event, room +from synapse.rest.client import login, reporting, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -37,7 +37,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, - report_event.register_servlets, + reporting.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -453,7 +453,7 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, - report_event.register_servlets, + reporting.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py index 992421ffe2..a85ea994de 100644 --- a/tests/rest/client/test_account.py +++ b/tests/rest/client/test_account.py @@ -427,13 +427,23 @@ class PasswordResetTestCase(unittest.HomeserverTestCase): text = None for part in mail.walk(): if part.get_content_type() == "text/plain": - text = part.get_payload(decode=True).decode("UTF-8") + text = part.get_payload(decode=True) + if text is not None: + # According to the logic table in `get_payload`, we know that + # the result of `get_payload` will be `bytes`, but mypy doesn't + # know this and complains. Thus, we assert the type. + assert isinstance(text, bytes) + text = text.decode("UTF-8") + break if not text: self.fail("Could not find text portion of email to parse") - assert text is not None + # `text` must be a `str`, after being decoded and determined just above + # to not be `None` or an empty `str`. + assert isinstance(text, str) + match = re.search(r"https://example.com\S+", text) assert match, "Could not find link in email" @@ -1209,13 +1219,23 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase): text = None for part in mail.walk(): if part.get_content_type() == "text/plain": - text = part.get_payload(decode=True).decode("UTF-8") + text = part.get_payload(decode=True) + if text is not None: + # According to the logic table in `get_payload`, we know that + # the result of `get_payload` will be `bytes`, but mypy doesn't + # know this and complains. Thus, we assert the type. + assert isinstance(text, bytes) + text = text.decode("UTF-8") + break if not text: self.fail("Could not find text portion of email to parse") - assert text is not None + # `text` must be a `str`, after being decoded and determined just above + # to not be `None` or an empty `str`. + assert isinstance(text, str) + match = re.search(r"https://example.com\S+", text) assert match, "Could not find link in email" diff --git a/tests/rest/client/test_models.py b/tests/rest/client/test_models.py index 534dd7bcf4..f8a56c80ca 100644 --- a/tests/rest/client/test_models.py +++ b/tests/rest/client/test_models.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING from typing_extensions import Literal from synapse._pydantic_compat import HAS_PYDANTIC_V2 -from synapse.rest.client.models import EmailRequestTokenBody +from synapse.types.rest.client import EmailRequestTokenBody if TYPE_CHECKING or HAS_PYDANTIC_V2: from pydantic.v1 import BaseModel, ValidationError diff --git a/tests/rest/client/test_report_event.py b/tests/rest/client/test_reporting.py similarity index 64% rename from tests/rest/client/test_report_event.py rename to tests/rest/client/test_reporting.py index 5903771e52..009deb9cb0 100644 --- a/tests/rest/client/test_report_event.py +++ b/tests/rest/client/test_reporting.py @@ -22,7 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin -from synapse.rest.client import login, report_event, room +from synapse.rest.client import login, reporting, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -35,7 +35,7 @@ class ReportEventTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, - report_event.register_servlets, + reporting.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -139,3 +139,92 @@ class ReportEventTestCase(unittest.HomeserverTestCase): "POST", self.report_path, data, access_token=self.other_user_tok ) self.assertEqual(response_status, channel.code, msg=channel.result["body"]) + + +class ReportRoomTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + reporting.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.other_user = self.register_user("user", "pass") + self.other_user_tok = self.login("user", "pass") + + self.room_id = self.helper.create_room_as( + self.other_user, tok=self.other_user_tok, is_public=True + ) + self.report_path = ( + f"/_matrix/client/unstable/org.matrix.msc4151/rooms/{self.room_id}/report" + ) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_reason_str(self) -> None: + data = {"reason": "this makes me sad"} + self._assert_status(200, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_no_reason(self) -> None: + data = {"not_reason": "for typechecking"} + self._assert_status(400, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_reason_nonstring(self) -> None: + data = {"reason": 42} + self._assert_status(400, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_reason_null(self) -> None: + data = {"reason": None} + self._assert_status(400, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_cannot_report_nonexistent_room(self) -> None: + """ + Tests that we don't accept event reports for rooms which do not exist. + """ + channel = self.make_request( + "POST", + "/_matrix/client/unstable/org.matrix.msc4151/rooms/!bloop:example.org/report", + {"reason": "i am very sad"}, + access_token=self.other_user_tok, + shorthand=False, + ) + self.assertEqual(404, channel.code, msg=channel.result["body"]) + self.assertEqual( + "Room does not exist", + channel.json_body["error"], + msg=channel.result["body"], + ) + + def _assert_status(self, response_status: int, data: JsonDict) -> None: + channel = self.make_request( + "POST", + self.report_path, + data, + access_token=self.other_user_tok, + shorthand=False, + ) + self.assertEqual(response_status, channel.code, msg=channel.result["body"]) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 27d5b0125f..81feb3ec29 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase): for e in events: e.internal_metadata.stream_ordering = self._next_stream_ordering + e.internal_metadata.instance_name = self.hs.get_instance_name() self._next_stream_ordering += 1 def _persist(txn: LoggingTransaction) -> None: diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 2029cd9c68..ee34baf46f 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -19,7 +19,10 @@ # # -from typing import List +import logging +from typing import List, Tuple + +from immutabledict import immutabledict from twisted.test.proto_helpers import MemoryReactor @@ -28,11 +31,13 @@ from synapse.api.filtering import Filter from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken from synapse.util import Clock from tests.unittest import HomeserverTestCase +logger = logging.getLogger(__name__) + class PaginationTestCase(HomeserverTestCase): """ @@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase): } chunk = self._filter_messages(filter) self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none]) + + +class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): + """ + Test `get_last_event_in_room_before_stream_ordering(...)` + """ + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _update_persisted_instance_name_for_event( + self, event_id: str, instance_name: str + ) -> None: + """ + Update the `instance_name` that persisted the the event in the database. + """ + return self.get_success( + self.store.db_pool.simple_update_one( + "events", + keyvalues={"event_id": event_id}, + updatevalues={"instance_name": instance_name}, + ) + ) + + def _send_event_on_instance( + self, instance_name: str, room_id: str, access_token: str + ) -> Tuple[JsonDict, PersistedEventPosition]: + """ + Send an event in a room and mimic that it was persisted by a specific + instance/worker. + """ + event_response = self.helper.send( + room_id, f"{instance_name} message", tok=access_token + ) + + self._update_persisted_instance_name_for_event( + event_response["event_id"], instance_name + ) + + event_pos = self.get_success( + self.store.get_position_for_event(event_response["event_id"]) + ) + + return event_response, event_pos + + def test_before_room_created(self) -> None: + """ + Test that no event is returned if we are using a token before the room was even created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=before_room_token.room_key, + ) + ) + + self.assertIsNone(last_event) + + def test_after_room_created(self) -> None: + """ + Test that an event is returned if we are using a token after the room was created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=after_room_token.room_key, + ) + ) + + self.assertIsNotNone(last_event) + + def test_activity_in_other_rooms(self) -> None: + """ + Test to make sure that the last event in the room is returned even if the + `stream_ordering` has advanced from activity in other rooms. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + # Create another room to advance the stream_ordering + self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the event we expect (which also means we know it's from the + # correct room) + self.assertEqual(last_event, event_response["event_id"]) + + def test_activity_after_token_has_no_effect(self) -> None: + """ + Test to make sure we return the last event before the token even if there is + activity after it. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + + after_room_token = self.event_sources.get_current_token() + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the last event before the token + self.assertEqual(last_event, event_response["event_id"]) + + def test_last_event_within_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that that is *within* the sharded + token (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing + that we can find an event within the tokens minimum and instance + `stream_ordering`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + # so we can sandwich event3 in the middle of the token + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event1 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos2.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event at/before the token in room1 + self.assertEqual( + last_event, + event_response3["event_id"], + f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to " + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + "event3": event_response3["event_id"], + } + ), + ) + + def test_last_event_before_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that is *before* the sharded token + (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event3 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos3.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event at/before the token in room1 + self.assertEqual( + last_event, + event_response2["event_id"], + f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to " + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + } + ), + )