From d9aee0b3ea0167dec06092e0d474dc44fdf9faab Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 10 Aug 2022 09:28:18 +0000 Subject: [PATCH 1/2] Conversation entries will now be stored asynchronous if possible --- database.sql | 10 +-- doc/database.md | 2 +- doc/database/db_fetched-activity.md | 22 +++++ doc/database/db_processed-activity.md | 22 ----- src/Content/Conversation.php | 2 +- src/Model/Item.php | 2 +- src/Protocol/ActivityPub/Processor.php | 108 ++++++++++++++++++++----- static/dbstructure.config.php | 8 +- 8 files changed, 120 insertions(+), 56 deletions(-) create mode 100644 doc/database/db_fetched-activity.md delete mode 100644 doc/database/db_processed-activity.md diff --git a/database.sql b/database.sql index 6126e58a87..6df09b14b9 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ -- Friendica 2022.09-dev (Giant Rhubarb) --- DB_UPDATE_VERSION 1478 +-- DB_UPDATE_VERSION 1479 -- ------------------------------------------ @@ -1726,13 +1726,13 @@ CREATE TABLE IF NOT EXISTS `arrived-activity` ( ) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities'; -- --- TABLE processed-activity +-- TABLE fetched-activity -- -CREATE TABLE IF NOT EXISTS `processed-activity` ( - `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity', +CREATE TABLE IF NOT EXISTS `fetched-activity` ( + `object-id` varbinary(255) NOT NULL COMMENT 'object id of fetched activity', `received` datetime COMMENT 'Receiving date', PRIMARY KEY(`object-id`) -) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities'; +) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of fetched activities'; -- -- TABLE worker-ipc diff --git a/doc/database.md b/doc/database.md index fe505ce18b..5aed24e91a 100644 --- a/doc/database.md +++ b/doc/database.md @@ -26,6 +26,7 @@ Database Tables | [event](help/database/db_event) | Events | | [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation | | [fetch-entry](help/database/db_fetch-entry) | | +| [fetched-activity](help/database/db_fetched-activity) | Id of fetched activities | | [fsuggest](help/database/db_fsuggest) | friend suggestion stuff | | [group](help/database/db_group) | privacy groups, group info | | [group_member](help/database/db_group_member) | privacy groups, member info | @@ -68,7 +69,6 @@ Database Tables | [post-user](help/database/db_post-user) | User specific post data | | [post-user-notification](help/database/db_post-user-notification) | User post notifications | | [process](help/database/db_process) | Currently running system processes | -| [processed-activity](help/database/db_processed-activity) | Id of processed activities | | [profile](help/database/db_profile) | user profiles data | | [profile_field](help/database/db_profile_field) | Custom profile fields | | [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers | diff --git a/doc/database/db_fetched-activity.md b/doc/database/db_fetched-activity.md new file mode 100644 index 0000000000..0a3ae7f89a --- /dev/null +++ b/doc/database/db_fetched-activity.md @@ -0,0 +1,22 @@ +Table fetched-activity +=========== + +Id of fetched activities + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| --------- | ----------------------------- | -------------- | ---- | --- | ------- | ----- | +| object-id | object id of fetched activity | varbinary(255) | NO | PRI | NULL | | +| received | Receiving date | datetime | YES | | NULL | | + +Indexes +------------ + +| Name | Fields | +| ------- | --------- | +| PRIMARY | object-id | + + +Return to [database documentation](help/database) diff --git a/doc/database/db_processed-activity.md b/doc/database/db_processed-activity.md deleted file mode 100644 index 618b46f972..0000000000 --- a/doc/database/db_processed-activity.md +++ /dev/null @@ -1,22 +0,0 @@ -Table processed-activity -=========== - -Id of processed activities - -Fields ------- - -| Field | Description | Type | Null | Key | Default | Extra | -| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- | -| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | | -| received | Receiving date | datetime | YES | | NULL | | - -Indexes ------------- - -| Name | Fields | -| ------- | --------- | -| PRIMARY | object-id | - - -Return to [database documentation](help/database) diff --git a/src/Content/Conversation.php b/src/Content/Conversation.php index 3941c514a9..744e9ed18f 100644 --- a/src/Content/Conversation.php +++ b/src/Content/Conversation.php @@ -854,7 +854,7 @@ class Conversation $row['direction'] = ['direction' => 7, 'title' => $this->l10n->t('You had been addressed (%s).', 'bcc')]; break; case ItemModel::PR_FOLLOWER: - $row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['author-name'])]; + $row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['causer-name'] ?: $row['author-name'])]; break; case ItemModel::PR_TAG: $row['direction'] = ['direction' => 4, 'title' => $this->l10n->t('You subscribed to one or more tags in this post.')]; diff --git a/src/Model/Item.php b/src/Model/Item.php index 8566643ed2..c5b8bcf3c2 100644 --- a/src/Model/Item.php +++ b/src/Model/Item.php @@ -1513,7 +1513,7 @@ class Item $is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE); - if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) && + if (($uid != 0) && (($item['gravity'] == GRAVITY_PARENT) || $is_reshare) && DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE && !in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) { Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]); diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index f142b7e91d..c029d1bc6e 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -69,20 +69,20 @@ class Processor */ private static function addActivityId(string $id) { - DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); - DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); + DBA::delete('fetched-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); + DBA::insert('fetched-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); } /** - * Checks if the given object id has just been processed + * Checks if the given object id has just been fetched * * @param string $id * * @return boolean */ - private static function isProcessed(string $id): bool + private static function isFetched(string $id): bool { - return DBA::exists('processed-activity', ['object-id' => $id]); + return DBA::exists('fetched-activity', ['object-id' => $id]); } /** @@ -303,13 +303,6 @@ class Processor */ public static function createItem(array $activity, bool $fetch_parents): array { - if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) { - Logger::info('Id is already processed', ['id' => $activity['id']]); - return []; - } - - self::addActivityId($activity['id']); - $item = []; $item['verb'] = Activity::POST; $item['thr-parent'] = $activity['reply-to-id']; @@ -333,6 +326,7 @@ class Processor if (!empty($conversation)) { Logger::debug('Got conversation', ['conversation' => $item['conversation'], 'parent' => $conversation]); $item['parent-uri'] = $conversation['uri']; + $item['parent-uri-id'] = ItemURI::getIdByURI($item['parent-uri']); } } else { $conversation = []; @@ -350,7 +344,7 @@ class Processor } if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { - $result = self::fetchParent($activity); + $result = self::fetchParent($activity, !empty($conversation)); if (!empty($result)) { if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { $item['thr-parent'] = $result; @@ -457,6 +451,8 @@ class Processor return []; } + $item['thr-parent-id'] = ItemURI::getIdByURI($item['thr-parent']); + $item = self::processContent($activity, $item); if (empty($item)) { Logger::info('Message was not processed'); @@ -489,14 +485,26 @@ class Processor * Fetch and process parent posts for the given activity * * @param array $activity + * @param bool $in_background * * @return string */ - private static function fetchParent(array $activity): string + private static function fetchParent(array $activity, bool $in_background = false): string { + if (self::isFetched($activity['reply-to-id'])) { + Logger::info('Id is already fetched', ['id' => $activity['reply-to-id']]); + return ''; + } + + self::addActivityId($activity['reply-to-id']); + + if (!DI::config()->get('system', 'fetch_by_worker')) { + $in_background = false; + } + $recursion_depth = $activity['recursion-depth'] ?? 0; - if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { + if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) { Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { @@ -525,17 +533,19 @@ class Processor } } elseif (self::isActivityGone($activity['reply-to-id'])) { Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]); - if (!empty($activity['entry-id'])) { + if ($in_background) { + // fetching in background is done for all activities where we have got the conversation + // There we only delete the single activity and not thr whole thread since we can store the + // other posts in the thread even with missing posts. + Queue::remove($activity); + } elseif (!empty($activity['entry-id'])) { Queue::deleteById($activity['entry-id']); } return ''; - } else { + } elseif (!$in_background) { Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - } - - if (Queue::hasWorker($activity['worker-id'] ?? 0)) { - Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]); - return ''; + } elseif ($in_background) { + Logger::notice('Fetching is done in the background.', ['parent' => $activity['reply-to-id']]); } if (!Fetch::hasWorker($activity['reply-to-id'])) { @@ -1056,6 +1066,10 @@ class Processor Logger::info('Accepting post', ['uid' => $receiver, 'url' => $item['uri']]); } + if (!self::hasParents($item, $receiver)) { + continue; + } + if (($item['gravity'] != GRAVITY_ACTIVITY) && ($activity['object_type'] == 'as:Event')) { $event_id = self::createEvent($activity, $item); @@ -1096,6 +1110,56 @@ class Processor } } + /** + * Checks if there are parent posts for the given receiver. + * If not, then the system will try to add them. + * + * @param array $item + * @param integer $receiver + * @return boolean + */ + private static function hasParents(array $item, int $receiver) + { + if (($receiver == 0) || ($item['gravity'] == GRAVITY_PARENT)) { + return true; + } + + $fields = ['causer-id' => $item['causer-id'] ?? $item['author-id'], 'post-reason' => Item::PR_FETCHED]; + + $has_parents = false; + + if (!empty($item['parent-uri-id'])) { + if (Post::exists(['uri-id' => $item['parent-uri-id'], 'uid' => $receiver])) { + $has_parents = true; + } elseif (Post::exists(['uri-id' => $item['parent-uri'], 'uid' => 0])) { + $stored = Item::storeForUserByUriId($item['parent-uri-id'], $receiver, $fields); + $has_parents = (bool)$stored; + if ($stored) { + Logger::notice('Inserted missing parent post', ['stored' => $stored, 'uid' => $receiver, 'parent' => $item['parent-uri']]); + } else { + Logger::notice('Parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'parent' => $item['parent-uri']]); + return false; + } + } + } + + if (empty($item['parent-uri-id']) || ($item['thr-parent-id'] != $item['parent-uri-id'])) { + if (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => $receiver])) { + $has_parents = true; + } elseif (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => 0])) { + $stored = Item::storeForUserByUriId($item['thr-parent-id'], $receiver, $fields); + $has_parents = $has_parents || (bool)$stored; + if ($stored) { + Logger::notice('Inserted missing thread parent post', ['stored' => $stored, 'uid' => $receiver, 'thread-parent' => $item['thr-parent']]); + } else { + Logger::notice('Thread parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'thread-parent' => $item['thr-parent']]); + } + } + } + + return $has_parents; + } + /** * Store tags and mentions into the tag table * diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index df5115dcb4..eb751d5b31 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1478); + define('DB_UPDATE_VERSION', 1479); } return [ @@ -1734,10 +1734,10 @@ return [ ], "engine" => "MEMORY", ], - "processed-activity" => [ - "comment" => "Id of processed activities", + "fetched-activity" => [ + "comment" => "Id of fetched activities", "fields" => [ - "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"], + "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of fetched activity"], "received" => ["type" => "datetime", "comment" => "Receiving date"], ], "indexes" => [ From e2896b449b544e451d9a55f398b5990072cee644 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 10 Aug 2022 13:43:00 +0000 Subject: [PATCH 2/2] Changes after code review --- src/Protocol/ActivityPub/Processor.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index c029d1bc6e..76bab6b81c 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -535,17 +535,17 @@ class Processor Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]); if ($in_background) { // fetching in background is done for all activities where we have got the conversation - // There we only delete the single activity and not thr whole thread since we can store the + // There we only delete the single activity and not the whole thread since we can store the // other posts in the thread even with missing posts. Queue::remove($activity); } elseif (!empty($activity['entry-id'])) { Queue::deleteById($activity['entry-id']); } return ''; - } elseif (!$in_background) { - Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); } elseif ($in_background) { Logger::notice('Fetching is done in the background.', ['parent' => $activity['reply-to-id']]); + } else { + Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); } if (!Fetch::hasWorker($activity['reply-to-id'])) {