mirror of
https://codeberg.org/streams/streams.git
synced 2024-09-20 06:35:15 +00:00
fetch collections
This commit is contained in:
parent
3a73d2ffbf
commit
038b1cbe06
3 changed files with 116 additions and 13 deletions
|
@ -50,7 +50,7 @@ class Convo {
|
|||
$AS = new ActivityStreams($message, null, true);
|
||||
if ($AS->is_valid() && is_array($AS->obj)) {
|
||||
$item = Activity::decode_note($AS,true);
|
||||
Activity::store($channel,$contact['abook_xchan'],$AS,$item);
|
||||
Activity::store($channel,$contact['abook_xchan'],$AS,$item,true,true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ namespace Zotlabs\Lib;
|
|||
|
||||
use Zotlabs\Lib\Libzot;
|
||||
use Zotlabs\Web\HTTPSig;
|
||||
use Zotlabs\Lib\Activity;
|
||||
use Zotlabs\Lib\ActivityStreams;
|
||||
|
||||
class Queue {
|
||||
|
||||
|
@ -122,7 +124,7 @@ class Queue {
|
|||
intval((isset($arr['priority'])) ? $arr['priority'] : 0),
|
||||
dbesc(datetime_convert()),
|
||||
dbesc(datetime_convert()),
|
||||
dbesc(datetime_convert()),
|
||||
dbesc((isset($arr['scheduled'])) ? $arr['scheduled'] : datetime_convert()),
|
||||
dbesc($arr['notify']),
|
||||
dbesc(($arr['msg']) ? $arr['msg'] : '')
|
||||
);
|
||||
|
@ -226,6 +228,59 @@ class Queue {
|
|||
return;
|
||||
}
|
||||
|
||||
if ($outq['outq_driver'] === 'asfetch') {
|
||||
|
||||
$channel = channelx_by_n($outq['outq_channel']);
|
||||
if (! $channel) {
|
||||
logger('missing channel: ' . $outq['outq_channel']);
|
||||
return;
|
||||
}
|
||||
|
||||
$j = Activity::fetch($outq['outq_posturl'],$channel);
|
||||
if ($j) {
|
||||
$AS = new ActivityStreams($j, null, true);
|
||||
if ($AS->is_valid() && isset($AS->data['type'])) {
|
||||
if (ActivityStreams::is_an_actor($AS->data['type'])) {
|
||||
Activity::actor_store($AS->data['id'],$AS->data);
|
||||
}
|
||||
if (strpos($AS->data['type'],'Collection')) {
|
||||
self::remove($outq['outq_hash']);
|
||||
return;
|
||||
}
|
||||
$item = Activity::decode_note($AS,true);
|
||||
if ($item) {
|
||||
Activity::store($channel,$channel['channnel_hash'],$AS,$item,true,true);
|
||||
}
|
||||
}
|
||||
logger('deliver: queue fetch success from ' . $outq['outq_posturl'], LOGGER_DEBUG);
|
||||
self::remove($outq['outq_hash']);
|
||||
|
||||
// server is responding - see if anything else is going to this destination and is piled up
|
||||
// and try to send some more. We're relying on the fact that do_delivery() results in an
|
||||
// immediate delivery otherwise we could get into a queue loop.
|
||||
|
||||
if (! $immediate) {
|
||||
$x = q("select outq_hash from outq where outq_driver = 'asfetch' and outq_channel = %d and outq_delivered = 0",
|
||||
dbesc($outq['outq_channel'])
|
||||
);
|
||||
|
||||
$piled_up = [];
|
||||
if ($x) {
|
||||
foreach ($x as $xx) {
|
||||
$piled_up[] = $xx['outq_hash'];
|
||||
}
|
||||
}
|
||||
if ($piled_up) {
|
||||
do_delivery($piled_up,true);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger('deliver: queue fetch failed' . ' from ' . $outq['outq_posturl'],LOGGER_DEBUG);
|
||||
self::update($outq['outq_hash'],10);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if($outq['outq_driver'] === 'activitypub') {
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ use Zotlabs\Web\Controller;
|
|||
use Zotlabs\Lib\Activity;
|
||||
use Zotlabs\Lib\ActivityStreams;
|
||||
use Zotlabs\Lib\ASCollection;
|
||||
use Zotlabs\Daemon\Run;
|
||||
|
||||
require_once("include/bbcode.php");
|
||||
require_once('include/security.php');
|
||||
|
@ -63,25 +64,62 @@ class Search extends Controller {
|
|||
|
||||
$o .= search($search,'search-box','/search',((local_channel()) ? true : false));
|
||||
|
||||
// ActivityStreams object fetches from the navbar
|
||||
|
||||
if (local_channel() && strpos($search,'https://') === 0) {
|
||||
$j = Activity::fetch($search,App::get_channel());
|
||||
$channel = App::get_channel();
|
||||
$hash = EMPTY_STR;
|
||||
$j = Activity::fetch($search,$channel);
|
||||
if ($j) {
|
||||
if (isset($j['type']) && ActivityStreams::is_an_actor($j['type'])) {
|
||||
Activity::actor_store($j['id'],$j);
|
||||
goaway(z_root() . '/directory' . '?f=1&navsearch=1&search=' . $search);
|
||||
}
|
||||
$AS = new ActivityStreams($j, null, true);
|
||||
if ($AS->is_valid() && isset($AS->data['type'])) {
|
||||
if (ActivityStreams::is_an_actor($AS->data['type'])) {
|
||||
Activity::actor_store($AS->data['id'],$AS->data);
|
||||
goaway(z_root() . '/directory' . '?f=1&navsearch=1&search=' . $search);
|
||||
}
|
||||
if (is_array($AS->obj)) {
|
||||
// matches Collection and orderedCollection
|
||||
if (isset($AS->obj['type']) && strpos($AS->obj['type'],'Collection')) {
|
||||
$max = intval(get_config('system','max_imported_search_collection',10));
|
||||
|
||||
// Collections are awkward to process because they can be huge.
|
||||
// Our strategy is to limit a navbar search to 100 Collection items
|
||||
// and only fetch the first 10 conversations in the foreground.
|
||||
// We'll queue the rest, and then send you to a page where
|
||||
// you can see something we've imported.
|
||||
// In theory you'll start to see notifications as other conversations
|
||||
// are fetched in the background while you're looking at the first ones.
|
||||
|
||||
$max = intval(get_config('system','max_imported_search_collection',100));
|
||||
if (intval($max)) {
|
||||
$obj = new ASCollection($search, App::get_channel(), 0, $max);
|
||||
$obj = new ASCollection($search, $channel, 0, $max);
|
||||
$messages = $obj->get();
|
||||
$author = null;
|
||||
if ($messages) {
|
||||
if ($messages) {
|
||||
$processed = 0;
|
||||
foreach ($messages as $message) {
|
||||
$processed ++;
|
||||
// only process the first several items in the foreground and
|
||||
// queue the remainder.
|
||||
if ($processed > 10) {
|
||||
$fetch_url = ((is_string($message)) ? $message : EMPTY_STR);
|
||||
$fetch_url = ((is_array($message) && array_key_exists('id',$message)) ? $message_id : $fetch_url);
|
||||
if (! $fetch_url) {
|
||||
continue;
|
||||
}
|
||||
$hash = new_uuid();
|
||||
Queue::insert(
|
||||
[
|
||||
'hash' => $hash,
|
||||
'account_id' => $channel['channel_account_id'],
|
||||
'channel_id' => $channel['channel_id'],
|
||||
'posturl' => $fetch_url,
|
||||
'notify' => EMPTY_STR,
|
||||
'msg' => EMPTY_STR,
|
||||
'driver' => 'asfetch'
|
||||
]
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (is_string($message)) {
|
||||
$message = Activity::fetch($message,App::get_channel());
|
||||
}
|
||||
|
@ -96,18 +134,28 @@ class Search extends Controller {
|
|||
Activity::store(App::get_channel(),get_observer_hash(),$AS,$item);
|
||||
}
|
||||
}
|
||||
if ($hash) {
|
||||
Run::Summon('Deliver', $hash);
|
||||
}
|
||||
}
|
||||
|
||||
// This will go to the right place most but not all of the time.
|
||||
// It will go to a relevant place all of the time, so we'll use it.
|
||||
|
||||
if ($author) {
|
||||
goaway(z_root() . '/stream/?xid=' . urlencode($author));
|
||||
goaway(z_root() . '/stream/?xchan=' . urlencode($author));
|
||||
}
|
||||
goaway(z_root() . '/stream');
|
||||
}
|
||||
}
|
||||
else {
|
||||
// The boolean flag enables html cache of the item
|
||||
|
||||
// It wasn't a Collection object and wasn't an Actor object,
|
||||
// so let's see if it decodes. The boolean flag enables html
|
||||
// cache of the item
|
||||
|
||||
$item = Activity::decode_note($AS,true);
|
||||
if ($item) {
|
||||
logger('parsed_item: ' . print_r($item,true),LOGGER_DATA);
|
||||
Activity::store(App::get_channel(),get_observer_hash(),$AS,$item, true, true);
|
||||
goaway(z_root() . '/display/' . gen_link_id($item['mid']));
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue