$base, 'site_update' => datetime_convert(), 'site_dead' => 0, 'site_type' => ((in_array($outq['outq_driver'], ['post', 'activitypub'])) ? SITE_TYPE_NOTZOT : SITE_TYPE_UNKNOWN), 'site_crypto' => '' ] ); } } $arr = array('outq' => $outq, 'base' => $base, 'handled' => false, 'immediate' => $immediate); Hook::call('queue_deliver', $arr); if ($arr['handled']) { return; } // "post" queue driver - used for diaspora and friendica-over-diaspora communications. if ($outq['outq_driver'] === 'post') { $result = Url::post($outq['outq_posturl'], $outq['outq_msg']); if ($result['success'] && $result['return_code'] < 300) { logger('deliver: queue post success to ' . $outq['outq_posturl'], LOGGER_DEBUG); if ($base) { q( "update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", dbesc(datetime_convert()), dbesc($base) ); } q( "update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s'", dbesc('accepted for delivery'), dbesc(datetime_convert()), dbesc($outq['outq_hash']) ); self::remove($outq['outq_hash']); // server is responding - see if anything else is going to this destination and is piled up // and try to send some more. We're relying on the fact that do_delivery() results in an // immediate delivery otherwise we could get into a queue loop. if (!$immediate) { $x = q( "select outq_hash from outq where outq_posturl = '%s' and outq_delivered = 0", dbesc($outq['outq_posturl']) ); $piled_up = []; if ($x) { foreach ($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if ($piled_up) { // call do_delivery() with the force flag do_delivery($piled_up, true); } } } else { logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $outq['outq_posturl'], LOGGER_DEBUG); self::update($outq['outq_hash'], 10); } return; } if ($outq['outq_driver'] === 'asfetch') { $channel = Channel::from_id($outq['outq_channel']); if (!$channel) { logger('missing channel: ' . $outq['outq_channel']); return; } if (!ActivityStreams::is_url($outq['outq_posturl'])) { logger('fetch item is not url: ' . $outq['outq_posturl']); self::remove($outq['outq_hash']); return; } $j = Activity::fetch($outq['outq_posturl'], $channel); if ($j) { $AS = new ActivityStreams($j, null, true); if ($AS->is_valid() && isset($AS->data['type'])) { if (ActivityStreams::is_an_actor($AS->data['type'])) { Activity::actor_store($AS->data['id'], $AS->data); } if (strpos($AS->data['type'], 'Collection') !== false) { // we are probably fetching a collection already - and do not support collection recursion at this time self::remove($outq['outq_hash']); return; } $item = Activity::decode_note($AS, true); if ($item) { Activity::store($channel, $channel['channnel_hash'], $AS, $item, true, true); } } logger('deliver: queue fetch success from ' . $outq['outq_posturl'], LOGGER_DEBUG); self::remove($outq['outq_hash']); // server is responding - see if anything else is going to this destination and is piled up // and try to send some more. We're relying on the fact that do_delivery() results in an // immediate delivery otherwise we could get into a queue loop. if (!$immediate) { $x = q( "select outq_hash from outq where outq_driver = 'asfetch' and outq_channel = %d and outq_delivered = 0", dbesc($outq['outq_channel']) ); $piled_up = []; if ($x) { foreach ($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if ($piled_up) { do_delivery($piled_up, true); } } } else { logger('deliver: queue fetch failed' . ' from ' . $outq['outq_posturl'], LOGGER_DEBUG); self::update($outq['outq_hash'], 10); } return; } if ($outq['outq_driver'] === 'activitypub') { $channel = Channel::from_id($outq['outq_channel']); if (!$channel) { logger('missing channel: ' . $outq['outq_channel']); return; } $m = parse_url($outq['outq_posturl']); $headers = []; $headers['Content-Type'] = 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'; $ret = $outq['outq_msg']; logger('ActivityPub send: ' . jindent($ret), LOGGER_DATA); $headers['Date'] = datetime_convert('UTC', 'UTC', 'now', 'D, d M Y H:i:s \\G\\M\\T'); $headers['Digest'] = HTTPSig::generate_digest_header($ret); $headers['Host'] = $m['host']; $headers['(request-target)'] = 'post ' . get_request_string($outq['outq_posturl']); $xhead = HTTPSig::create_sig($headers, $channel['channel_prvkey'], Channel::url($channel)); if (strpos($outq['outq_posturl'], 'http') !== 0) { logger('bad url: ' . $outq['outq_posturl']); self::remove($outq['outq_hash']); } $result = Url::post($outq['outq_posturl'], $outq['outq_msg'], ['headers' => $xhead]); if ($result['success'] && $result['return_code'] < 300) { logger('deliver: queue post success to ' . $outq['outq_posturl'], LOGGER_DEBUG); if ($base) { q( "update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", dbesc(datetime_convert()), dbesc($base) ); } q( "update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s'", dbesc('accepted for delivery'), dbesc(datetime_convert()), dbesc($outq['outq_hash']) ); self::remove($outq['outq_hash']); // server is responding - see if anything else is going to this destination and is piled up // and try to send some more. We're relying on the fact that do_delivery() results in an // immediate delivery otherwise we could get into a queue loop. if (!$immediate) { $x = q( "select outq_hash from outq where outq_posturl = '%s' and outq_delivered = 0", dbesc($outq['outq_posturl']) ); $piled_up = []; if ($x) { foreach ($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if ($piled_up) { do_delivery($piled_up, true); } } } elseif ($result['return_code'] >= 400 && $result['return_code'] < 500) { q( "update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s'", dbesc('delivery rejected' . ' ' . $result['return_code']), dbesc(datetime_convert()), dbesc($outq['outq_hash']) ); self::remove($outq['outq_hash']); } else { $dr = q( "select * from dreport where dreport_queue = '%s'", dbesc($outq['outq_hash']) ); if ($dr) { // update every queue entry going to this site with the most recent communication error q( "update dreport set dreport_log = '%s' where dreport_site = '%s'", dbesc(Url::format_error($result)), dbesc($dr[0]['dreport_site']) ); } self::update($outq['outq_hash'], 10); } logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $outq['outq_posturl'], LOGGER_DEBUG); return; } // normal zot delivery logger('deliver: dest: ' . $outq['outq_posturl'], LOGGER_DEBUG); if ($outq['outq_posturl'] === z_root() . '/zot') { // local delivery $zot = new Receiver(new NomadHandler(), $outq['outq_notify']); $result = $zot->run(); logger('returned_json: ' . json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES), LOGGER_DATA); logger('deliver: local zot delivery succeeded to ' . $outq['outq_posturl']); Libzot::process_response($outq['outq_posturl'], ['success' => true, 'body' => json_encode($result)], $outq); if (!$immediate) { $x = q( "select outq_hash from outq where outq_posturl = '%s' and outq_delivered = 0", dbesc($outq['outq_posturl']) ); $piled_up = []; if ($x) { foreach ($x as $xx) { $piled_up[] = $xx['outq_hash']; } } if ($piled_up) { do_delivery($piled_up, true); } } } else { logger('remote'); $channel = null; if ($outq['outq_channel']) { $channel = Channel::from_id($outq['outq_channel'], true); } $host_crypto = null; if ($channel && $base) { $h = q( "select hubloc_sitekey, site_crypto from hubloc left join site on hubloc_url = site_url where site_url = '%s' and hubloc_network in ('zot6','nomad') and hubloc_deleted = 0 order by hubloc_id desc limit 1", dbesc($base) ); if ($h) { $host_crypto = $h[0]; } } $msg = $outq['outq_notify']; if ($outq['outq_driver'] === 'nomad') { $result = Libzot::nomad($outq['outq_posturl'],$msg,$channel,$host_crypto); } else { $result = Libzot::zot($outq['outq_posturl'],$msg,$channel,$host_crypto); } if ($result['success']) { logger('deliver: remote nomad/zot delivery succeeded to ' . $outq['outq_posturl']); Libzot::process_response($outq['outq_posturl'], $result, $outq); } else { $dr = q( "select * from dreport where dreport_queue = '%s'", dbesc($outq['outq_hash']) ); // update every queue entry going to this site with the most recent communication error q( "update dreport set dreport_log = '%s' where dreport_site = '%s'", dbesc(Url::format_error($result)), dbesc($dr[0]['dreport_site']) ); logger('deliver: remote nomad/zot delivery failed to ' . $outq['outq_posturl']); logger('deliver: remote nomad/zot delivery fail data: ' . print_r($result, true), LOGGER_DATA); self::update($outq['outq_hash'], 10); } } return; } }