Add Jetstream daemon to console

This commit is contained in:
Philipp 2025-01-01 23:52:48 +01:00
parent 64e66acb01
commit 3d2524532f
No known key found for this signature in database
GPG key ID: 24A7501396EB5432
5 changed files with 167 additions and 158 deletions

View file

@ -24,4 +24,7 @@ $dice = (new Dice())->addRules(require(dirname(__DIR__) . '/static/dependencies.
$app = \Friendica\App::fromDice($dice); $app = \Friendica\App::fromDice($dice);
$app->processJetstream(); $argv = $_SERVER['argv'] ?? [];
array_splice($argv, 1, 0, "jetstream");
$app->processConsole($argv);

View file

@ -217,161 +217,10 @@ class App
$this->registerTemplateEngine(); $this->registerTemplateEngine();
(new \Friendica\Core\Console($this->container, $argv))->execute();
}
public function processJetstream(): void
{
$this->setupContainerForAddons();
$this->setupContainerForLogger(LogChannel::DAEMON);
$this->setupLegacyServiceLocator();
$this->registerErrorHandler();
Addon::loadAddons(); Addon::loadAddons();
Hook::loadHooks(); Hook::loadHooks();
/** @var IManageConfigValues */ (new \Friendica\Core\Console($this->container, $argv))->execute();
$config = $this->container->create(IManageConfigValues::class);
$config->reload();
/** @var Mode */
$mode = $this->container->create(Mode::class);
if ($mode->isInstall()) {
die("Friendica isn't properly installed yet.\n");
}
if (empty($config->get('jetstream', 'pidfile'))) {
die(<<<TXT
Please set jetstream.pidfile in config/local.config.php. For example:
'jetstream' => [
'pidfile' => '/path/to/jetstream.pid',
],
TXT);
}
if (!Addon::isEnabled('bluesky')) {
die("Bluesky has to be enabled.\n");
}
$pidfile = $config->get('jetstream', 'pidfile');
if (in_array('start', (array)$_SERVER['argv'])) {
$daemonMode = 'start';
}
if (in_array('stop', (array)$_SERVER['argv'])) {
$daemonMode = 'stop';
}
if (in_array('status', (array)$_SERVER['argv'])) {
$daemonMode = 'status';
}
if (!isset($daemonMode)) {
die("Please use either 'start', 'stop' or 'status'.\n");
}
// Get options
$shortopts = 'f';
$longopts = ['foreground'];
$options = getopt($shortopts, $longopts);
$foreground = array_key_exists('f', $options) || array_key_exists('foreground', $options);
if (empty($_SERVER['argv'][0])) {
die("Unexpected script behaviour. This message should never occur.\n");
}
$pid = null;
if (is_readable($pidfile)) {
$pid = intval(file_get_contents($pidfile));
}
if (empty($pid) && in_array($daemonMode, ['stop', 'status'])) {
die("Pidfile wasn't found. Is jetstream running?\n");
}
if ($daemonMode == 'status') {
if (posix_kill($pid, 0)) {
die("Jetstream process $pid is running.\n");
}
unlink($pidfile);
die("Jetstream process $pid isn't running.\n");
}
if ($daemonMode == 'stop') {
posix_kill($pid, SIGTERM);
unlink($pidfile);
Logger::notice('Jetstream process was killed', ['pid' => $pid]);
die("Jetstream process $pid was killed.\n");
}
if (!empty($pid) && posix_kill($pid, 0)) {
die("Jetstream process $pid is already running.\n");
}
Logger::notice('Starting jetstream daemon.', ['pid' => $pid]);
if (!$foreground) {
echo "Starting jetstream daemon.\n";
DBA::disconnect();
// Fork a daemon process
$pid = pcntl_fork();
if ($pid == -1) {
echo "Daemon couldn't be forked.\n";
Logger::warning('Could not fork daemon');
exit(1);
} elseif ($pid) {
// The parent process continues here
if (!file_put_contents($pidfile, $pid)) {
echo "Pid file wasn't written.\n";
Logger::warning('Could not store pid file');
posix_kill($pid, SIGTERM);
exit(1);
}
echo 'Child process started with pid ' . $pid . ".\n";
Logger::notice('Child process started', ['pid' => $pid]);
exit(0);
}
// We now are in the child process
register_shutdown_function(function (): void {
posix_kill(posix_getpid(), SIGTERM);
posix_kill(posix_getpid(), SIGHUP);
});
// Make the child the main process, detach it from the terminal
if (posix_setsid() < 0) {
return;
}
// Closing all existing connections with the outside
fclose(STDIN);
// And now connect the database again
DBA::connect();
}
// Just to be sure that this script really runs endlessly
set_time_limit(0);
// Now running as a daemon.
$jetstream = $this->container->create(Jetstream::class);
$jetstream->listen();
} }
public function processWorker(array $options): void public function processWorker(array $options): void

View file

@ -0,0 +1,154 @@
<?php
// Copyright (C) 2010-2024, the Friendica project
// SPDX-FileCopyrightText: 2010-2024 the Friendica project
//
// SPDX-License-Identifier: AGPL-3.0-or-later
declare(strict_types=1);
namespace Friendica\Console;
use Friendica\App\Mode;
use Friendica\Core\Addon;
use Friendica\Core\Config\Capability\IManageConfigValues;
use Asika\SimpleConsole\Console;
use Friendica\Core\KeyValueStorage\Capability\IManageKeyValuePairs;
use Friendica\Protocol\ATProtocol\Jetstream;
use Friendica\System\Daemon as SysDaemon;
use RuntimeException;
/**
* Console command for interacting with the daemon
*/
final class JetstreamDaemon extends Console
{
private Mode $mode;
private IManageConfigValues $config;
private IManageKeyValuePairs $keyValue;
private SysDaemon $daemon;
private Jetstream $jetstream;
/**
* @param Mode $mode
* @param IManageConfigValues $config
* @param IManageKeyValuePairs $keyValue
* @param SysDaemon $daemon
* @param Jetstream $jetstream
* @param array|null $argv
*/
public function __construct(Mode $mode, IManageConfigValues $config, IManageKeyValuePairs $keyValue, SysDaemon $daemon, Jetstream $jetstream, array $argv = null)
{
parent::__construct($argv);
$this->mode = $mode;
$this->config = $config;
$this->keyValue = $keyValue;
$this->jetstream = $jetstream;
$this->daemon = $daemon;
}
protected function getHelp(): string
{
return <<<HELP
jetstream - Interact with the Jetstream daemon
Synopsis
bin/console jetstream start [-h|--help|-?] [-v] [-f]
bin/console jetstream stop [-h|--help|-?] [-v]
bin/console jetstream status [-h|--help|-?] [-v]
Description
Interact with the Jetstream daemon
Options
-h|--help|-? Show help information
-v Show more debug information.
-f|--foreground Runs the daemon in the foreground
Examples
bin/console jetstream start -f
Starts the daemon in the foreground
bin/console jetstream status
Gets the status of the daemon
HELP;
}
protected function doExecute()
{
if ($this->mode->isInstall()) {
throw new RuntimeException("Friendica isn't properly installed yet");
}
$this->config->reload();
if (empty($this->config->get('jetstream', 'pidfile'))) {
throw new RuntimeException(<<< TXT
Please set jetstream.pidfile in config/local.config.php. For example:
'jetstream' => [
'pidfile' => '/path/to/jetstream.pid',
],
TXT
);
}
if (!Addon::isEnabled('bluesky')) {
throw new RuntimeException("Bluesky has to be enabled.\n");
}
$pidfile = $this->config->get('jetstream', 'pidfile');
$daemonMode = $this->getArgument(0);
$foreground = $this->getOption(['f', 'foreground']) ?? false;
if (empty($daemonMode)) {
throw new RuntimeException("Please use either 'start', 'stop' or 'status'");
}
$this->daemon->init($pidfile);
if ($daemonMode == 'status') {
if ($this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s is running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
} else {
$this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
}
return 0;
}
if ($daemonMode == 'stop') {
if (!$this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0;
}
if ($this->daemon->stop()) {
$this->keyValue->set('worker_daemon_mode', false);
$this->out(sprintf("Daemon process %s was killed (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0;
}
return 1;
}
if ($this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s is already running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 1;
}
if ($daemonMode == "start") {
$this->out("Starting worker daemon");
$this->daemon->start(function () {
$this->jetstream->listen();
}, $foreground);
return 0;
}
$this->err('Invalid command');
$this->out($this->getHelp());
return 1;
}
}

View file

@ -38,6 +38,7 @@ Commands:
contact Contact management contact Contact management
createdoxygen Generate Doxygen headers createdoxygen Generate Doxygen headers
daemon Interact with the Friendica daemon daemon Interact with the Friendica daemon
jetstream Interact with the Jetstream daemon
dbstructure Do database updates dbstructure Do database updates
docbloxerrorchecker Check the file tree for DocBlox errors docbloxerrorchecker Check the file tree for DocBlox errors
extract Generate translation string file for the Friendica project (deprecated) extract Generate translation string file for the Friendica project (deprecated)
@ -77,6 +78,7 @@ HELP;
'contact' => Friendica\Console\Contact::class, 'contact' => Friendica\Console\Contact::class,
'createdoxygen' => Friendica\Console\CreateDoxygen::class, 'createdoxygen' => Friendica\Console\CreateDoxygen::class,
'daemon' => Friendica\Console\Daemon::class, 'daemon' => Friendica\Console\Daemon::class,
'jetstream' => Friendica\Console\JetstreamDaemon::class,
'docbloxerrorchecker' => Friendica\Console\DocBloxErrorChecker::class, 'docbloxerrorchecker' => Friendica\Console\DocBloxErrorChecker::class,
'dbstructure' => Friendica\Console\DatabaseStructure::class, 'dbstructure' => Friendica\Console\DatabaseStructure::class,
'extract' => Friendica\Console\Extract::class, 'extract' => Friendica\Console\Extract::class,

View file

@ -95,6 +95,7 @@ class Jetstream
// @todo make the path configurable // @todo make the path configurable
$this->client = new \WebSocket\Client('wss://jetstream1.us-west.bsky.network/subscribe?requireHello=true' . $cursor); $this->client = new \WebSocket\Client('wss://jetstream1.us-west.bsky.network/subscribe?requireHello=true' . $cursor);
$this->client->setTimeout($timeout); $this->client->setTimeout($timeout);
$this->client->setLogger($this->logger);
} catch (\WebSocket\ConnectionException $e) { } catch (\WebSocket\ConnectionException $e) {
$this->logger->error('Error while trying to establish the connection', ['code' => $e->getCode(), 'message' => $e->getMessage()]); $this->logger->error('Error while trying to establish the connection', ['code' => $e->getCode(), 'message' => $e->getMessage()]);
echo "Connection wasn't established.\n"; echo "Connection wasn't established.\n";
@ -365,7 +366,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.feed.post commits * Route app.bsky.feed.post commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @param integer $drift * @param integer $drift
@ -389,7 +390,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.feed.repost commits * Route app.bsky.feed.repost commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @param integer $drift * @param integer $drift
@ -413,7 +414,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.feed.like commits * Route app.bsky.feed.like commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @return void * @return void
@ -436,7 +437,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.actor.profile commits * Route app.bsky.actor.profile commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @return void * @return void
@ -463,7 +464,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.graph.follow commits * Route app.bsky.graph.follow commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @return void * @return void