streams/Code/Daemon/Notifier.php

836 lines
34 KiB
PHP
Raw Normal View History

2021-12-03 03:01:39 +00:00
<?php
2016-05-20 02:42:45 +00:00
2022-02-16 04:08:28 +00:00
namespace Code\Daemon;
2016-05-20 02:42:45 +00:00
2022-12-17 21:43:34 +00:00
use Code\Lib\Config;
2022-12-11 19:16:17 +00:00
use Code\Lib\IConfig;
use Code\Lib\JcsEddsa2022;
2022-02-16 04:08:28 +00:00
use Code\Lib\Libzot;
2022-12-28 01:51:30 +00:00
use Code\Lib\ObjCache;
2022-02-16 04:08:28 +00:00
use Code\Lib\Queue;
use Code\Lib\Activity;
use Code\Lib\ActivityStreams;
use Code\Lib\ActivityPub;
use Code\Lib\Channel;
use Code\Extend\Hook;
2024-03-10 02:40:50 +00:00
use Code\Lib\Time;
2016-05-20 02:42:45 +00:00
require_once('include/html2plain.php');
require_once('include/conversation.php');
2017-04-19 00:26:50 +00:00
require_once('include/bbcode.php');
2016-05-20 02:42:45 +00:00
/*
* Notifier - message dispatch and preparation for delivery
2016-05-20 02:42:45 +00:00
*
* The basic flow is:
* Identify the type of message
* Collect any information that needs to be sent
* Convert it into a suitable generic format for sending
2021-12-03 03:01:39 +00:00
* Figure out who the recipients are and if we need to relay
2016-05-20 02:42:45 +00:00
* through a conversation owner
2021-12-03 03:01:39 +00:00
* Once we know what recipients are involved, collect a list of
2016-05-20 02:42:45 +00:00
* destination sites
* Build and store a queue item for each unique site and invoke
* a delivery process for each site or a small number of sites (1-3)
* and add a slight delay between each delivery invocation if desired (usually)
2021-12-03 03:01:39 +00:00
*
2016-05-20 02:42:45 +00:00
*/
/*
* The notifier is typically called with:
*
2022-02-16 04:08:28 +00:00
* Code\Daemon\Run::Summon( [ 'Notifier', COMMAND, ITEM_ID ] );
2016-05-20 02:42:45 +00:00
*
* where COMMAND is one of the following:
*
* activity (in diaspora.php, dfrn_confirm.php, profiles.php)
* comment-import (in diaspora.php, items.php)
* comment-new (in item.php)
* drop (in diaspora.php, items.php, photos.php)
* edit_post (in item.php)
* event (in events.php)
* expire (in items.php)
* like (in like.php, poke.php)
* mail (in message.php)
* tag (in photos.php, poke.php, tagger.php)
* tgroup (in items.php)
* wall-new (in photos.php, item.php)
*
* and ITEM_ID is the id of the item in the database that needs to be sent to others.
*
* Nomad
* permissions_create abook_id
* permissions_accept abook_id
* permissions_reject abook_id
* permissions_update abook_id
* refresh_all channel_id
* purge xchan_hash
* purge_all channel_id
* expire channel_id
* relay item_id (item was relayed to owner, we will deliver it as owner)
* single_activity item_id (deliver to a singleton network from the appropriate clone)
* single_mail mail_id (deliver to a singleton network from the appropriate clone)
* location channel_id
* request channel_id xchan_hash message_id
* keychange channel_id
2016-05-20 02:42:45 +00:00
*
*/
2022-11-27 09:15:28 +00:00
class Notifier implements DaemonInterface
2021-12-03 03:01:39 +00:00
{
public static $deliveries = [];
public static $recipients = [];
public static $env_recips = [];
public static $packet_type = 'activity';
public static $encoding = 'activitystreams';
public static $encoded_item = null;
public static $channel = null;
public static $private = false;
2022-11-27 09:15:28 +00:00
public function run(int $argc, array $argv): void
2021-12-03 03:01:39 +00:00
{
if ($argc < 3) {
return;
}
logger('notifier: invoked: ' . print_r($argv, true), LOGGER_DEBUG);
2021-12-03 03:01:39 +00:00
$cmd = $argv[1];
$item_id = $argv[2];
if (! $item_id) {
return;
}
self::$deliveries = [];
self::$recipients = [];
self::$env_recips = [];
self::$packet_type = 'activity';
self::$encoding = 'activitystreams';
self::$encoded_item = null;
self::$channel = null;
self::$private = false;
2022-01-25 01:26:12 +00:00
$sys = Channel::get_system();
2021-12-03 03:01:39 +00:00
$normal_mode = true;
if ($cmd === 'request') {
$xchan = $argv[3];
if ($argc < 5) {
return;
}
2022-01-25 01:26:12 +00:00
self::$channel = Channel::from_id($item_id);
2021-12-03 03:01:39 +00:00
self::$private = true;
self::$recipients[] = $xchan;
self::$packet_type = 'request';
self::$encoded_item = [ 'message_id' => $argv[4] ];
self::$encoding = 'zot';
$normal_mode = false;
} elseif ($cmd === 'keychange') {
2022-01-25 01:26:12 +00:00
self::$channel = Channel::from_id($item_id);
2021-12-03 03:01:39 +00:00
$r = q(
"select abook_xchan from abook where abook_channel = %d",
intval($item_id)
);
if ($r) {
foreach ($r as $rr) {
self::$recipients[] = $rr['abook_xchan'];
}
}
self::$private = false;
self::$packet_type = 'keychange';
self::$encoded_item = get_pconfig(self::$channel['channel_id'], 'system', 'keychange');
self::$encoding = 'zot';
$normal_mode = false;
} elseif (in_array($cmd, [ 'permissions_update', 'permissions_reject', 'permissions_accept', 'permissions_create' ])) {
// Get the (single) recipient
$r = q(
"select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0",
intval($item_id)
);
if ($r) {
$recip = array_shift($r);
$uid = $recip['abook_channel'];
// Get the sender
2022-01-25 01:26:12 +00:00
self::$channel = Channel::from_id($uid);
2021-12-03 03:01:39 +00:00
if (self::$channel) {
$perm_update = [ 'sender' => self::$channel, 'recipient' => $recip, 'success' => false, 'deliveries' => '' ];
switch ($cmd) {
case 'permissions_create':
ActivityPub::permissions_create($perm_update);
break;
case 'permissions_accept':
ActivityPub::permissions_accept($perm_update);
break;
case 'permissions_update':
ActivityPub::permissions_update($perm_update);
break;
default:
break;
}
if (! $perm_update['success']) {
Hook::call($cmd, $perm_update);
2021-12-03 03:01:39 +00:00
}
if ($perm_update['success']) {
if ($perm_update['deliveries']) {
self::$deliveries[] = $perm_update['deliveries'];
do_delivery(self::$deliveries);
self::$deliveries = [];
2021-12-03 03:01:39 +00:00
}
return;
} else {
self::$recipients[] = $recip['abook_xchan'];
self::$private = false;
self::$packet_type = 'refresh';
self::$env_recips = [ $recip['xchan_hash'] ];
}
}
}
} elseif ($cmd === 'refresh_all') {
logger('notifier: refresh_all: ' . $item_id);
2022-01-25 01:26:12 +00:00
self::$channel = Channel::from_id($item_id, true);
$perm_update = [ 'sender' => self::$channel, 'success' => false, 'deliveries' => '' ];
ActivityPub::refresh_all($perm_update);
if (! $perm_update['success']) {
Hook::call($cmd, $perm_update);
}
if ($perm_update['success']) {
if ($perm_update['deliveries']) {
self::$deliveries[] = $perm_update['deliveries'];
do_delivery(self::$deliveries);
self::$deliveries = [];
}
}
2021-12-03 03:01:39 +00:00
$r = q(
"select abook_xchan from abook left join xchan on abook_xchan = xchan_hash
where abook_channel = %d and xchan_network != 'activitypub'",
2021-12-03 03:01:39 +00:00
intval($item_id)
);
2021-12-03 03:01:39 +00:00
if ($r) {
foreach ($r as $rr) {
self::$recipients[] = $rr['abook_xchan'];
}
}
2023-05-06 10:39:39 +00:00
2021-12-03 03:01:39 +00:00
self::$recipients[] = self::$channel['channel_hash'];
self::$private = false;
self::$packet_type = 'refresh';
} elseif ($cmd === 'purge') {
$xchan = $argv[3];
logger('notifier: purge: ' . $item_id . ' => ' . $xchan);
if (! $xchan) {
return;
}
2022-01-25 01:26:12 +00:00
self::$channel = Channel::from_id($item_id, true);
2021-12-03 03:01:39 +00:00
self::$recipients = [ $xchan ];
self::$private = true;
self::$packet_type = 'purge';
} elseif ($cmd === 'purge_all') {
logger('notifier: purge_all: ' . $item_id);
2022-01-25 01:26:12 +00:00
self::$channel = Channel::from_id($item_id, true);
2021-12-03 03:01:39 +00:00
self::$recipients = [];
$r = q(
"select abook_xchan from abook where abook_channel = %d and abook_self = 0",
intval($item_id)
);
if (! $r) {
return;
}
foreach ($r as $rr) {
self::$recipients[] = $rr['abook_xchan'];
}
self::$private = false;
self::$packet_type = 'purge';
} else {
// Normal items
// Fetch the target item
$r = q(
"SELECT * FROM item WHERE id = %d and parent != 0 LIMIT 1",
intval($item_id)
);
if (! $r) {
return;
}
xchan_query($r);
$r = fetch_post_tags($r);
$target_item = array_shift($r);
if ($target_item['author']['xchan_network'] === 'anon') {
logger('notifier: target item author is not a fetchable actor', LOGGER_DEBUG);
return;
}
if (intval($target_item['item_deleted'])) {
2022-10-18 19:23:50 +00:00
logger('target item ITEM_DELETED', LOGGER_DEBUG);
2021-12-03 03:01:39 +00:00
}
if (! in_array(intval($target_item['item_type']), [ ITEM_TYPE_POST, ITEM_TYPE_MAIL ])) {
if (intval($target_item['item_type'] == ITEM_TYPE_CUSTOM)) {
$hookinfo=[
'targetitem' => $target_item,
'deliver' => false
];
Hook::call('customitem_deliver', $hookinfo);
2021-12-03 03:01:39 +00:00
}
if (! $hookinfo['deliver']) {
logger('notifier: target item not forwardable: type ' . $target_item['item_type'], LOGGER_DEBUG);
return;
}
}
// Check for non published items, but allow an exclusion for transmitting hidden file activities
if (intval($target_item['item_unpublished']) || intval($target_item['item_delayed']) ||
intval($target_item['item_blocked']) ||
( intval($target_item['item_hidden']) && ($target_item['obj_type'] !== ACTIVITY_OBJ_FILE))) {
logger('notifier: target item not published, so not forwardable', LOGGER_DEBUG);
return;
}
if (in_array($target_item['verb'], [ ACTIVITY_FOLLOW, ACTIVITY_IGNORE ])) {
logger('not fowarding follow|unfollow->note activity');
return;
}
$s = q(
"select * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1",
intval($target_item['uid'])
);
if ($s) {
self::$channel = array_shift($s);
}
if (self::$channel['channel_hash'] !== $target_item['author_xchan'] && self::$channel['channel_hash'] !== $target_item['owner_xchan']) {
logger("notifier: Sending channel is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING);
return;
}
$thread_is_public = false;
$question = false;
2023-11-08 21:36:24 +00:00
if ($target_item['verb'] === 'Announce') {
// Provide correct representation across the wire. Internally this is treated as a comment.
$target_item['parent_mid'] = $target_item['thr_parent'] = $target_item['mid'];
}
2021-12-03 03:01:39 +00:00
if ($target_item['mid'] === $target_item['parent_mid']) {
$parent_item = $target_item;
$top_level_post = true;
} else {
// fetch the parent item
$r = q(
"SELECT * from item where id = %d order by id asc",
intval($target_item['parent'])
);
if (! $r) {
return;
}
xchan_query($r);
$r = fetch_post_tags($r);
$parent_item = array_shift($r);
$top_level_post = false;
$thread_is_public = !intval($parent_item['item_private']);
$question = $parent_item['verb'] === 'Question';
2021-12-03 03:01:39 +00:00
}
// avoid looping of discover items 12/4/2014
2024-02-18 02:13:10 +00:00
if ($sys && (int)$parent_item['uid'] === (int)$sys['channel_id']) {
2021-12-03 03:01:39 +00:00
return;
}
2024-01-26 01:11:36 +00:00
$m = ($cmd === 'edit_post') ? '' : ObjCache::Get($target_item['mid']);
2023-12-19 10:08:08 +00:00
2021-12-03 03:01:39 +00:00
// Re-use existing signature unless the activity type changed to a Tombstone, which won't verify.
if ($m && (! intval($target_item['item_deleted']))) {
self::$encoded_item = json_decode($m, true);
2022-12-28 01:51:30 +00:00
2021-12-03 03:01:39 +00:00
} else {
2023-12-19 10:08:08 +00:00
self::$encoded_item = array_merge(Activity::ap_context(), Activity::encode_activity($target_item, true));
self::$encoded_item['proof'] = (new JcsEddsa2022)->sign(self::$encoded_item, self::$channel);
2021-12-03 03:01:39 +00:00
}
logger('target_item: ' . print_r($target_item, true), LOGGER_DEBUG);
logger('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG);
// Send comments to the owner to re-deliver to everybody in the conversation
// We only do this if the item in question originated on this site. This prevents looping.
// To clarify, a site accepting a new comment is responsible for sending it to the owner for relay.
// Relaying should never be initiated on a post that arrived from elsewhere.
// We should normally be able to rely on ITEM_ORIGIN, but start_delivery_chain() incorrectly set this
// flag on comments for an extended period. So we'll also call comment_local_origin() which looks at
// the hostname in the message_id and provides a second (fallback) opinion.
$relay_to_owner = (! $top_level_post) && (intval($target_item['item_origin'])) && comment_local_origin($target_item) && $cmd !== 'hyper';
2024-01-29 20:59:07 +00:00
if (self::$channel['channel_hash'] === $target_item['owner_xchan']) {
$relay_to_owner = false;
}
2021-12-03 03:01:39 +00:00
$uplink = false;
// $cmd === 'relay' indicates the owner is sending it to the original recipients
// don't allow the item in the relay command to relay to owner under any circumstances, it will loop
logger('notifier: relay_to_owner: ' . (($relay_to_owner) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG);
logger('notifier: top_level_post: ' . (($top_level_post) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG);
// tag_deliver'd post which needs to be sent back to the original author
if (($cmd === 'uplink') && intval($parent_item['item_uplink']) && (! $top_level_post)) {
logger('notifier: uplink');
$uplink = true;
self::$packet_type = 'response';
}
if (($relay_to_owner || $uplink) && ($cmd !== 'relay')) {
logger('followup relay (upstream delivery)', LOGGER_DEBUG);
2024-02-28 09:24:43 +00:00
$sendto = ($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan'];
self::$recipients = [$sendto];
// over-ride upstream recipients if 'replyTo' was set in the parent.
if ($parent_item['replyto'] && (!$uplink)) {
logger('replyto: over-riding owner ' . $sendto, LOGGER_DEBUG);
// unserialise is a no-op if presented with data that wasn't serialised.
$ptr = unserialise($parent_item['replyto']);
if (is_string($ptr)) {
if (ActivityStreams::is_url($sendto)) {
$sendto = $ptr;
self::$recipients = [$sendto];
}
} elseif (is_array($ptr)) {
$sendto = [];
foreach ($ptr as $rto) {
if (is_string($rto)) {
$sendto[] = $rto;
} elseif (is_array($rto) && isset($rto['id'])) {
$sendto[] = $rto['id'];
2021-12-03 03:01:39 +00:00
}
}
2024-02-28 09:24:43 +00:00
self::$recipients = $sendto;
2021-12-03 03:01:39 +00:00
}
}
logger('replyto: upstream recipients ' . print_r($sendto, true), LOGGER_DEBUG);
self::$private = true;
$upstream = true;
self::$packet_type = 'response';
2022-12-11 19:16:17 +00:00
$is_moderated = their_perms_contains($parent_item['uid'], (is_array($sendto) ? $sendto[0] : $sendto), 'moderated');
if ($relay_to_owner && $thread_is_public && (! $is_moderated) && (! $question) && (! Channel::is_group($parent_item['uid']))) {
2021-12-03 03:01:39 +00:00
if (get_pconfig($target_item['uid'], 'system', 'hyperdrive', true)) {
Run::Summon([ 'Notifier' , 'hyper', $item_id ]);
}
}
2022-12-11 19:16:17 +00:00
}
else {
2021-12-03 03:01:39 +00:00
if ($cmd === 'relay') {
logger('owner relay (downstream delivery)');
} else {
logger('normal (downstream) distribution', LOGGER_DEBUG);
}
$upstream = false;
if ($parent_item && $parent_item['item_private'] !== $target_item['item_private']) {
logger('parent_item: ' . $parent_item['id'] . ' item_private: ' . $parent_item['item_private']);
logger('target_item: ' . $target_item['id'] . ' item_private: ' . $target_item['item_private']);
logger('conversation privacy mismatch - downstream delivery prevented');
return;
}
// if our parent is a tag_delivery recipient, uplink to the original author causing
// a delivery fork.
if (($parent_item) && intval($parent_item['item_uplink']) && (! $top_level_post) && ($cmd !== 'uplink')) {
// don't uplink a relayed post to the relay owner
if ($parent_item['source_xchan'] !== $parent_item['owner_xchan']) {
logger('notifier: uplinking this item');
Run::Summon([ 'Notifier','uplink',$item_id ]);
}
}
2022-12-11 19:16:17 +00:00
if ($thread_is_public && $cmd === 'hyper') {
// Add hyperdrive (friend-of-friend recipients for public activities.
// Don't add Hubzilla (zot6) connections, since that software doesn't support
// hyperdrive and this would just clutter the airwaves with rejected deliveries.
2021-12-03 03:01:39 +00:00
self::$recipients = [];
$r = q(
"select abook_xchan, xchan_network from abook left join xchan on abook_xchan = xchan_hash
where abook_channel = %d and abook_self = 0 and abook_pending = 0 and abook_archived = 0
and xchan_network != 'zot6'
and not abook_xchan in ( '%s', '%s', '%s' ) ",
2021-12-03 03:01:39 +00:00
intval($target_item['uid']),
dbesc($target_item['author_xchan']),
dbesc($target_item['owner_xchan']),
dbesc($target_item['source_xchan'])
);
if ($r) {
foreach ($r as $rv) {
self::$recipients[] = $rv['abook_xchan'];
}
}
2022-12-11 19:16:17 +00:00
}
else {
2021-12-03 03:01:39 +00:00
self::$private = false;
self::$recipients = collect_recipients($parent_item, self::$private);
}
// @FIXME add any additional recipients such as mentions, etc.
2022-10-06 18:41:46 +00:00
if ($top_level_post && intval($target_item['item_wall'])) {
2021-12-03 03:01:39 +00:00
// remove clones who will receive the post via sync
self::$recipients = array_values(array_diff(self::$recipients, [ $target_item['owner_xchan'] ]));
}
// don't send deletions onward for other people's stuff
if (intval($target_item['item_deleted']) && (! intval($target_item['item_wall']))) {
logger('notifier: ignoring delete notification for non-wall item', LOGGER_NORMAL, LOG_NOTICE);
return;
}
}
}
// Generic delivery section, we have an encoded item and recipients
// Now start the delivery process
logger('encoded item: ' . print_r(self::$encoded_item, true), LOGGER_DATA, LOG_DEBUG);
2016-05-20 02:42:45 +00:00
// This addresses an issue that crossposting addons weren't being called if the sender had no friends
// and only wanted to crosspost.
$crossposting = isset($target_item['postopts']) && $target_item['postopts'];
stringify_array_elms(self::$recipients);
if ( (! self::$recipients && ! $crossposting)) {
logger('no recipients');
2016-05-20 02:42:45 +00:00
return;
}
2016-05-20 02:42:45 +00:00
2021-12-03 03:01:39 +00:00
// logger('recipients: ' . print_r(self::$recipients,true), LOGGER_NORMAL, LOG_DEBUG);
2016-05-20 02:42:45 +00:00
2021-12-03 03:01:39 +00:00
if (! count(self::$env_recips)) {
self::$env_recips = ((self::$private) ? [] : null);
}
2016-05-20 02:42:45 +00:00
2021-12-03 03:01:39 +00:00
$recip_list = [];
2016-05-20 02:42:45 +00:00
if (self::$recipients) {
2021-12-03 20:52:41 +00:00
$details = q("select xchan_hash, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan
where xchan_hash in (" . protect_sprintf(implode(',', self::$recipients)) . ")");
}
else {
$details = [];
}
2016-05-20 02:42:45 +00:00
2021-12-03 03:01:39 +00:00
if ($details) {
foreach ($details as $d) {
$recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')';
if (self::$private) {
self::$env_recips[] = $d['xchan_hash'];
}
}
}
$narr = [
'channel' => self::$channel,
'upstream' => $upstream,
'env_recips' => self::$env_recips,
'recipients' => self::$recipients,
'target_item' => $target_item,
'parent_item' => $parent_item,
'top_level_post' => $top_level_post,
'private' => self::$private,
'relay_to_owner' => $relay_to_owner,
'uplink' => $uplink,
'cmd' => $cmd,
'single' => $cmd === 'single_activity',
2021-12-03 03:01:39 +00:00
'normal_mode' => $normal_mode,
'packet_type' => self::$packet_type,
'queued' => []
];
Hook::call('notifier_process', $narr);
2021-12-03 03:01:39 +00:00
if ($narr['queued']) {
foreach ($narr['queued'] as $pq) {
self::$deliveries[] = $pq;
}
}
// notifier_process can alter the recipient list
self::$recipients = $narr['recipients'];
self::$env_recips = $narr['env_recips'];
if ((self::$private) && (! self::$env_recips)) {
// shouldn't happen
logger('private message with no envelope recipients.' . print_r($argv, true), LOGGER_NORMAL, LOG_NOTICE);
return;
}
logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list, true), LOGGER_DEBUG);
// Now we have collected recipients (except for external mentions, @FIXME)
// Let's reduce this to a set of hubs; checking that the site is not dead.
2016-05-20 02:42:45 +00:00
if (self::$recipients) {
2021-12-03 20:52:41 +00:00
$hubs = q("select hubloc.*, site.site_crypto, site.site_flags from hubloc left join site on site_url = hubloc_url
where hubloc_hash in (" . protect_sprintf(implode(',', self::$recipients)) . ")
and hubloc_error = 0 and hubloc_deleted = 0 ");
}
else {
$hubs = [];
}
2021-12-03 03:01:39 +00:00
// public posts won't make it to the local public stream unless there's a recipient on this site.
// This code block sees if it's a public post and localhost is missing, and if so adds an entry for the local sys channel to the $hubs list
if (! self::$private) {
$found_localhost = false;
if ($hubs) {
foreach ($hubs as $h) {
if ($h['hubloc_url'] === z_root()) {
$found_localhost = true;
break;
}
}
}
if (! $found_localhost) {
$localhub = q(
"select hubloc.*, site.site_crypto, site.site_flags, site.site_dead from hubloc
2020-10-13 02:56:07 +00:00
left join site on site_url = hubloc_url where hubloc_id_url = '%s' and hubloc_error = 0 and hubloc_deleted = 0 ",
2021-12-03 03:01:39 +00:00
dbesc(z_root() . '/channel/sys')
);
if ($localhub) {
$hubs = array_merge($hubs, $localhub);
}
}
}
if (! $hubs) {
logger('notifier: no hubs', LOGGER_NORMAL, LOG_NOTICE);
return;
}
/**
* Reduce the hubs to those that are unique. For zot hubs, we need to verify uniqueness by the sitekey,
* since it may have been a re-install which has not yet been detected and pruned.
* For other networks which don't have or require sitekeys, we'll have to use the URL
*/
$hublist = []; // this provides an easily printable list for the logs
$dhubs = []; // delivery hubs where we store our resulting unique array
$keys = []; // array of keys to check uniquness for zot hubs
$urls = []; // array of urls to check uniqueness of hubs from other networks
$hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all
$dead = []; // known dead hubs - report them as undeliverable
foreach ($hubs as $hub) {
2022-10-18 19:23:50 +00:00
if (!empty($hub['site_dead'])) {
2021-12-03 03:01:39 +00:00
$dead[] = $hub;
continue;
}
if (self::$env_recips) {
foreach (self::$env_recips as $er) {
if ($hub['hubloc_hash'] === $er) {
if (! array_key_exists($hub['hubloc_site_id'], $hub_env)) {
$hub_env[$hub['hubloc_site_id']] = [];
}
$hub_env[$hub['hubloc_site_id']][] = $er;
}
}
}
if (in_array($hub['hubloc_network'],['nomad','zot6'])) {
2021-12-03 03:01:39 +00:00
if (! in_array($hub['hubloc_sitekey'], $keys)) {
$hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network'];
$dhubs[] = $hub;
$keys[] = $hub['hubloc_sitekey'];
}
} else {
if (! in_array($hub['hubloc_url'], $urls)) {
$hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network'];
$dhubs[] = $hub;
$urls[] = $hub['hubloc_url'];
}
}
}
logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist, true), LOGGER_DEBUG, LOG_DEBUG);
foreach ($dhubs as $hub) {
logger('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG, LOG_DEBUG);
// deliver to any non-zot networks
if (! in_array($hub['hubloc_network'], ['zot6', 'nomad' ])) {
2021-12-03 03:01:39 +00:00
$narr = [
'channel' => self::$channel,
'upstream' => $upstream,
'env_recips' => self::$env_recips,
'recipients' => self::$recipients,
'target_item' => $target_item,
'parent_item' => $parent_item,
'hub' => $hub,
'top_level_post' => $top_level_post,
'private' => self::$private,
'relay_to_owner' => $relay_to_owner,
'uplink' => $uplink,
'cmd' => $cmd,
'single' => $cmd === 'single_activity',
2021-12-03 03:01:39 +00:00
'normal_mode' => $normal_mode,
'packet_type' => self::$packet_type,
'queued' => []
];
ActivityPub::notifier_process($narr);
Hook::call('notifier_hub', $narr);
2021-12-03 03:01:39 +00:00
if ($narr['queued']) {
foreach ($narr['queued'] as $pq) {
self::$deliveries[] = $pq;
}
}
continue;
}
2016-05-20 02:42:45 +00:00
// Single deliveries are for non-nomadic federated networks and we're essentially
2021-12-03 03:01:39 +00:00
// delivering only to those that have this site url in their abook_instance
// and only from within a sync operation. This means if you post from a clone,
// and a connection is connected to one of your other clones; assuming that hub
// is running it will receive a sync packet. On receipt of this sync packet it
// will invoke a delivery to those connections which are connected to just that
// hub instance.
if ($cmd === 'single_activity') {
continue;
}
// default: zot or nomad protocol
2021-12-03 03:01:39 +00:00
$hash = new_uuid();
$env = (($hub_env && $hub_env[$hub['hubloc_site_id']]) ? $hub_env[$hub['hubloc_site_id']] : '');
if ((self::$private) && (! $env)) {
continue;
}
// @FIXME do not send Add/Collection activities to Hubzilla until they support them.
if (in_array($target_item['verb'], ['Add', 'Remove']) && $hub['hubloc_network'] === 'zot6') {
continue;
}
2021-12-03 03:01:39 +00:00
$packet = Libzot::build_packet(
self::$channel,
self::$packet_type,
$env,
self::$encoded_item,
self::$encoding,
((self::$private) ? $hub['hubloc_sitekey'] : null),
$hub['site_crypto']
);
Queue::insert(
[
'hash' => $hash,
'account_id' => self::$channel['channel_account_id'],
'channel_id' => self::$channel['channel_id'],
'mid' => $target_item['mid'] ?? '',
2021-12-03 03:01:39 +00:00
'posturl' => $hub['hubloc_callback'],
'driver' => $hub['hubloc_network'],
2021-12-03 03:01:39 +00:00
'notify' => $packet,
'msg' => EMPTY_STR
]
);
// only create delivery reports for normal undeleted items
if (is_array($target_item) && (! $target_item['item_deleted']) && (! get_config('system', 'disable_dreport'))) {
q(
"insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue, dreport_log )
2021-07-27 12:26:47 +00:00
values ( '%s', '%s','%s','%s','%s','%s','%s','%s', '%s' ) ",
2021-12-03 03:01:39 +00:00
dbesc($target_item['mid']),
dbesc($hub['hubloc_host']),
dbesc($hub['hubloc_host']),
dbesc($hub['hubloc_host']),
dbesc('queued'),
2024-03-10 02:40:50 +00:00
dbesc(Time::convert()),
2021-12-03 03:01:39 +00:00
dbesc(self::$channel['channel_hash']),
dbesc($hash),
dbesc(EMPTY_STR)
);
}
self::$deliveries[] = $hash;
}
if ($normal_mode) {
// This wastes a process if there are no delivery hooks configured, so check this before launching the new process
$x = q("select * from hook where hook = 'notifier_normal'");
if ($x) {
Run::Summon([ 'Deliver_hooks', $target_item['id'] ]);
}
}
if (self::$deliveries) {
do_delivery(self::$deliveries);
}
if ($dead) {
foreach ($dead as $deceased) {
if (is_array($target_item) && (! $target_item['item_deleted']) && (! get_config('system', 'disable_dreport'))) {
q(
"insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue, dreport_log )
2021-07-27 12:26:47 +00:00
values ( '%s', '%s','%s','%s','%s','%s','%s','%s','%s' ) ",
2021-12-03 03:01:39 +00:00
dbesc($target_item['mid']),
dbesc($deceased['hubloc_host']),
dbesc($deceased['hubloc_host']),
dbesc($deceased['hubloc_host']),
dbesc('undeliverable/unresponsive site'),
2024-03-10 02:40:50 +00:00
dbesc(Time::convert()),
2021-12-03 03:01:39 +00:00
dbesc(self::$channel['channel_hash']),
dbesc(new_uuid()),
dbesc(EMPTY_STR)
);
}
}
}
2020-10-13 02:56:07 +00:00
Hook::call('notifier_end', $target_item);
2020-10-13 02:56:07 +00:00
2021-12-03 03:01:39 +00:00
logger('notifer: complete.');
return;
}
2016-05-20 02:42:45 +00:00
}