revive convo fetching

This commit is contained in:
Mike Macgirvin 2024-07-11 06:53:25 +10:00
parent ea1514651b
commit 055bb69987
2 changed files with 44 additions and 66 deletions

View file

@ -3,10 +3,15 @@
namespace Code\Daemon; namespace Code\Daemon;
use Code\Lib\Activity; use Code\Lib\Activity;
use Code\Lib\ActivityStreams;
use Code\Lib\ASCollection; use Code\Lib\ASCollection;
use Code\Lib\Channel; use Code\Lib\Channel;
/*
* Given an id of a collection of messages (such as a replies collection or context),
* fetch and store all the conversation children. At this time we will not attempt to recurse
* beyond what is provided in the immediate collection.
*/
class Convo implements DaemonInterface class Convo implements DaemonInterface
{ {
@ -17,7 +22,6 @@ class Convo implements DaemonInterface
*/ */
public function run(int $argc, array $argv): void public function run(int $argc, array $argv): void
{ {
logger('convo invoked: ' . print_r($argv, true)); logger('convo invoked: ' . print_r($argv, true));
if ($argc != 4) { if ($argc != 4) {
@ -33,35 +37,25 @@ class Convo implements DaemonInterface
killme(); killme();
} }
$r = q(
"SELECT abook.*, xchan.* FROM abook left join xchan on abook_xchan = xchan_hash
WHERE abook_channel = %d and abook_xchan = '%s' LIMIT 1",
intval($channel_id),
dbesc($contact_hash)
);
if (! $r) {
killme();
}
$contact = array_shift($r);
$obj = new ASCollection($id, $channel); $obj = new ASCollection($id, $channel);
$messages = $obj->get(); $messages = $obj->get();
if ($messages) { if ($messages) {
foreach ($messages as $message) { foreach ($messages as $message) {
if (is_string($message)) { if (is_array($message) && isset($message['id'])) {
$message = Activity::fetch($message, $channel); $messageId = $message['id'];
} }
// set client flag because comments will probably just be objects and not full-blown activities elseif (is_string($message)) {
// and that lets us use implied_create $messageId = $message;
$AS = new ActivityStreams($message, null, true); }
if ($AS->is_valid() && is_array($AS->obj)) { else {
$item = Activity::decode_note($AS, true); continue;
if ($item) { }
Activity::store($channel, $contact['abook_xchan'], $AS, $item, true, true); $fetched = Activity::fetch_and_parse($messageId, $channel);
} if ($fetched) {
Activity::store($channel, $contact_hash, $fetched['activity'], $fetched['item'], false,
false, $fetched['collection']);
} }
} }
} }

View file

@ -6,7 +6,6 @@ use App;
use Code\Access\PermissionLimits; use Code\Access\PermissionLimits;
use Code\ActivityStreams\Actor; use Code\ActivityStreams\Actor;
use Code\ActivityStreams\ASObject; use Code\ActivityStreams\ASObject;
use Code\ActivityStreams\Collection;
use Code\ActivityStreams\Link; use Code\ActivityStreams\Link;
use Code\ActivityStreams\Place; use Code\ActivityStreams\Place;
use Code\ActivityStreams\AssertionMethod; use Code\ActivityStreams\AssertionMethod;
@ -21,7 +20,6 @@ use Code\Access\PermissionRoles;
use Code\Daemon\Run; use Code\Daemon\Run;
use Code\Lib as Zlib; use Code\Lib as Zlib;
use Code\Extend\Hook; use Code\Extend\Hook;
use Emoji;
require_once('include/html2bbcode.php'); require_once('include/html2bbcode.php');
require_once('include/html2plain.php'); require_once('include/html2plain.php');
@ -4453,7 +4451,7 @@ class Activity
} else { } else {
$fetch = false; $fetch = false;
if (intval($channel['channel_system']) || (perm_is_allowed($channel['channel_id'], $observer_hash, 'send_stream') && perm_is_allowed($channel['channel_id'], $observer_hash, 'hyperdrive') && (PConfig::Get($channel['channel_id'], 'system', 'hyperdrive', true) || $act->type === 'Announce'))) { if (intval($channel['channel_system']) || (perm_is_allowed($channel['channel_id'], $observer_hash, 'send_stream') && perm_is_allowed($channel['channel_id'], $observer_hash, 'hyperdrive') && (PConfig::Get($channel['channel_id'], 'system', 'hyperdrive', true) || $act->type === 'Announce'))) {
$fetch = ($fetch_parents && self::fetch_and_store_parents($channel, $observer_hash, $item, $tag_delivery)); $fetch = ($fetch_parents && self::fetch_and_store_parents($channel, $observer_hash, $item));
} }
if ($fetch) { if ($fetch) {
$parent_item = q( $parent_item = q(
@ -4639,12 +4637,10 @@ class Activity
return $xchan; return $xchan;
} }
public static function fetch_and_store_parents($channel, $observer_hash, $item)
public static function fetch_and_store_parents($channel, $observer_hash, $item, $tag_delivery)
{ {
logger('fetching parents'); logger('fetching parents');
$conversation = []; $conversation = [];
$seen_mids = []; $seen_mids = [];
@ -4656,7 +4652,7 @@ class Activity
break; break;
} }
$fetched = self::fetch_and_parse($current_item['parent_mid']); $fetched = self::fetch_and_parse($current_item['parent_mid'], $channel);
$seen_mids[] = $current_item['parent_mid']; $seen_mids[] = $current_item['parent_mid'];
if (! $fetched) { if (! $fetched) {
@ -4688,45 +4684,40 @@ class Activity
if ($conversation) { if ($conversation) {
$parent_item = $conversation[0]['item']; $parent_item = $conversation[0]['item'];
if ($parent_item['mid'] === $parent_item['parent_mid']) {
// // determine if the top-level post provides a replies collection
// if ($parent_item['obj']) {
// $parent_item['obj'] = json_decode($parent_item['obj'],true);
// }
// logger('topfetch: ' . print_r($parent_item,true), LOGGER_ALL);
// $id = ((array_path_exists('obj/replies/id',$parent_item)) ? $parent_item['obj']['replies']['id'] : false);
// if (! $id) {
// $id = ((array_path_exists('obj/replies',$parent_item) && is_string($parent_item['obj']['replies'])) ? $parent_item['obj']['replies'] : false);
// }
// if ($id) {
// Run::Summon( [ 'Convo', $id, $channel['channel_id'], $observer_hash ] );
// }
if ($conversation[0]['item']['mid'] === $conversation[0]['item']['parent_mid']) {
foreach ($conversation as $post) { foreach ($conversation as $post) {
if ($post['activity']->is_valid()) { if ($post['activity']->is_valid()) {
self::store($channel, $observer_hash, $post['activity'], $post['item'], false, isCollectionOperation: $post['collection']); self::store($channel, $observer_hash, $post['activity'], $post['item'], false, isCollectionOperation: $post['collection']);
} }
} }
if ($parent_item['obj'] && is_string($parent_item['obj'])) {
$parent_item['obj'] = json_decode($parent_item['obj'],true);
}
$id = ((array_path_exists('obj/replies/id',$parent_item)) ? $parent_item['obj']['replies']['id'] : false);
if (! $id) {
$id = ((array_path_exists('obj/replies',$parent_item) && is_string($parent_item['obj']['replies'])) ? $parent_item['obj']['replies'] : false);
}
if ($id && is_string($id)) {
Run::Summon(['Convo', $id, $channel['channel_id'], $observer_hash]);
}
return true; return true;
} }
} }
return false; return false;
} }
public static function fetch_and_parse($message_id) public static function fetch_and_parse($message_id, $channel)
{ {
$json = self::fetch($message_id); if (ActivityStreams::is_url($message_id)) {
if (!$json) { $json = self::fetch($message_id, $channel);
return false; if (!$json) {
return false;
}
} }
// set client flag to convert objects to implied activities // set client flag to convert objects to implied activities
$activity = new ActivityStreams($json, null, true); $activity = new ActivityStreams($json, null, true);
if ( if ($activity->type === 'Announce' && is_array($activity->obj)
$activity->type === 'Announce' && is_array($activity->obj) && array_key_exists('object', $activity->obj) && array_key_exists('actor', $activity->obj)) {
&& array_key_exists('object', $activity->obj) && array_key_exists('actor', $activity->obj)
) {
// This is a relayed/forwarded Activity (as opposed to a shared/boosted object) // This is a relayed/forwarded Activity (as opposed to a shared/boosted object)
// Reparse the encapsulated Activity and use that instead // Reparse the encapsulated Activity and use that instead
logger('relayed activity', LOGGER_DEBUG); logger('relayed activity', LOGGER_DEBUG);
@ -4735,10 +4726,10 @@ class Activity
} }
if (in_array($activity->type, ['Add', 'Remove']) if (in_array($activity->type, ['Add', 'Remove'])
&& is_array($activity->obj) && is_array($activity->obj)
&& array_key_exists('object', $activity->obj) && array_key_exists('object', $activity->obj)
&& array_key_exists('actor', $activity->obj) && array_key_exists('actor', $activity->obj)
&& !empty($activity->tgt)) { && !empty($activity->tgt)) {
logger('relayed collection operation', LOGGER_DEBUG); logger('relayed collection operation', LOGGER_DEBUG);
$isCollectionOperation = true; $isCollectionOperation = true;
$rawActivity = json_decode($activity->raw, true); $rawActivity = json_decode($activity->raw, true);
@ -4746,8 +4737,6 @@ class Activity
$activity->setApproveId($rawActivity['id']); $activity->setApproveId($rawActivity['id']);
} }
logger($activity->debug(), LOGGER_DATA);
if (!$activity->is_valid()) { if (!$activity->is_valid()) {
logger('not a valid activity'); logger('not a valid activity');
return false; return false;
@ -4758,19 +4747,14 @@ class Activity
// ActivityPub sourced items are cacheable // ActivityPub sourced items are cacheable
$item = self::decode_note($activity, true); $item = self::decode_note($activity, true);
if (!$item) { if (!$item) {
return false; return false;
} }
logger('decoded_note: ' . print_r($item,true), LOGGER_DATA);
$hookinfo = [ $hookinfo = [
'activity' => $activity, 'activity' => $activity,
'item' => $item, 'item' => $item,
'collection' => $isCollectionOperation, 'collection' => $isCollectionOperation,
]; ];
Hook::call('fetch_and_store', $hookinfo); Hook::call('fetch_and_store', $hookinfo);
return $hookinfo; return $hookinfo;