Add tests for event auth

This commit is contained in:
Mark Haines 2017-01-04 13:16:13 +00:00
parent 899a3a1268
commit b6df1dc7ec
3 changed files with 378 additions and 4 deletions

View file

@ -83,8 +83,8 @@ class Auth(object):
if not hasattr(event, "room_id"):
raise AuthError(500, "Event has no room_id: %s" % event)
if do_sig_check:
sender_domain = get_domain_from_id(event.sender)
if do_sig_check:
event_id_domain = get_domain_from_id(event.event_id)
is_invite_via_3pid = (
@ -130,9 +130,15 @@ class Auth(object):
"Room %r does not exist" % (event.room_id,)
)
if event.room_id != creation_event.room_id:
raise SynapseError(
403, "Event has the wrong room_id: %r != %r" % (
event.room_id, creation_event.room_id
)
)
creating_domain = get_domain_from_id(event.room_id)
originating_domain = get_domain_from_id(event.sender)
if creating_domain != originating_domain:
if creating_domain != sender_domain:
if not self.can_federate(event, auth_events):
raise AuthError(
403,
@ -331,6 +337,7 @@ class Auth(object):
create = auth_events.get(key)
if create and event.prev_events[0][0] == create.event_id:
if create.content["creator"] == event.state_key:
if event.state_key == event.sender:
return True
target_user_id = event.state_key

View file

@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright 2015 - 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import json
import os
import pprint
testcases = json.load(sys.stdin)
w = sys.stdout.write
w("""
# -*- coding: utf-8 -*-
# Generated by """ + os.path.basename(__file__) + """
# Copyright 2015 - 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.auth import Auth
from synapse.api.errors import SynapseError
from tests import unittest
from tests.utils import setup_test_homeserver
from twisted.internet import defer
from synapse.events import FrozenEvent
class EventAuthTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.hs = yield setup_test_homeserver(handlers=None)
self.auth = Auth(self.hs)
""")
indent = " " * 4
for name, case in sorted(testcases.items()):
w("""
def test_""" + name[4:] + """(self):
""")
auth_data = case.get("auth_events", {})
auth_events = {}
if "create" in auth_data:
auth_events[("m.room.create", "")] = auth_data["create"]
for user_id, member in auth_data.get("member", {}).items():
auth_events[("m.room.member", user_id)] = member
w(indent * 2 + "auth_events = (\n" + indent * 3)
data = pprint.pformat(auth_events, width=80 - len(indent * 3))
w(data.replace("\n", "\n" + indent * 3))
w("\n" + indent * 2 + ")\n")
w("""
auth_events = {k: FrozenEvent(v) for k, v in auth_events.items()}
"""[1:])
w("\n" + indent * 2 + "# Allowed events\n")
for allowed in case.get("allowed", ()):
reason = allowed.pop("unsigned", {}).pop("allowed", None)
if reason is not None:
w(indent * 2 + "#Allowed: " + reason)
w(indent * 2 + "self.auth.check(FrozenEvent(\n" + indent * 3)
data = pprint.pformat(allowed, width=80 - len(indent * 3))
w(data.replace("\n", "\n" + indent * 3))
w("\n" + indent * 2 + "), auth_events=auth_events, do_sig_check=False)\n")
w("\n" + indent * 2 + "# Disallowed events\n")
for not_allowed in case.get("not_allowed", ()):
reason = not_allowed.pop("unsigned", {}).pop("not_allowed", None)
if reason is not None:
w(indent * 2 + "# " + reason + "\n")
w(indent * 2 + "self.assertRaises(SynapseError,")
w(" self.auth.check, FrozenEvent(\n" + indent * 3)
data = pprint.pformat(not_allowed, width=80 - len(indent * 3))
w(data.replace("\n", "\n" + indent * 3))
w("\n" + indent * 2 + "), auth_events=auth_events, do_sig_check=False)\n")

268
tests/api/test_eventauth.py Normal file
View file

@ -0,0 +1,268 @@
# -*- coding: utf-8 -*-
# Generated by generate_test_eventauth.py
# Copyright 2015 - 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.auth import Auth
from synapse.api.errors import SynapseError
from tests import unittest
from tests.utils import setup_test_homeserver
from twisted.internet import defer
from synapse.events import FrozenEvent
class EventAuthTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.hs = yield setup_test_homeserver(handlers=None)
self.auth = Auth(self.hs)
def test_EmptyRoom(self):
auth_events = (
{}
)
auth_events = {k: FrozenEvent(v) for k, v in auth_events.items()}
# Allowed events
self.auth.check(FrozenEvent(
{u'content': {u'creator': u'@u1:a'},
u'event_id': u'$e1:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'type': u'm.room.create'}
), auth_events=auth_events, do_sig_check=False)
# Disallowed events
# Sent by a different server than the one which made the room_id
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'creator': u'@u1:b'},
u'event_id': u'$e2:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:b',
u'type': u'm.room.create'}
), auth_events=auth_events, do_sig_check=False)
# All non-create events must reference a create event.
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'membership': u'join'},
u'event_id': u'$e3:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
def test_FirstJoin(self):
auth_events = (
{('m.room.create', ''): {u'content': {u'creator': u'@u1:a'},
u'event_id': u'$e1:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'type': u'm.room.create'}}
)
auth_events = {k: FrozenEvent(v) for k, v in auth_events.items()}
# Allowed events
self.auth.check(FrozenEvent(
{u'content': {u'membership': u'join'},
u'event_id': u'$e2:a',
u'prev_events': [[u'$e1:a', {}]],
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
# Disallowed events
# The prev_event is not the create event.
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'membership': u'join'},
u'event_id': u'$e3:a',
u'prev_events': [[u'$e2:a', {}]],
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
# The membership key is not join
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'membership': u'invite'},
u'event_id': u'$e4:a',
u'prev_events': [[u'$e1:a', {}]],
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
# The room_id doesn't match the create event
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'membership': u'join'},
u'event_id': u'$e5:a',
u'prev_events': [[u'$e1:a', {}]],
u'room_id': u'!r2:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
# The sender doesn't match the room creator
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'membership': u'join'},
u'event_id': u'$e6:a',
u'prev_events': [[u'$e1:a', {}]],
u'room_id': u'!r1:a',
u'sender': u'@u2:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
# The sender doesn't match the state_key
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'membership': u'join'},
u'event_id': u'$e7:a',
u'prev_events': [[u'$e1:a', {}]],
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u2:a',
u'type': u'm.room.member'}
), auth_events=auth_events, do_sig_check=False)
def test_FirstPowerLevels(self):
auth_events = (
{('m.room.create', ''): {u'content': {u'creator': u'@u1:a'},
u'event_id': u'$e1:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'type': u'm.room.create'},
('m.room.member', u'@u1:a'): {u'content': {u'membership': u'join'},
u'event_id': u'$e2:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'},
('m.room.member', u'@u2:a'): {u'content': {u'membership': u'join'},
u'event_id': u'$e3:a',
u'room_id': u'!r1:a',
u'sender': u'@u2:a',
u'state_key': u'@u2:a',
u'type': u'm.room.member'}}
)
auth_events = {k: FrozenEvent(v) for k, v in auth_events.items()}
# Allowed events
self.auth.check(FrozenEvent(
{u'content': {u'ban': 50,
u'events': {u'm.room.avatar': 50,
u'm.room.canonical_alias': 50,
u'm.room.history_visibility': 100,
u'm.room.name': 50,
u'm.room.power_levels': 100},
u'events_default': 0,
u'invite': 0,
u'kick': 50,
u'redact': 50,
u'state_default': 50,
u'users': {u'@u1:a': 100,
u'@u2:a': 100,
u'@u3:a': 50},
u'users_default': 0},
u'event_id': u'$e3:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'',
u'type': u'm.room.power_levels'}
), auth_events=auth_events, do_sig_check=False)
self.auth.check(FrozenEvent(
{u'content': {u'users': {u'@u1:a': 1000}},
u'event_id': u'$e4:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'',
u'type': u'm.room.power_levels'}
), auth_events=auth_events, do_sig_check=False)
# Disallowed events
# Only the creator can send the first power level event
self.assertRaises(SynapseError, self.auth.check, FrozenEvent(
{u'content': {u'users': {u'@u1:a': 1000}},
u'event_id': u'$e4:a',
u'room_id': u'!r1:a',
u'sender': u'@u2:a',
u'state_key': u'',
u'type': u'm.room.power_levels'}
), auth_events=auth_events, do_sig_check=False)
def test_PowerLevels(self):
auth_events = (
{('m.room.create', ''): {u'content': {u'creator': u'@u1:a'},
u'event_id': u'$e1:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'type': u'm.room.create'},
('m.room.member', u'@u1:a'): {u'content': {u'membership': u'join'},
u'event_id': u'$e2:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'},
('m.room.member', u'@u3:a'): {u'content': {u'membership': u'join'},
u'event_id': u'$e2:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'@u1:a',
u'type': u'm.room.member'}}
)
auth_events = {k: FrozenEvent(v) for k, v in auth_events.items()}
# Allowed events
self.auth.check(FrozenEvent(
{u'content': {u'ban': 50,
u'events': {u'm.room.avatar': 50,
u'm.room.canonical_alias': 50,
u'm.room.history_visibility': 100,
u'm.room.name': 50,
u'm.room.power_levels': 100},
u'events_default': 0,
u'invite': 0,
u'kick': 50,
u'redact': 50,
u'state_default': 50,
u'users': {u'@u1:a': 100,
u'@u2:a': 100,
u'@u3:a': 50},
u'users_default': 0},
u'event_id': u'$e4:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'',
u'type': u'm.room.power_levels'}
), auth_events=auth_events, do_sig_check=False)
self.auth.check(FrozenEvent(
{u'content': {u'ban': 50,
u'events': {u'm.room.power_levels': 100},
u'events_default': 0,
u'invite': 0,
u'kick': 50,
u'redact': 50,
u'state_default': 50,
u'users': {u'@u1:a': 100, u'@u2:a': 100},
u'users_default': 0},
u'event_id': u'$e5:a',
u'room_id': u'!r1:a',
u'sender': u'@u1:a',
u'state_key': u'',
u'type': u'm.room.power_levels'}
), auth_events=auth_events, do_sig_check=False)
# Disallowed events