Use deferred tasks

This commit is contained in:
Michael 2022-07-21 06:23:55 +00:00
parent d4a536137d
commit c775833117
4 changed files with 29 additions and 6 deletions

View file

@ -284,9 +284,16 @@ class Processor
$recursion_depth = $activity['recursion-depth'] ?? 0; $recursion_depth = $activity['recursion-depth'] ?? 0;
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
if ($recursion_depth < 10) { if ($recursion_depth < 10) {
self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
$fetch_by_worker = empty($result);
} else { } else {
Logger::notice('Recursion level is too high, fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
$activity['recursion-depth'] = 0;
$fetch_by_worker = true;
}
if ($fetch_by_worker) {
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
return []; return [];
} }

View file

@ -126,8 +126,16 @@ class Queue
while ($entry = DBA::fetch($entries)) { while ($entry = DBA::fetch($entries)) {
self::process($entry['id']); self::process($entry['id']);
} }
}
DBA::delete('inbox-entry', ["`received` < ?", DateTimeFormat::utc('now - 1 days')]); /**
* Process all activities
*
* @return void
*/
public static function clear()
{
DBA::delete('inbox-entry', ["`received` < ?", DateTimeFormat::utc('now - 2 days')]);
} }
/** /**

View file

@ -89,8 +89,8 @@ class Cron
Tag::setLocalTrendingHashtags(24, 20); Tag::setLocalTrendingHashtags(24, 20);
Tag::setGlobalTrendingHashtags(24, 20); Tag::setGlobalTrendingHashtags(24, 20);
// Process pending posts in the queue // Remove old pending posts from the queue
Queue::processAll(); Queue::clear();
// Search for new contacts in the directory // Search for new contacts in the directory
if (DI::config()->get('system', 'synchronize_directory')) { if (DI::config()->get('system', 'synchronize_directory')) {

View file

@ -22,6 +22,7 @@
namespace Friendica\Worker; namespace Friendica\Worker;
use Friendica\Core\Logger; use Friendica\Core\Logger;
use Friendica\Core\Worker;
use Friendica\Protocol\ActivityPub; use Friendica\Protocol\ActivityPub;
use Friendica\Protocol\ActivityPub\Receiver; use Friendica\Protocol\ActivityPub\Receiver;
@ -35,6 +36,13 @@ class FetchMissingActivity
{ {
Logger::info('Start fetching missing activity', ['url' => $url]); Logger::info('Start fetching missing activity', ['url' => $url]);
$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
Logger::info('Finished fetching missing activity', ['url' => $url, 'result' => $result]); if ($result) {
Logger::info('Successfully fetched missing activity', ['url' => $url]);
} elseif (!Worker::defer()) {
Logger::info('Activity could not be fetched', ['url' => $url]);
// Possibly we should recursively remove child activities at this point.
} else {
Logger::info('Fetching deferred', ['url' => $url]);
}
} }
} }