Bulk delivery added for all protocols

This commit is contained in:
Michael 2022-12-31 12:19:34 +00:00
parent 3fcc45a720
commit 259b99e6e9
12 changed files with 398 additions and 49 deletions

View file

@ -1264,7 +1264,7 @@ class Worker
$command = array_shift($args);
$parameters = json_encode($args);
$found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$queue = DBA::selectFirst('workerqueue', [], ['command' => $command, 'parameter' => $parameters, 'done' => false]);
$added = 0;
if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) {
@ -1277,14 +1277,17 @@ class Worker
return 0;
}
if (!$found) {
if (empty($queue)) {
if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
'priority' => $priority, 'next_try' => $delayed])) {
return 0;
}
$added = DBA::lastInsertId();
} elseif ($force_priority) {
DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
$ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]);
if ($ret && ($priority != $queue['priority'])) {
$added = $queue['id'];
}
}
// Set the IPC flag to ensure an immediate process execution via daemon

View file

@ -26,8 +26,10 @@ use Friendica\Core\Worker;
use Friendica\Database\DBA;
use Friendica\DI;
use Friendica\Model\Contact;
use Friendica\Model\GServer;
use Friendica\Model\Post;
use Friendica\Protocol\ActivityPub;
use Friendica\Protocol\Delivery;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\Strings;
@ -58,7 +60,10 @@ class Cron
// Remove old entries from the workerqueue
self::cleanWorkerQueue();
// Directly deliver or requeue posts
// Directly deliver or requeue posts to ActivityPub systems
self::deliverAPPosts();
// Directly deliver or requeue posts to other systems
self::deliverPosts();
}
@ -157,7 +162,7 @@ class Cron
*
* This function is placed here as a safeguard. Even when the worker queue is completely blocked, messages will be delivered.
*/
private static function deliverPosts()
private static function deliverAPPosts()
{
$deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox` ORDER BY RAND()");
while ($delivery = DBA::fetch($deliveries)) {
@ -181,7 +186,7 @@ class Cron
}
if (Worker::add(['priority' => $priority, 'force_priority' => true], 'APDelivery', '', 0, $delivery['inbox'], 0)) {
Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
Logger::info('Priority for APDelivery worker adjusted', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
}
}
@ -193,6 +198,41 @@ class Cron
}
}
/**
* Directly deliver messages or requeue them.
*/
private static function deliverPosts()
{
$deliveries = DBA::p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM `delivery-queue` GROUP BY `gsid` ORDER BY RAND()");
while ($delivery = DBA::fetch($deliveries)) {
if ($delivery['failed'] > 0) {
Logger::info('Removing failed deliveries', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed']]);
Delivery::removeFailedQueue($delivery['gsid']);
}
if (($delivery['failed'] < 3) || GServer::reachableById($delivery['gsid'])) {
$priority = Worker::PRIORITY_HIGH;
} elseif ($delivery['failed'] < 6) {
$priority = Worker::PRIORITY_MEDIUM;
} elseif ($delivery['failed'] < 8) {
$priority = Worker::PRIORITY_LOW;
} else {
$priority = Worker::PRIORITY_NEGLIGIBLE;
}
if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery['gsid'])) {
Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed'], 'priority' => $priority]);
}
}
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {
Logger::info('Optimize start');
DBA::e("OPTIMIZE TABLE `delivery-queue`");
Logger::info('Optimize end');
}
}
/**
* Add missing "intro" records.
*