Reduce the number of HTTP requests in the media handling

This commit is contained in:
Michael 2024-12-09 13:40:47 +00:00
parent 69345432e1
commit e7d9c6c254
11 changed files with 207 additions and 89 deletions

View file

@ -147,7 +147,7 @@ final class ATProtocol
return $data;
}
private function post(int $uid, string $url, string $params, array $headers): ?stdClass
public function post(int $uid, string $url, string $params, array $headers): ?stdClass
{
$pds = $this->getUserPds($uid);
if (empty($pds)) {
@ -172,8 +172,11 @@ final class ATProtocol
$data->code = $curlResult->getReturnCode();
}
$this->pConfig->set($uid, 'bluesky', 'status', self::STATUS_SUCCESS);
Item::incrementOutbound(Protocol::BLUESKY);
if (!empty($data->code) && ($data->code >= 200) && ($data->code < 400)) {
$this->pConfig->set($uid, 'bluesky', 'status', self::STATUS_SUCCESS);
} else {
$this->pConfig->set($uid, 'bluesky', 'status', self::STATUS_API_FAIL);
}
return $data;
}
@ -230,7 +233,7 @@ final class ATProtocol
return $did;
}
private function getDid(string $handle): string
public function getDid(string $handle): string
{
if ($handle == '') {
return '';
@ -373,7 +376,7 @@ final class ATProtocol
return in_array('at://' . $handle, $data->alsoKnownAs);
}
private function getUserToken(int $uid): string
public function getUserToken(int $uid): string
{
$token = $this->pConfig->get($uid, 'bluesky', 'access_token');
$created = $this->pConfig->get($uid, 'bluesky', 'token_created');
@ -393,6 +396,11 @@ final class ATProtocol
$data = $this->post($uid, '/xrpc/com.atproto.server.refreshSession', '', ['Authorization' => ['Bearer ' . $token]]);
if (empty($data) || empty($data->accessJwt)) {
$this->logger->debug('Refresh failed', ['return' => $data]);
$password = $this->pConfig->get($uid, 'bluesky', 'password');
if (!empty($password)) {
return $this->createUserToken($uid, $password);
}
$this->pConfig->set($uid, 'bluesky', 'status', self::STATUS_TOKEN_FAIL);
return '';
}

View file

@ -38,9 +38,10 @@ use stdClass;
*/
class Jetstream
{
private $uids = [];
private $self = [];
private $capped = false;
private $uids = [];
private $self = [];
private $capped = false;
private $next_stat = 0;
/** @var LoggerInterface */
private $logger;
@ -108,6 +109,7 @@ class Jetstream
$timestamp = $data->time_us;
$this->route($data);
$this->keyValue->set('jetstream_timestamp', $timestamp);
$this->incrementMessages();
} else {
$this->logger->warning('Unexpected return value', ['data' => $data]);
break;
@ -135,6 +137,15 @@ class Jetstream
}
}
private function incrementMessages()
{
$packets = (int)($this->keyValue->get('jetstream_messages') ?? 0);
if ($packets >= PHP_INT_MAX) {
$packets = 0;
}
$this->keyValue->set('jetstream_messages', $packets + 1);
}
private function syncContacts()
{
$active_uids = $this->atprotocol->getUids();
@ -184,10 +195,14 @@ class Jetstream
}
if (!$this->capped && count($dids) < $did_limit) {
$contacts = Contact::selectToArray(['url'], ['uid' => 0, 'network' => Protocol::BLUESKY], ['order' => ['last-item' => true], 'limit' => $did_limit]);
$condition = ["`uid` = ? AND `network` = ? AND EXISTS(SELECT `author-id` FROM `post-user` WHERE `author-id` = `contact`.`id` AND `post-user`.`uid` != ?)", 0, Protocol::BLUESKY, 0];
$contacts = Contact::selectToArray(['url'], $condition, ['order' => ['last-item' => true], 'limit' => $did_limit]);
$dids = $this->addDids($contacts, $uids, $did_limit, $dids);
}
$this->keyValue->set('jetstream_did_count', count($dids));
$this->keyValue->set('jetstream_did_limit', $did_limit);
$this->logger->debug('Selected DIDs', ['uids' => $active_uids, 'count' => count($dids), 'capped' => $this->capped]);
$update = [
'type' => 'options_update',
@ -241,17 +256,7 @@ class Jetstream
private function routeCommits(stdClass $data)
{
$drift = max(0, round(time() - $data->time_us / 1000000));
if ($drift > 60 && !$this->capped) {
$this->capped = true;
$this->setOptions();
$this->logger->notice('Drift is too high, dids will be capped');
} elseif ($drift == 0 && $this->capped) {
$this->capped = false;
$this->setOptions();
$this->logger->notice('Drift is low enough, dids will be uncapped');
}
$drift = $this->getDrift($data);
$this->logger->notice('Received commit', ['time' => date(DateTimeFormat::ATOM, $data->time_us / 1000000), 'drift' => $drift, 'capped' => $this->capped, 'did' => $data->did, 'operation' => $data->commit->operation, 'collection' => $data->commit->collection, 'timestamp' => $data->time_us]);
$timestamp = microtime(true);
@ -299,6 +304,23 @@ class Jetstream
}
}
private function getDrift(stdClass $data): int
{
$drift = max(0, round(time() - $data->time_us / 1000000));
$this->keyValue->set('jetstream_drift', $drift);
if ($drift > 60 && !$this->capped) {
$this->capped = true;
$this->setOptions();
$this->logger->notice('Drift is too high, dids will be capped');
} elseif ($drift == 0 && $this->capped) {
$this->capped = false;
$this->setOptions();
$this->logger->notice('Drift is low enough, dids will be uncapped');
}
return $drift;
}
private function routePost(stdClass $data, int $drift)
{
switch ($data->commit->operation) {

View file

@ -13,6 +13,7 @@ namespace Friendica\Protocol\ATProtocol;
use Friendica\App\BaseURL;
use Friendica\Core\Protocol;
use Friendica\Database\Database;
use Friendica\Database\DBA;
use Friendica\Model\Contact;
use Friendica\Model\Conversation;
use Friendica\Model\Item;
@ -128,12 +129,12 @@ class Processor
if (!empty($data->commit->record->reply)) {
$root = $this->getUri($data->commit->record->reply->root);
$parent = $this->getUri($data->commit->record->reply->parent);
$uids = $this->getPostUids($root);
$uids = $this->getPostUids($root, true);
if (!$uids) {
$this->logger->debug('Comment is not imported since the root post is not found.', ['root' => $root, 'parent' => $parent]);
return;
}
if ($dont_fetch && !$this->getPostUids($parent)) {
if ($dont_fetch && !$this->getPostUids($parent, false)) {
$this->logger->debug('Comment is not imported since the parent post is not found.', ['root' => $root, 'parent' => $parent]);
return;
}
@ -166,7 +167,8 @@ class Processor
return;
}
}
$item = $this->addMedia($post->thread->post->embed, $item, 0, 0, 0);
$item['source'] = json_encode($post);
$item = $this->addMedia($post->thread->post->embed, $item, 0);
}
$id = Item::insert($item);
@ -183,7 +185,7 @@ class Processor
public function createRepost(stdClass $data, array $uids, bool $dont_fetch)
{
if ($dont_fetch && !$this->getPostUids($this->getUri($data->commit->record->subject))) {
if ($dont_fetch && !$this->getPostUids($this->getUri($data->commit->record->subject), true)) {
$this->logger->debug('Repost is not imported since the subject is not found.', ['subject' => $this->getUri($data->commit->record->subject)]);
return;
}
@ -213,7 +215,7 @@ class Processor
public function createLike(stdClass $data)
{
$uids = $this->getPostUids($this->getUri($data->commit->record->subject));
$uids = $this->getPostUids($this->getUri($data->commit->record->subject), false);
if (!$uids) {
$this->logger->debug('Like is not imported since the subject is not found.', ['subject' => $this->getUri($data->commit->record->subject)]);
return;
@ -270,7 +272,7 @@ class Processor
return true;
}
private function processPost(stdClass $post, int $uid, int $post_reason, int $causer, int $level, int $protocol): int
public function processPost(stdClass $post, int $uid, int $post_reason, int $causer, int $level, int $protocol): int
{
$uri = $this->getUri($post);
@ -295,7 +297,7 @@ class Processor
}
if (!empty($post->embed)) {
$item = $this->addMedia($post->embed, $item, $uid, $level);
$item = $this->addMedia($post->embed, $item, $level);
}
$item['restrictions'] = $this->getRestrictionsForUser($post, $item, $post_reason);
@ -378,7 +380,7 @@ class Processor
return $item;
}
private function getHeaderFromPost(stdClass $post, string $uri, int $uid, int $protocol): array
public function getHeaderFromPost(stdClass $post, string $uri, int $uid, int $protocol): array
{
$parts = $this->getUriParts($uri);
if (empty($post->author) || empty($post->cid) || empty($parts->rkey)) {
@ -538,6 +540,8 @@ class Processor
'url' => $image->fullsize,
'preview' => $image->thumb,
'description' => $image->alt,
'height' => $image->aspectRatio->height ?? null,
'width' => $image->aspectRatio->width ?? null,
];
Post\Media::insert($media);
}
@ -561,6 +565,7 @@ class Processor
'uri-id' => $item['uri-id'],
'type' => Post\Media::HTML,
'url' => $embed->external->uri,
'preview' => $embed->external->thumb ?? null,
'name' => $embed->external->title,
'description' => $embed->external->description,
];
@ -686,7 +691,7 @@ class Processor
return $restrict ? Item::CANT_REPLY : null;
}
private function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $always_fetch = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $always_fetch = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
{
$timestamp = microtime(true);
$stamp = Strings::getRandomHex(30);
@ -794,7 +799,7 @@ class Processor
return $uri;
}
private function getUriParts(string $uri): ?stdClass
public function getUriParts(string $uri): ?stdClass
{
$class = $this->getUriClass($uri);
if (empty($class)) {
@ -812,7 +817,7 @@ class Processor
return $class;
}
private function getUriClass(string $uri): ?stdClass
public function getUriClass(string $uri): ?stdClass
{
if (empty($uri)) {
return null;
@ -837,7 +842,7 @@ class Processor
return $class;
}
private function fetchUriId(string $uri, int $uid): string
public function fetchUriId(string $uri, int $uid): string
{
$reply = Post::selectFirst(['uri-id'], ['uri' => $uri, 'uid' => [$uid, 0]]);
if (!empty($reply['uri-id'])) {
@ -852,16 +857,18 @@ class Processor
return 0;
}
private function getPostUids(string $uri): array
private function getPostUids(string $uri, bool $with_public_user): array
{
$condition = $with_public_user ? [] : ["`uid` != ?", 0];
$uids = [];
$posts = Post::select(['uid'], ['uri' => $uri]);
$posts = Post::select(['uid'], DBA::mergeConditions(['uri' => $uri], $condition));
while ($post = Post::fetch($posts)) {
$uids[] = $post['uid'];
}
$this->db->close($posts);
$posts = Post::select(['uid'], ['extid' => $uri]);
$posts = Post::select(['uid'], DBA::mergeConditions(['extid' => $uri], $condition));
while ($post = Post::fetch($posts)) {
$uids[] = $post['uid'];
}
@ -878,7 +885,7 @@ class Processor
return Post::exists(['extid' => $uri, 'uid' => $uids]);
}
private function getUri(stdClass $post): string
public function getUri(stdClass $post): string
{
if (empty($post->cid)) {
$this->logger->info('Invalid URI', ['post' => $post]);
@ -887,7 +894,7 @@ class Processor
return $post->uri . ':' . $post->cid;
}
private function getPostUri(string $uri, int $uid): string
public function getPostUri(string $uri, int $uid): string
{
if (Post::exists(['uri' => $uri, 'uid' => [$uid, 0]])) {
$this->logger->debug('Post exists', ['uri' => $uri]);