mirror of
https://codeberg.org/streams/streams.git
synced 2024-09-19 16:35:19 +00:00
refactor fetching code
This commit is contained in:
parent
aea775c999
commit
253c82bbfb
1 changed files with 103 additions and 106 deletions
|
@ -4559,37 +4559,6 @@ class Activity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// experimental code that needs more work. What this did was once we fetched a conversation to find the root node,
|
|
||||||
// start at that root node and fetch children, so you get all the branches and not just the branch related to the current node.
|
|
||||||
// Unfortunately there is no standard method for achieving this. Mastodon provides a 'replies' collection and Nomad projects
|
|
||||||
// can fetch the 'context'. For other platforms it's a wild guess. Additionally, when we tested this, it started an infinite
|
|
||||||
// recursion and has been disabled until the recursive behaviour is tracked down and fixed.
|
|
||||||
|
|
||||||
// if ($fetch_parents && $parent && ! intval($parent_item['item_private'])) {
|
|
||||||
// logger('topfetch', LOGGER_DEBUG);
|
|
||||||
// // if the thread owner is a connnection, we will already receive any additional comments to their posts
|
|
||||||
// // but if they are not we can try to fetch others in the background
|
|
||||||
// $x = q("SELECT abook.*, xchan.* FROM abook left join xchan on abook_xchan = xchan_hash
|
|
||||||
// WHERE abook_channel = %d and abook_xchan = '%s' LIMIT 1",
|
|
||||||
// intval($channel['channel_id']),
|
|
||||||
// dbesc($parent_item['owner_xchan'])
|
|
||||||
// );
|
|
||||||
// if (! $x) {
|
|
||||||
// // determine if the top-level post provides a replies collection
|
|
||||||
// if ($parent_item['obj']) {
|
|
||||||
// $parent_item['obj'] = json_decode($parent_item['obj'],true);
|
|
||||||
// }
|
|
||||||
// logger('topfetch: ' . print_r($parent_item,true), LOGGER_ALL);
|
|
||||||
// $id = ((array_path_exists('obj/replies/id',$parent_item)) ? $parent_item['obj']['replies']['id'] : false);
|
|
||||||
// if (! $id) {
|
|
||||||
// $id = ((array_path_exists('obj/replies',$parent_item) && is_string($parent_item['obj']['replies'])) ? $parent_item['obj']['replies'] : false);
|
|
||||||
// }
|
|
||||||
// if ($id) {
|
|
||||||
// Run::Summon( [ 'Convo', $id, $channel['channel_id'], $observer_hash ] );
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (is_array($x) && $x['item_id']) {
|
if (is_array($x) && $x['item_id']) {
|
||||||
tag_deliver($channel['channel_id'], $x['item_id']);
|
tag_deliver($channel['channel_id'], $x['item_id']);
|
||||||
if ($is_child_node) {
|
if ($is_child_node) {
|
||||||
|
@ -4682,102 +4651,130 @@ class Activity
|
||||||
$current_item = $item;
|
$current_item = $item;
|
||||||
|
|
||||||
while ($current_item['parent_mid'] !== $current_item['mid']) {
|
while ($current_item['parent_mid'] !== $current_item['mid']) {
|
||||||
$isCollectionOperation = false;
|
|
||||||
// recursion breaker
|
// recursion breaker
|
||||||
if (in_array($current_item['parent_mid'], $seen_mids)) {
|
if (in_array($current_item['parent_mid'], $seen_mids)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
$json = self::fetch($current_item['parent_mid']);
|
|
||||||
|
$fetched = self::fetch_and_parse($current_item['parent_mid']);
|
||||||
$seen_mids[] = $current_item['parent_mid'];
|
$seen_mids[] = $current_item['parent_mid'];
|
||||||
if (!$json) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// set client flag to convert objects to implied activities
|
|
||||||
$activity = new ActivityStreams($json, null, true);
|
|
||||||
if (
|
|
||||||
$activity->type === 'Announce' && is_array($activity->obj)
|
|
||||||
&& array_key_exists('object', $activity->obj) && array_key_exists('actor', $activity->obj)
|
|
||||||
) {
|
|
||||||
// This is a relayed/forwarded Activity (as opposed to a shared/boosted object)
|
|
||||||
// Reparse the encapsulated Activity and use that instead
|
|
||||||
logger('relayed activity', LOGGER_DEBUG);
|
|
||||||
$rawActivity = json_decode($activity->raw, true);
|
|
||||||
$activity = new ActivityStreams($rawActivity['object'], null, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (in_array($activity->type, ['Add', 'Remove'])
|
if (! $fetched) {
|
||||||
&& is_array($activity->obj)
|
|
||||||
&& array_key_exists('object', $activity->obj)
|
|
||||||
&& array_key_exists('actor', $activity->obj)
|
|
||||||
&& !empty($activity->tgt)) {
|
|
||||||
logger('relayed collection operation', LOGGER_DEBUG);
|
|
||||||
$isCollectionOperation = true;
|
|
||||||
$rawActivity = json_decode($activity->raw, true);
|
|
||||||
$activity = new ActivityStreams($rawActivity['object'], null, true);
|
|
||||||
$activity->setApproveId($rawActivity['id']);
|
|
||||||
}
|
|
||||||
|
|
||||||
logger($activity->debug(), LOGGER_DATA);
|
|
||||||
|
|
||||||
if (!$activity->is_valid()) {
|
|
||||||
logger('not a valid activity');
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (is_array($activity->actor) && array_key_exists('id', $activity->actor)) {
|
|
||||||
self::actor_store($activity->actor['id'], $activity->actor);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ActivityPub sourced items are cacheable
|
|
||||||
$item = self::decode_note($activity, true);
|
|
||||||
|
|
||||||
if (!$item) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger('decoded_note: ' . print_r($item,true), LOGGER_DATA);
|
$item = $fetched['item'];
|
||||||
|
$activity = $fetched['activity'];
|
||||||
|
$collection = $fetched['collection'];
|
||||||
|
|
||||||
$hookinfo = [
|
// don't leak any private conversations to the public stream
|
||||||
'activity' => $activity,
|
// even if they contain publicly addressed comments/reactions
|
||||||
'item' => $item
|
|
||||||
];
|
|
||||||
|
|
||||||
Hook::call('fetch_and_store', $hookinfo);
|
if (intval($channel['channel_system']) && intval($item['item_private'])) {
|
||||||
|
logger('private conversation ignored');
|
||||||
|
$conversation = [];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// We're fetching upstream starting with the initial post,
|
||||||
|
// so push each fetched activity to the head of the conversation.
|
||||||
|
array_unshift($conversation, ['activity' => $activity, 'item' => $item, 'collection' => $collection]);
|
||||||
|
|
||||||
$item = $hookinfo['item'];
|
if ($item['parent_mid'] === $item['mid']) {
|
||||||
|
break;
|
||||||
if ($item) {
|
|
||||||
// don't leak any private conversations to the public stream
|
|
||||||
// even if they contain publicly addressed comments/reactions
|
|
||||||
|
|
||||||
if (intval($channel['channel_system']) && intval($item['item_private'])) {
|
|
||||||
logger('private conversation ignored');
|
|
||||||
$conversation = [];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// We're fetching upstream starting with the initial post,
|
|
||||||
// so push each fetched activity to the head of the conversation.
|
|
||||||
array_unshift($conversation, ['activity' => $activity, 'item' => $item, 'collection' => $isCollectionOperation]);
|
|
||||||
|
|
||||||
if ($item['parent_mid'] === $item['mid']) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$current_item = $item;
|
$current_item = $item;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($conversation && $conversation[0]['item']['mid'] === $conversation[0]['item']['parent_mid']) {
|
if ($conversation) {
|
||||||
foreach ($conversation as $post) {
|
$parent_item = $conversation[0]['item'];
|
||||||
if ($post['activity']->is_valid()) {
|
|
||||||
self::store($channel, $observer_hash, $post['activity'], $post['item'], false, isCollectionOperation: $post['collection']);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// // determine if the top-level post provides a replies collection
|
||||||
|
// if ($parent_item['obj']) {
|
||||||
|
// $parent_item['obj'] = json_decode($parent_item['obj'],true);
|
||||||
|
// }
|
||||||
|
// logger('topfetch: ' . print_r($parent_item,true), LOGGER_ALL);
|
||||||
|
// $id = ((array_path_exists('obj/replies/id',$parent_item)) ? $parent_item['obj']['replies']['id'] : false);
|
||||||
|
// if (! $id) {
|
||||||
|
// $id = ((array_path_exists('obj/replies',$parent_item) && is_string($parent_item['obj']['replies'])) ? $parent_item['obj']['replies'] : false);
|
||||||
|
// }
|
||||||
|
// if ($id) {
|
||||||
|
// Run::Summon( [ 'Convo', $id, $channel['channel_id'], $observer_hash ] );
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
if ($conversation[0]['item']['mid'] === $conversation[0]['item']['parent_mid']) {
|
||||||
|
foreach ($conversation as $post) {
|
||||||
|
if ($post['activity']->is_valid()) {
|
||||||
|
self::store($channel, $observer_hash, $post['activity'], $post['item'], false, isCollectionOperation: $post['collection']);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static function fetch_and_parse($message_id)
|
||||||
|
{
|
||||||
|
$json = self::fetch($message_id);
|
||||||
|
if (!$json) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// set client flag to convert objects to implied activities
|
||||||
|
$activity = new ActivityStreams($json, null, true);
|
||||||
|
if (
|
||||||
|
$activity->type === 'Announce' && is_array($activity->obj)
|
||||||
|
&& array_key_exists('object', $activity->obj) && array_key_exists('actor', $activity->obj)
|
||||||
|
) {
|
||||||
|
// This is a relayed/forwarded Activity (as opposed to a shared/boosted object)
|
||||||
|
// Reparse the encapsulated Activity and use that instead
|
||||||
|
logger('relayed activity', LOGGER_DEBUG);
|
||||||
|
$rawActivity = json_decode($activity->raw, true);
|
||||||
|
$activity = new ActivityStreams($rawActivity['object'], null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (in_array($activity->type, ['Add', 'Remove'])
|
||||||
|
&& is_array($activity->obj)
|
||||||
|
&& array_key_exists('object', $activity->obj)
|
||||||
|
&& array_key_exists('actor', $activity->obj)
|
||||||
|
&& !empty($activity->tgt)) {
|
||||||
|
logger('relayed collection operation', LOGGER_DEBUG);
|
||||||
|
$isCollectionOperation = true;
|
||||||
|
$rawActivity = json_decode($activity->raw, true);
|
||||||
|
$activity = new ActivityStreams($rawActivity['object'], null, true);
|
||||||
|
$activity->setApproveId($rawActivity['id']);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger($activity->debug(), LOGGER_DATA);
|
||||||
|
|
||||||
|
if (!$activity->is_valid()) {
|
||||||
|
logger('not a valid activity');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (is_array($activity->actor) && array_key_exists('id', $activity->actor)) {
|
||||||
|
self::actor_store($activity->actor['id'], $activity->actor);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ActivityPub sourced items are cacheable
|
||||||
|
$item = self::decode_note($activity, true);
|
||||||
|
|
||||||
|
if (!$item) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger('decoded_note: ' . print_r($item,true), LOGGER_DATA);
|
||||||
|
|
||||||
|
$hookinfo = [
|
||||||
|
'activity' => $activity,
|
||||||
|
'item' => $item,
|
||||||
|
'collection' => $isCollectionOperation,
|
||||||
|
];
|
||||||
|
|
||||||
|
Hook::call('fetch_and_store', $hookinfo);
|
||||||
|
|
||||||
|
return $hookinfo;
|
||||||
|
}
|
||||||
|
|
||||||
// This function is designed to work with Nomad attachments and item body
|
// This function is designed to work with Nomad attachments and item body
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue