$base, 'site_update' => datetime_convert(), 'site_dead' => 0, 'site_type' => ((in_array($outq['outq_driver'], [ 'post', 'activitypub' ])) ? SITE_TYPE_NOTZOT : SITE_TYPE_UNKNOWN), 'site_crypto' => '' ] ); } } $arr = array('outq' => $outq, 'base' => $base, 'handled' => false, 'immediate' => $immediate); call_hooks('queue_deliver',$arr); if($arr['handled']) return; // "post" queue driver - used for diaspora and friendica-over-diaspora communications. if($outq['outq_driver'] === 'post') { $result = z_post_url($outq['outq_posturl'],$outq['outq_msg']); if($result['success'] && $result['return_code'] < 300) { logger('deliver: queue post success to ' . $outq['outq_posturl'], LOGGER_DEBUG); if($base) { q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", dbesc(datetime_convert()), dbesc($base) ); } q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s'", dbesc('accepted for delivery'), dbesc(datetime_convert()), dbesc($outq['outq_hash']) ); self::remove($outq['outq_hash']); // server is responding - see if anything else is going to this destination and is piled up // and try to send some more. We're relying on the fact that do_delivery() results in an // immediate delivery otherwise we could get into a queue loop. if(! $immediate) { $x = q("select outq_hash from outq where outq_posturl = '%s' and outq_delivered = 0", dbesc($outq['outq_posturl']) ); $piled_up = []; if($x) { foreach($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if($piled_up) { // call do_delivery() with the force flag do_delivery($piled_up, true); } } } else { logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $outq['outq_posturl'],LOGGER_DEBUG); self::update($outq['outq_hash'],10); } return; } if ($outq['outq_driver'] === 'asfetch') { $channel = channelx_by_n($outq['outq_channel']); if (! $channel) { logger('missing channel: ' . $outq['outq_channel']); return; } if (! ActivityStreams::is_url($outq['outq_posturl'])) { logger('fetch item is not url: ' . $outq['outq_posturl']); self::remove($outq['outq_hash']); return; } $j = Activity::fetch($outq['outq_posturl'],$channel); if ($j) { $AS = new ActivityStreams($j, null, true); if ($AS->is_valid() && isset($AS->data['type'])) { if (ActivityStreams::is_an_actor($AS->data['type'])) { Activity::actor_store($AS->data['id'],$AS->data); } if (strpos($AS->data['type'],'Collection') !== false) { // we are probably fetching a collection already - and do not support collection recursion at this time self::remove($outq['outq_hash']); return; } $item = Activity::decode_note($AS,true); if ($item) { Activity::store($channel,$channel['channnel_hash'],$AS,$item,true,true); } } logger('deliver: queue fetch success from ' . $outq['outq_posturl'], LOGGER_DEBUG); self::remove($outq['outq_hash']); // server is responding - see if anything else is going to this destination and is piled up // and try to send some more. We're relying on the fact that do_delivery() results in an // immediate delivery otherwise we could get into a queue loop. if (! $immediate) { $x = q("select outq_hash from outq where outq_driver = 'asfetch' and outq_channel = %d and outq_delivered = 0", dbesc($outq['outq_channel']) ); $piled_up = []; if ($x) { foreach ($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if ($piled_up) { do_delivery($piled_up,true); } } } else { logger('deliver: queue fetch failed' . ' from ' . $outq['outq_posturl'],LOGGER_DEBUG); self::update($outq['outq_hash'],10); } return; } if($outq['outq_driver'] === 'activitypub') { $channel = channelx_by_n($outq['outq_channel']); if (! $channel) { logger('missing channel: ' . $outq['outq_channel']); return; } $retries = 0; $m = parse_url($outq['outq_posturl']); $headers = []; $headers['Content-Type'] = 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' ; $ret = $outq['outq_msg']; logger('ActivityPub send: ' . jindent($ret), LOGGER_DATA); $headers['Date'] = datetime_convert('UTC','UTC', 'now', 'D, d M Y H:i:s \\G\\M\\T'); $headers['Digest'] = HTTPSig::generate_digest_header($ret); $headers['Host'] = $m['host']; $headers['(request-target)'] = 'post ' . get_request_string($outq['outq_posturl']); $xhead = HTTPSig::create_sig($headers,$channel['channel_prvkey'],channel_url($channel)); if(strpos($outq['outq_posturl'],'http') !== 0) { logger('bad url: ' . $outq['outq_posturl']); self::remove($outq['outq_hash']); } $result = z_post_url($outq['outq_posturl'],$outq['outq_msg'],$retries,[ 'headers' => $xhead ]); if($result['success'] && $result['return_code'] < 300) { logger('deliver: queue post success to ' . $outq['outq_posturl'], LOGGER_DEBUG); if($base) { q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", dbesc(datetime_convert()), dbesc($base) ); } q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s'", dbesc('accepted for delivery'), dbesc(datetime_convert()), dbesc($outq['outq_hash']) ); self::remove($outq['outq_hash']); // server is responding - see if anything else is going to this destination and is piled up // and try to send some more. We're relying on the fact that do_delivery() results in an // immediate delivery otherwise we could get into a queue loop. if(! $immediate) { $x = q("select outq_hash from outq where outq_posturl = '%s' and outq_delivered = 0", dbesc($outq['outq_posturl']) ); $piled_up = []; if($x) { foreach($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if($piled_up) { do_delivery($piled_up,true); } } } else { if ($result['return_code'] >= 300) { q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s'", dbesc('delivery rejected' . ' ' . $result['return_code']), dbesc(datetime_convert()), dbesc($outq['outq_hash']) ); } else { $dr = q("select * from dreport where dreport_queue = '%s'", dbesc($outq['outq_hash']) ); if ($dr) { // update every queue entry going to this site with the most recent communication error q("update dreport set dreport_log = '%s' where dreport_site = '%s'", dbesc(z_curl_error($result)), dbesc($dr[0]['dreport_site']) ); } } logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $outq['outq_posturl'],LOGGER_DEBUG); self::update($outq['outq_hash'],10); } return; } // normal zot delivery logger('deliver: dest: ' . $outq['outq_posturl'], LOGGER_DEBUG); if($outq['outq_posturl'] === z_root() . '/zot') { // local delivery $zot = new \Zotlabs\Zot6\Receiver(new \Zotlabs\Zot6\Zot6Handler(),$outq['outq_notify']); $result = $zot->run(); logger('returned_json: ' . json_encode($result,JSON_PRETTY_PRINT|JSON_UNESCAPED_SLASHES), LOGGER_DATA); logger('deliver: local zot delivery succeeded to ' . $outq['outq_posturl']); Libzot::process_response($outq['outq_posturl'],[ 'success' => true, 'body' => json_encode($result) ], $outq); if(! $immediate) { $x = q("select outq_hash from outq where outq_posturl = '%s' and outq_delivered = 0", dbesc($outq['outq_posturl']) ); $piled_up = []; if($x) { foreach($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if($piled_up) { do_delivery($piled_up,true); } } } else { logger('remote'); $channel = null; if($outq['outq_channel']) { $channel = channelx_by_n($outq['outq_channel'],true); } $host_crypto = null; if($channel && $base) { $h = q("select hubloc_sitekey, site_crypto from hubloc left join site on hubloc_url = site_url where site_url = '%s' and hubloc_network in ('nomad','zot6') order by hubloc_id desc limit 1", dbesc($base) ); if($h) { $host_crypto = $h[0]; } } $msg = $outq['outq_notify']; if ($outq['outq_driver'] === 'nomad') { $result = Libzot::nomad($outq['outq_posturl'],$msg,$channel,$host_crypto); } else { $result = Libzot::zot($outq['outq_posturl'],$msg,$channel,$host_crypto); } if($result['success']) { logger('deliver: remote nomad/zot delivery succeeded to ' . $outq['outq_posturl']); Libzot::process_response($outq['outq_posturl'],$result, $outq); } else { $dr = q("select * from dreport where dreport_queue = '%s'", dbesc($outq['outq_hash']) ); // update every queue entry going to this site with the most recent communication error q("update dreport set dreport_log = '%s' where dreport_site = '%s'", dbesc(z_curl_error($result)), dbesc($dr[0]['dreport_site']) ); logger('deliver: remote zot delivery failed to ' . $outq['outq_posturl']); logger('deliver: remote zot delivery fail data: ' . print_r($result,true), LOGGER_DATA); self::update($outq['outq_hash'],10); } } return; } }