Reduce federation presence replication traffic

This is mainly done by moving the calculation of where to send presence
updates from the presence handler to the transaction queue, so we only
need to send the presence event (and not the destinations) across the
replication connection. Before we were duplicating by sending the full
state across once per destination.
This commit is contained in:
Erik Johnston 2017-04-10 16:48:30 +01:00
parent 2e6f5a4910
commit 29574fd5b3
6 changed files with 139 additions and 80 deletions

View file

@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@ -80,6 +81,17 @@ class FederationSenderSlaveStore(
return rows[0][0] if rows else -1
# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
# This is fine since in practice nobody uses the presence list stuff...
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]
class FederationSenderServer(HomeServer):
def get_db_conn(self, run_new_connection=True):

View file

@ -206,10 +206,8 @@ class SynchrotronPresence(object):
@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
parties = yield self._get_interested_parties(
states, calculate_remote_hosts=False
)
room_ids_to_states, users_to_states, _ = parties
parties = yield self._get_interested_parties(states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),

View file

@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.presence_map = {}
self.presence_changed = sorteddict()
@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
user_id for uids in self.presence_changed.values() for _, user_id in uids
user_id
for uids in self.presence_changed.itervalues()
for user_id in uids
)
to_del = [
@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_presence(self, destination, states):
def send_presence(self, states):
"""As per TransactionQueue"""
pos = self._next_pos()
self.presence_map.update({
state.user_id: state
for state in states
})
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
self.presence_changed[pos] = [
(destination, state.user_id) for state in states
]
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object):
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
dest_user_ids = set(
(pos, dest_user_id)
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for dest_user_id in self.presence_changed[pos]
)
for user_id in self.presence_changed[pos]
]
for (key, (dest, user_id)) in dest_user_ids:
for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
destination=dest,
state=self.presence_map[user_id],
)))
@ -354,7 +352,6 @@ class BaseFederationRow(object):
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
"destination", # str
"state", # UserPresenceState
))):
TypeId = "p"
@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
@staticmethod
def from_data(data):
return PresenceRow(
destination=data["destination"],
state=UserPresenceState.from_dict(data["state"])
state=UserPresenceState.from_dict(data)
)
def to_data(self):
return {
"destination": self.destination,
"state": self.state.as_dict()
}
return self.state.as_dict()
def add_to_buffer(self, buff):
buff.presence.setdefault(self.destination, []).append(self.state)
buff.presence.append(self.state)
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@ -469,7 +462,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # dict of destination -> [UserPresenceState]
"presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
presence={},
presence=[],
keyed_edus={},
edus={},
failures={},
@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows):
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
for destination, states in buff.presence.iteritems():
transaction_queue.send_presence(destination, states)
if buff.presence:
transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():

View file

@ -21,7 +21,7 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
@ -78,6 +78,7 @@ class TransactionQueue(object):
self.pending_edus_by_dest = edus = {}
# Presence needs to be separate as we send single aggragate EDUs
self.pending_presence = {}
self.pending_presence_by_dest = presence = {}
self.pending_edus_keyed_by_dest = edus_keyed = {}
@ -113,6 +114,8 @@ class TransactionQueue(object):
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
def can_send_to(self, destination):
"""Can we send messages to the given server?
@ -224,17 +227,95 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
def send_presence(self, destination, states):
if not self.can_send_to(destination):
@preserve_fn
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
Args:
states (list(UserPresenceState))
"""
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
self.pending_presence.update({state.user_id: state for state in states})
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self.pending_presence_by_dest.setdefault(destination, {}).update({
state.user_id: state for state in states
})
self._processing_pending_presence = True
try:
while True:
states = self.pending_presence
self.pending_presence = {}
preserve_context_over_fn(
self._attempt_new_transaction, destination
)
if not states:
break
yield self._process_presence_inner(states)
finally:
self._processing_pending_presence = False
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
# First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote
# hosts in those rooms.
room_ids_to_states = {}
users_to_states = {}
for state in states.itervalues():
room_ids = yield self.store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)
plist = yield self.store.get_presence_list_observers_accepted(
state.user_id,
)
for u in plist:
users_to_states.setdefault(u, []).append(state)
hosts_and_states = []
for room_id, states in room_ids_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
hosts = yield self.store.get_hosts_in_room(room_id)
hosts_and_states.append((hosts, local_states))
for user_id, states in users_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
host = get_domain_from_id(user_id)
hosts_and_states.append(([host], local_states))
# And now finally queue up new transactions
for destinations, states in hosts_and_states:
for destination in destinations:
if not self.can_send_to(destination):
continue
self.pending_presence_by_dest.setdefault(
destination, {}
).update({
state.user_id: state for state in states
})
preserve_fn(self._attempt_new_transaction)(destination)
def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(

View file

@ -318,11 +318,7 @@ class PresenceHandler(object):
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
_, _, hosts_to_states = yield self._get_interested_parties(
to_federation_ping.values()
)
self._push_to_remotes(hosts_to_states)
self._push_to_remotes(to_federation_ping.values())
def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@ -615,12 +611,12 @@ class PresenceHandler(object):
defer.returnValue(states)
@defer.inlineCallbacks
def _get_interested_parties(self, states, calculate_remote_hosts=True):
def _get_interested_parties(self, states):
"""Given a list of states return which entities (rooms, users, servers)
are interested in the given states.
Returns:
3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
2-tuple: `(room_ids_to_states, users_to_states)`,
with each item being a dict of `entity_name` -> `[UserPresenceState]`
"""
room_ids_to_states = {}
@ -637,30 +633,10 @@ class PresenceHandler(object):
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
hosts_to_states = {}
if calculate_remote_hosts:
for room_id, states in room_ids_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
hosts = yield self.store.get_hosts_in_room(room_id)
for host in hosts:
hosts_to_states.setdefault(host, []).extend(local_states)
for user_id, states in users_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
host = get_domain_from_id(user_id)
hosts_to_states.setdefault(host, []).extend(local_states)
# TODO: de-dup hosts_to_states, as a single host might have multiple
# of same presence
defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
defer.returnValue((room_ids_to_states, users_to_states))
@defer.inlineCallbacks
def _persist_and_notify(self, states):
@ -670,33 +646,32 @@ class PresenceHandler(object):
stream_id, max_token = yield self.store.update_presence(states)
parties = yield self._get_interested_parties(states)
room_ids_to_states, users_to_states, hosts_to_states = parties
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states.keys()]
users=[UserID.from_string(u) for u in users_to_states]
)
self._push_to_remotes(hosts_to_states)
self._push_to_remotes(states)
@defer.inlineCallbacks
def notify_for_states(self, state, stream_id):
parties = yield self._get_interested_parties([state])
room_ids_to_states, users_to_states, hosts_to_states = parties
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states.keys()]
users=[UserID.from_string(u) for u in users_to_states]
)
def _push_to_remotes(self, hosts_to_states):
def _push_to_remotes(self, states):
"""Sends state updates to remote servers.
Args:
hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
hosts_to_states (list): list(state)
"""
for host, states in hosts_to_states.items():
self.federation.send_presence(host, states)
self.federation.send_presence(states)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@ -837,14 +812,13 @@ class PresenceHandler(object):
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
hosts = set(get_domain_from_id(u) for u in user_ids)
self._push_to_remotes({host: (state,) for host in hosts})
self._push_to_remotes([state])
else:
user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
self._push_to_remotes({user.domain: states.values()})
self._push_to_remotes(states.values())
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):

View file

@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore):
# to reach inside the __dict__ to extract them.
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
get_users_who_share_room_with_user = (
RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
)