Hopefully fixes loops during message processing

This commit is contained in:
Michael 2022-08-03 03:38:03 +00:00
parent 64894f9d6f
commit f2eec66240
6 changed files with 142 additions and 76 deletions

View file

@ -84,6 +84,18 @@ class Queue
return $activity;
}
/**
* Checks if an entryy for a given url and type already exists
*
* @param string $url
* @param string $type
* @return boolean
*/
public static function exists(string $url, string $type): bool
{
return DBA::exists('inbox-entry', ['type' => $type, 'object-id' => $url]);
}
/**
* Remove activity from the queue
*
@ -132,30 +144,31 @@ class Queue
/**
* Set the worker id for the queue entry
*
* @param array $activity
* @param int $wid
* @param int $entryid
* @param int $wid
* @return void
*/
public static function setWorkerId(array $activity, int $wid)
public static function setWorkerId(int $entryid, int $wid)
{
if (empty($activity['entry-id']) || empty($wid)) {
if (empty($entryid) || empty($wid)) {
return;
}
DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]);
DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entryid]);
}
/**
* Check if there is an assigned worker task
*
* @param array $activity
* @param int $wid
*
* @return bool
*/
public static function hasWorker(array $activity = []): bool
public static function hasWorker(int $wid): bool
{
if (empty($activity['worker-id'])) {
if (empty($wid)) {
return false;
}
return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]);
return DBA::exists('workerqueue', ['id' => $wid, 'done' => false]);
}
/**
@ -172,6 +185,18 @@ class Queue
return false;
}
if (!empty($entry['wid'])) {
$worker = DI::app()->getQueue();
$wid = $worker['id'] ?? 0;
if ($entry['wid'] != $wid) {
$workerqueue = DBA::selectFirst('workerqueue', ['pid'], ['id' => $entry['wid'], 'done' => false]);
if (!empty($workerqueue['pid']) && posix_kill($workerqueue['pid'], 0)) {
Logger::notice('Entry is already processed via another process.', ['current' => $wid, 'processor' => $entry['wid']]);
return false;
}
}
}
Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
$activity = json_decode($entry['activity'], true);
@ -314,6 +339,5 @@ class Queue
}
}
DBA::close($entries);
}
}