This commit is contained in:
Éloi Rivard 2024-06-14 10:21:43 +01:00 committed by GitHub
commit f987fb5f10
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1877 additions and 1 deletions

View file

@ -0,0 +1 @@
Add support for MSC4098 (SCIM provisioning protocol).

View file

@ -70,6 +70,7 @@
- [Users](admin_api/user_admin_api.md)
- [Server Version](admin_api/version_api.md)
- [Federation](usage/administration/admin_api/federation.md)
- [SCIM provisioning](usage/administration/admin_api/scim_api.md)
- [Manhole](manhole.md)
- [Monitoring](metrics-howto.md)
- [Reporting Homeserver Usage Statistics](usage/administration/monitoring/reporting_homeserver_usage_statistics.md)

View file

@ -0,0 +1 @@
# SCIM API

View file

@ -65,6 +65,7 @@ from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.scim import SCIMResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.rest.well_known import well_known_resource
from synapse.server import HomeServer
@ -179,6 +180,7 @@ class SynapseHomeServer(HomeServer):
CLIENT_API_PREFIX: client_resource,
"/.well-known": well_known_resource(self),
"/_synapse/admin": AdminRestResource(self),
"/_matrix/client/unstable/coop.yaal/scim/": SCIMResource(self),
**build_synapse_client_resource_tree(self),
}
)

View file

