Add room_version param to get_pdu

When we add new event format we'll need to know the event format or room
version when parsing events.
This commit is contained in:
Erik Johnston 2019-01-23 17:19:58 +00:00
parent 6b90ae6efc
commit 6a41d2a187
4 changed files with 60 additions and 13 deletions

View file

@ -43,8 +43,8 @@ class FederationBase(object):
self._clock = hs.get_clock() self._clock = hs.get_clock()
@defer.inlineCallbacks @defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
include_none=False): outlier=False, include_none=False):
"""Takes a list of PDUs and checks the signatures and hashs of each """Takes a list of PDUs and checks the signatures and hashs of each
one. If a PDU fails its signature check then we check if we have it in one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of the database and if not then request if from the originating server of
@ -56,8 +56,12 @@ class FederationBase(object):
a new list. a new list.
Args: Args:
origin (str)
pdu (list) pdu (list)
outlier (bool) room_version (str)
outlier (bool): Whether the events are outliers or not
include_none (str): Whether to include None in the returned list
for events that have failed their checks
Returns: Returns:
Deferred : A list of PDUs that have valid signatures and hashes. Deferred : A list of PDUs that have valid signatures and hashes.
@ -84,6 +88,7 @@ class FederationBase(object):
res = yield self.get_pdu( res = yield self.get_pdu(
destinations=[pdu.origin], destinations=[pdu.origin],
event_id=pdu.event_id, event_id=pdu.event_id,
room_version=room_version,
outlier=outlier, outlier=outlier,
timeout=10000, timeout=10000,
) )

View file

