Retrial counter for the inbox queue

This commit is contained in:
Michael 2024-06-22 02:29:24 +00:00
parent cd8189e859
commit a433a4623a
7 changed files with 57 additions and 50 deletions

View file

@ -1,6 +1,6 @@
-- ------------------------------------------ -- ------------------------------------------
-- Friendica 2024.06-rc (Yellow Archangel) -- Friendica 2024.06-rc (Yellow Archangel)
-- DB_UPDATE_VERSION 1566 -- DB_UPDATE_VERSION 1567
-- ------------------------------------------ -- ------------------------------------------
@ -825,6 +825,7 @@ CREATE TABLE IF NOT EXISTS `inbox-entry` (
`push` boolean COMMENT 'Is the entry pushed or have pulled it?', `push` boolean COMMENT 'Is the entry pushed or have pulled it?',
`trust` boolean COMMENT 'Do we trust this entry?', `trust` boolean COMMENT 'Do we trust this entry?',
`wid` int unsigned COMMENT 'Workerqueue id', `wid` int unsigned COMMENT 'Workerqueue id',
`retrial` tinyint unsigned DEFAULT 0 COMMENT 'Retrial counter',
PRIMARY KEY(`id`), PRIMARY KEY(`id`),
UNIQUE INDEX `activity-id` (`activity-id`), UNIQUE INDEX `activity-id` (`activity-id`),
INDEX `object-id` (`object-id`), INDEX `object-id` (`object-id`),

View file

@ -6,22 +6,23 @@ Incoming activity
Fields Fields
------ ------
| Field | Description | Type | Null | Key | Default | Extra | | Field | Description | Type | Null | Key | Default | Extra |
| ------------------ | -------------------------------------- | -------------- | ---- | --- | ------- | -------------- | | ------------------ | -------------------------------------- | ---------------- | ---- | --- | ------- | -------------- |
| id | sequential ID | int unsigned | NO | PRI | NULL | auto_increment | | id | sequential ID | int unsigned | NO | PRI | NULL | auto_increment |
| activity-id | id of the incoming activity | varbinary(383) | YES | | NULL | | | activity-id | id of the incoming activity | varbinary(383) | YES | | NULL | |
| object-id | | varbinary(383) | YES | | NULL | | | object-id | | varbinary(383) | YES | | NULL | |
| in-reply-to-id | | varbinary(383) | YES | | NULL | | | in-reply-to-id | | varbinary(383) | YES | | NULL | |
| conversation | | varbinary(383) | YES | | NULL | | | conversation | | varbinary(383) | YES | | NULL | |
| type | Type of the activity | varchar(64) | YES | | NULL | | | type | Type of the activity | varchar(64) | YES | | NULL | |
| object-type | Type of the object activity | varchar(64) | YES | | NULL | | | object-type | Type of the object activity | varchar(64) | YES | | NULL | |
| object-object-type | Type of the object's object activity | varchar(64) | YES | | NULL | | | object-object-type | Type of the object's object activity | varchar(64) | YES | | NULL | |
| received | Receiving date | datetime | YES | | NULL | | | received | Receiving date | datetime | YES | | NULL | |
| activity | The JSON activity | mediumtext | YES | | NULL | | | activity | The JSON activity | mediumtext | YES | | NULL | |
| signer | | varchar(255) | YES | | NULL | | | signer | | varchar(255) | YES | | NULL | |
| push | Is the entry pushed or have pulled it? | boolean | YES | | NULL | | | push | Is the entry pushed or have pulled it? | boolean | YES | | NULL | |
| trust | Do we trust this entry? | boolean | YES | | NULL | | | trust | Do we trust this entry? | boolean | YES | | NULL | |
| wid | Workerqueue id | int unsigned | YES | | NULL | | | wid | Workerqueue id | int unsigned | YES | | NULL | |
| retrial | Retrial counter | tinyint unsigned | YES | | 0 | |
Indexes Indexes
------------ ------------

View file

@ -250,15 +250,34 @@ class Queue
*/ */
public static function processAll() public static function processAll()
{ {
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`trust` AND `wid` IS NULL"], ['order' => ['id' => true]]); $expired_days = max(1, DI::config()->get('system', 'queue_expired_days'));
$max_retrial = max(3, DI::config()->get('system', 'queue_retrial'));
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id', 'received', 'trust', 'retrial'], ["`wid` IS NULL"], ['order' => ['retrial', 'id' => true]]);
while ($entry = DBA::fetch($entries)) { while ($entry = DBA::fetch($entries)) {
if (!self::isProcessable($entry['id'])) { // We delete all entries that aren't associated with a worker entry after a given amount of days or retrials
if (($entry['retrial'] > $max_retrial) || ($entry['received'] < DateTimeFormat::utc('now - ' . $expired_days . ' days'))) {
self::deleteById($entry['id']);
}
if (!$entry['trust'] || !self::isProcessable($entry['id'])) {
continue; continue;
} }
Logger::debug('Process leftover entry', $entry); Logger::debug('Process leftover entry', $entry);
self::process($entry['id'], false); self::process($entry['id'], false);
} }
DBA::close($entries); DBA::close($entries);
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {
Logger::info('Optimize start');
DBA::optimizeTable('inbox-entry');
Logger::info('Optimize end');
}
}
private static function retrial(int $id)
{
DBA::update('inbox-entry', ["`retrial` = `retrial` + 1"], ['id' => $id]);
} }
public static function isProcessable(int $id): bool public static function isProcessable(int $id): bool
@ -286,14 +305,16 @@ class Queue
} }
if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id'])) { if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id'])) {
if (DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) { if (DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
// This entry belongs to some other entry that should be processed first // This entry belongs to some other entry that should be processed first
self::retrial($id);
return false; return false;
} }
if (!Post::exists(['uri' => $entry['in-reply-to-id']])) { if (!Post::exists(['uri' => $entry['in-reply-to-id']])) {
// This entry belongs to some other entry that need to be fetched first // This entry belongs to some other entry that need to be fetched first
if (Fetch::hasWorker($entry['in-reply-to-id'])) { if (Fetch::hasWorker($entry['in-reply-to-id'])) {
Logger::debug('Fetching of the activity is already queued', ['id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]); Logger::debug('Fetching of the activity is already queued', ['id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]);
self::retrial($id);
return false; return false;
} }
Fetch::add($entry['in-reply-to-id']); Fetch::add($entry['in-reply-to-id']);
@ -302,6 +323,7 @@ class Queue
$wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $entry['in-reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC); $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $entry['in-reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC);
Fetch::setWorkerId($entry['in-reply-to-id'], $wid); Fetch::setWorkerId($entry['in-reply-to-id'], $wid);
Logger::debug('Fetch missing activity', ['wid' => $wid, 'id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]); Logger::debug('Fetch missing activity', ['wid' => $wid, 'id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]);
self::retrial($id);
return false; return false;
} }
} }
@ -309,29 +331,6 @@ class Queue
return true; return true;
} }
/**
* Clear old activities
*
* @return void
*/
public static function clear()
{
// We delete all entries that aren't associated with a worker entry after seven days.
// The other entries are deleted when the worker deferred for too long.
$entries = DBA::select('inbox-entry', ['id'], ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
while ($entry = DBA::fetch($entries)) {
self::deleteById($entry['id']);
}
DBA::close($entries);
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {
Logger::info('Optimize start');
DBA::optimizeTable('inbox-entry');
Logger::info('Optimize end');
}
}
/** /**
* Process all activities that are children of a given post url * Process all activities that are children of a given post url
* *

View file

@ -92,9 +92,6 @@ class Cron
Tag::setLocalTrendingHashtags(24, 20); Tag::setLocalTrendingHashtags(24, 20);
Tag::setGlobalTrendingHashtags(24, 20); Tag::setGlobalTrendingHashtags(24, 20);
// Remove old pending posts from the queue
Queue::clear();
// Process all unprocessed entries // Process all unprocessed entries
Queue::processAll(); Queue::processAll();

View file

@ -56,7 +56,7 @@ use Friendica\Database\DBA;
// This file is required several times during the test in DbaDefinition which justifies this condition // This file is required several times during the test in DbaDefinition which justifies this condition
if (!defined('DB_UPDATE_VERSION')) { if (!defined('DB_UPDATE_VERSION')) {
define('DB_UPDATE_VERSION', 1566); define('DB_UPDATE_VERSION', 1567);
} }
return [ return [
@ -875,6 +875,7 @@ return [
"push" => ["type" => "boolean", "comment" => "Is the entry pushed or have pulled it?"], "push" => ["type" => "boolean", "comment" => "Is the entry pushed or have pulled it?"],
"trust" => ["type" => "boolean", "comment" => "Do we trust this entry?"], "trust" => ["type" => "boolean", "comment" => "Do we trust this entry?"],
"wid" => ["type" => "int unsigned", "foreign" => ["workerqueue" => "id"], "comment" => "Workerqueue id"], "wid" => ["type" => "int unsigned", "foreign" => ["workerqueue" => "id"], "comment" => "Workerqueue id"],
"retrial" => ["type" => "tinyint unsigned", "default" => "0", "comment" => "Retrial counter"],
], ],
"indexes" => [ "indexes" => [
"PRIMARY" => ["id"], "PRIMARY" => ["id"],

View file

@ -478,6 +478,14 @@ return [
// Enable internal timings to help optimize code. Needed for "rendertime" addon. // Enable internal timings to help optimize code. Needed for "rendertime" addon.
'profiler' => false, 'profiler' => false,
// queue_expired_days (Integer)
// Number of days after unprocessed inbox items are removed from the queue. Minimum is 1.
'queue_expired_days' => 7,
// queue_retrial (Integer)
// Number of retrial after unprocessed inbox items are removed from the queue. Minimum is 3.
'queue_retrial' => 10,
// redis_host (String) // redis_host (String)
// Host name or the path to the Unix domain socket of the Redis daemon. // Host name or the path to the Unix domain socket of the Redis daemon.
'redis_host' => '127.0.0.1', 'redis_host' => '127.0.0.1',

View file

@ -8,7 +8,7 @@ msgid ""
msgstr "" msgstr ""
"Project-Id-Version: 2024.06-rc\n" "Project-Id-Version: 2024.06-rc\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-06-23 13:19+0200\n" "POT-Creation-Date: 2024-06-23 14:42+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n" "Language-Team: LANGUAGE <LL@li.org>\n"
@ -3965,8 +3965,8 @@ msgid ""
"profile\n" "profile\n"
"\t\t\t(on the \"Profiles\" page) so that other people can easily find you.\n" "\t\t\t(on the \"Profiles\" page) so that other people can easily find you.\n"
"\n" "\n"
"\t\t\tWe recommend adding a profile photo, adding some profile \"keywords\" " "\t\t\tWe recommend adding a profile photo, adding some profile "
"(very useful\n" "\"keywords\" (very useful\n"
"\t\t\tin making new friends) - and perhaps what country you live in; if you " "\t\t\tin making new friends) - and perhaps what country you live in; if you "
"do not wish\n" "do not wish\n"
"\t\t\tto be more specific than that.\n" "\t\t\tto be more specific than that.\n"