From 253c82bbfbd7a254db0ace9c9a8e39970a9b3f24 Mon Sep 17 00:00:00 2001 From: Mike Macgirvin Date: Wed, 10 Jul 2024 09:16:36 +1000 Subject: [PATCH] refactor fetching code --- src/Lib/Activity.php | 209 +++++++++++++++++++++---------------------- 1 file changed, 103 insertions(+), 106 deletions(-) diff --git a/src/Lib/Activity.php b/src/Lib/Activity.php index 40b0a9598..e858814bf 100644 --- a/src/Lib/Activity.php +++ b/src/Lib/Activity.php @@ -4559,37 +4559,6 @@ class Activity } -// experimental code that needs more work. What this did was once we fetched a conversation to find the root node, -// start at that root node and fetch children, so you get all the branches and not just the branch related to the current node. -// Unfortunately there is no standard method for achieving this. Mastodon provides a 'replies' collection and Nomad projects -// can fetch the 'context'. For other platforms it's a wild guess. Additionally, when we tested this, it started an infinite -// recursion and has been disabled until the recursive behaviour is tracked down and fixed. - -// if ($fetch_parents && $parent && ! intval($parent_item['item_private'])) { -// logger('topfetch', LOGGER_DEBUG); -// // if the thread owner is a connnection, we will already receive any additional comments to their posts -// // but if they are not we can try to fetch others in the background -// $x = q("SELECT abook.*, xchan.* FROM abook left join xchan on abook_xchan = xchan_hash -// WHERE abook_channel = %d and abook_xchan = '%s' LIMIT 1", -// intval($channel['channel_id']), -// dbesc($parent_item['owner_xchan']) -// ); -// if (! $x) { -// // determine if the top-level post provides a replies collection -// if ($parent_item['obj']) { -// $parent_item['obj'] = json_decode($parent_item['obj'],true); -// } -// logger('topfetch: ' . print_r($parent_item,true), LOGGER_ALL); -// $id = ((array_path_exists('obj/replies/id',$parent_item)) ? $parent_item['obj']['replies']['id'] : false); -// if (! $id) { -// $id = ((array_path_exists('obj/replies',$parent_item) && is_string($parent_item['obj']['replies'])) ? $parent_item['obj']['replies'] : false); -// } -// if ($id) { -// Run::Summon( [ 'Convo', $id, $channel['channel_id'], $observer_hash ] ); -// } -// } -// } - if (is_array($x) && $x['item_id']) { tag_deliver($channel['channel_id'], $x['item_id']); if ($is_child_node) { @@ -4682,102 +4651,130 @@ class Activity $current_item = $item; while ($current_item['parent_mid'] !== $current_item['mid']) { - $isCollectionOperation = false; // recursion breaker if (in_array($current_item['parent_mid'], $seen_mids)) { break; } - $json = self::fetch($current_item['parent_mid']); + + $fetched = self::fetch_and_parse($current_item['parent_mid']); $seen_mids[] = $current_item['parent_mid']; - if (!$json) { - break; - } - // set client flag to convert objects to implied activities - $activity = new ActivityStreams($json, null, true); - if ( - $activity->type === 'Announce' && is_array($activity->obj) - && array_key_exists('object', $activity->obj) && array_key_exists('actor', $activity->obj) - ) { - // This is a relayed/forwarded Activity (as opposed to a shared/boosted object) - // Reparse the encapsulated Activity and use that instead - logger('relayed activity', LOGGER_DEBUG); - $rawActivity = json_decode($activity->raw, true); - $activity = new ActivityStreams($rawActivity['object'], null, true); - } - if (in_array($activity->type, ['Add', 'Remove']) - && is_array($activity->obj) - && array_key_exists('object', $activity->obj) - && array_key_exists('actor', $activity->obj) - && !empty($activity->tgt)) { - logger('relayed collection operation', LOGGER_DEBUG); - $isCollectionOperation = true; - $rawActivity = json_decode($activity->raw, true); - $activity = new ActivityStreams($rawActivity['object'], null, true); - $activity->setApproveId($rawActivity['id']); - } - - logger($activity->debug(), LOGGER_DATA); - - if (!$activity->is_valid()) { - logger('not a valid activity'); - break; - } - if (is_array($activity->actor) && array_key_exists('id', $activity->actor)) { - self::actor_store($activity->actor['id'], $activity->actor); - } - - // ActivityPub sourced items are cacheable - $item = self::decode_note($activity, true); - - if (!$item) { + if (! $fetched) { break; } - logger('decoded_note: ' . print_r($item,true), LOGGER_DATA); + $item = $fetched['item']; + $activity = $fetched['activity']; + $collection = $fetched['collection']; - $hookinfo = [ - 'activity' => $activity, - 'item' => $item - ]; + // don't leak any private conversations to the public stream + // even if they contain publicly addressed comments/reactions - Hook::call('fetch_and_store', $hookinfo); + if (intval($channel['channel_system']) && intval($item['item_private'])) { + logger('private conversation ignored'); + $conversation = []; + break; + } + // We're fetching upstream starting with the initial post, + // so push each fetched activity to the head of the conversation. + array_unshift($conversation, ['activity' => $activity, 'item' => $item, 'collection' => $collection]); - $item = $hookinfo['item']; - - if ($item) { - // don't leak any private conversations to the public stream - // even if they contain publicly addressed comments/reactions - - if (intval($channel['channel_system']) && intval($item['item_private'])) { - logger('private conversation ignored'); - $conversation = []; - break; - } - // We're fetching upstream starting with the initial post, - // so push each fetched activity to the head of the conversation. - array_unshift($conversation, ['activity' => $activity, 'item' => $item, 'collection' => $isCollectionOperation]); - - if ($item['parent_mid'] === $item['mid']) { - break; - } + if ($item['parent_mid'] === $item['mid']) { + break; } $current_item = $item; } - if ($conversation && $conversation[0]['item']['mid'] === $conversation[0]['item']['parent_mid']) { - foreach ($conversation as $post) { - if ($post['activity']->is_valid()) { - self::store($channel, $observer_hash, $post['activity'], $post['item'], false, isCollectionOperation: $post['collection']); - } - } - return true; - } + if ($conversation) { + $parent_item = $conversation[0]['item']; +// // determine if the top-level post provides a replies collection +// if ($parent_item['obj']) { +// $parent_item['obj'] = json_decode($parent_item['obj'],true); +// } +// logger('topfetch: ' . print_r($parent_item,true), LOGGER_ALL); +// $id = ((array_path_exists('obj/replies/id',$parent_item)) ? $parent_item['obj']['replies']['id'] : false); +// if (! $id) { +// $id = ((array_path_exists('obj/replies',$parent_item) && is_string($parent_item['obj']['replies'])) ? $parent_item['obj']['replies'] : false); +// } +// if ($id) { +// Run::Summon( [ 'Convo', $id, $channel['channel_id'], $observer_hash ] ); +// } + + + if ($conversation[0]['item']['mid'] === $conversation[0]['item']['parent_mid']) { + foreach ($conversation as $post) { + if ($post['activity']->is_valid()) { + self::store($channel, $observer_hash, $post['activity'], $post['item'], false, isCollectionOperation: $post['collection']); + } + } + return true; + } + } return false; } + public static function fetch_and_parse($message_id) + { + $json = self::fetch($message_id); + if (!$json) { + return false; + } + // set client flag to convert objects to implied activities + $activity = new ActivityStreams($json, null, true); + if ( + $activity->type === 'Announce' && is_array($activity->obj) + && array_key_exists('object', $activity->obj) && array_key_exists('actor', $activity->obj) + ) { + // This is a relayed/forwarded Activity (as opposed to a shared/boosted object) + // Reparse the encapsulated Activity and use that instead + logger('relayed activity', LOGGER_DEBUG); + $rawActivity = json_decode($activity->raw, true); + $activity = new ActivityStreams($rawActivity['object'], null, true); + } + + if (in_array($activity->type, ['Add', 'Remove']) + && is_array($activity->obj) + && array_key_exists('object', $activity->obj) + && array_key_exists('actor', $activity->obj) + && !empty($activity->tgt)) { + logger('relayed collection operation', LOGGER_DEBUG); + $isCollectionOperation = true; + $rawActivity = json_decode($activity->raw, true); + $activity = new ActivityStreams($rawActivity['object'], null, true); + $activity->setApproveId($rawActivity['id']); + } + + logger($activity->debug(), LOGGER_DATA); + + if (!$activity->is_valid()) { + logger('not a valid activity'); + return false; + } + if (is_array($activity->actor) && array_key_exists('id', $activity->actor)) { + self::actor_store($activity->actor['id'], $activity->actor); + } + + // ActivityPub sourced items are cacheable + $item = self::decode_note($activity, true); + + if (!$item) { + return false; + } + + logger('decoded_note: ' . print_r($item,true), LOGGER_DATA); + + $hookinfo = [ + 'activity' => $activity, + 'item' => $item, + 'collection' => $isCollectionOperation, + ]; + + Hook::call('fetch_and_store', $hookinfo); + + return $hookinfo; + } // This function is designed to work with Nomad attachments and item body