From 3d2524532fe744e79e3c747c493affa2cb496ad5 Mon Sep 17 00:00:00 2001 From: Philipp Date: Wed, 1 Jan 2025 23:52:48 +0100 Subject: [PATCH] Add Jetstream daemon to console --- bin/jetstream.php | 5 +- src/App.php | 153 +------------------------ src/Console/JetstreamDaemon.php | 154 ++++++++++++++++++++++++++ src/Core/Console.php | 2 + src/Protocol/ATProtocol/Jetstream.php | 11 +- 5 files changed, 167 insertions(+), 158 deletions(-) create mode 100644 src/Console/JetstreamDaemon.php diff --git a/bin/jetstream.php b/bin/jetstream.php index b2df4d38dc..c696d044b9 100755 --- a/bin/jetstream.php +++ b/bin/jetstream.php @@ -24,4 +24,7 @@ $dice = (new Dice())->addRules(require(dirname(__DIR__) . '/static/dependencies. $app = \Friendica\App::fromDice($dice); -$app->processJetstream(); +$argv = $_SERVER['argv'] ?? []; +array_splice($argv, 1, 0, "jetstream"); + +$app->processConsole($argv); diff --git a/src/App.php b/src/App.php index a233e079f1..57b32e082b 100644 --- a/src/App.php +++ b/src/App.php @@ -217,161 +217,10 @@ class App $this->registerTemplateEngine(); - (new \Friendica\Core\Console($this->container, $argv))->execute(); - } - - public function processJetstream(): void - { - $this->setupContainerForAddons(); - - $this->setupContainerForLogger(LogChannel::DAEMON); - - $this->setupLegacyServiceLocator(); - - $this->registerErrorHandler(); - Addon::loadAddons(); Hook::loadHooks(); - /** @var IManageConfigValues */ - $config = $this->container->create(IManageConfigValues::class); - - $config->reload(); - - /** @var Mode */ - $mode = $this->container->create(Mode::class); - - if ($mode->isInstall()) { - die("Friendica isn't properly installed yet.\n"); - } - - if (empty($config->get('jetstream', 'pidfile'))) { - die(<< [ - 'pidfile' => '/path/to/jetstream.pid', - ], - TXT); - } - - if (!Addon::isEnabled('bluesky')) { - die("Bluesky has to be enabled.\n"); - } - - $pidfile = $config->get('jetstream', 'pidfile'); - - if (in_array('start', (array)$_SERVER['argv'])) { - $daemonMode = 'start'; - } - - if (in_array('stop', (array)$_SERVER['argv'])) { - $daemonMode = 'stop'; - } - - if (in_array('status', (array)$_SERVER['argv'])) { - $daemonMode = 'status'; - } - - if (!isset($daemonMode)) { - die("Please use either 'start', 'stop' or 'status'.\n"); - } - - // Get options - $shortopts = 'f'; - $longopts = ['foreground']; - $options = getopt($shortopts, $longopts); - - $foreground = array_key_exists('f', $options) || array_key_exists('foreground', $options); - - if (empty($_SERVER['argv'][0])) { - die("Unexpected script behaviour. This message should never occur.\n"); - } - - $pid = null; - - if (is_readable($pidfile)) { - $pid = intval(file_get_contents($pidfile)); - } - - if (empty($pid) && in_array($daemonMode, ['stop', 'status'])) { - die("Pidfile wasn't found. Is jetstream running?\n"); - } - - if ($daemonMode == 'status') { - if (posix_kill($pid, 0)) { - die("Jetstream process $pid is running.\n"); - } - - unlink($pidfile); - - die("Jetstream process $pid isn't running.\n"); - } - - if ($daemonMode == 'stop') { - posix_kill($pid, SIGTERM); - - unlink($pidfile); - - Logger::notice('Jetstream process was killed', ['pid' => $pid]); - - die("Jetstream process $pid was killed.\n"); - } - - if (!empty($pid) && posix_kill($pid, 0)) { - die("Jetstream process $pid is already running.\n"); - } - - Logger::notice('Starting jetstream daemon.', ['pid' => $pid]); - - if (!$foreground) { - echo "Starting jetstream daemon.\n"; - - DBA::disconnect(); - - // Fork a daemon process - $pid = pcntl_fork(); - if ($pid == -1) { - echo "Daemon couldn't be forked.\n"; - Logger::warning('Could not fork daemon'); - exit(1); - } elseif ($pid) { - // The parent process continues here - if (!file_put_contents($pidfile, $pid)) { - echo "Pid file wasn't written.\n"; - Logger::warning('Could not store pid file'); - posix_kill($pid, SIGTERM); - exit(1); - } - echo 'Child process started with pid ' . $pid . ".\n"; - Logger::notice('Child process started', ['pid' => $pid]); - exit(0); - } - - // We now are in the child process - register_shutdown_function(function (): void { - posix_kill(posix_getpid(), SIGTERM); - posix_kill(posix_getpid(), SIGHUP); - }); - - // Make the child the main process, detach it from the terminal - if (posix_setsid() < 0) { - return; - } - - // Closing all existing connections with the outside - fclose(STDIN); - - // And now connect the database again - DBA::connect(); - } - - // Just to be sure that this script really runs endlessly - set_time_limit(0); - - // Now running as a daemon. - $jetstream = $this->container->create(Jetstream::class); - $jetstream->listen(); + (new \Friendica\Core\Console($this->container, $argv))->execute(); } public function processWorker(array $options): void diff --git a/src/Console/JetstreamDaemon.php b/src/Console/JetstreamDaemon.php new file mode 100644 index 0000000000..1f50a92e50 --- /dev/null +++ b/src/Console/JetstreamDaemon.php @@ -0,0 +1,154 @@ +mode = $mode; + $this->config = $config; + $this->keyValue = $keyValue; + $this->jetstream = $jetstream; + $this->daemon = $daemon; + } + + protected function getHelp(): string + { + return <<mode->isInstall()) { + throw new RuntimeException("Friendica isn't properly installed yet"); + } + + $this->config->reload(); + + if (empty($this->config->get('jetstream', 'pidfile'))) { + throw new RuntimeException(<<< TXT + Please set jetstream.pidfile in config/local.config.php. For example: + + 'jetstream' => [ + 'pidfile' => '/path/to/jetstream.pid', + ], + TXT + ); + } + + if (!Addon::isEnabled('bluesky')) { + throw new RuntimeException("Bluesky has to be enabled.\n"); + } + + $pidfile = $this->config->get('jetstream', 'pidfile'); + + $daemonMode = $this->getArgument(0); + $foreground = $this->getOption(['f', 'foreground']) ?? false; + + if (empty($daemonMode)) { + throw new RuntimeException("Please use either 'start', 'stop' or 'status'"); + } + + $this->daemon->init($pidfile); + + if ($daemonMode == 'status') { + if ($this->daemon->isRunning()) { + $this->out(sprintf("Daemon process %s is running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile())); + } else { + $this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile())); + } + return 0; + } + + if ($daemonMode == 'stop') { + if (!$this->daemon->isRunning()) { + $this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile())); + return 0; + } + + if ($this->daemon->stop()) { + $this->keyValue->set('worker_daemon_mode', false); + $this->out(sprintf("Daemon process %s was killed (%s)", $this->daemon->getPid(), $this->daemon->getPidfile())); + return 0; + } + + return 1; + } + + if ($this->daemon->isRunning()) { + $this->out(sprintf("Daemon process %s is already running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile())); + return 1; + } + + if ($daemonMode == "start") { + $this->out("Starting worker daemon"); + + $this->daemon->start(function () { + $this->jetstream->listen(); + }, $foreground); + + return 0; + } + + $this->err('Invalid command'); + $this->out($this->getHelp()); + return 1; + } +} diff --git a/src/Core/Console.php b/src/Core/Console.php index eec8b5fb67..3fabd92502 100644 --- a/src/Core/Console.php +++ b/src/Core/Console.php @@ -38,6 +38,7 @@ Commands: contact Contact management createdoxygen Generate Doxygen headers daemon Interact with the Friendica daemon + jetstream Interact with the Jetstream daemon dbstructure Do database updates docbloxerrorchecker Check the file tree for DocBlox errors extract Generate translation string file for the Friendica project (deprecated) @@ -77,6 +78,7 @@ HELP; 'contact' => Friendica\Console\Contact::class, 'createdoxygen' => Friendica\Console\CreateDoxygen::class, 'daemon' => Friendica\Console\Daemon::class, + 'jetstream' => Friendica\Console\JetstreamDaemon::class, 'docbloxerrorchecker' => Friendica\Console\DocBloxErrorChecker::class, 'dbstructure' => Friendica\Console\DatabaseStructure::class, 'extract' => Friendica\Console\Extract::class, diff --git a/src/Protocol/ATProtocol/Jetstream.php b/src/Protocol/ATProtocol/Jetstream.php index 64a8397ddf..1326991f24 100755 --- a/src/Protocol/ATProtocol/Jetstream.php +++ b/src/Protocol/ATProtocol/Jetstream.php @@ -95,6 +95,7 @@ class Jetstream // @todo make the path configurable $this->client = new \WebSocket\Client('wss://jetstream1.us-west.bsky.network/subscribe?requireHello=true' . $cursor); $this->client->setTimeout($timeout); + $this->client->setLogger($this->logger); } catch (\WebSocket\ConnectionException $e) { $this->logger->error('Error while trying to establish the connection', ['code' => $e->getCode(), 'message' => $e->getMessage()]); echo "Connection wasn't established.\n"; @@ -365,7 +366,7 @@ class Jetstream } /** - * Route app.bsky.feed.post commits + * Route app.bsky.feed.post commits * * @param stdClass $data message object * @param integer $drift @@ -389,7 +390,7 @@ class Jetstream } /** - * Route app.bsky.feed.repost commits + * Route app.bsky.feed.repost commits * * @param stdClass $data message object * @param integer $drift @@ -413,7 +414,7 @@ class Jetstream } /** - * Route app.bsky.feed.like commits + * Route app.bsky.feed.like commits * * @param stdClass $data message object * @return void @@ -436,7 +437,7 @@ class Jetstream } /** - * Route app.bsky.actor.profile commits + * Route app.bsky.actor.profile commits * * @param stdClass $data message object * @return void @@ -463,7 +464,7 @@ class Jetstream } /** - * Route app.bsky.graph.follow commits + * Route app.bsky.graph.follow commits * * @param stdClass $data message object * @return void