Native probe support for AT-Proto

This commit is contained in:
Michael 2024-12-14 07:14:46 +00:00
parent 59f7f87226
commit ec702f2deb
6 changed files with 332 additions and 43 deletions

View file

@ -41,7 +41,6 @@ class Jetstream
private $uids = [];
private $self = [];
private $capped = false;
private $next_stat = 0;
/** @var LoggerInterface */
private $logger;
@ -74,10 +73,12 @@ class Jetstream
$this->processor = $processor;
}
// *****************************************
// * Listener
// *****************************************
public function listen()
/**
* Listen to incoming webstream messages from Jetstream
*
* @return void
*/
public function listen(): void
{
$timeout = 300;
$timeout_limit = 10;
@ -137,7 +138,12 @@ class Jetstream
}
}
private function incrementMessages()
/**
* Increment the message counter for the statistics page
*
* @return void
*/
private function incrementMessages(): void
{
$packets = (int)($this->keyValue->get('jetstream_messages') ?? 0);
if ($packets >= PHP_INT_MAX) {
@ -146,6 +152,11 @@ class Jetstream
$this->keyValue->set('jetstream_messages', $packets + 1);
}
/**
* Synchronize contacts for all active users
*
* @return void
*/
private function syncContacts()
{
$active_uids = $this->atprotocol->getUids();
@ -158,6 +169,11 @@ class Jetstream
}
}
/**
* Set options like the followed DIDs
*
* @return void
*/
private function setOptions()
{
$active_uids = $this->atprotocol->getUids();
@ -219,6 +235,15 @@ class Jetstream
}
}
/**
* Returns an array of DIDs provided by an array of contacts
*
* @param array $contacts Array of contact records
* @param array $uids Array with the user ids with enabled bluesky timeline import
* @param integer $did_limit Maximum limit of entries
* @param array $dids Array of DIDs that are added to the output list
* @return array DIDs
*/
private function addDids(array $contacts, array $uids, int $did_limit, array $dids): array
{
foreach ($contacts as $contact) {
@ -233,7 +258,13 @@ class Jetstream
return $dids;
}
private function route(stdClass $data)
/**
* Route incoming messages
*
* @param stdClass $data message object
* @return void
*/
private function route(stdClass $data): void
{
Item::incrementInbound(Protocol::BLUESKY);
@ -254,7 +285,13 @@ class Jetstream
}
}
private function routeCommits(stdClass $data)
/**
* Route incoming commit messages
*
* @param stdClass $data message object
* @return void
*/
private function routeCommits(stdClass $data): void
{
$drift = $this->getDrift($data);
$this->logger->notice('Received commit', ['time' => date(DateTimeFormat::ATOM, $data->time_us / 1000000), 'drift' => $drift, 'capped' => $this->capped, 'did' => $data->did, 'operation' => $data->commit->operation, 'collection' => $data->commit->collection, 'timestamp' => $data->time_us]);
@ -304,6 +341,12 @@ class Jetstream
}
}
/**
* Calculate the drift between the server timestamp and the current time.
*
* @param stdClass $data message object
* @return integer The calculated drift
*/
private function getDrift(stdClass $data): int
{
$drift = max(0, round(time() - $data->time_us / 1000000));
@ -321,7 +364,14 @@ class Jetstream
return $drift;
}
private function routePost(stdClass $data, int $drift)
/**
* Route app.bsky.feed.post commits
*
* @param stdClass $data message object
* @param integer $drift
* @return void
*/
private function routePost(stdClass $data, int $drift): void
{
switch ($data->commit->operation) {
case 'delete':
@ -338,7 +388,14 @@ class Jetstream
}
}
private function routeRepost(stdClass $data, int $drift)
/**
* Route app.bsky.feed.repost commits
*
* @param stdClass $data message object
* @param integer $drift
* @return void
*/
private function routeRepost(stdClass $data, int $drift): void
{
switch ($data->commit->operation) {
case 'delete':
@ -355,7 +412,13 @@ class Jetstream
}
}
private function routeLike(stdClass $data)
/**
* Route app.bsky.feed.like commits
*
* @param stdClass $data message object
* @return void
*/
private function routeLike(stdClass $data): void
{
switch ($data->commit->operation) {
case 'delete':
@ -372,7 +435,13 @@ class Jetstream
}
}
private function routeProfile(stdClass $data)
/**
* Route app.bsky.actor.profile commits
*
* @param stdClass $data message object
* @return void
*/
private function routeProfile(stdClass $data): void
{
switch ($data->commit->operation) {
case 'delete':
@ -380,11 +449,11 @@ class Jetstream
break;
case 'create':
$this->actor->updateContactByDID($data->did);
$this->actor->updateContactByDID($data->did, 0);
break;
case 'update':
$this->actor->updateContactByDID($data->did);
$this->actor->updateContactByDID($data->did, 0);
break;
default:
@ -393,7 +462,13 @@ class Jetstream
}
}
private function routeFollow(stdClass $data)
/**
* Route app.bsky.graph.follow commits
*
* @param stdClass $data message object
* @return void
*/
private function routeFollow(stdClass $data): void
{
switch ($data->commit->operation) {
case 'delete':
@ -416,7 +491,13 @@ class Jetstream
}
}
private function storeCommitMessage(stdClass $data)
/**
* Store commit messages for debugging purposes
*
* @param stdClass $data message object
* @return void
*/
private function storeCommitMessage(stdClass $data): void
{
if ($this->config->get('debug', 'jetstream_log')) {
$tempfile = tempnam(System::getTempPath(), 'at-proto.commit.' . $data->commit->collection . '.' . $data->commit->operation . '-');