@ -25,7 +25,12 @@ from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RoomVersions,
)
from synapse.api.errors import ( from synapse.api.errors import (
CodeMessageException, CodeMessageException,
FederationDeniedError, FederationDeniedError,
@ -202,7 +207,8 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def get_pdu(self, destinations, event_id, outlier=False, timeout=None): def get_pdu(self, destinations, event_id, room_version, outlier=False,
timeout=None):
"""Requests the PDU with given origin and ID from the remote home """Requests the PDU with given origin and ID from the remote home
servers. servers.
@ -212,6 +218,7 @@ class FederationClient(FederationBase):
Args: Args:
destinations (list): Which home servers to query destinations (list): Which home servers to query
event_id (str): event to fetch event_id (str): event to fetch
room_version (str): version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False` of the current block of PDUs. Defaults to `False`
@ -352,10 +359,13 @@ class FederationClient(FederationBase):
ev.event_id for ev in itertools.chain(pdus, auth_chain) ev.event_id for ev in itertools.chain(pdus, auth_chain)
]) ])
room_version = yield self.store.get_room_version(room_id)
signed_pdus = yield self._check_sigs_and_hash_and_fetch( signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, destination,
[p for p in pdus if p.event_id not in seen_events], [p for p in pdus if p.event_id not in seen_events],
outlier=True outlier=True,
room_version=room_version,
) )
signed_pdus.extend( signed_pdus.extend(
seen_events[p.event_id] for p in pdus if p.event_id in seen_events seen_events[p.event_id] for p in pdus if p.event_id in seen_events
@ -364,7 +374,8 @@ class FederationClient(FederationBase):
signed_auth = yield self._check_sigs_and_hash_and_fetch( signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, destination,
[p for p in auth_chain if p.event_id not in seen_events], [p for p in auth_chain if p.event_id not in seen_events],
outlier=True outlier=True,
room_version=room_version,
) )
signed_auth.extend( signed_auth.extend(
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
@ -411,6 +422,8 @@ class FederationClient(FederationBase):
random.shuffle(srvs) random.shuffle(srvs)
return srvs return srvs
room_version = yield self.store.get_room_version(room_id)
batch_size = 20 batch_size = 20
missing_events = list(missing_events) missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size): for i in range(0, len(missing_events), batch_size):
@ -421,6 +434,7 @@ class FederationClient(FederationBase):
self.get_pdu, self.get_pdu,
destinations=random_server_list(), destinations=random_server_list(),
event_id=e_id, event_id=e_id,
room_version=room_version,
) )
for e_id in batch for e_id in batch
] ]
@ -450,8 +464,11 @@ class FederationClient(FederationBase):
for p in res["auth_chain"] for p in res["auth_chain"]
] ]
room_version = yield self.store.get_room_version(room_id)
signed_auth = yield self._check_sigs_and_hash_and_fetch( signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True destination, auth_chain,
outlier=True, room_version=room_version,
) )
signed_auth.sort(key=lambda e: e.depth) signed_auth.sort(key=lambda e: e.depth)
@ -650,9 +667,20 @@ class FederationClient(FederationBase):
for p in itertools.chain(state, auth_chain) for p in itertools.chain(state, auth_chain)
} }
room_version = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
room_version = e.content.get("room_version", RoomVersions.V1)
break
if room_version is None:
# We use this error has that is what
raise SynapseError(400, "No create event in state")
valid_pdus = yield self._check_sigs_and_hash_and_fetch( valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, list(pdus.values()), destination, list(pdus.values()),
outlier=True, outlier=True,
room_version=room_version,
) )
valid_pdus_map = { valid_pdus_map = {
@ -790,8 +818,10 @@ class FederationClient(FederationBase):
for e in content["auth_chain"] for e in content["auth_chain"]
] ]
room_version = yield self.store.get_room_version(room_id)
signed_auth = yield self._check_sigs_and_hash_and_fetch( signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True destination, auth_chain, outlier=True, room_version=room_version,
) )
signed_auth.sort(key=lambda e: e.depth) signed_auth.sort(key=lambda e: e.depth)
@ -838,8 +868,10 @@ class FederationClient(FederationBase):
for e in content.get("events", []) for e in content.get("events", [])
] ]
room_version = yield self.store.get_room_version(room_id)
signed_events = yield self._check_sigs_and_hash_and_fetch( signed_events = yield self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False destination, events, outlier=False, room_version=room_version,
) )
except HttpResponseException as e: except HttpResponseException as e:
if not e.code == 400: if not e.code == 400:

View file

@ -457,8 +457,10 @@ class FederationServer(FederationBase):
for e in content["auth_chain"] for e in content["auth_chain"]
] ]
room_version = yield self.store.get_room_version(room_id)
signed_auth = yield self._check_sigs_and_hash_and_fetch( signed_auth = yield self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True origin, auth_chain, outlier=True, room_version=room_version,
) )
ret = yield self.handler.on_query_auth( ret = yield self.handler.on_query_auth(

View file

@ -34,6 +34,7 @@ from synapse.api.constants import (
EventTypes, EventTypes,
Membership, Membership,
RejectedReason, RejectedReason,
RoomVersions,
) )
from synapse.api.errors import ( from synapse.api.errors import (
AuthError, AuthError,
@ -342,6 +343,8 @@ class FederationHandler(BaseHandler):
room_id, event_id, p, room_id, event_id, p,
) )
room_version = yield self.store.get_room_version(room_id)
with logcontext.nested_logging_context(p): with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or # note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped # auth events, the requests to fetch those events are deduped
@ -355,7 +358,7 @@ class FederationHandler(BaseHandler):
# we want the state *after* p; get_state_for_room returns the # we want the state *after* p; get_state_for_room returns the
# state *before* p. # state *before* p.
remote_event = yield self.federation_client.get_pdu( remote_event = yield self.federation_client.get_pdu(
[origin], p, outlier=True, [origin], p, room_version, outlier=True,
) )
if remote_event is None: if remote_event is None:
@ -379,7 +382,6 @@ class FederationHandler(BaseHandler):
for x in remote_state: for x in remote_state:
event_map[x.event_id] = x event_map[x.event_id] = x
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_store( state_map = yield resolve_events_with_store(
room_version, state_maps, event_map, room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store), state_res_store=StateResolutionStore(self.store),
@ -655,6 +657,8 @@ class FederationHandler(BaseHandler):
if dest == self.server_name: if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.") raise SynapseError(400, "Can't backfill from self.")
room_version = yield self.store.get_room_version(room_id)
events = yield self.federation_client.backfill( events = yield self.federation_client.backfill(
dest, dest,
room_id, room_id,
@ -748,6 +752,7 @@ class FederationHandler(BaseHandler):
self.federation_client.get_pdu, self.federation_client.get_pdu,
[dest], [dest],
event_id, event_id,
room_version=room_version,
outlier=True, outlier=True,
timeout=10000, timeout=10000,
) )
@ -1659,6 +1664,8 @@ class FederationHandler(BaseHandler):
create_event = e create_event = e
break break
room_version = create_event.content.get("room_version", RoomVersions.V1)
missing_auth_events = set() missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]): for e in itertools.chain(auth_events, state, [event]):
for e_id in e.auth_event_ids(): for e_id in e.auth_event_ids():
@ -1669,6 +1676,7 @@ class FederationHandler(BaseHandler):
m_ev = yield self.federation_client.get_pdu( m_ev = yield self.federation_client.get_pdu(
[origin], [origin],
e_id, e_id,
room_version=room_version,
outlier=True, outlier=True,
timeout=10000, timeout=10000,
) )