From a1947d2bb106e932cb72616e6c4cf809d249a1f6 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 25 Oct 2023 20:16:36 +0000 Subject: [PATCH] Improved asynchronous message procession --- src/Content/Item.php | 2 +- src/Model/Item.php | 9 ++--- src/Model/Post/Media.php | 3 +- src/Protocol/ActivityPub/ClientToServer.php | 2 +- src/Protocol/ActivityPub/Processor.php | 38 ++++++++------------- src/Protocol/ActivityPub/Queue.php | 23 +++++++++++-- src/Protocol/ActivityPub/Receiver.php | 3 +- static/defaults.config.php | 4 --- 8 files changed, 46 insertions(+), 38 deletions(-) diff --git a/src/Content/Item.php b/src/Content/Item.php index c604f26215..48dcb80d32 100644 --- a/src/Content/Item.php +++ b/src/Content/Item.php @@ -635,7 +635,7 @@ class Item public function addSharedPost(array $item, string $body = ''): string { if (empty($body)) { - $body = $item['body']; + $body = $item['body'] ?? ''; } if (empty($item['quote-uri-id']) || ($item['quote-uri-id'] == $item['uri-id'])) { diff --git a/src/Model/Item.php b/src/Model/Item.php index 04853314f5..0e8a04b3ef 100644 --- a/src/Model/Item.php +++ b/src/Model/Item.php @@ -3905,11 +3905,12 @@ class Item * Fetches item for given URI or plink * * @param string $uri - * @param integer $uid + * @param int $uid + * @param int $completion * * @return integer item id */ - public static function fetchByLink(string $uri, int $uid = 0): int + public static function fetchByLink(string $uri, int $uid = 0, int $completion = ActivityPub\Receiver::COMPLETION_MANUAL): int { Logger::info('Trying to fetch link', ['uid' => $uid, 'uri' => $uri]); $item_id = self::searchByLink($uri, $uid); @@ -3930,7 +3931,7 @@ class Item return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0; } - $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri, [], '', ActivityPub\Receiver::COMPLETION_MANUAL, $uid); + $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri, [], '', $completion, $uid); if ($fetched_uri) { $item_id = self::searchByLink($fetched_uri, $uid); @@ -3990,7 +3991,7 @@ class Item } $url = $shared['message_id'] ?: $shared['link']; - $id = self::fetchByLink($url); + $id = self::fetchByLink($url, 0, ActivityPub\Receiver::COMPLETION_ASYNC); if (!$id) { Logger::notice('Post could not be fetched.', ['url' => $url, 'uid' => $uid]); return 0; diff --git a/src/Model/Post/Media.php b/src/Model/Post/Media.php index 4ba67de9a8..671c19f454 100644 --- a/src/Model/Post/Media.php +++ b/src/Model/Post/Media.php @@ -35,6 +35,7 @@ use Friendica\Model\Photo; use Friendica\Model\Post; use Friendica\Network\HTTPClient\Client\HttpClientAccept; use Friendica\Network\HTTPClient\Client\HttpClientOptions; +use Friendica\Protocol\ActivityPub; use Friendica\Util\Images; use Friendica\Util\Network; use Friendica\Util\ParseUrl; @@ -253,7 +254,7 @@ class Media */ private static function addActivity(array $media): array { - $id = Item::fetchByLink($media['url']); + $id = Item::fetchByLink($media['url'], 0, ActivityPub\Receiver::COMPLETION_ASYNC); if (empty($id)) { return $media; } diff --git a/src/Protocol/ActivityPub/ClientToServer.php b/src/Protocol/ActivityPub/ClientToServer.php index 2b52d17b76..8bc4dcc059 100644 --- a/src/Protocol/ActivityPub/ClientToServer.php +++ b/src/Protocol/ActivityPub/ClientToServer.php @@ -142,7 +142,7 @@ class ClientToServer */ private static function updateContent(int $uid, string $object_id, array $application, array $ldactivity): array { - $id = Item::fetchByLink($object_id, $uid); + $id = Item::fetchByLink($object_id, $uid, ActivityPub\Receiver::COMPLETION_ASYNC); $original_post = Post::selectFirst(['uri-id'], ['uid' => $uid, 'origin' => true, 'id' => $id]); if (empty($original_post)) { Logger::debug('Item not found or does not belong to the user', ['id' => $id, 'uid' => $uid, 'object_id' => $object_id, 'activity' => $ldactivity]); diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index df19ab8b8f..2e204717b8 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -348,12 +348,8 @@ class Processor if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { $result = self::fetchParent($activity, !empty($conversation)); - if (!empty($result)) { - if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { - $item['thr-parent'] = $result; - } - } elseif (empty($conversation)) { - return []; + if (!empty($result) && ($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { + $item['thr-parent'] = $result; } } @@ -532,39 +528,35 @@ class Processor self::addActivityId($activity['reply-to-id']); - if (!DI::config()->get('system', 'fetch_by_worker')) { - $in_background = false; + $completion = $activity['completion-mode'] ?? Receiver::COMPLETION_NONE; + + if (DI::config()->get('system', 'decoupled_receiver') && ($completion != Receiver::COMPLETION_MANUAL)) { + $in_background = true; } $recursion_depth = $activity['recursion-depth'] ?? 0; if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) { - Logger::info('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + Logger::info('Parent not found. Try to refetch it.', ['completion' => $completion, 'recursion-depth' => $recursion_depth, 'parent' => $activity['reply-to-id']]); $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]); if (!empty($activity['entry-id'])) { Queue::deleteById($activity['entry-id']); } - return ''; } elseif (!empty($result)) { - $exists = Post::exists(['uri' => [$result, $activity['reply-to-id']]]); - if ($exists) { - Logger::info('The activity has been fetched and created.', ['parent' => $result]); - return $result; - } elseif (DI::config()->get('system', 'fetch_by_worker') || DI::config()->get('system', 'decoupled_receiver')) { - Logger::info('The activity has been fetched and will hopefully be created later.', ['parent' => $result]); + $post = Post::selectFirstPost(['uri'], ['uri' => [$result, $activity['reply-to-id']]]); + if (!empty($post['uri'])) { + Logger::info('The activity has been fetched and created.', ['result' => $result, 'uri' => $post['uri']]); + return $post['uri']; } else { Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]); if (!empty($activity['entry-id'])) { Queue::deleteById($activity['entry-id']); } } - return ''; - } - if (empty($result) && !DI::config()->get('system', 'fetch_by_worker')) { - return ''; } + return ''; } elseif (self::isActivityGone($activity['reply-to-id'])) { Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]); if ($in_background) { @@ -586,7 +578,7 @@ class Processor Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); Fetch::add($activity['reply-to-id']); $activity['recursion-depth'] = 0; - $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC); Fetch::setWorkerId($activity['reply-to-id'], $wid); } else { Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]); @@ -867,7 +859,7 @@ class Processor $content = self::addMentionLinks($content, $activity['tags']); if (!empty($activity['quote-url'])) { - $id = Item::fetchByLink($activity['quote-url']); + $id = Item::fetchByLink($activity['quote-url'], 0, ActivityPub\Receiver::COMPLETION_ASYNC); if ($id) { $shared_item = Post::selectFirst(['uri-id'], ['id' => $id]); $item['quote-uri-id'] = $shared_item['uri-id']; @@ -1456,7 +1448,7 @@ class Processor if (empty($post['id'])) { continue; } - $id = Item::fetchByLink($post['id']); + $id = Item::fetchByLink($post['id'], 0, ActivityPub\Receiver::COMPLETION_ASYNC); if (!empty($id)) { $item = Post::selectFirst(['uri-id', 'featured', 'author-id'], ['id' => $id]); if (!empty($item['uri-id'])) { diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 47c8e632a7..3a234e945a 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -22,6 +22,7 @@ namespace Friendica\Protocol\ActivityPub; use Friendica\Core\Logger; +use Friendica\Core\Worker; use Friendica\Database\Database; use Friendica\Database\DBA; use Friendica\DI; @@ -285,9 +286,25 @@ class Queue } } - if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) { - // This entry belongs to some other entry that should be processed first - return false; + if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id'])) { + if (DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) { + // This entry belongs to some other entry that should be processed first + return false; + } + if (!Post::exists(['uri' => $entry['in-reply-to-id']])) { + // This entry belongs to some other entry that need to be fetched first + if (Fetch::hasWorker($entry['in-reply-to-id'])) { + Logger::debug('Fetching of the activity is already queued', ['id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]); + return false; + } + Fetch::add($entry['in-reply-to-id']); + $activity = json_decode($entry['activity'], true); + $activity['recursion-depth'] = 0; + $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $entry['in-reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC); + Fetch::setWorkerId($entry['in-reply-to-id'], $wid); + Logger::debug('Fetch missing activity', ['wid' => $wid, 'id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]); + return false; + } } return true; diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 567fe3b86f..992b428c35 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -80,6 +80,7 @@ class Receiver const COMPLETION_RELAY = 2; const COMPLETION_MANUAL = 3; const COMPLETION_AUTO = 4; + const COMPLETION_ASYNC = 5; /** * Checks incoming message from the inbox @@ -681,7 +682,7 @@ class Receiver return true; } - if (!empty($object_data['entry-id']) && $decouple && ($push || ($completion == self::COMPLETION_RELAY))) { + if (!empty($object_data['entry-id']) && $decouple && ($push || in_array($completion, [self::COMPLETION_RELAY, self::COMPLETION_ASYNC]))) { if (Queue::isProcessable($object_data['entry-id'])) { // We delay by 5 seconds to allow to accumulate all receivers $delayed = date(DateTimeFormat::MYSQL, time() + 5); diff --git a/static/defaults.config.php b/static/defaults.config.php index b639e19166..eb39767b66 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -298,10 +298,6 @@ return [ // Priority for the expiry notification 'expire-notify-priority' => Friendica\Core\Worker::PRIORITY_LOW, - // fetch_by_worker (Boolean) - // Fetch missing posts via a background process - 'fetch_by_worker' => false, - // fetch_featured_posts (Boolean) // Fetch featured posts from all contacts 'fetch_featured_posts' => false,