$argv[4] ]; self::$encoding = 'zot'; $normal_mode = false; } elseif($cmd === 'keychange') { self::$channel = channelx_by_n($item_id); $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); if($r) { foreach($r as $rr) { self::$recipients[] = $rr['abook_xchan']; } } self::$private = false; self::$packet_type = 'keychange'; self::$encoded_item = get_pconfig(self::$channel['channel_id'],'system','keychange'); self::$encoding = 'zot'; $normal_mode = false; } elseif(in_array($cmd, [ 'permissions_update', 'permissions_reject', 'permissions_accept', 'permissions_create' ])) { // Get the (single) recipient $r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0", intval($item_id) ); if($r) { $uid = $r[0]['abook_channel']; // Get the sender self::$channel = channelx_by_n($uid); if(self::$channel) { $perm_update = array('sender' => self::$channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); switch($cmd) { case 'permissions_create': self::activitypub_permissions_create($perm_update); break; case 'permissions_accept': self::activitypub_permissions_accept($perm_update); break; default: break; } if(! $perm_update['success']) { call_hooks($cmd,$perm_update); } if($perm_update['success']) { if($perm_update['deliveries']) { self::$deliveries[] = $perm_update['deliveries']; do_delivery(self::$deliveries); } return; } else { self::$recipients[] = $r[0]['abook_xchan']; self::$private = false; self::$packet_type = 'refresh'; self::$env_recips = [ $r[0]['xchan_hash'] ]; } } } } elseif($cmd === 'refresh_all') { logger('notifier: refresh_all: ' . $item_id); self::$channel = channelx_by_n($item_id); $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); if($r) { foreach($r as $rr) { self::$recipients[] = $rr['abook_xchan']; } } self::$private = false; self::$packet_type = 'refresh'; } elseif($cmd === 'purge_all') { logger('notifier: purge_all: ' . $item_id); $s = q("select * from channel where channel_id = %d limit 1", intval($item_id) ); if($s) self::$channel = $s[0]; self::$recipients = array(); $r = q("select abook_xchan from abook where abook_channel = %d and abook_self = 0", intval($item_id) ); if($r) { foreach($r as $rr) { self::$recipients[] = $rr['abook_xchan']; } } self::$private = false; self::$packet_type = 'purge'; } else { // Normal items // Fetch the target item $r = q("SELECT * FROM item WHERE id = %d and parent != 0 LIMIT 1", intval($item_id) ); if(! $r) return; xchan_query($r); $r = fetch_post_tags($r); $target_item = $r[0]; $deleted_item = false; if(intval($target_item['item_deleted'])) { logger('notifier: target item ITEM_DELETED', LOGGER_DEBUG); $deleted_item = true; } if(! in_array(intval($target_item['item_type']), [ ITEM_TYPE_POST, ITEM_TYPE_MAIL ] )) { logger('notifier: target item not forwardable: type ' . $target_item['item_type'], LOGGER_DEBUG); return; } // Check for non published items, but allow an exclusion for transmitting hidden file activities if(intval($target_item['item_unpublished']) || intval($target_item['item_delayed']) || intval($target_item['item_blocked']) || ( intval($target_item['item_hidden']) && ($target_item['obj_type'] !== ACTIVITY_OBJ_FILE))) { logger('notifier: target item not published, so not forwardable', LOGGER_DEBUG); return; } if(strpos($target_item['postopts'],'nodeliver') !== false) { logger('notifier: target item is undeliverable', LOGGER_DEBUG); return; } $s = q("select * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1", intval($target_item['uid']) ); if($s) self::$channel = $s[0]; if(self::$channel['channel_hash'] !== $target_item['author_xchan'] && self::$channel['channel_hash'] !== $target_item['owner_xchan']) { logger("notifier: Sending channel is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); return; } if($target_item['mid'] === $target_item['parent_mid']) { $parent_item = $target_item; $top_level_post = true; } else { // fetch the parent item $r = q("SELECT * from item where id = %d order by id asc", intval($target_item['parent']) ); if(! $r) return; xchan_query($r); $r = fetch_post_tags($r); $parent_item = $r[0]; $top_level_post = false; } // avoid looping of discover items 12/4/2014 if($sys && $parent_item['uid'] == $sys['channel_id']) return; $m = get_iconfig($target_item,'activitystreams','signed_data'); if($m) self::$encoded_item = json_decode($m,true); else self::$encoded_item = \Zotlabs\Lib\Activity::encode_activity($target_item); logger('target_item: ' . print_r($target_item,true)); // self::$encoded_item = encode_item($target_item); logger('encoded: ' . print_r(self::$encoded_item,true)); // Send comments to the owner to re-deliver to everybody in the conversation // We only do this if the item in question originated on this site. This prevents looping. // To clarify, a site accepting a new comment is responsible for sending it to the owner for relay. // Relaying should never be initiated on a post that arrived from elsewhere. // We should normally be able to rely on ITEM_ORIGIN, but start_delivery_chain() incorrectly set this // flag on comments for an extended period. So we'll also call comment_local_origin() which looks at // the hostname in the message_id and provides a second (fallback) opinion. $relay_to_owner = (((! $top_level_post) && (intval($target_item['item_origin'])) && comment_local_origin($target_item)) ? true : false); $uplink = false; // $cmd === 'relay' indicates the owner is sending it to the original recipients // don't allow the item in the relay command to relay to owner under any circumstances, it will loop logger('notifier: relay_to_owner: ' . (($relay_to_owner) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG); logger('notifier: top_level_post: ' . (($top_level_post) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG); // tag_deliver'd post which needs to be sent back to the original author if(($cmd === 'uplink') && intval($parent_item['item_uplink']) && (! $top_level_post)) { logger('notifier: uplink'); $uplink = true; self::$packet_type = 'response'; } if(($relay_to_owner || $uplink) && ($cmd !== 'relay')) { logger('followup relay (upstream delivery)', LOGGER_DEBUG); self::$recipients = [ ($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan'] ]; self::$private = true; $upstream = true; self::$packet_type = 'response'; } else { if($cmd === 'relay') { logger('owner relay (downstream delivery)'); } else { logger('normal (downstream) distribution', LOGGER_DEBUG); } $upstream = false; // if our parent is a tag_delivery recipient, uplink to the original author causing // a delivery fork. if(($parent_item) && intval($parent_item['item_uplink']) && (! $top_level_post) && ($cmd !== 'uplink')) { // don't uplink a relayed post to the relay owner if($parent_item['source_xchan'] !== $parent_item['owner_xchan']) { logger('notifier: uplinking this item'); Master::Summon(array('Notifier','uplink',$item_id)); } } self::$private = false; self::$recipients = collect_recipients($parent_item,self::$private); // FIXME add any additional recipients such as mentions, etc. // don't send deletions onward for other people's stuff // TODO verify this is needed - copied logic from same place in old code if(intval($target_item['item_deleted']) && (! intval($target_item['item_wall']))) { logger('notifier: ignoring delete notification for non-wall item', LOGGER_NORMAL, LOG_NOTICE); return; } } } // Generic delivery section, we have an encoded item and recipients // Now start the delivery process logger('encoded item: ' . print_r(self::$encoded_item,true), LOGGER_DATA, LOG_DEBUG); stringify_array_elms(self::$recipients); if(! self::$recipients) { logger('no recipients'); return; } // add any linked identities $l = q("select link from linkid where ident in ((" . protect_sprintf(implode(',',self::$recipients)) . ") "); if($l) { foreach($l as $lv) { if(! in_array("'" . $lv['link'] . "'", self::$recipients)) { self::$recipients[] = "'" . $lv['link'] . "'"; } } } $l = q("select ident from linkid where link in ((" . protect_sprintf(implode(',',self::$recipients)) . ") "); if($l) { foreach($l as $lv) { if(! in_array("'" . $lv['ident'] . "'", self::$recipients)) { self::$recipients[] = "'" . $lv['ident'] . "'"; } } } // logger('recipients: ' . print_r(self::$recipients,true), LOGGER_NORMAL, LOG_DEBUG); if(! count(self::$env_recips)) self::$env_recips = ((self::$private) ? [] : null); $details = q("select xchan_hash, xchan_instance_url, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . protect_sprintf(implode(',',self::$recipients)) . ")"); $recip_list = array(); if($details) { foreach($details as $d) { $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; if(self::$private) { self::$env_recips[] = $d['xchan_hash']; } } } $narr = [ 'channel' => self::$channel, 'upstream' => $upstream, 'env_recips' => self::$env_recips, 'recipients' => self::$recipients, 'item' => $item, 'target_item' => $target_item, 'parent_item' => $parent_item, 'top_level_post' => $top_level_post, 'private' => self::$private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, 'mail' => $mail, 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), 'request' => $request, 'normal_mode' => $normal_mode, 'packet_type' => self::$packet_type, 'queued' => [] ]; call_hooks('notifier_process', $narr); if($narr['queued']) { foreach($narr['queued'] as $pq) self::$deliveries[] = $pq; } // notifier_process can alter the recipient list self::$recipients = $narr['recipients']; self::$env_recips = $narr['env_recips']; if((self::$private) && (! self::$env_recips)) { // shouldn't happen logger('private message with no envelope recipients.' . print_r($argv,true), LOGGER_NORMAL, LOG_NOTICE); return; } logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list,true), LOGGER_DEBUG); // Now we have collected recipients (except for external mentions, FIXME) // Let's reduce this to a set of hubs; checking that the site is not dead. $r = q("select hubloc.*, site.site_crypto, site.site_flags from hubloc left join site on site_url = hubloc_url where hubloc_hash in (" . protect_sprintf(implode(',',self::$recipients)) . ") and hubloc_error = 0 and hubloc_deleted = 0 and ( site_dead = 0 OR site_dead is null ) " ); if(! $r) { logger('notifier: no hubs', LOGGER_NORMAL, LOG_NOTICE); return; } $hubs = $r; /** * Reduce the hubs to those that are unique. For zot hubs, we need to verify uniqueness by the sitekey, * since it may have been a re-install which has not yet been detected and pruned. * For other networks which don't have or require sitekeys, we'll have to use the URL */ $hublist = []; // this provides an easily printable list for the logs $dhubs = []; // delivery hubs where we store our resulting unique array $keys = []; // array of keys to check uniquness for zot hubs $urls = []; // array of urls to check uniqueness of hubs from other networks $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all foreach($hubs as $hub) { if(self::$env_recips) { foreach(self::$env_recips as $er) { if($hub['hubloc_hash'] === $er) { if(! array_key_exists($hub['hubloc_site_id'], $hub_env)) { $hub_env[$hub['hubloc_site_id']] = []; } $hub_env[$hub['hubloc_site_id']][] = $er; } } } if($hub['hubloc_network'] === 'zot6') { if(! in_array($hub['hubloc_sitekey'],$keys)) { $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; $dhubs[] = $hub; $keys[] = $hub['hubloc_sitekey']; } } else { if(! in_array($hub['hubloc_url'],$urls)) { $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; $dhubs[] = $hub; $urls[] = $hub['hubloc_url']; } } } logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG, LOG_DEBUG); foreach($dhubs as $hub) { if($hub['hubloc_network'] !== 'zot6') { $narr = [ 'channel' => self::$channel, 'upstream' => $upstream, 'env_recips' => self::$env_recips, 'recipients' => self::$recipients, 'item' => $item, 'target_item' => $target_item, 'parent_item' => $parent_item, 'hub' => $hub, 'top_level_post' => $top_level_post, 'private' => self::$private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, 'mail' => $mail, 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), 'request' => $request, 'normal_mode' => $normal_mode, 'packet_type' => self::$packet_type, 'queued' => [] ]; self::activitypub_process($narr); call_hooks('notifier_hub',$narr); if($narr['queued']) { foreach($narr['queued'] as $pq) self::$deliveries[] = $pq; } continue; } // singleton deliveries by definition 'not got zot'. // Single deliveries are other federated networks (plugins) and we're essentially // delivering only to those that have this site url in their abook_instance // and only from within a sync operation. This means if you post from a clone, // and a connection is connected to one of your other clones; assuming that hub // is running it will receive a sync packet. On receipt of this sync packet it // will invoke a delivery to those connections which are connected to just that // hub instance. if($cmd === 'single_mail' || $cmd === 'single_activity') { continue; } // default: zot protocol $hash = random_string(); $env = (($hub_env && $hub_env[$hub['hubloc_site_id']]) ? $hub_env[$hub['hubloc_site_id']] : ''); if((self::$private) && (! $env)) { continue; } $packet = Libzot::build_packet(self::$channel, self::$packet_type, $env, self::$encoded_item, self::$encoding, ((self::$private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto']); Queue::insert( [ 'hash' => $hash, 'account_id' => self::$channel['channel_account_id'], 'channel_id' => self::$channel['channel_id'], 'posturl' => $hub['hubloc_callback'], 'notify' => $packet, 'msg' => EMPTY_STR ] ); // only create delivery reports for normal undeleted items if(is_array($target_item) && array_key_exists('postopts',$target_item) && (! $target_item['item_deleted']) && (! get_config('system','disable_dreport'))) { q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", dbesc($target_item['mid']), dbesc($hub['hubloc_host']), dbesc($hub['hubloc_host']), dbesc($hub['hubloc_host']), dbesc('queued'), dbesc(datetime_convert()), dbesc(self::$channel['channel_hash']), dbesc($hash) ); } self::$deliveries[] = $hash; } if($normal_mode) { $x = q("select * from hook where hook = 'notifier_normal'"); if($x) { Master::Summon( [ 'Deliver_hooks', $target_item['id'] ] ); } } if(self::$deliveries) do_delivery(self::$deliveries); logger('notifier: basic loop complete.', LOGGER_DEBUG); call_hooks('notifier_end',$target_item); logger('notifer: complete.'); return; } static public function activitypub_process(&$arr) { if($arr['hub']['hubloc_network'] !== 'activitypub') return; logger('upstream: ' . intval($arr['upstream'])); logger('notifier_array: ' . print_r($arr,true), LOGGER_ALL, LOG_INFO); $signed_msg = null; if(array_key_exists('target_item',$arr) && is_array($arr['target_item'])) { if(intval($arr['target_item']['item_obscured'])) { logger('Cannot send raw data as an activitypub activity.'); return; } if(strpos($arr['target_item']['postopts'],'nopub') !== false) { return; } $signed_msg = get_iconfig($arr['target_item'],'activitypub','rawmsg'); // If we have an activity already stored with an LD-signature // which we are sending downstream, use that signed activity as is. // The channel will then sign the HTTP transaction. // It is unclear if Mastodon supports the federation delivery model. Initial tests were // inconclusive and the behaviour varied. if(($arr['channel']['channel_hash'] != $arr['target_item']['author_xchan']) && (! $signed_msg)) { return; } } $target_item = $arr['target_item']; if(! $target_item['mid']) return; $prv_recips = $arr['env_recips']; if($signed_msg) { $jmsg = $signed_msg; } else { $ti = Activity::encode_activity($target_item, true); if(! $ti) return; $msg = array_merge(['@context' => [ ACTIVITYSTREAMS_JSONLD_REV, 'https://w3id.org/security/v1', z_root() . ZOT_APSCHEMA_REV ]], $ti); $msg['signature'] = \Zotlabs\Lib\LDSignatures::sign($msg,$arr['channel']); logger('ActivityPub_encoded: ' . json_encode($msg,JSON_UNESCAPED_SLASHES|JSON_PRETTY_PRINT)); $jmsg = json_encode($msg, JSON_UNESCAPED_SLASHES); } if($prv_recips) { $hashes = array(); // re-explode the recipients, but only for this hub/pod foreach($prv_recips as $recip) $hashes[] = "'" . $recip['hash'] . "'"; $r = q("select * from xchan left join hubloc on xchan_hash = hubloc_hash where hubloc_url = '%s' and xchan_hash in (" . implode(',', $hashes) . ") and xchan_network = 'activitypub' ", dbesc($arr['hub']['hubloc_url']) ); if(! $r) { logger('activitypub_process_outbound: no recipients'); return; } foreach($r as $contact) { // is $contact connected with this channel - and if the channel is cloned, also on this hub? $single = deliverable_singleton($arr['channel']['channel_id'],$contact); if(! $arr['normal_mode']) continue; if($single) { $qi = self::activitypub_queue_message($jmsg,$arr['channel'],$contact,$target_item['mid']); if($qi) $arr['queued'][] = $qi; } continue; } } else { // public message // See if we can deliver all of them at once $x = get_xconfig($arr['hub']['hubloc_hash'],'activitypub','collections'); if($x && $x['sharedInbox']) { logger('using publicInbox delivery for ' . $arr['hub']['hubloc_url'], LOGGER_DEBUG); $contact['hubloc_callback'] = $x['sharedInbox']; $qi = self::activitypub_queue_message($jmsg,$arr['channel'],$contact,$target_item['mid']); if($qi) { $arr['queued'][] = $qi; } } else { $r = q("select * from xchan left join hubloc on xchan_hash = hubloc_hash where hubloc_url = '%s' and xchan_network = 'activitypub' ", dbesc($arr['hub']['hubloc_url']) ); if(! $r) { logger('activitypub_process_outbound: no recipients'); return; } foreach($r as $contact) { $single = deliverable_singleton($arr['channel']['channel_id'],$contact); if($single) { $qi = self::activitypub_queue_message($jmsg,$arr['channel'],$contact,$target_item['mid']); if($qi) $arr['queued'][] = $qi; } } } } return; } static function activitypub_queue_message($msg,$sender,$recip,$message_id = '') { $dest_url = $recip['hubloc_callback']; logger('URL: ' . $dest_url, LOGGER_DEBUG); logger('DATA: ' . jindent($msg), LOGGER_DATA); if(intval(get_config('system','activitypub_test')) || intval(get_pconfig($sender['channel_id'],'system','activitypub_test'))) { logger('test mode - delivery disabled'); return false; } $hash = random_string(); logger('queue: ' . $hash . ' ' . $dest_url, LOGGER_DEBUG); Queue::insert(array( 'hash' => $hash, 'account_id' => $sender['channel_account_id'], 'channel_id' => $sender['channel_id'], 'driver' => 'activitypub', 'posturl' => $dest_url, 'notify' => '', 'msg' => $msg )); if($message_id && (! get_config('system','disable_dreport'))) { q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s','%s','%s','%s','%s','%s','%s' ) ", dbesc($message_id), dbesc($dest_url), dbesc($dest_url), dbesc('queued'), dbesc(datetime_convert()), dbesc($sender['channel_hash']), dbesc($hash) ); } return $hash; } static function activitypub_permissions_create(&$x) { // send a follow activity to the followee's inbox if($x['recipient']['xchan_network'] !== 'activitypub') { return; } $p = asencode_person($x['sender']); if(! $p) return; $msg = array_merge(['@context' => [ ACTIVITYSTREAMS_JSONLD_REV, 'https://w3id.org/security/v1', z_root() . ZOT_APSCHEMA_REV ]], [ 'id' => z_root() . '/follow/' . $x['recipient']['abook_id'], 'type' => 'Follow', 'actor' => $p, 'object' => $x['recipient']['xchan_url'], 'to' => [ $x['recipient']['xchan_hash'] ] ]); $msg['signature'] = \Zotlabs\Lib\LDSignatures::sign($msg,$x['sender']); $jmsg = json_encode($msg, JSON_UNESCAPED_SLASHES); // is $contact connected with this channel - and if the channel is cloned, also on this hub? $single = deliverable_singleton($x['sender']['channel_id'],$x['recipient']); $h = q("select * from hubloc where hubloc_hash = '%s' limit 1", dbesc($x['recipient']['xchan_hash']) ); if($single && $h) { $qi = self::activitypub_queue_message($jmsg,$x['sender'],$h[0]); if($qi) $x['deliveries'] = $qi; } $x['success'] = true; } static function activitypub_permissions_accept(&$x) { // send an accept activity to the followee's inbox if($x['recipient']['xchan_network'] !== 'activitypub') { return; } // we currently are not handling send of reject follow activities; this is permitted by protocol $accept = get_abconfig($x['recipient']['abook_channel'],$x['recipient']['xchan_hash'],'activitypub','their_follow_id'); if(! $accept) return; $p = asencode_person($x['sender']); if(! $p) return; $msg = array_merge(['@context' => [ ACTIVITYSTREAMS_JSONLD_REV, 'https://w3id.org/security/v1', z_root() . ZOT_APSCHEMA_REV ]], [ 'id' => z_root() . '/follow/' . $x['recipient']['abook_id'], 'type' => 'Accept', 'actor' => $p, 'object' => [ 'type' => 'Follow', 'id' => $accept, 'actor' => $x['recipient']['xchan_hash'], 'object' => z_root() . '/channel/' . $x['sender']['channel_address'] ], 'to' => [ $x['recipient']['xchan_hash'] ] ]); $msg['signature'] = \Zotlabs\Lib\LDSignatures::sign($msg,$x['sender']); $jmsg = json_encode($msg, JSON_UNESCAPED_SLASHES); // is $contact connected with this channel - and if the channel is cloned, also on this hub? $single = deliverable_singleton($x['sender']['channel_id'],$x['recipient']); $h = q("select * from hubloc where hubloc_hash = '%s' limit 1", dbesc($x['recipient']['xchan_hash']) ); if($single && $h) { $qi = self::activitypub_queue_message($jmsg,$x['sender'],$h[0]); if($qi) $x['deliveries'] = $qi; } $x['success'] = true; } }