@ -21,7 +21,7 @@
from typing import TYPE_CHECKING, Callable
from synapse.http.server import HttpServer, JsonResource
from synapse.rest import admin
from synapse.rest import admin, scim
from synapse.rest.client import (
account,
account_data,
@ -145,6 +145,7 @@ class ClientRestResource(JsonResource):
password_policy.register_servlets(hs, client_resource)
knock.register_servlets(hs, client_resource)
appservice_ping.register_servlets(hs, client_resource)
scim.register_servlets(hs, client_resource)
# moving to /_synapse/admin
if is_main_process:

442
synapse/rest/scim.py Normal file
View file

@ -0,0 +1,442 @@
"""This module implements a subset of the SCIM user provisioning protocol,
as proposed in the MSC4098.
The implemented endpoints are:
- /User (GET, POST, PUT, DELETE)
- /ServiceProviderConfig (GET)
- /Schemas (GET)
- /ResourceTypes (GET)
The supported SCIM User attributes are:
- userName
- password
- emails
- phoneNumbers
- displayName
- photos
- active
References:
https://github.com/matrix-org/matrix-spec-proposals/pull/4098
https://datatracker.ietf.org/doc/html/rfc7642
https://datatracker.ietf.org/doc/html/rfc7643
https://datatracker.ietf.org/doc/html/rfc7644
"""
import datetime
import logging
import re
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple
from synapse.api.errors import SynapseError
from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import (
RestServlet,
parse_integer,
parse_json_object_from_request,
)
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin
from synapse.types import JsonDict, UserID
from .scim_constants import (
RESOURCE_TYPE_USER,
SCHEMA_RESOURCE_TYPE,
SCHEMA_SCHEMA,
SCHEMA_SERVICE_PROVIDER_CONFIG,
SCHEMA_USER,
SCIM_SERVICE_PROVIDER_CONFIG,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
SCIM_PREFIX = "_matrix/client/unstable/coop.yaal/scim"
logger = logging.getLogger(__name__)
class SCIMResource(JsonResource):
"""The REST resource which gets mounted at
/_matrix/client/unstable/coop.yaal/scim"""
def __init__(self, hs: "HomeServer"):
JsonResource.__init__(self, hs, canonical_json=False)
register_servlets(hs, self)
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SchemaListServlet(hs).register(http_server)
SchemaServlet(hs).register(http_server)
ResourceTypeListServlet(hs).register(http_server)
ResourceTypeServlet(hs).register(http_server)
ServiceProviderConfigServlet(hs).register(http_server)
UserListServlet(hs).register(http_server)
UserServlet(hs).register(http_server)
# TODO: test requests with additional/wrong attributes
# TODO: take inspiration from tests/rest/admin/test_user.py
# TODO: test user passwords after creation/update
class SCIMServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.config = hs.config
self.store = hs.get_datastores().main
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
self.is_mine = hs.is_mine
self.profile_handler = hs.get_profile_handler()
self.default_nb_items_per_page = 100
def absolute_meta_location(self, payload: JsonDict) -> JsonDict:
prefix = self.config.server.public_baseurl + SCIM_PREFIX
if not payload["meta"]["location"].startswith(prefix):
payload["meta"]["location"] = prefix + payload["meta"]["location"]
return payload
def make_list_response_payload(
self, items, start_index=1, count=None, total_results=None
):
return {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"totalResults": total_results or len(items),
"itemsPerPage": count or len(items),
"startIndex": start_index,
"Resources": items,
}
def make_error_response(self, status, message):
return status, {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"status": status.value if isinstance(status, HTTPStatus) else status,
"detail": message,
}
def parse_pagination_params(self, request):
start_index = parse_integer(request, "startIndex", default=1, negative=True)
count = parse_integer(
request, "count", default=self.default_nb_items_per_page, negative=True
)
# RFC7644 §3.4.2.4
# A value less than 1 SHALL be interpreted as 1.
#
# https://datatracker.ietf.org/doc/html/rfc7644#section-3.4.2.4
if start_index < 1:
start_index = 1
# RFC7644 §3.4.2.4
# A negative value SHALL be interpreted as 0.
#
# https://datatracker.ietf.org/doc/html/rfc7644#section-3.4.2.4
if count < 0:
count = 0
return start_index, count
async def get_user_data(self, user_id: str):
user_id_obj = UserID.from_string(user_id)
user = await self.store.get_user_by_id(user_id)
profile = await self.store.get_profileinfo(user_id_obj)
threepids = await self.store.user_get_threepids(user_id)
if not user:
raise SynapseError(
HTTPStatus.NOT_FOUND,
"User not found",
)
if not self.is_mine(user_id_obj):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Only local users can be admins of this homeserver",
)
location = f"{self.config.server.public_baseurl}{SCIM_PREFIX}/Users/{user_id}"
creation_datetime = datetime.datetime.fromtimestamp(user.creation_ts)
payload = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"meta": {
"resourceType": "User",
"created": creation_datetime.isoformat(),
"lastModified": creation_datetime.isoformat(),
"location": location,
},
"id": user_id,
"externalId": user_id,
"userName": user_id_obj.localpart,
"active": not user.is_deactivated,
}
for threepid in threepids:
if threepid.medium == "email":
payload.setdefault("emails", []).append({"value": threepid.address})
if threepid.medium == "msisdn":
payload.setdefault("phoneNumbers", []).append(
{"value": threepid.address}
)
if profile.display_name:
payload["displayName"] = profile.display_name
if profile.avatar_url:
payload["photos"] = [{
"type": "photo",
"primary": True,
"value": profile.avatar_url,
}]
return payload
class UserServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/Users/(?P<user_id>[^/]*)")]
async def on_GET(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
try:
payload = await self.get_user_data(user_id)
return HTTPStatus.OK, payload
except SynapseError as exc:
return self.make_error_response(exc.code, exc.msg)
async def on_DELETE(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)
deactivate_account_handler = self.hs.get_deactivate_account_handler()
is_admin = await self.auth.is_server_admin(requester)
try:
await deactivate_account_handler.deactivate_account(
user_id, erase_data=True, requester=requester, by_admin=is_admin
)
except SynapseError as exc:
return self.make_error_response(exc.code, exc.msg)
return HTTPStatus.NO_CONTENT, ""
async def on_PUT(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)
body = parse_json_object_from_request(request)
try:
user_id_obj = UserID.from_string(user_id)
threepids = await self.store.user_get_threepids(user_id)
default_display_name = body.get("displayName", "")
await self.profile_handler.set_displayname(
user_id_obj, requester, default_display_name, True
)
avatar_url = body["photos"][0]["value"] if body.get("photos") else ""
await self.profile_handler.set_avatar_url(
user_id_obj, requester, avatar_url, True
)
if threepids is not None:
new_threepids = {
("email", email["value"]) for email in body["emails"]
} | {
("msisdn", phone_number["value"])
for phone_number in body["phoneNumbers"]
}
# get changed threepids (added and removed)
cur_threepids = {
(threepid.medium, threepid.address)
for threepid in await self.store.user_get_threepids(user_id)
}
add_threepids = new_threepids - cur_threepids
del_threepids = cur_threepids - new_threepids
# remove old threepids
for medium, address in del_threepids:
try:
# Attempt to remove any known bindings of this third-party ID
# and user ID from identity servers.
await self.hs.get_identity_handler().try_unbind_threepid(
user_id, medium, address, id_server=None
)
except Exception:
logger.exception("Failed to remove threepids")
raise SynapseError(500, "Failed to remove threepids")
# Delete the local association of this user ID and third-party ID.
await self.auth_handler.delete_local_threepid(
user_id, medium, address
)
# add new threepids
current_time = self.hs.get_clock().time_msec()
for medium, address in add_threepids:
await self.auth_handler.add_threepid(
user_id, medium, address, current_time
)
payload = await self.get_user_data(user_id)
return HTTPStatus.OK, payload
except SynapseError as exc:
return self.make_error_response(exc.code, exc.msg)
class UserListServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/Users/?$")]
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
try:
await assert_requester_is_admin(self.auth, request)
start_index, count = self.parse_pagination_params(request)
items, total = await self.store.get_users_paginate(
start=start_index - 1,
limit=count,
)
users = [await self.get_user_data(item.name) for item in items]
payload = self.make_list_response_payload(
users, start_index=start_index, count=count, total_results=total
)
return HTTPStatus.OK, payload
except SynapseError as exc:
return self.make_error_response(exc.code, exc.msg)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
try:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)
body = parse_json_object_from_request(request)
from synapse.rest.client.register import RegisterRestServlet
register = RegisterRestServlet(self.hs)
registration_arguments = {
"by_admin": True,
"approved": True,
"localpart": body["userName"],
}
if password := body.get("password"):
registration_arguments["password_hash"] = await self.auth_handler.hash(
password
)
if display_name := body.get("displayName"):
registration_arguments["default_display_name"] = display_name
user_id = await register.registration_handler.register_user(
**registration_arguments
)
await register._create_registration_details(
user_id,
body,
should_issue_refresh_token=True,
)
now_ts = self.hs.get_clock().time_msec()
for email in body.get("emails", []):
await self.store.user_add_threepid(
user_id, "email", email["value"], now_ts, now_ts
)
for phone_number in body.get("phoneNumbers", []):
await self.store.user_add_threepid(
user_id, "msisdn", phone_number["value"], now_ts, now_ts
)
avatar_url = body["photos"][0]["value"] if body.get("photos") else None
if avatar_url:
await self.profile_handler.set_avatar_url(
UserID.from_string(user_id), requester, avatar_url, True
)
payload = await self.get_user_data(user_id)
return HTTPStatus.CREATED, payload
except SynapseError as exc:
return self.make_error_response(exc.code, exc.msg)
class ServiceProviderConfigServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/ServiceProviderConfig$")]
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
return HTTPStatus.OK, SCIM_SERVICE_PROVIDER_CONFIG
class SchemaListServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/Schemas$")]
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
start_index, count = self.parse_pagination_params(request)
resources = [
self.absolute_meta_location(SCHEMA_SERVICE_PROVIDER_CONFIG),
self.absolute_meta_location(SCHEMA_RESOURCE_TYPE),
self.absolute_meta_location(SCHEMA_SCHEMA),
self.absolute_meta_location(SCHEMA_USER),
]
return HTTPStatus.OK, self.make_list_response_payload(
resources, start_index=start_index, count=count
)
class SchemaServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/Schemas/(?P<schema_id>[^/]*)$")]
async def on_GET(
self, request: SynapseRequest, schema_id: str
) -> Tuple[int, JsonDict]:
schemas = {
"urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig": SCHEMA_SERVICE_PROVIDER_CONFIG,
"urn:ietf:params:scim:schemas:core:2.0:ResourceType": SCHEMA_RESOURCE_TYPE,
"urn:ietf:params:scim:schemas:core:2.0:Schema": SCHEMA_SCHEMA,
"urn:ietf:params:scim:schemas:core:2.0:User": SCHEMA_USER,
}
try:
return HTTPStatus.OK, self.absolute_meta_location(schemas[schema_id])
except KeyError:
return self.make_error_response(HTTPStatus.NOT_FOUND, "Object not found")
class ResourceTypeListServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/ResourceTypes$")]
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
start_index, count = self.parse_pagination_params(request)
resources = [self.absolute_meta_location(RESOURCE_TYPE_USER)]
return HTTPStatus.OK, self.make_list_response_payload(
resources, start_index=start_index, count=count
)
class ResourceTypeServlet(SCIMServlet):
PATTERNS = [re.compile(f"^/{SCIM_PREFIX}/ResourceTypes/(?P<resource_type>[^/]*)$")]
async def on_GET(
self, request: SynapseRequest, resource_type: str
) -> Tuple[int, JsonDict]:
resource_types = {
"User": RESOURCE_TYPE_USER,
}
try:
return HTTPStatus.OK, self.absolute_meta_location(
resource_types[resource_type]
)
except KeyError:
return self.make_error_response(HTTPStatus.NOT_FOUND, "Object not found")

