This commit is contained in:
Kegan Dougal 2024-01-30 16:56:43 +00:00
parent fab5948f5b
commit 8e95932e01

View file

@ -517,15 +517,19 @@ class DeviceHandler(DeviceWorkerHandler):
deltas_by_room.setdefault(delta.room_id, []).append(delta) deltas_by_room.setdefault(delta.room_id, []).append(delta)
for room_id, deltas_for_room in deltas_by_room.items(): for room_id, deltas_for_room in deltas_by_room.items():
newly_joined_local_users = await self._get_newly_joined_local_users(room_id, deltas_for_room) newly_joined_local_users = await self._get_newly_joined_local_users(
room_id, deltas_for_room
)
if not newly_joined_local_users: if not newly_joined_local_users:
continue continue
# if a local user newly joins a room, we want to broadcast their device lists to # if a local user newly joins a room, we want to broadcast their device lists to
# federated servers in that room, if we haven't already. # federated servers in that room, if we haven't already.
hosts = await self.store.get_current_hosts_in_room(room_id) hosts = await self.store.get_current_hosts_in_room(room_id)
# filter out ourselves # filter out ourselves
hosts = [h for h in hosts if not self.hs.is_mine_server_name(h)] other_hosts = [
if len(hosts) == 0: h for h in hosts if not self.hs.is_mine_server_name(h)
]
if len(other_hosts) == 0:
continue continue
# broadcast device lists for these users in the room # broadcast device lists for these users in the room
num_pokes = 0 num_pokes = 0
@ -539,29 +543,30 @@ class DeviceHandler(DeviceWorkerHandler):
user_id=user_id, user_id=user_id,
device_id=device_id, device_id=device_id,
room_id=room_id, room_id=room_id,
hosts=hosts, hosts=other_hosts,
context=None, context=None,
) )
logger.info( logger.info(
"Found %d hosts to send device list updates to for a new room join, " + "Found %d hosts to send device list updates to for a new room join, "
"added %s device_list_outbound_pokes", + "added %s device_list_outbound_pokes",
len(hosts), num_pokes, len(other_hosts),
num_pokes,
) )
# Notify things that device lists need to be sent out. # Notify things that device lists need to be sent out.
self.notifier.notify_replication() self.notifier.notify_replication()
await self.federation_sender.send_device_messages( await self.federation_sender.send_device_messages(
hosts, immediate=False other_hosts, immediate=False
) )
self._event_pos = max_pos self._event_pos = max_pos
# Expose current event processing position to prometheus # Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("device").set( synapse.metrics.event_processing_positions.labels("device").set(max_pos)
max_pos
)
async def _get_newly_joined_local_users(self, room_id: str, deltas: List[StateDelta]) -> Optional[Set[str]]: async def _get_newly_joined_local_users(
self, room_id: str, deltas: List[StateDelta]
) -> Optional[Set[str]]:
"""Process current state deltas for the room to find new joins that need """Process current state deltas for the room to find new joins that need
to be handled. to be handled.
""" """
@ -570,7 +575,10 @@ class DeviceHandler(DeviceWorkerHandler):
for delta in deltas: for delta in deltas:
assert room_id == delta.room_id assert room_id == delta.room_id
logger.debug( logger.debug(
"device.handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id "device.handling: %r %r, %s",
delta.event_type,
delta.state_key,
delta.event_id,
) )
# Drop any event that isn't a membership join # Drop any event that isn't a membership join
if delta.event_type != EventTypes.Member: if delta.event_type != EventTypes.Member:
@ -600,7 +608,7 @@ class DeviceHandler(DeviceWorkerHandler):
if not newly_joined_local_users: if not newly_joined_local_users:
# If nobody has joined then there's nothing to do. # If nobody has joined then there's nothing to do.
return return None
return newly_joined_local_users return newly_joined_local_users
def _check_device_name_length(self, name: Optional[str]) -> None: def _check_device_name_length(self, name: Optional[str]) -> None: