Improved queue processing

This commit is contained in:
Michael 2022-08-07 19:24:50 +00:00
parent 22c1873064
commit d60d2caef6
6 changed files with 169 additions and 78 deletions

View file

@ -598,7 +598,7 @@ class Receiver
return true;
}
}
if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) {
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
}
@ -609,11 +609,15 @@ class Receiver
}
if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) {
// We delay by 5 seconds to allow to accumulate all receivers
$delayed = date(DateTimeFormat::MYSQL, time() + 5);
Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
$wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
Queue::setWorkerId($object_data['entry-id'], $wid);
if (Queue::isProcessable($object_data['entry-id'])) {
// We delay by 5 seconds to allow to accumulate all receivers
$delayed = date(DateTimeFormat::MYSQL, time() + 5);
Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
$wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
Queue::setWorkerId($object_data['entry-id'], $wid);
} else {
Logger::debug('Other queue entries need to be processed first.', ['id' => $object_data['entry-id']]);
}
return false;
}
@ -687,13 +691,18 @@ class Receiver
$object_data['thread-completion'] = Contact::getIdForURL($actor);
$object_data['completion-mode'] = self::COMPLETION_ANNOUCE;
$item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
if (empty($item)) {
return false;
}
if (!Post::exists(['uri' => $object_data['id'], 'uid' => 0])) {
$item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
if (empty($item)) {
return false;
}
$item['post-reason'] = Item::PR_ANNOUNCEMENT;
ActivityPub\Processor::postItem($object_data, $item);
$item['post-reason'] = Item::PR_ANNOUNCEMENT;
ActivityPub\Processor::postItem($object_data, $item);
} else {
Logger::info('Announced id already exists', ['id' => $object_data['id']]);
Queue::remove($object_data);
}
if (!empty($activity)) {
$announce_object_data = self::processObject($activity);