View file

@ -0,0 +1,824 @@
SCIM_SERVICE_PROVIDER_CONFIG = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"],
"meta": {
"location": "/ServiceProviderConfig",
"resourceType": "ServiceProviderConfig",
},
"documentationUri": "https://element-hq.github.io/synapse/latest/admin_api/scim_api.html",
"patch": {"supported": False},
"bulk": {"supported": False, "maxOperations": 0, "maxPayloadSize": 0},
"changePassword": {"supported": True},
"filter": {"supported": False, "maxResults": 0},
"sort": {"supported": False},
"etag": {"supported": False},
"authenticationSchemes": [
{
"name": "OAuth Bearer Token",
"description": "Authentication scheme using the OAuth Bearer Token Standard",
"specUri": "http://www.rfc-editor.org/info/rfc6750",
"documentationUri": "https://element-hq.github.io/synapse/latest/openid.html",
"type": "oauthbearertoken",
"primary": True, # TODO
},
{
"name": "HTTP Basic",
"description": "Authentication scheme using the HTTP Basic Standard",
"specUri": "http://www.rfc-editor.org/info/rfc2617",
"documentationUri": "https://element-hq.github.io/synapse/latest/modules/password_auth_provider_callbacks.html",
"type": "httpbasic",
},
],
}
SCHEMA_SERVICE_PROVIDER_CONFIG = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Schema"],
"id": "urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig",
"meta": {
"resourceType": "Schema",
"location": "/Schemas/urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig",
},
"name": "Service Provider Configuration",
"description": """Schema for representing the service provider's configuration""",
"attributes": [
{
"name": "documentationUri",
"type": "reference",
"referenceTypes": ["external"],
"multiValued": False,
"description": """An HTTP-addressable URL pointing to the service provider's human-consumable help documentation.""",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "patch",
"type": "complex",
"multiValued": False,
"description": """A complex type that specifies PATCH configuration options.""",
"required": True,
"returned": "default",
"mutability": "readOnly",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value specifying whether or not the operation is supported.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
}
],
},
{
"name": "bulk",
"type": "complex",
"multiValued": False,
"description": """A complex type that specifies bulk configuration options.""",
"required": True,
"returned": "default",
"mutability": "readOnly",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value specifying whether or not the operation is supported.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "maxOperations",
"type": "integer",
"multiValued": False,
"description": """An integer value specifying the maximum number of operations.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "maxPayloadSize",
"type": "integer",
"multiValued": False,
"description": """An integer value specifying the maximum payload size in bytes.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
],
},
{
"name": "filter",
"type": "complex",
"multiValued": False,
"description": """A complex type that specifies FILTER options.""",
"required": True,
"returned": "default",
"mutability": "readOnly",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value specifying whether or not the operation is supported.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "maxResults",
"type": "integer",
"multiValued": False,
"description": """An integer value specifying the maximum number of resources returned in a response.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
],
},
{
"name": "changePassword",
"type": "complex",
"multiValued": False,
"description": """A complex type that specifies configuration options related to changing a password.""",
"required": True,
"returned": "default",
"mutability": "readOnly",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value specifying whether or not the operation is supported.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
}
],
},
{
"name": "sort",
"type": "complex",
"multiValued": False,
"description": """A complex type that specifies sort result options.""",
"required": True,
"returned": "default",
"mutability": "readOnly",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value specifying whether or not the operation is supported.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
}
],
},
{
"name": "authenticationSchemes",
"type": "complex",
"multiValued": True,
"description": """A complex type that specifies supported authentication scheme properties.""",
"required": True,
"returned": "default",
"mutability": "readOnly",
"subAttributes": [
{
"name": "name",
"type": "string",
"multiValued": False,
"description": """The common authentication scheme name, e.g., HTTP Basic.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": """A description of the authentication scheme.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "specUri",
"type": "reference",
"referenceTypes": ["external"],
"multiValued": False,
"description": """An HTTP-addressable URL pointing to the authentication scheme's specification.""",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "documentationUri",
"type": "reference",
"referenceTypes": ["external"],
"multiValued": False,
"description": """An HTTP-addressable URL pointing to the authentication scheme's usage documentation.""",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
],
},
],
}
SCHEMA_RESOURCE_TYPE = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Schema"],
"id": "urn:ietf:params:scim:schemas:core:2.0:ResourceType",
"meta": {
"resourceType": "Schema",
"location": "/Schemas/urn:ietf:params:scim:schemas:core:2.0:ResourceType",
},
"name": "ResourceType",
"description": """Specifies the schema that describes a SCIM resource type""",
"attributes": [
{
"name": "id",
"type": "string",
"multiValued": False,
"description": """The resource type's server unique id. May be the same as the 'name' attribute.""",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "name",
"type": "string",
"multiValued": False,
"description": """The resource type name. When applicable, service providers MUST specify the name, e.g., 'User'.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": """The resource type's human-readable description. When applicable, service providers MUST specify the description.""",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "endpoint",
"type": "reference",
"referenceTypes": ["uri"],
"multiValued": False,
"description": """The resource type's HTTP-addressable endpoint relative to the Base URL, e.g., '/Users'.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "schema",
"type": "reference",
"referenceTypes": ["uri"],
"multiValued": False,
"description": """The resource type's primary/base schema URI.""",
"required": True,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "schemaExtensions",
"type": "complex",
"multiValued": False,
"description": """A list of URIs of the resource type's schema extensions.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
"subAttributes": [
{
"name": "schema",
"type": "reference",
"referenceTypes": ["uri"],
"multiValued": False,
"description": "The URI of a schema extension.",
"required": True,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "required",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value that specifies whether or not the schema extension is required for the resource type. If True, a resource of this type MUST include this schema extension and also include any attributes declared as required in this schema extension. If False, a resource of this type MAY omit this schema extension.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
},
],
},
],
}
SCHEMA_SCHEMA = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Schema"],
"id": "urn:ietf:params:scim:schemas:core:2.0:Schema",
"meta": {
"resourceType": "Schema",
"location": "/Schemas/urn:ietf:params:scim:schemas:core:2.0:Schema",
},
"name": "Schema",
"description": """Specifies the schema that describes a SCIM schema""",
"attributes": [
{
"name": "id",
"type": "string",
"multiValued": False,
"description": """The unique URI of the schema. When applicable, service providers MUST specify the URI.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "name",
"type": "string",
"multiValued": False,
"description": """The schema's human-readable name. When applicable, service providers MUST specify the name, e.g., 'User'.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": """The schema's human-readable name. When applicable, service providers MUST specify the name, e.g., 'User'.""",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "attributes",
"type": "complex",
"multiValued": True,
"description": """A complex attribute that includes the attributes of a schema.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
"subAttributes": [
{
"name": "name",
"type": "string",
"multiValued": False,
"description": "The attribute's name.",
"required": True,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "type",
"type": "string",
"multiValued": False,
"description": """The attribute's data type. Valid values include 'string', 'complex', 'boolean', 'decimal', 'integer', 'dateTime', 'reference'.""",
"required": True,
"canonicalValues": [
"string",
"complex",
"boolean",
"decimal",
"integer",
"dateTime",
"reference",
],
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "multiValued",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value indicating an attribute's plurality.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": """A human-readable description of the attribute.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "required",
"type": "boolean",
"multiValued": False,
"description": """A boolean value indicating whether or not the attribute is required.""",
"required": False,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "canonicalValues",
"type": "string",
"multiValued": True,
"description": """A collection of canonical values. When applicable, service providers MUST specify the canonical types, e.g., 'work', 'home'.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "caseExact",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value indicating whether or not a string attribute is case sensitive.""",
"required": False,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "mutability",
"type": "string",
"multiValued": False,
"description": """Indicates whether or not an attribute is modifiable.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": [
"readOnly",
"readWrite",
"immutable",
"writeOnly",
],
},
{
"name": "returned",
"type": "string",
"multiValued": False,
"description": """Indicates when an attribute is returned in a response (e.g., to a query).""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": [
"always",
"never",
"default",
"request",
],
},
{
"name": "uniqueness",
"type": "string",
"multiValued": False,
"description": "Indicates how unique a value must be.",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": ["none", "server", "global"],
},
{
"name": "referenceTypes",
"type": "string",
"multiValued": True,
"description": """Used only with an attribute of type 'reference'. Specifies a SCIM resourceType that a reference attribute MAY refer to, e.g., 'User'.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "subAttributes",
"type": "complex",
"multiValued": True,
"description": """Used to define the sub-attributes of a complex attribute.""",
"required": False,
"mutability": "readOnly",
"returned": "default",
"subAttributes": [
{
"name": "name",
"type": "string",
"multiValued": False,
"description": "The attribute's name.",
"required": True,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "type",
"type": "string",
"multiValued": False,
"description": """The attribute's data type. Valid values include 'string', 'complex', 'boolean', 'decimal', 'integer', 'dateTime', 'reference'.""",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": [
"string",
"complex",
"boolean",
"decimal",
"integer",
"dateTime",
"reference",
],
},
{
"name": "multiValued",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value indicating an attribute's plurality.""",
"required": True,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": """A human-readable description of the attribute.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "required",
"type": "boolean",
"multiValued": False,
"description": """A boolean value indicating whether or not the attribute is required.""",
"required": False,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "canonicalValues",
"type": "string",
"multiValued": True,
"description": """A collection of canonical values. When applicable, service providers MUST specify the canonical types, e.g., 'work', 'home'.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
{
"name": "caseExact",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value indicating whether or not a string attribute is case sensitive.""",
"required": False,
"mutability": "readOnly",
"returned": "default",
},
{
"name": "mutability",
"type": "string",
"multiValued": False,
"description": """Indicates whether or not an attribute is modifiable.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": [
"readOnly",
"readWrite",
"immutable",
"writeOnly",
],
},
{
"name": "returned",
"type": "string",
"multiValued": False,
"description": """Indicates when an attribute is returned in a response (e.g., to a query).""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": [
"always",
"never",
"default",
"request",
],
},
{
"name": "uniqueness",
"type": "string",
"multiValued": False,
"description": "Indicates how unique a value must be.",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"canonicalValues": ["none", "server", "global"],
},
{
"name": "referenceTypes",
"type": "string",
"multiValued": False,
"description": """Used only with an attribute of type 'reference'. Specifies a SCIM resourceType that a reference attribute MAY refer to, e.g., 'User'.""",
"required": False,
"caseExact": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
},
],
},
],
},
],
}
SCHEMA_USER = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Schema"],
"id": "urn:ietf:params:scim:schemas:core:2.0:User",
"meta": {
"resourceType": "Schema",
"location": "/Schemas/urn:ietf:params:scim:schemas:core:2.0:User",
},
"name": "User",
"description": "User Account",
"attributes": [
{
"name": "userName",
"type": "string",
"multiValued": False,
"description": """Unique identifier for the User, typically used by the user to directly authenticate to the service provider. Each User MUST include a non-empty userName value. This identifier MUST be unique across the service provider's entire set of Users. REQUIRED.""",
"required": True,
"caseExact": False,
"mutability": "readWrite",
"returned": "default",
"uniqueness": "server",
},
{
"name": "displayName",
"type": "string",
"multiValued": False,
"description": """The name of the User, suitable for display to end-users. The name SHOULD be the full name of the User being described, if known.""",
"required": False,
"caseExact": False,
"mutability": "readWrite",
"returned": "default",
"uniqueness": "none",
},
{
"name": "active",
"type": "boolean",
"multiValued": False,
"description": """A Boolean value indicating the User's administrative status.""",
"required": False,
"mutability": "readWrite",
"returned": "default",
},
{
"name": "password",
"type": "string",
"multiValued": False,
"description": """The User's cleartext password. This attribute is intended to be used as a means to specify an initial password when creating a new User or to reset an existing User's password.""",
"required": False,
"caseExact": False,
"mutability": "writeOnly",
"returned": "never",
"uniqueness": "none",
},
{
"name": "emails",
"type": "complex",
"multiValued": True,
"description": """Email addresses for the user. The value SHOULD be canonicalized by the service provider, e.g., 'bjensen@example.com' instead of 'bjensen@EXAMPLE.COM'. Canonical type values of 'work', 'home', and 'other'.""",
"required": False,
"subAttributes": [
{
"name": "value",
"type": "string",
"multiValued": False,
"description": """Email addresses for the user. The value SHOULD be canonicalized by the service provider, e.g., 'bjensen@example.com' instead of 'bjensen@EXAMPLE.COM'. Canonical type values of 'work', 'home', and 'other'.""",
"required": False,
"caseExact": False,
"mutability": "readWrite",
"returned": "default",
"uniqueness": "none",
},
],
"mutability": "readWrite",
"returned": "default",
"uniqueness": "none",
},
{
"name": "phoneNumbers",
"type": "complex",
"multiValued": True,
"description": """Phone numbers for the User. The value SHOULD be canonicalized by the service provider according to the format specified in RFC 3966, e.g., 'tel:+1-201-555-0123'. Canonical type values of 'work', 'home', 'mobile', 'fax', 'pager', and 'other'.""",
"required": False,
"subAttributes": [
{
"name": "value",
"type": "string",
"multiValued": False,
"description": "Phone number of the User.",
"required": False,
"caseExact": False,
"mutability": "readWrite",
"returned": "default",
"uniqueness": "none",
},
],
"mutability": "readWrite",
"returned": "default",
},
{
"name": "photos",
"type": "complex",
"multiValued": True,
"description": "URLs of photos of the User.",
"required": False,
"subAttributes": [
{
"name": "value",
"type": "reference",
"referenceTypes": ["external"],
"multiValued": False,
"description": "URL of a photo of the User.",
"required": False,
"caseExact": False,
"mutability": "readWrite",
"returned": "default",
"uniqueness": "none",
},
],
"mutability": "readWrite",
"returned": "default",
},
],
}
RESOURCE_TYPE_USER = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:ResourceType"],
"meta": {
"location": "/ResourceTypes/User",
"resourceType": "ResourceType",
},
"id": "User",
"name": "User",
"endpoint": "/Users",
"description": "User Account",
"schema": "urn:ietf:params:scim:schemas:core:2.0:User",
"schemaExtensions": [],
}

