Configurable recursion depth when fetching replies

This commit is contained in:
Michael 2024-08-14 08:16:33 +00:00
parent 25cad5aae8
commit d6b37591f5
2 changed files with 54 additions and 22 deletions

View file

@ -267,6 +267,7 @@ class Processor
self::updateEvent($post['event-id'], $activity); self::updateEvent($post['event-id'], $activity);
} }
} }
self::processReplies($activity, $item);
} }
/** /**
@ -524,28 +525,34 @@ class Processor
} }
} }
if (DI::config()->get('system', 'decoupled_receiver') && ($activity['completion-mode'] ?? Receiver::COMPLETION_NONE != Receiver::COMPLETION_REPLIES)) { return $item;
$replies = [$item['thr-parent']]; }
if (!empty($item['parent-uri'])) {
$replies[] = $item['parent-uri'];
}
$condition = DBA::mergeConditions(['uri' => $replies], ["`replies-id` IS NOT NULL"]);
$posts = Post::select(['replies', 'replies-id'], $condition);
while ($post = Post::fetch($posts)) {
$cachekey = 'Processor-CreateItem-Replies-' . $post['replies-id'];
if (!DI::cache()->get($cachekey)) {
self::fetchReplies($post['replies'], $activity);
DI::cache()->set($cachekey, true);
}
}
DBA::close($replies);
if (!empty($item['replies'])) { private static function processReplies(array $activity, array $item)
self::fetchReplies($item['replies'], $activity); {
// @todo fetch replies not only in the decoupled mode
if (!DI::config()->get('system', 'decoupled_receiver')) {
return;
}
$replies = [$item['thr-parent']];
if (!empty($item['parent-uri'])) {
$replies[] = $item['parent-uri'];
}
$condition = DBA::mergeConditions(['uri' => $replies], ["`replies-id` IS NOT NULL"]);
$posts = Post::select(['replies', 'replies-id'], $condition);
while ($post = Post::fetch($posts)) {
$cachekey = 'Processor-CreateItem-Replies-' . $post['replies-id'];
if (!DI::cache()->get($cachekey)) {
self::fetchReplies($post['replies'], $activity);
DI::cache()->set($cachekey, true);
} }
} }
DBA::close($replies);
return $item; if (!empty($item['replies'])) {
self::fetchReplies($item['replies'], $activity);
}
} }
/** /**
@ -1247,7 +1254,7 @@ class Processor
Queue::remove($activity); Queue::remove($activity);
if ($success && Queue::hasChildren($item['uri']) && Post::exists(['uri' => $item['uri']])) { if ($success && Queue::hasChildren($item['uri']) && Post::exists(['uri' => $item['uri']])) {
Queue::processReplyByUri($item['uri']); Queue::processReplyByUri($item['uri'], $activity);
} }
// Store send a follow request for every reshare - but only when the item had been stored // Store send a follow request for every reshare - but only when the item had been stored
@ -1259,6 +1266,10 @@ class Processor
ActivityPub\Transmitter::sendFollowObject($item['uri'], $item['author-link']); ActivityPub\Transmitter::sendFollowObject($item['uri'], $item['author-link']);
} }
} }
if ($success) {
self::processReplies($activity, $item);
}
} }
/** /**
@ -1794,10 +1805,27 @@ class Processor
private static function fetchReplies(string $url, array $child) private static function fetchReplies(string $url, array $child)
{ {
if (in_array(__FUNCTION__, $child['callstack'] ?? [])) { $callstack_count = 0;
Logger::notice('Callstack already contains "' . __FUNCTION__ . '"', ['callstack' => $child['callstack']]); foreach ($child['callstack'] ?? [] as $function) {
if ($function == __FUNCTION__) {
++$callstack_count;
}
}
$callstack = array_slice(array_column(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS), 'function'), 1);
$system_count = 0;
foreach ($callstack as $function) {
if ($function == __FUNCTION__) {
++$system_count;
}
}
$maximum_fetchreplies_depth = DI::config()->get('system', 'max_fetchreplies_depth');
if (max($callstack_count, $system_count) == $maximum_fetchreplies_depth) {
Logger::notice('Maximum callstack depth reached', ['max' => $maximum_fetchreplies_depth, 'count' => $callstack_count, 'system-count' => $system_count, 'replies' => $url, 'callstack' => $child['callstack'] ?? [], 'system' => $callstack]);
return; return;
} }
$child['callstack'] = self::addToCallstack($child['callstack'] ?? []); $child['callstack'] = self::addToCallstack($child['callstack'] ?? []);
$replies = ActivityPub::fetchItems($url); $replies = ActivityPub::fetchItems($url);
@ -1805,7 +1833,7 @@ class Processor
Logger::notice('No replies', ['replies' => $url]); Logger::notice('No replies', ['replies' => $url]);
return; return;
} }
Logger::notice('Fetch replies - start', ['replies' => $url]); Logger::notice('Fetch replies - start', ['replies' => $url, 'callstack' => $child['callstack'], 'system' => $callstack]);
$fetched = 0; $fetched = 0;
foreach ($replies as $reply) { foreach ($replies as $reply) {
if (is_array($reply)) { if (is_array($reply)) {

View file

@ -388,6 +388,10 @@ return [
// Maximum number of feed items that are fetched and processed. For unlimited items set to 0. // Maximum number of feed items that are fetched and processed. For unlimited items set to 0.
'max_feed_items' => 20, 'max_feed_items' => 20,
// max_fetchreplies_depth (Integer)
// Maximum number of "fetchreplies" activities in the callstack. The higher, the more complete a thread will be.
'max_fetchreplies_depth' => 2,
// max_image_length (Integer) // max_image_length (Integer)
// An alternate way of limiting picture upload sizes. // An alternate way of limiting picture upload sizes.
// Specify the maximum pixel length that pictures are allowed to be (for non-square pictures, it will apply to the longest side). // Specify the maximum pixel length that pictures are allowed to be (for non-square pictures, it will apply to the longest side).