Merge pull request #583 from matrix-org/daniel/roomcleanupincremental

Merge all of the room membership codepaths into one
This commit is contained in:
Daniel Wagner-Hall 2016-02-18 11:48:28 +00:00
commit 35cda2e692
9 changed files with 393 additions and 364 deletions

View file

@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import LimitExceededError, SynapseError, AuthError from synapse.api.errors import LimitExceededError, SynapseError, AuthError
from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator from synapse.push.action_generator import ActionGenerator
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
@ -147,7 +147,7 @@ class BaseHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_new_client_event(self, builder): def _create_new_client_event(self, builder):
latest_ret = yield self.store.get_latest_events_in_room( latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
builder.room_id, builder.room_id,
) )
@ -156,7 +156,10 @@ class BaseHandler(object):
else: else:
depth = 1 depth = 1
prev_events = [(e, h) for e, h, _ in latest_ret] prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in latest_ret
]
builder.prev_events = prev_events builder.prev_events = prev_events
builder.depth = depth builder.depth = depth
@ -165,6 +168,31 @@ class BaseHandler(object):
context = yield state_handler.compute_event_context(builder) context = yield state_handler.compute_event_context(builder)
# If we've received an invite over federation, there are no latest
# events in the room, because we don't know enough about the graph
# fragment we received to treat it like a graph, so the above returned
# no relevant events. It may have returned some events (if we have
# joined and left the room), but not useful ones, like the invite. So we
# forcibly set our context to the invite we received over federation.
if (
not self.is_host_in_room(context.current_state) and
builder.type == EventTypes.Member
):
prev_member_event = yield self.store.get_room_member(
builder.sender, builder.room_id
)
if prev_member_event:
builder.prev_events = (
prev_member_event.event_id,
prev_member_event.prev_events
)
context = yield state_handler.compute_event_context(
builder,
old_state=(prev_member_event,),
outlier=True
)
if builder.is_state(): if builder.is_state():
builder.prev_state = yield self.store.add_event_hashes( builder.prev_state = yield self.store.add_event_hashes(
context.prev_state_events context.prev_state_events
@ -187,10 +215,33 @@ class BaseHandler(object):
(event, context,) (event, context,)
) )
def is_host_in_room(self, current_state):
room_members = [
(state_key, event.membership)
for ((event_type, state_key), event) in current_state.items()
if event_type == EventTypes.Member
]
if len(room_members) == 0:
# Have we just created the room, and is this about to be the very
# first member event?
create_event = current_state.get(("m.room.create", ""))
if create_event:
return True
for (state_key, membership) in room_members:
if (
UserID.from_string(state_key).domain == self.hs.hostname
and membership == Membership.JOIN
):
return True
return False
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_new_client_event(self, event, context, extra_users=[]): def handle_new_client_event(self, event, context, ratelimit=True, extra_users=[]):
# We now need to go and hit out to wherever we need to hit out to. # We now need to go and hit out to wherever we need to hit out to.
if ratelimit:
self.ratelimit(event.sender)
self.auth.check(event, auth_events=context.current_state) self.auth.check(event, auth_events=context.current_state)
yield self.maybe_kick_guest_users(event, context.current_state.values()) yield self.maybe_kick_guest_users(event, context.current_state.values())
@ -316,7 +367,8 @@ class BaseHandler(object):
if member_event.type != EventTypes.Member: if member_event.type != EventTypes.Member:
continue continue
if not self.hs.is_mine(UserID.from_string(member_event.state_key)): target_user = UserID.from_string(member_event.state_key)
if not self.hs.is_mine(target_user):
continue continue
if member_event.content["membership"] not in { if member_event.content["membership"] not in {
@ -338,18 +390,13 @@ class BaseHandler(object):
# and having homeservers have their own users leave keeps more # and having homeservers have their own users leave keeps more
# of that decision-making and control local to the guest-having # of that decision-making and control local to the guest-having
# homeserver. # homeserver.
message_handler = self.hs.get_handlers().message_handler requester = Requester(target_user, "", True)
yield message_handler.create_and_send_event( handler = self.hs.get_handlers().room_member_handler
{ yield handler.update_membership(
"type": EventTypes.Member, requester,
"state_key": member_event.state_key, target_user,
"content": { member_event.room_id,
"membership": Membership.LEAVE, "leave",
"kind": "guest"
},
"room_id": member_event.room_id,
"sender": member_event.state_key
},
ratelimit=False, ratelimit=False,
) )
except Exception as e: except Exception as e:

View file

@ -216,7 +216,7 @@ class DirectoryHandler(BaseHandler):
aliases = yield self.store.get_aliases_for_room(room_id) aliases = yield self.store.get_aliases_for_room(room_id)
msg_handler = self.hs.get_handlers().message_handler msg_handler = self.hs.get_handlers().message_handler
yield msg_handler.create_and_send_event({ yield msg_handler.create_and_send_nonmember_event({
"type": EventTypes.Aliases, "type": EventTypes.Aliases,
"state_key": self.hs.hostname, "state_key": self.hs.hostname,
"room_id": room_id, "room_id": room_id,

View file

@ -1658,7 +1658,7 @@ class FederationHandler(BaseHandler):
self.auth.check(event, context.current_state) self.auth.check(event, context.current_state)
yield self._validate_keyserver(event, auth_events=context.current_state) yield self._validate_keyserver(event, auth_events=context.current_state)
member_handler = self.hs.get_handlers().room_member_handler member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.send_membership_event(event, context) yield member_handler.send_membership_event(event, context, from_client=False)
else: else:
destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)]) destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
yield self.replication_layer.forward_third_party_invite( yield self.replication_layer.forward_third_party_invite(
@ -1687,7 +1687,7 @@ class FederationHandler(BaseHandler):
# TODO: Make sure the signatures actually are correct. # TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures) event.signatures.update(returned_invite.signatures)
member_handler = self.hs.get_handlers().room_member_handler member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.send_membership_event(event, context) yield member_handler.send_membership_event(event, context, from_client=False)
@defer.inlineCallbacks @defer.inlineCallbacks
def add_display_name_to_third_party_invite(self, event_dict, event, context): def add_display_name_to_third_party_invite(self, event_dict, event, context):

View file

@ -16,7 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
@ -216,7 +216,7 @@ class MessageHandler(BaseHandler):
defer.returnValue((event, context)) defer.returnValue((event, context))
@defer.inlineCallbacks @defer.inlineCallbacks
def send_event(self, event, context, ratelimit=True, is_guest=False): def send_nonmember_event(self, event, context, ratelimit=True):
""" """
Persists and notifies local clients and federation of an event. Persists and notifies local clients and federation of an event.
@ -226,55 +226,69 @@ class MessageHandler(BaseHandler):
ratelimit (bool): Whether to rate limit this send. ratelimit (bool): Whether to rate limit this send.
is_guest (bool): Whether the sender is a guest. is_guest (bool): Whether the sender is a guest.
""" """
if event.type == EventTypes.Member:
raise SynapseError(
500,
"Tried to send member event through non-member codepath"
)
user = UserID.from_string(event.sender) user = UserID.from_string(event.sender)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,) assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if ratelimit:
self.ratelimit(event.sender)
if event.is_state(): if event.is_state():
prev_state = context.current_state.get((event.type, event.state_key)) prev_state = self.deduplicate_state_event(event, context)
if prev_state and event.user_id == prev_state.user_id: if prev_state is not None:
prev_content = encode_canonical_json(prev_state.content) defer.returnValue(prev_state)
next_content = encode_canonical_json(event.content)
if prev_content == next_content:
# Duplicate suppression for state updates with same sender
# and content.
defer.returnValue(prev_state)
if event.type == EventTypes.Member: yield self.handle_new_client_event(
member_handler = self.hs.get_handlers().room_member_handler event=event,
yield member_handler.send_membership_event(event, context, is_guest=is_guest) context=context,
else: ratelimit=ratelimit,
yield self.handle_new_client_event( )
event=event,
context=context,
)
if event.type == EventTypes.Message: if event.type == EventTypes.Message:
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler
with PreserveLoggingContext(): with PreserveLoggingContext():
presence.bump_presence_active_time(user) presence.bump_presence_active_time(user)
def deduplicate_state_event(self, event, context):
"""
Checks whether event is in the latest resolved state in context.
If so, returns the version of the event in context.
Otherwise, returns None.
"""
prev_event = context.current_state.get((event.type, event.state_key))
if prev_event and event.user_id == prev_event.user_id:
prev_content = encode_canonical_json(prev_event.content)
next_content = encode_canonical_json(event.content)
if prev_content == next_content:
return prev_event
return None
@defer.inlineCallbacks @defer.inlineCallbacks
def create_and_send_event(self, event_dict, ratelimit=True, def create_and_send_nonmember_event(
token_id=None, txn_id=None, is_guest=False): self,
event_dict,
ratelimit=True,
token_id=None,
txn_id=None
):
""" """
Creates an event, then sends it. Creates an event, then sends it.
See self.create_event and self.send_event. See self.create_event and self.send_nonmember_event.
""" """
event, context = yield self.create_event( event, context = yield self.create_event(
event_dict, event_dict,
token_id=token_id, token_id=token_id,
txn_id=txn_id txn_id=txn_id
) )
yield self.send_event( yield self.send_nonmember_event(
event, event,
context, context,
ratelimit=ratelimit, ratelimit=ratelimit,
is_guest=is_guest
) )
defer.returnValue(event) defer.returnValue(event)

View file

@ -16,8 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import EventTypes, Membership from synapse.types import UserID, Requester
from synapse.types import UserID
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from ._base import BaseHandler from ._base import BaseHandler
@ -208,21 +207,18 @@ class ProfileHandler(BaseHandler):
) )
for j in joins: for j in joins:
content = { handler = self.hs.get_handlers().room_member_handler
"membership": Membership.JOIN,
}
yield collect_presencelike_data(self.distributor, user, content)
msg_handler = self.hs.get_handlers().message_handler
try: try:
yield msg_handler.create_and_send_event({ # Assume the user isn't a guest because we don't let guests set
"type": EventTypes.Member, # profile or avatar data.
"room_id": j.room_id, requester = Requester(user, "", False)
"state_key": user.to_string(), yield handler.update_membership(
"content": content, requester,
"sender": user.to_string() user,
}, ratelimit=False) j.room_id,
"join", # We treat a profile update like a join.
ratelimit=False, # Try to hide that these events aren't atomic.
)
except Exception as e: except Exception as e:
logger.warn( logger.warn(
"Failed to update join event for room %s - %s", "Failed to update join event for room %s - %s",

View file

@ -24,7 +24,6 @@ from synapse.api.constants import (
) )
from synapse.api.errors import AuthError, StoreError, SynapseError, Codes from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
from synapse.util import stringutils, unwrapFirstError from synapse.util import stringutils, unwrapFirstError
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn from synapse.util.logcontext import preserve_context_over_fn
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
@ -42,10 +41,6 @@ logger = logging.getLogger(__name__)
id_server_scheme = "https://" id_server_scheme = "https://"
def collect_presencelike_data(distributor, user, content):
return distributor.fire("collect_presencelike_data", user, content)
def user_left_room(distributor, user, room_id): def user_left_room(distributor, user, room_id):
return preserve_context_over_fn( return preserve_context_over_fn(
distributor.fire, distributor.fire,
@ -173,9 +168,14 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {}) creation_content = config.get("creation_content", {})
user = UserID.from_string(user_id) msg_handler = self.hs.get_handlers().message_handler
creation_events = self._create_events_for_new_room( room_member_handler = self.hs.get_handlers().room_member_handler
user, room_id,
yield self._send_events_for_new_room(
requester,
room_id,
msg_handler,
room_member_handler,
preset_config=preset_config, preset_config=preset_config,
invite_list=invite_list, invite_list=invite_list,
initial_state=initial_state, initial_state=initial_state,
@ -183,14 +183,9 @@ class RoomCreationHandler(BaseHandler):
room_alias=room_alias, room_alias=room_alias,
) )
msg_handler = self.hs.get_handlers().message_handler
for event in creation_events:
yield msg_handler.create_and_send_event(event, ratelimit=False)
if "name" in config: if "name" in config:
name = config["name"] name = config["name"]
yield msg_handler.create_and_send_event({ yield msg_handler.create_and_send_nonmember_event({
"type": EventTypes.Name, "type": EventTypes.Name,
"room_id": room_id, "room_id": room_id,
"sender": user_id, "sender": user_id,
@ -200,7 +195,7 @@ class RoomCreationHandler(BaseHandler):
if "topic" in config: if "topic" in config:
topic = config["topic"] topic = config["topic"]
yield msg_handler.create_and_send_event({ yield msg_handler.create_and_send_nonmember_event({
"type": EventTypes.Topic, "type": EventTypes.Topic,
"room_id": room_id, "room_id": room_id,
"sender": user_id, "sender": user_id,
@ -209,13 +204,13 @@ class RoomCreationHandler(BaseHandler):
}, ratelimit=False) }, ratelimit=False)
for invitee in invite_list: for invitee in invite_list:
yield msg_handler.create_and_send_event({ room_member_handler.update_membership(
"type": EventTypes.Member, requester,
"state_key": invitee, UserID.from_string(invitee),
"room_id": room_id, room_id,
"sender": user_id, "invite",
"content": {"membership": Membership.INVITE}, ratelimit=False,
}, ratelimit=False) )
for invite_3pid in invite_3pid_list: for invite_3pid in invite_3pid_list:
id_server = invite_3pid["id_server"] id_server = invite_3pid["id_server"]
@ -223,11 +218,11 @@ class RoomCreationHandler(BaseHandler):
medium = invite_3pid["medium"] medium = invite_3pid["medium"]
yield self.hs.get_handlers().room_member_handler.do_3pid_invite( yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
room_id, room_id,
user, requester.user,
medium, medium,
address, address,
id_server, id_server,
token_id=None, requester,
txn_id=None, txn_id=None,
) )
@ -241,19 +236,19 @@ class RoomCreationHandler(BaseHandler):
defer.returnValue(result) defer.returnValue(result)
def _create_events_for_new_room(self, creator, room_id, preset_config, @defer.inlineCallbacks
invite_list, initial_state, creation_content, def _send_events_for_new_room(
room_alias): self,
config = RoomCreationHandler.PRESETS_DICT[preset_config] creator, # A Requester object.
room_id,
creator_id = creator.to_string() msg_handler,
room_member_handler,
event_keys = { preset_config,
"room_id": room_id, invite_list,
"sender": creator_id, initial_state,
"state_key": "", creation_content,
} room_alias
):
def create(etype, content, **kwargs): def create(etype, content, **kwargs):
e = { e = {
"type": etype, "type": etype,
@ -265,26 +260,39 @@ class RoomCreationHandler(BaseHandler):
return e return e
creation_content.update({"creator": creator.to_string()}) @defer.inlineCallbacks
creation_event = create( def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
yield msg_handler.create_and_send_nonmember_event(event, ratelimit=False)
config = RoomCreationHandler.PRESETS_DICT[preset_config]
creator_id = creator.user.to_string()
event_keys = {
"room_id": room_id,
"sender": creator_id,
"state_key": "",
}
creation_content.update({"creator": creator_id})
yield send(
etype=EventTypes.Create, etype=EventTypes.Create,
content=creation_content, content=creation_content,
) )
join_event = create( yield room_member_handler.update_membership(
etype=EventTypes.Member, creator,
state_key=creator_id, creator.user,
content={ room_id,
"membership": Membership.JOIN, "join",
}, ratelimit=False,
) )
returned_events = [creation_event, join_event]
if (EventTypes.PowerLevels, '') not in initial_state: if (EventTypes.PowerLevels, '') not in initial_state:
power_level_content = { power_level_content = {
"users": { "users": {
creator.to_string(): 100, creator_id: 100,
}, },
"users_default": 0, "users_default": 0,
"events": { "events": {
@ -306,45 +314,35 @@ class RoomCreationHandler(BaseHandler):
for invitee in invite_list: for invitee in invite_list:
power_level_content["users"][invitee] = 100 power_level_content["users"][invitee] = 100
power_levels_event = create( yield send(
etype=EventTypes.PowerLevels, etype=EventTypes.PowerLevels,
content=power_level_content, content=power_level_content,
) )
returned_events.append(power_levels_event)
if room_alias and (EventTypes.CanonicalAlias, '') not in initial_state: if room_alias and (EventTypes.CanonicalAlias, '') not in initial_state:
room_alias_event = create( yield send(
etype=EventTypes.CanonicalAlias, etype=EventTypes.CanonicalAlias,
content={"alias": room_alias.to_string()}, content={"alias": room_alias.to_string()},
) )
returned_events.append(room_alias_event)
if (EventTypes.JoinRules, '') not in initial_state: if (EventTypes.JoinRules, '') not in initial_state:
join_rules_event = create( yield send(
etype=EventTypes.JoinRules, etype=EventTypes.JoinRules,
content={"join_rule": config["join_rules"]}, content={"join_rule": config["join_rules"]},
) )
returned_events.append(join_rules_event)
if (EventTypes.RoomHistoryVisibility, '') not in initial_state: if (EventTypes.RoomHistoryVisibility, '') not in initial_state:
history_event = create( yield send(
etype=EventTypes.RoomHistoryVisibility, etype=EventTypes.RoomHistoryVisibility,
content={"history_visibility": config["history_visibility"]} content={"history_visibility": config["history_visibility"]}
) )
returned_events.append(history_event)
for (etype, state_key), content in initial_state.items(): for (etype, state_key), content in initial_state.items():
returned_events.append(create( yield send(
etype=etype, etype=etype,
state_key=state_key, state_key=state_key,
content=content, content=content,
)) )
return returned_events
class RoomMemberHandler(BaseHandler): class RoomMemberHandler(BaseHandler):
@ -392,7 +390,16 @@ class RoomMemberHandler(BaseHandler):
remotedomains.add(member.domain) remotedomains.add(member.domain)
@defer.inlineCallbacks @defer.inlineCallbacks
def update_membership(self, requester, target, room_id, action, txn_id=None): def update_membership(
self,
requester,
target,
room_id,
action,
txn_id=None,
remote_room_hosts=None,
ratelimit=True,
):
effective_membership_state = action effective_membership_state = action
if action in ["kick", "unban"]: if action in ["kick", "unban"]:
effective_membership_state = "leave" effective_membership_state = "leave"
@ -401,7 +408,7 @@ class RoomMemberHandler(BaseHandler):
msg_handler = self.hs.get_handlers().message_handler msg_handler = self.hs.get_handlers().message_handler
content = {"membership": unicode(effective_membership_state)} content = {"membership": effective_membership_state}
if requester.is_guest: if requester.is_guest:
content["kind"] = "guest" content["kind"] = "guest"
@ -412,6 +419,9 @@ class RoomMemberHandler(BaseHandler):
"room_id": room_id, "room_id": room_id,
"sender": requester.user.to_string(), "sender": requester.user.to_string(),
"state_key": target.to_string(), "state_key": target.to_string(),
# For backwards compatibility:
"membership": effective_membership_state,
}, },
token_id=requester.access_token_id, token_id=requester.access_token_id,
txn_id=txn_id, txn_id=txn_id,
@ -432,90 +442,165 @@ class RoomMemberHandler(BaseHandler):
errcode=Codes.BAD_STATE errcode=Codes.BAD_STATE
) )
yield msg_handler.send_event( member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.send_membership_event(
event, event,
context, context,
ratelimit=True, is_guest=requester.is_guest,
is_guest=requester.is_guest ratelimit=ratelimit,
remote_room_hosts=remote_room_hosts,
from_client=True,
) )
if action == "forget": if action == "forget":
yield self.forget(requester.user, room_id) yield self.forget(requester.user, room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_membership_event(self, event, context, is_guest=False): def send_membership_event(
""" Change the membership status of a user in a room. self,
event,
context,
is_guest=False,
remote_room_hosts=None,
ratelimit=True,
from_client=True,
):
"""
Change the membership status of a user in a room.
Args: Args:
event (SynapseEvent): The membership event event (SynapseEvent): The membership event.
context: The context of the event.
is_guest (bool): Whether the sender is a guest.
room_hosts ([str]): Homeservers which are likely to already be in
the room, and could be danced with in order to join this
homeserver for the first time.
ratelimit (bool): Whether to rate limit this request.
from_client (bool): Whether this request is the result of a local
client request (rather than over federation). If so, we will
perform extra checks, like that this homeserver can act as this
client.
Raises: Raises:
SynapseError if there was a problem changing the membership. SynapseError if there was a problem changing the membership.
""" """
target_user_id = event.state_key target_user = UserID.from_string(event.state_key)
room_id = event.room_id
prev_state = context.current_state.get( if from_client:
(EventTypes.Member, target_user_id), sender = UserID.from_string(event.sender)
assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
message_handler = self.hs.get_handlers().message_handler
prev_event = message_handler.deduplicate_state_event(event, context)
if prev_event is not None:
return
action = "send"
if event.membership == Membership.JOIN:
if is_guest and not self._can_guest_join(context.current_state):
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
do_remote_join_dance, remote_room_hosts = self._should_do_dance(
context,
(self.get_inviter(event.state_key, context.current_state)),
remote_room_hosts,
)
if do_remote_join_dance:
action = "remote_join"
elif event.membership == Membership.LEAVE:
is_host_in_room = self.is_host_in_room(context.current_state)
if not is_host_in_room:
action = "remote_reject"
federation_handler = self.hs.get_handlers().federation_handler
if action == "remote_join":
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
# We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
yield federation_handler.do_invite_join(
remote_room_hosts,
event.room_id,
event.user_id,
event.content,
)
elif action == "remote_reject":
inviter = self.get_inviter(target_user.to_string(), context.current_state)
if not inviter:
raise SynapseError(404, "No known servers")
yield federation_handler.do_remotely_reject_invite(
[inviter.domain],
room_id,
event.user_id
)
else:
yield self.handle_new_client_event(
event,
context,
extra_users=[target_user],
ratelimit=ratelimit,
)
prev_member_event = context.current_state.get(
(EventTypes.Member, target_user.to_string()),
None None
) )
room_id = event.room_id
# If we're trying to join a room then we have to do this differently
# if this HS is not currently in the room, i.e. we have to do the
# invite/join dance.
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
if is_guest: if not prev_member_event or prev_member_event.membership != Membership.JOIN:
guest_access = context.current_state.get( # Only fire user_joined_room if the user has acutally joined the
(EventTypes.GuestAccess, ""), # room. Don't bother if the user is just changing their profile
None # info.
) yield user_joined_room(self.distributor, target_user, room_id)
is_guest_access_allowed = ( elif event.membership == Membership.LEAVE:
guest_access if prev_member_event and prev_member_event.membership == Membership.JOIN:
and guest_access.content user_left_room(self.distributor, target_user, room_id)
and "guest_access" in guest_access.content
and guest_access.content["guest_access"] == "can_join"
)
if not is_guest_access_allowed:
raise AuthError(403, "Guest access not allowed")
yield self._do_join(event, context) def _can_guest_join(self, current_state):
else: """
if event.membership == Membership.LEAVE: Returns whether a guest can join a room based on its current state.
is_host_in_room = yield self.is_host_in_room(room_id, context) """
if not is_host_in_room: guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
# Rejecting an invite, rather than leaving a joined room return (
handler = self.hs.get_handlers().federation_handler guest_access
inviter = yield self.get_inviter(event) and guest_access.content
if not inviter: and "guest_access" in guest_access.content
# return the same error as join_room_alias does and guest_access.content["guest_access"] == "can_join"
raise SynapseError(404, "No known servers") )
yield handler.do_remotely_reject_invite(
[inviter.domain],
room_id,
event.user_id
)
defer.returnValue({"room_id": room_id})
return
# FIXME: This isn't idempotency. def _should_do_dance(self, context, inviter, room_hosts=None):
if prev_state and prev_state.membership == event.membership: # TODO: Shouldn't this be remote_room_host?
# double same action, treat this event as a NOOP. room_hosts = room_hosts or []
defer.returnValue({})
return
yield self._do_local_membership_update( is_host_in_room = self.is_host_in_room(context.current_state)
event, if is_host_in_room:
context=context, return False, room_hosts
)
if prev_state and prev_state.membership == Membership.JOIN: if inviter and not self.hs.is_mine(inviter):
user = UserID.from_string(event.user_id) room_hosts.append(inviter.domain)
user_left_room(self.distributor, user, event.room_id)
defer.returnValue({"room_id": room_id}) return True, room_hosts
@defer.inlineCallbacks @defer.inlineCallbacks
def join_room_alias(self, joinee, room_alias, content={}): def lookup_room_alias(self, room_alias):
"""
Get the room ID associated with a room alias.
Args:
room_alias (RoomAlias): The alias to look up.
Returns:
A tuple of:
The room ID as a RoomID object.
Hosts likely to be participating in the room ([str]).
Raises:
SynapseError if room alias could not be found.
"""
directory_handler = self.hs.get_handlers().directory_handler directory_handler = self.hs.get_handlers().directory_handler
mapping = yield directory_handler.get_association(room_alias) mapping = yield directory_handler.get_association(room_alias)
@ -523,111 +608,15 @@ class RoomMemberHandler(BaseHandler):
raise SynapseError(404, "No such room alias") raise SynapseError(404, "No such room alias")
room_id = mapping["room_id"] room_id = mapping["room_id"]
hosts = mapping["servers"] servers = mapping["servers"]
if not hosts:
raise SynapseError(404, "No known servers")
# If event doesn't include a display name, add one. defer.returnValue((RoomID.from_string(room_id), servers))
yield collect_presencelike_data(self.distributor, joinee, content)
content.update({"membership": Membership.JOIN})
builder = self.event_builder_factory.new({
"type": EventTypes.Member,
"state_key": joinee.to_string(),
"room_id": room_id,
"sender": joinee.to_string(),
"membership": Membership.JOIN,
"content": content,
})
event, context = yield self._create_new_client_event(builder)
yield self._do_join(event, context, room_hosts=hosts)
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
def _do_join(self, event, context, room_hosts=None):
room_id = event.room_id
# XXX: We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
is_host_in_room = yield self.is_host_in_room(room_id, context)
if is_host_in_room:
should_do_dance = False
elif room_hosts: # TODO: Shouldn't this be remote_room_host?
should_do_dance = True
else:
inviter = yield self.get_inviter(event)
if not inviter:
# return the same error as join_room_alias does
raise SynapseError(404, "No known servers")
should_do_dance = not self.hs.is_mine(inviter)
room_hosts = [inviter.domain]
if should_do_dance:
handler = self.hs.get_handlers().federation_handler
yield handler.do_invite_join(
room_hosts,
room_id,
event.user_id,
event.content,
)
else:
logger.debug("Doing normal join")
yield self._do_local_membership_update(
event,
context=context,
)
prev_state = context.current_state.get((event.type, event.state_key))
if not prev_state or prev_state.membership != Membership.JOIN:
# Only fire user_joined_room if the user has acutally joined the
# room. Don't bother if the user is just changing their profile
# info.
user = UserID.from_string(event.user_id)
yield user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
def get_inviter(self, event):
# TODO(markjh): get prev_state from snapshot
prev_state = yield self.store.get_room_member(
event.user_id, event.room_id
)
def get_inviter(self, user_id, current_state):
prev_state = current_state.get((EventTypes.Member, user_id))
if prev_state and prev_state.membership == Membership.INVITE: if prev_state and prev_state.membership == Membership.INVITE:
defer.returnValue(UserID.from_string(prev_state.user_id)) return UserID.from_string(prev_state.user_id)
return return None
elif "third_party_invite" in event.content:
if "sender" in event.content["third_party_invite"]:
inviter = UserID.from_string(
event.content["third_party_invite"]["sender"]
)
defer.returnValue(inviter)
defer.returnValue(None)
@defer.inlineCallbacks
def is_host_in_room(self, room_id, context):
is_host_in_room = yield self.auth.check_host_in_room(
room_id,
self.hs.hostname
)
if not is_host_in_room:
# is *anyone* in the room?
room_member_keys = [
v for (k, v) in context.current_state.keys() if (
k == "m.room.member"
)
]
if len(room_member_keys) == 0:
# has the room been created so we can join it?
create_event = context.current_state.get(("m.room.create", ""))
if create_event:
is_host_in_room = True
defer.returnValue(is_host_in_room)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_joined_rooms_for_user(self, user): def get_joined_rooms_for_user(self, user):
@ -644,18 +633,6 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(room_ids) defer.returnValue(room_ids)
@defer.inlineCallbacks
def _do_local_membership_update(self, event, context):
yield run_on_reactor()
target_user = UserID.from_string(event.state_key)
yield self.handle_new_client_event(
event,
context,
extra_users=[target_user],
)
@defer.inlineCallbacks @defer.inlineCallbacks
def do_3pid_invite( def do_3pid_invite(
self, self,
@ -664,7 +641,7 @@ class RoomMemberHandler(BaseHandler):
medium, medium,
address, address,
id_server, id_server,
token_id, requester,
txn_id txn_id
): ):
invitee = yield self._lookup_3pid( invitee = yield self._lookup_3pid(
@ -672,19 +649,12 @@ class RoomMemberHandler(BaseHandler):
) )
if invitee: if invitee:
# make sure it looks like a user ID; it'll throw if it's invalid. handler = self.hs.get_handlers().room_member_handler
UserID.from_string(invitee) yield handler.update_membership(
yield self.hs.get_handlers().message_handler.create_and_send_event( requester,
{ UserID.from_string(invitee),
"type": EventTypes.Member, room_id,
"content": { "invite",
"membership": unicode("invite")
},
"room_id": room_id,
"sender": inviter.to_string(),
"state_key": invitee,
},
token_id=token_id,
txn_id=txn_id, txn_id=txn_id,
) )
else: else:
@ -694,7 +664,7 @@ class RoomMemberHandler(BaseHandler):
address, address,
room_id, room_id,
inviter, inviter,
token_id, requester.access_token_id,
txn_id=txn_id txn_id=txn_id
) )
@ -805,7 +775,7 @@ class RoomMemberHandler(BaseHandler):
) )
) )
msg_handler = self.hs.get_handlers().message_handler msg_handler = self.hs.get_handlers().message_handler
yield msg_handler.create_and_send_event( yield msg_handler.create_and_send_nonmember_event(
{ {
"type": EventTypes.ThirdPartyInvite, "type": EventTypes.ThirdPartyInvite,
"content": { "content": {

View file

@ -150,10 +150,21 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
event_dict["state_key"] = state_key event_dict["state_key"] = state_key
msg_handler = self.handlers.message_handler msg_handler = self.handlers.message_handler
event = yield msg_handler.create_and_send_event( event, context = yield msg_handler.create_event(
event_dict, token_id=requester.access_token_id, txn_id=txn_id, event_dict,
token_id=requester.access_token_id,
txn_id=txn_id,
) )
if event_type == EventTypes.Member:
yield self.handlers.room_member_handler.send_membership_event(
event,
context,
is_guest=requester.is_guest,
)
else:
yield msg_handler.send_nonmember_event(event, context)
defer.returnValue((200, {"event_id": event.event_id})) defer.returnValue((200, {"event_id": event.event_id}))
@ -171,7 +182,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
content = _parse_json(request) content = _parse_json(request)
msg_handler = self.handlers.message_handler msg_handler = self.handlers.message_handler
event = yield msg_handler.create_and_send_event( event = yield msg_handler.create_and_send_nonmember_event(
{ {
"type": event_type, "type": event_type,
"content": content, "content": content,
@ -217,46 +228,29 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
allow_guest=True, allow_guest=True,
) )
# the identifier could be a room alias or a room id. Try one then the if RoomID.is_valid(room_identifier):
# other if it fails to parse, without swallowing other valid room_id = room_identifier
# SynapseErrors. remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
identifier = None
is_room_alias = False
try:
identifier = RoomAlias.from_string(room_identifier)
is_room_alias = True
except SynapseError:
identifier = RoomID.from_string(room_identifier)
# TODO: Support for specifying the home server to join with?
if is_room_alias:
handler = self.handlers.room_member_handler handler = self.handlers.room_member_handler
ret_dict = yield handler.join_room_alias( room_alias = RoomAlias.from_string(room_identifier)
requester.user, room_id, remote_room_hosts = yield handler.lookup_room_alias(room_alias)
identifier, room_id = room_id.to_string()
) else:
defer.returnValue((200, ret_dict)) raise SynapseError(400, "%s was not legal room ID or room alias" % (
else: # room id room_identifier,
msg_handler = self.handlers.message_handler ))
content = {"membership": Membership.JOIN}
if requester.is_guest:
content["kind"] = "guest"
yield msg_handler.create_and_send_event(
{
"type": EventTypes.Member,
"content": content,
"room_id": identifier.to_string(),
"sender": requester.user.to_string(),
"state_key": requester.user.to_string(),
},
token_id=requester.access_token_id,
txn_id=txn_id,
is_guest=requester.is_guest,
)
defer.returnValue((200, {"room_id": identifier.to_string()})) yield self.handlers.room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
action="join",
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
)
defer.returnValue((200, {"room_id": room_id}))
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, request, room_identifier, txn_id): def on_PUT(self, request, room_identifier, txn_id):
@ -451,7 +445,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
content["medium"], content["medium"],
content["address"], content["address"],
content["id_server"], content["id_server"],
requester.access_token_id, requester,
txn_id txn_id
) )
defer.returnValue((200, {})) defer.returnValue((200, {}))
@ -507,7 +501,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet):
content = _parse_json(request) content = _parse_json(request)
msg_handler = self.handlers.message_handler msg_handler = self.handlers.message_handler
event = yield msg_handler.create_and_send_event( event = yield msg_handler.create_and_send_nonmember_event(
{ {
"type": EventTypes.Redaction, "type": EventTypes.Redaction,
"content": content, "content": content,

View file

@ -114,10 +114,10 @@ class EventFederationStore(SQLBaseStore):
retcol="event_id", retcol="event_id",
) )
def get_latest_events_in_room(self, room_id): def get_latest_event_ids_and_hashes_in_room(self, room_id):
return self.runInteraction( return self.runInteraction(
"get_latest_events_in_room", "get_latest_event_ids_and_hashes_in_room",
self._get_latest_events_in_room, self._get_latest_event_ids_and_hashes_in_room,
room_id, room_id,
) )
@ -132,7 +132,7 @@ class EventFederationStore(SQLBaseStore):
desc="get_latest_event_ids_in_room", desc="get_latest_event_ids_in_room",
) )
def _get_latest_events_in_room(self, txn, room_id): def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
sql = ( sql = (
"SELECT e.event_id, e.depth FROM events as e " "SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f " "INNER JOIN event_forward_extremities as f "

View file

@ -73,6 +73,14 @@ class DomainSpecificString(
"""Return a string encoding the fields of the structure object.""" """Return a string encoding the fields of the structure object."""
return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain)
@classmethod
def is_valid(cls, s):
try:
cls.from_string(s)
return True
except:
return False
__str__ = to_string __str__ = to_string
@classmethod @classmethod