604
tests/rest/test_scim.py Normal file
View file

@ -0,0 +1,604 @@
from unittest import mock
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
import synapse.rest.scim
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.types import JsonDict, UserID
from synapse.util import Clock
from tests import unittest
class UserProvisioningTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource,
synapse.rest.scim.register_servlets,
login.register_servlets,
]
url = "/_matrix/client/unstable/coop.yaal/scim"
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.admin_user_id = self.register_user(
"admin", "pass", admin=True, displayname="admin display name"
)
self.admin_user_tok = self.login("admin", "pass")
self.user_user_id = self.register_user(
"user", "pass", admin=False, displayname="user display name"
)
self.other_user_ids = [
self.register_user(f"user{i:02d}", "pass", displayname=f"user{i}")
for i in range(15)
]
self.get_success(
self.store.user_add_threepid(
self.user_user_id, "email", "user@mydomain.tld", 0, 0
)
)
self.get_success(
self.store.user_add_threepid(
self.user_user_id, "msisdn", "+1-12345678", 1, 1
)
)
self.get_success(
self.store.set_profile_avatar_url(
UserID.from_string(self.user_user_id),
"https://mydomain.tld/photo.webp",
)
)
def test_get_user(self) -> None:
"""
Nominal test of the /Users/<user_id> endpoint.
"""
channel = self.make_request(
"GET",
f"{self.url}/Users/{self.user_user_id}",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
{
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"meta": {
"resourceType": "User",
"created": mock.ANY,
"lastModified": mock.ANY,
"location": "https://test/_matrix/client/unstable/coop.yaal/scim/Users/@user:test",
},
"id": "@user:test",
"userName": "user",
"externalId": "@user:test",
"phoneNumbers": [{"value": "+1-12345678"}],
"emails": [{"value": "user@mydomain.tld"}],
"active": True,
"displayName": "user display name",
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
},
channel.json_body,
)
def test_get_users(self) -> None:
"""
Nominal test of the /Users endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/Users",
access_token=self.admin_user_tok,
)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
self.assertEqual(len(channel.json_body["Resources"]), 17)
self.assertTrue(
{
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"meta": {
"resourceType": "User",
"created": mock.ANY,
"lastModified": mock.ANY,
"location": "https://test/_matrix/client/unstable/coop.yaal/scim/Users/@user:test",
},
"id": "@user:test",
"userName": "user",
"externalId": "@user:test",
"phoneNumbers": [{"value": "+1-12345678"}],
"emails": [{"value": "user@mydomain.tld"}],
"active": True,
"displayName": "user display name",
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
}
in channel.json_body["Resources"],
)
def test_get_users_pagination_count(self) -> None:
"""
Test the 'count' parameter of the /Users endpoint.
"""
channel = self.make_request(
"GET",
f"{self.url}/Users?count=2",
access_token=self.admin_user_tok,
)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
self.assertEqual(len(channel.json_body["Resources"]), 2)
def test_get_users_pagination_start_index(self) -> None:
"""
Test the 'startIndex' parameter of the /Users endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/Users?startIndex=2&count=1",
access_token=self.admin_user_tok,
)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
self.assertEqual(len(channel.json_body["Resources"]), 1)
self.assertEqual(channel.json_body["Resources"][0]["id"], "@user00:test")
def test_get_users_pagination_negative_count(self) -> None:
"""
RFC7644 §3.4.2.4
A negative value SHALL be interpreted as 0.
A value of "0" indicates that no resource results are
to be returned except for "totalResults".
https://datatracker.ietf.org/doc/html/rfc7644#section-3.4.2.4
"""
channel = self.make_request(
"GET",
f"{self.url}/Users?count=-1",
access_token=self.admin_user_tok,
)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
self.assertEqual(
0,
len(channel.json_body["Resources"]),
)
self.assertEqual(
17,
channel.json_body["totalResults"],
)
def test_get_users_pagination_negative_start_index(self) -> None:
"""
RFC7644 §3.4.2.4
A value less than 1 SHALL be interpreted as 1.
https://datatracker.ietf.org/doc/html/rfc7644#section-3.4.2.4
"""
channel = self.make_request(
"GET",
f"{self.url}/Users?startIndex=-1",
access_token=self.admin_user_tok,
)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
self.assertEqual(len(channel.json_body["Resources"]), 17)
self.assertEqual(channel.json_body["Resources"][0]["id"], "@admin:test")
def test_get_users_pagination_big_start_index(self) -> None:
"""
Test the 'startIndex' parameter of the /Users endpoint
is not greater than the number of users.
"""
channel = self.make_request(
"GET",
f"{self.url}/Users?startIndex=1234",
access_token=self.admin_user_tok,
)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
self.assertEqual(
0,
len(channel.json_body["Resources"]),
)
self.assertEqual(
17,
channel.json_body["totalResults"],
)
def test_get_invalid_user(self) -> None:
"""
Attempt to retrieve user information with a wrong username.
"""
channel = self.make_request(
"GET",
f"{self.url}/Users/@bjensen:test",
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(
["urn:ietf:params:scim:api:messages:2.0:Error"],
channel.json_body["schemas"],
)
def test_post_user(self) -> None:
"""
Create a new user.
"""
request_data: JsonDict = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"userName": "bjensen",
"externalId": "bjensen@test",
"phoneNumbers": [{"value": "+1-12345678"}],
"emails": [{"value": "bjensen@mydomain.tld"}],
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
"active": True,
"displayName": "bjensen display name",
"password": "correct horse battery staple",
}
channel = self.make_request(
"POST",
f"{self.url}/Users/",
request_data,
access_token=self.admin_user_tok,
)
self.assertEqual(201, channel.code, msg=channel.json_body)
expected = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"meta": {
"resourceType": "User",
"created": mock.ANY,
"lastModified": mock.ANY,
"location": "https://test/_matrix/client/unstable/coop.yaal/scim/Users/@bjensen:test",
},
"id": "@bjensen:test",
"externalId": "@bjensen:test",
"phoneNumbers": [{"value": "+1-12345678"}],
"userName": "bjensen",
"emails": [{"value": "bjensen@mydomain.tld"}],
"active": True,
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
"displayName": "bjensen display name",
}
self.assertEqual(expected, channel.json_body)
channel = self.make_request(
"GET",
f"{self.url}/Users/@bjensen:test",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(expected, channel.json_body)
def test_delete_user(self) -> None:
"""
Delete an existing user.
"""
channel = self.make_request(
"DELETE",
f"{self.url}/Users/@user:test",
access_token=self.admin_user_tok,
)
self.assertEqual(204, channel.code)
def test_delete_invalid_user(self) -> None:
"""
Attempt to delete a user with a non-existing username.
"""
channel = self.make_request(
"GET",
f"{self.url}/Users/@bjensen:test",
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code)
self.assertEqual(
["urn:ietf:params:scim:api:messages:2.0:Error"],
channel.json_body["schemas"],
)
def test_replace_user(self) -> None:
"""
Replace user information.
"""
channel = self.make_request(
"GET",
f"{self.url}/Users/@user:test",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
{
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"meta": {
"resourceType": "User",
"created": mock.ANY,
"lastModified": mock.ANY,
"location": "https://test/_matrix/client/unstable/coop.yaal/scim/Users/@user:test",
},
"id": "@user:test",
"userName": "user",
"externalId": "@user:test",
"phoneNumbers": [{"value": "+1-12345678"}],
"emails": [{"value": "user@mydomain.tld"}],
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
"active": True,
"displayName": "user display name",
},
channel.json_body,
)
request_data: JsonDict = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"phoneNumbers": [{"value": "+1-11112222"}],
"emails": [{"value": "newmail@mydomain.tld"}],
"displayName": "new display name",
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
}
channel = self.make_request(
"PUT",
f"{self.url}/Users/@user:test",
request_data,
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code)
expected = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"meta": {
"resourceType": "User",
"created": mock.ANY,
"lastModified": mock.ANY,
"location": "https://test/_matrix/client/unstable/coop.yaal/scim/Users/@user:test",
},
"id": "@user:test",
"externalId": "@user:test",
"phoneNumbers": [{"value": "+1-11112222"}],
"userName": "user",
"emails": [{"value": "newmail@mydomain.tld"}],
"active": True,
"displayName": "new display name",
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
}
self.assertEqual(expected, channel.json_body)
channel = self.make_request(
"GET",
f"{self.url}/Users/@user:test",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(expected, channel.json_body)
def test_replace_invalid_user(self) -> None:
"""
Attempt to replace user information based on a wrong username.
"""
request_data: JsonDict = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"phoneNumbers": [{"value": "+1-11112222"}],
"emails": [{"value": "newmail@mydomain.tld"}],
"displayName": "new display name",
"photos": [
{
"type": "photo",
"primary": True,
"value": "https://mydomain.tld/photo.webp",
}
],
}
channel = self.make_request(
"PUT",
f"{self.url}/Users/@bjensen:test",
request_data,
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code)
self.assertEqual(
["urn:ietf:params:scim:api:messages:2.0:Error"],
channel.json_body["schemas"],
)
class SCIMMetadataTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource,
synapse.rest.scim.register_servlets,
login.register_servlets,
]
url = "/_matrix/client/unstable/coop.yaal/scim"
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.admin_user_id = self.register_user(
"admin", "pass", admin=True, displayname="admin display name"
)
self.admin_user_tok = self.login("admin", "pass")
self.schemas = [
"urn:ietf:params:scim:schemas:core:2.0:User",
"urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig",
"urn:ietf:params:scim:schemas:core:2.0:Schema",
"urn:ietf:params:scim:schemas:core:2.0:ResourceType",
]
def test_get_schemas(self) -> None:
"""
Read the /Schemas endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/Schemas",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
for schema in self.schemas:
self.assertTrue(
any(item["id"] == schema for item in channel.json_body["Resources"])
)
def test_get_schema(self) -> None:
"""
Read the /Schemas endpoint
"""
for schema in self.schemas:
channel = self.make_request(
"GET",
f"{self.url}/Schemas/{schema}",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["id"], schema)
def test_get_invalid_schema(self) -> None:
"""
Read the /Schemas endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/Schemas/urn:ietf:params:scim:schemas:core:2.0:Group",
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(
["urn:ietf:params:scim:api:messages:2.0:Error"],
channel.json_body["schemas"],
)
def test_get_service_provider_config(self) -> None:
"""
Read the /ServiceProviderConfig endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/ServiceProviderConfig",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"],
)
def test_get_resource_types(self) -> None:
"""
Read the /ResourceTypes endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/ResourceTypes",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
)
def test_get_resource_type_user(self) -> None:
"""
Read the /ResourceTypes/User endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/ResourceTypes/User",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
channel.json_body["schemas"],
["urn:ietf:params:scim:schemas:core:2.0:ResourceType"],
)
def test_get_invalid_resource_type(self) -> None:
"""
Read an invalid /ResourceTypes/ endpoint
"""
channel = self.make_request(
"GET",
f"{self.url}/ResourceTypes/Group",
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(
["urn:ietf:params:scim:api:messages:2.0:Error"],
channel.json_body["schemas"],
)