Merge pull request #14655 from nupplaphil/feat/worker_daemon

Add JetStream to Console
This commit is contained in:
Hypolite Petovan 2025-01-02 07:40:03 -05:00 committed by GitHub
commit 8215b5de83
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 532 additions and 334 deletions

View file

@ -5,6 +5,8 @@
* SPDX-FileCopyrightText: 2010-2024 the Friendica project * SPDX-FileCopyrightText: 2010-2024 the Friendica project
* *
* SPDX-License-Identifier: AGPL-3.0-or-later * SPDX-License-Identifier: AGPL-3.0-or-later
*
* @deprecated 2025.01 use bin/console.php daemon instead
*/ */
/** /**

View file

@ -6,6 +6,7 @@
* *
* SPDX-License-Identifier: AGPL-3.0-or-later * SPDX-License-Identifier: AGPL-3.0-or-later
* *
* @deprecated 2025.01 use bin/console.php jetstream instead
*/ */
use Dice\Dice; use Dice\Dice;
@ -24,4 +25,7 @@ $dice = (new Dice())->addRules(require(dirname(__DIR__) . '/static/dependencies.
$app = \Friendica\App::fromDice($dice); $app = \Friendica\App::fromDice($dice);
$app->processJetstream(); $argv = $_SERVER['argv'] ?? [];
array_splice($argv, 1, 0, "jetstream");
$app->processConsole($argv);

View file

@ -17,21 +17,16 @@ use Friendica\App\Router;
use Friendica\Capabilities\ICanCreateResponses; use Friendica\Capabilities\ICanCreateResponses;
use Friendica\Capabilities\ICanHandleRequests; use Friendica\Capabilities\ICanHandleRequests;
use Friendica\Content\Nav; use Friendica\Content\Nav;
use Friendica\Core\Addon;
use Friendica\Core\Config\Factory\Config; use Friendica\Core\Config\Factory\Config;
use Friendica\Core\Hook;
use Friendica\Core\Renderer; use Friendica\Core\Renderer;
use Friendica\Core\Session\Capability\IHandleUserSessions; use Friendica\Core\Session\Capability\IHandleUserSessions;
use Friendica\Core\Worker\Repository\Process as ProcessRepository; use Friendica\Core\Worker\Repository\Process as ProcessRepository;
use Friendica\Database\DBA;
use Friendica\Database\Definition\DbaDefinition; use Friendica\Database\Definition\DbaDefinition;
use Friendica\Database\Definition\ViewDefinition; use Friendica\Database\Definition\ViewDefinition;
use Friendica\Module\Maintenance; use Friendica\Module\Maintenance;
use Friendica\Protocol\ATProtocol\Jetstream;
use Friendica\Security\Authentication; use Friendica\Security\Authentication;
use Friendica\Core\Config\Capability\IManageConfigValues; use Friendica\Core\Config\Capability\IManageConfigValues;
use Friendica\Core\L10n; use Friendica\Core\L10n;
use Friendica\Core\Logger;
use Friendica\Core\Logger\Capability\LogChannel; use Friendica\Core\Logger\Capability\LogChannel;
use Friendica\Core\PConfig\Capability\IManagePersonalConfigValues; use Friendica\Core\PConfig\Capability\IManagePersonalConfigValues;
use Friendica\Core\System; use Friendica\Core\System;
@ -220,160 +215,6 @@ class App
(new \Friendica\Core\Console($this->container, $argv))->execute(); (new \Friendica\Core\Console($this->container, $argv))->execute();
} }
public function processJetstream(): void
{
$this->setupContainerForAddons();
$this->setupContainerForLogger(LogChannel::DAEMON);
$this->setupLegacyServiceLocator();
$this->registerErrorHandler();
Addon::loadAddons();
Hook::loadHooks();
/** @var IManageConfigValues */
$config = $this->container->create(IManageConfigValues::class);
$config->reload();
/** @var Mode */
$mode = $this->container->create(Mode::class);
if ($mode->isInstall()) {
die("Friendica isn't properly installed yet.\n");
}
if (empty($config->get('jetstream', 'pidfile'))) {
die(<<<TXT
Please set jetstream.pidfile in config/local.config.php. For example:
'jetstream' => [
'pidfile' => '/path/to/jetstream.pid',
],
TXT);
}
if (!Addon::isEnabled('bluesky')) {
die("Bluesky has to be enabled.\n");
}
$pidfile = $config->get('jetstream', 'pidfile');
if (in_array('start', (array)$_SERVER['argv'])) {
$daemonMode = 'start';
}
if (in_array('stop', (array)$_SERVER['argv'])) {
$daemonMode = 'stop';
}
if (in_array('status', (array)$_SERVER['argv'])) {
$daemonMode = 'status';
}
if (!isset($daemonMode)) {
die("Please use either 'start', 'stop' or 'status'.\n");
}
// Get options
$shortopts = 'f';
$longopts = ['foreground'];
$options = getopt($shortopts, $longopts);
$foreground = array_key_exists('f', $options) || array_key_exists('foreground', $options);
if (empty($_SERVER['argv'][0])) {
die("Unexpected script behaviour. This message should never occur.\n");
}
$pid = null;
if (is_readable($pidfile)) {
$pid = intval(file_get_contents($pidfile));
}
if (empty($pid) && in_array($daemonMode, ['stop', 'status'])) {
die("Pidfile wasn't found. Is jetstream running?\n");
}
if ($daemonMode == 'status') {
if (posix_kill($pid, 0)) {
die("Jetstream process $pid is running.\n");
}
unlink($pidfile);
die("Jetstream process $pid isn't running.\n");
}
if ($daemonMode == 'stop') {
posix_kill($pid, SIGTERM);
unlink($pidfile);
Logger::notice('Jetstream process was killed', ['pid' => $pid]);
die("Jetstream process $pid was killed.\n");
}
if (!empty($pid) && posix_kill($pid, 0)) {
die("Jetstream process $pid is already running.\n");
}
Logger::notice('Starting jetstream daemon.', ['pid' => $pid]);
if (!$foreground) {
echo "Starting jetstream daemon.\n";
DBA::disconnect();
// Fork a daemon process
$pid = pcntl_fork();
if ($pid == -1) {
echo "Daemon couldn't be forked.\n";
Logger::warning('Could not fork daemon');
exit(1);
} elseif ($pid) {
// The parent process continues here
if (!file_put_contents($pidfile, $pid)) {
echo "Pid file wasn't written.\n";
Logger::warning('Could not store pid file');
posix_kill($pid, SIGTERM);
exit(1);
}
echo 'Child process started with pid ' . $pid . ".\n";
Logger::notice('Child process started', ['pid' => $pid]);
exit(0);
}
// We now are in the child process
register_shutdown_function(function (): void {
posix_kill(posix_getpid(), SIGTERM);
posix_kill(posix_getpid(), SIGHUP);
});
// Make the child the main process, detach it from the terminal
if (posix_setsid() < 0) {
return;
}
// Closing all existing connections with the outside
fclose(STDIN);
// And now connect the database again
DBA::connect();
}
// Just to be sure that this script really runs endlessly
set_time_limit(0);
// Now running as a daemon.
$jetstream = $this->container->create(Jetstream::class);
$jetstream->listen();
}
public function processWorker(array $options): void public function processWorker(array $options): void
{ {
$this->setupContainerForAddons(); $this->setupContainerForAddons();

View file

@ -17,6 +17,7 @@ use Friendica\Core\System;
use Friendica\Core\Update; use Friendica\Core\Update;
use Friendica\Core\Worker; use Friendica\Core\Worker;
use Friendica\Database\Database; use Friendica\Database\Database;
use Friendica\System\Daemon as SysDaemon;
use Friendica\Util\BasePath; use Friendica\Util\BasePath;
use Friendica\Util\DateTimeFormat; use Friendica\Util\DateTimeFormat;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
@ -34,6 +35,7 @@ final class Daemon extends Console
private System $system; private System $system;
private LoggerInterface $logger; private LoggerInterface $logger;
private Database $dba; private Database $dba;
private SysDaemon $daemon;
/** /**
* @param Mode $mode * @param Mode $mode
@ -43,9 +45,10 @@ final class Daemon extends Console
* @param System $system * @param System $system
* @param LoggerInterface $logger * @param LoggerInterface $logger
* @param Database $dba * @param Database $dba
* @param SysDaemon $daemon
* @param array|null $argv * @param array|null $argv
*/ */
public function __construct(Mode $mode, IManageConfigValues $config, IManageKeyValuePairs $keyValue, BasePath $basePath, System $system, LoggerInterface $logger, Database $dba, array $argv = null) public function __construct(Mode $mode, IManageConfigValues $config, IManageKeyValuePairs $keyValue, BasePath $basePath, System $system, LoggerInterface $logger, Database $dba, SysDaemon $daemon, array $argv = null)
{ {
parent::__construct($argv); parent::__construct($argv);
@ -56,6 +59,7 @@ final class Daemon extends Console
$this->system = $system; $this->system = $system;
$this->logger = $logger; $this->logger = $logger;
$this->dba = $dba; $this->dba = $dba;
$this->daemon = $daemon;
} }
protected function getHelp(): string protected function getHelp(): string
@ -63,7 +67,9 @@ final class Daemon extends Console
return <<<HELP return <<<HELP
Daemon - Interact with the Friendica daemon Daemon - Interact with the Friendica daemon
Synopsis Synopsis
bin/console daemon [-h|--help|-?] [-v] [-a] [-f] bin/console daemon start [-h|--help|-?] [-v] [-f]
bin/console daemon stop [-h|--help|-?] [-v]
bin/console daemon status [-h|--help|-?] [-v]
Description Description
Interact with the Friendica daemon Interact with the Friendica daemon
@ -84,6 +90,10 @@ HELP;
protected function doExecute() protected function doExecute()
{ {
if ($this->executable !== 'bin/console.php') {
$this->out(sprintf("'%s' is deprecated and will removed. Please use 'bin/console.php daemon' instead", $this->executable));
}
if ($this->mode->isInstall()) { if ($this->mode->isInstall()) {
throw new RuntimeException("Friendica isn't properly installed yet"); throw new RuntimeException("Friendica isn't properly installed yet");
} }
@ -93,164 +103,125 @@ HELP;
$this->config->reload(); $this->config->reload();
if (empty($this->config->get('system', 'pidfile'))) { if (empty($this->config->get('system', 'pidfile'))) {
throw new RuntimeException(<<< TXT throw new RuntimeException(
<<< TXT
Please set system.pidfile in config/local.config.php. For example: Please set system.pidfile in config/local.config.php. For example:
'system' => [ 'system' => [
'pidfile' => '/path/to/daemon.pid', 'pidfile' => '/path/to/daemon.pid',
], ],
TXT); TXT
);
} }
$pidfile = $this->config->get('system', 'pidfile'); $pidfile = $this->config->get('system', 'pidfile');
$daemonMode = $this->getArgument(0); $daemonMode = $this->getArgument(0);
$foreground = $this->getOption(['f', 'foreground']); $foreground = $this->getOption(['f', 'foreground']) ?? false;
if (empty($daemonMode)) { if (empty($daemonMode)) {
throw new RuntimeException("Please use either 'start', 'stop' or 'status'"); throw new RuntimeException("Please use either 'start', 'stop' or 'status'");
} }
$pid = null; $this->daemon->init($pidfile);
if (is_readable($pidfile)) {
$pid = intval(file_get_contents($pidfile));
}
if (empty($pid) && in_array($daemonMode, ['stop', 'status'])) {
$this->keyValue->set('worker_daemon_mode', false);
throw new RuntimeException("Pidfile wasn't found. Is the daemon running?");
}
if ($daemonMode == 'status') { if ($daemonMode == 'status') {
if (posix_kill($pid, 0)) { if ($this->daemon->isRunning()) {
$this->out("Daemon process $pid is running"); $this->out(sprintf("Daemon process %s is running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0; } else {
$this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
} }
unlink($pidfile);
$this->keyValue->set('worker_daemon_mode', false);
$this->out("Daemon process $pid isn't running.");
return 0; return 0;
} }
if ($daemonMode == 'stop') { if ($daemonMode == 'stop') {
posix_kill($pid, SIGTERM); if (!$this->daemon->isRunning()) {
unlink($pidfile); $this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0;
}
$this->logger->notice('Worker daemon process was killed', ['pid' => $pid]); if ($this->daemon->stop()) {
$this->keyValue->set('worker_daemon_mode', false);
$this->out(sprintf("Daemon process %s was killed (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0;
}
return 1;
}
if ($this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s is already running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 1;
}
if ($daemonMode == "start") {
$this->out("Starting Friendica daemon");
$this->daemon->start(function () {
$wait_interval = intval($this->config->get('system', 'cron_interval', 5)) * 60;
$do_cron = true;
$last_cron = 0;
$path = $this->basePath->getPath();
// Now running as a daemon.
while (true) {
// Check the database structure and possibly fixes it
Update::check($path, true);
if (!$do_cron && ($last_cron + $wait_interval) < time()) {
$this->logger->info('Forcing cron worker call.', ['pid' => $this->daemon->getPid()]);
$do_cron = true;
}
if ($do_cron || (!$this->system->isMaxLoadReached() && Worker::entriesExists() && Worker::isReady())) {
Worker::spawnWorker($do_cron);
} else {
$this->logger->info('Cool down for 5 seconds', ['pid' => $this->daemon->getPid()]);
sleep(5);
}
if ($do_cron) {
// We force a reconnect of the database connection.
// This is done to ensure that the connection don't get lost over time.
$this->dba->reconnect();
$last_cron = time();
}
$start = time();
$this->logger->info('Sleeping', ['pid' => $this->daemon->getPid(), 'until' => gmdate(DateTimeFormat::MYSQL, $start + $wait_interval)]);
do {
$seconds = (time() - $start);
// logarithmic wait time calculation.
// Background: After jobs had been started, they often fork many workers.
// To not waste too much time, the sleep period increases.
$arg = (($seconds + 1) / ($wait_interval / 9)) + 1;
$sleep = min(1000000, round(log10($arg) * 1000000, 0));
$this->daemon->sleep((int)$sleep);
$timeout = ($seconds >= $wait_interval);
} while (!$timeout && !Worker\IPC::JobsExists());
if ($timeout) {
$do_cron = true;
$this->logger->info('Woke up after $wait_interval seconds.', ['pid' => $this->daemon->getPid(), 'sleep' => $wait_interval]);
} else {
$do_cron = false;
$this->logger->info('Worker jobs are calling to be forked.', ['pid' => $this->daemon->getPid()]);
}
}
}, $foreground);
$this->keyValue->set('worker_daemon_mode', false);
$this->out("Daemon process $pid was killed.");
return 0; return 0;
} }
$this->logger->notice('Starting worker daemon', ['pid' => $pid]); $this->err('Invalid command');
$this->out($this->getHelp());
if (!$foreground) { return 1;
$this->out("Starting worker daemon");
$this->dba->disconnect();
// Fork a daemon process
$pid = pcntl_fork();
if ($pid == -1) {
$this->logger->warning('Could not fork daemon');
throw new RuntimeException("Daemon couldn't be forked");
} elseif ($pid) {
// The parent process continues here
if (!file_put_contents($pidfile, $pid)) {
posix_kill($pid, SIGTERM);
$this->logger->warning('Could not store pid file');
throw new RuntimeException("Pid file wasn't written");
}
$this->out("Child process started with pid $pid");
$this->logger->notice('Child process started', ['pid' => $pid]);
return 0;
}
// We now are in the child process
register_shutdown_function(function () {
posix_kill(posix_getpid(), SIGTERM);
posix_kill(posix_getpid(), SIGHUP);
});
// Make the child the main process, detach it from the terminal
if (posix_setsid() < 0) {
return 0;
}
// Closing all existing connections with the outside
fclose(STDIN);
// And now connect the database again
$this->dba->connect();
}
$this->keyValue->set('worker_daemon_mode', true);
// Just to be sure that this script really runs endlessly
set_time_limit(0);
$wait_interval = intval($this->config->get('system', 'cron_interval', 5)) * 60;
$do_cron = true;
$last_cron = 0;
$path = $this->basePath->getPath();
// Now running as a daemon.
while (true) {
// Check the database structure and possibly fixes it
Update::check($path, true);
if (!$do_cron && ($last_cron + $wait_interval) < time()) {
$this->logger->info('Forcing cron worker call.', ['pid' => $pid]);
$do_cron = true;
}
if ($do_cron || (!$this->system->isMaxLoadReached() && Worker::entriesExists() && Worker::isReady())) {
Worker::spawnWorker($do_cron);
} else {
$this->logger->info('Cool down for 5 seconds', ['pid' => $pid]);
sleep(5);
}
if ($do_cron) {
// We force a reconnect of the database connection.
// This is done to ensure that the connection don't get lost over time.
$this->dba->reconnect();
$last_cron = time();
}
$start = time();
$this->logger->info('Sleeping', ['pid' => $pid, 'until' => gmdate(DateTimeFormat::MYSQL, $start + $wait_interval)]);
do {
$seconds = (time() - $start);
// logarithmic wait time calculation.
// Background: After jobs had been started, they often fork many workers.
// To not waste too much time, the sleep period increases.
$arg = (($seconds + 1) / ($wait_interval / 9)) + 1;
$sleep = min(1000000, round(log10($arg) * 1000000, 0));
usleep((int)$sleep);
$pid = pcntl_waitpid(-1, $status, WNOHANG);
if ($pid > 0) {
$this->logger->info('Children quit via pcntl_waitpid', ['pid' => $pid, 'status' => $status]);
}
$timeout = ($seconds >= $wait_interval);
} while (!$timeout && !Worker\IPC::JobsExists());
if ($timeout) {
$do_cron = true;
$this->logger->info('Woke up after $wait_interval seconds.', ['pid' => $pid, 'sleep' => $wait_interval]);
} else {
$do_cron = false;
$this->logger->info('Worker jobs are calling to be forked.', ['pid' => $pid]);
}
}
} }
} }

View file

@ -0,0 +1,163 @@
<?php
// Copyright (C) 2010-2025, the Friendica project
// SPDX-FileCopyrightText: 2010-2024 the Friendica project
//
// SPDX-License-Identifier: AGPL-3.0-or-later
declare(strict_types=1);
namespace Friendica\Console;
use Friendica\App\Mode;
use Friendica\Core\Addon;
use Friendica\Core\Config\Capability\IManageConfigValues;
use Asika\SimpleConsole\Console;
use Friendica\Core\Hook;
use Friendica\Core\KeyValueStorage\Capability\IManageKeyValuePairs;
use Friendica\Protocol\ATProtocol\Jetstream;
use Friendica\System\Daemon as SysDaemon;
use RuntimeException;
/**
* Console command for interacting with the daemon
*/
final class JetstreamDaemon extends Console
{
private Mode $mode;
private IManageConfigValues $config;
private IManageKeyValuePairs $keyValue;
private SysDaemon $daemon;
private Jetstream $jetstream;
/**
* @param Mode $mode
* @param IManageConfigValues $config
* @param IManageKeyValuePairs $keyValue
* @param SysDaemon $daemon
* @param Jetstream $jetstream
* @param array|null $argv
*/
public function __construct(Mode $mode, IManageConfigValues $config, IManageKeyValuePairs $keyValue, SysDaemon $daemon, Jetstream $jetstream, array $argv = null)
{
parent::__construct($argv);
$this->mode = $mode;
$this->config = $config;
$this->keyValue = $keyValue;
$this->jetstream = $jetstream;
$this->daemon = $daemon;
}
protected function getHelp(): string
{
return <<<HELP
jetstream - Interact with the Jetstream daemon
Synopsis
bin/console jetstream start [-h|--help|-?] [-v] [-f]
bin/console jetstream stop [-h|--help|-?] [-v]
bin/console jetstream status [-h|--help|-?] [-v]
Description
Interact with the Jetstream daemon
Options
-h|--help|-? Show help information
-v Show more debug information.
-f|--foreground Runs the daemon in the foreground
Examples
bin/console jetstream start -f
Starts the daemon in the foreground
bin/console jetstream status
Gets the status of the daemon
HELP;
}
protected function doExecute()
{
if ($this->executable !== 'bin/console.php') {
$this->out(sprintf("'%s' is deprecated and will removed. Please use 'bin/console.php jetstream' instead", $this->executable));
}
if ($this->mode->isInstall()) {
throw new RuntimeException("Friendica isn't properly installed yet");
}
$this->config->reload();
if (empty($this->config->get('jetstream', 'pidfile'))) {
throw new RuntimeException(
<<< TXT
Please set jetstream.pidfile in config/local.config.php. For example:
'jetstream' => [
'pidfile' => '/path/to/jetstream.pid',
],
TXT
);
}
Addon::loadAddons();
Hook::loadHooks();
if (!Addon::isEnabled('bluesky')) {
throw new RuntimeException("Bluesky has to be enabled.\n");
}
$pidfile = $this->config->get('jetstream', 'pidfile');
$daemonMode = $this->getArgument(0);
$foreground = $this->getOption(['f', 'foreground']) ?? false;
if (empty($daemonMode)) {
throw new RuntimeException("Please use either 'start', 'stop' or 'status'");
}
$this->daemon->init($pidfile);
if ($daemonMode == 'status') {
if ($this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s is running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
} else {
$this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
}
return 0;
}
if ($daemonMode == 'stop') {
if (!$this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s isn't running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0;
}
if ($this->daemon->stop()) {
$this->keyValue->set('worker_daemon_mode', false);
$this->out(sprintf("Daemon process %s was killed (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 0;
}
return 1;
}
if ($this->daemon->isRunning()) {
$this->out(sprintf("Daemon process %s is already running (%s)", $this->daemon->getPid(), $this->daemon->getPidfile()));
return 1;
}
if ($daemonMode == "start") {
$this->out("Starting Jetstream daemon");
$this->daemon->start(function () {
$this->jetstream->listen();
}, $foreground);
return 0;
}
$this->err('Invalid command');
$this->out($this->getHelp());
return 1;
}
}

View file

@ -17,7 +17,7 @@ use Friendica\App;
class Console extends \Asika\SimpleConsole\Console class Console extends \Asika\SimpleConsole\Console
{ {
// Disables the default help handling // Disables the default help handling
protected $helpOptions = []; protected $helpOptions = [];
protected $customHelpOptions = ['h', 'help', '?']; protected $customHelpOptions = ['h', 'help', '?'];
/** /**
@ -38,6 +38,7 @@ Commands:
contact Contact management contact Contact management
createdoxygen Generate Doxygen headers createdoxygen Generate Doxygen headers
daemon Interact with the Friendica daemon daemon Interact with the Friendica daemon
jetstream Interact with the Jetstream daemon
dbstructure Do database updates dbstructure Do database updates
docbloxerrorchecker Check the file tree for DocBlox errors docbloxerrorchecker Check the file tree for DocBlox errors
extract Generate translation string file for the Friendica project (deprecated) extract Generate translation string file for the Friendica project (deprecated)
@ -68,35 +69,36 @@ HELP;
} }
protected $subConsoles = [ protected $subConsoles = [
'addon' => Friendica\Console\Addon::class, 'addon' => Friendica\Console\Addon::class,
'archivecontact' => Friendica\Console\ArchiveContact::class, 'archivecontact' => Friendica\Console\ArchiveContact::class,
'autoinstall' => Friendica\Console\AutomaticInstallation::class, 'autoinstall' => Friendica\Console\AutomaticInstallation::class,
'cache' => Friendica\Console\Cache::class, 'cache' => Friendica\Console\Cache::class,
'clearavatarcache' => Friendica\Console\ClearAvatarCache::class, 'clearavatarcache' => Friendica\Console\ClearAvatarCache::class,
'config' => Friendica\Console\Config::class, 'config' => Friendica\Console\Config::class,
'contact' => Friendica\Console\Contact::class, 'contact' => Friendica\Console\Contact::class,
'createdoxygen' => Friendica\Console\CreateDoxygen::class, 'createdoxygen' => Friendica\Console\CreateDoxygen::class,
'daemon' => Friendica\Console\Daemon::class, 'daemon' => Friendica\Console\Daemon::class,
'docbloxerrorchecker' => Friendica\Console\DocBloxErrorChecker::class, 'jetstream' => Friendica\Console\JetstreamDaemon::class,
'dbstructure' => Friendica\Console\DatabaseStructure::class, 'docbloxerrorchecker' => Friendica\Console\DocBloxErrorChecker::class,
'extract' => Friendica\Console\Extract::class, 'dbstructure' => Friendica\Console\DatabaseStructure::class,
'extract' => Friendica\Console\Extract::class,
'fixapdeliveryworkertaskparameters' => Friendica\Console\FixAPDeliveryWorkerTaskParameters::class, 'fixapdeliveryworkertaskparameters' => Friendica\Console\FixAPDeliveryWorkerTaskParameters::class,
'globalcommunityblock' => Friendica\Console\GlobalCommunityBlock::class, 'globalcommunityblock' => Friendica\Console\GlobalCommunityBlock::class,
'globalcommunitysilence' => Friendica\Console\GlobalCommunitySilence::class, 'globalcommunitysilence' => Friendica\Console\GlobalCommunitySilence::class,
'lock' => Friendica\Console\Lock::class, 'lock' => Friendica\Console\Lock::class,
'maintenance' => Friendica\Console\Maintenance::class, 'maintenance' => Friendica\Console\Maintenance::class,
'mergecontacts' => Friendica\Console\MergeContacts::class, 'mergecontacts' => Friendica\Console\MergeContacts::class,
'movetoavatarcache' => Friendica\Console\MoveToAvatarCache::class, 'movetoavatarcache' => Friendica\Console\MoveToAvatarCache::class,
'php2po' => Friendica\Console\PhpToPo::class, 'php2po' => Friendica\Console\PhpToPo::class,
'postupdate' => Friendica\Console\PostUpdate::class, 'postupdate' => Friendica\Console\PostUpdate::class,
'po2php' => Friendica\Console\PoToPhp::class, 'po2php' => Friendica\Console\PoToPhp::class,
'relay' => Friendica\Console\Relay::class, 'relay' => Friendica\Console\Relay::class,
'relocate' => Friendica\Console\Relocate::class, 'relocate' => Friendica\Console\Relocate::class,
'serverblock' => Friendica\Console\ServerBlock::class, 'serverblock' => Friendica\Console\ServerBlock::class,
'storage' => Friendica\Console\Storage::class, 'storage' => Friendica\Console\Storage::class,
'test' => Friendica\Console\Test::class, 'test' => Friendica\Console\Test::class,
'typo' => Friendica\Console\Typo::class, 'typo' => Friendica\Console\Typo::class,
'user' => Friendica\Console\User::class, 'user' => Friendica\Console\User::class,
]; ];
/** /**

View file

@ -38,9 +38,9 @@ use stdClass;
*/ */
class Jetstream class Jetstream
{ {
private $uids = []; private $uids = [];
private $self = []; private $self = [];
private $capped = false; private $capped = false;
/** @var LoggerInterface */ /** @var LoggerInterface */
private $logger; private $logger;
@ -95,6 +95,7 @@ class Jetstream
// @todo make the path configurable // @todo make the path configurable
$this->client = new \WebSocket\Client('wss://jetstream1.us-west.bsky.network/subscribe?requireHello=true' . $cursor); $this->client = new \WebSocket\Client('wss://jetstream1.us-west.bsky.network/subscribe?requireHello=true' . $cursor);
$this->client->setTimeout($timeout); $this->client->setTimeout($timeout);
$this->client->setLogger($this->logger);
} catch (\WebSocket\ConnectionException $e) { } catch (\WebSocket\ConnectionException $e) {
$this->logger->error('Error while trying to establish the connection', ['code' => $e->getCode(), 'message' => $e->getMessage()]); $this->logger->error('Error while trying to establish the connection', ['code' => $e->getCode(), 'message' => $e->getMessage()]);
echo "Connection wasn't established.\n"; echo "Connection wasn't established.\n";
@ -212,8 +213,8 @@ class Jetstream
if (!$this->capped && count($dids) < $did_limit) { if (!$this->capped && count($dids) < $did_limit) {
$condition = ["`uid` = ? AND `network` = ? AND EXISTS(SELECT `author-id` FROM `post-user` WHERE `author-id` = `contact`.`id` AND `post-user`.`uid` != ?)", 0, Protocol::BLUESKY, 0]; $condition = ["`uid` = ? AND `network` = ? AND EXISTS(SELECT `author-id` FROM `post-user` WHERE `author-id` = `contact`.`id` AND `post-user`.`uid` != ?)", 0, Protocol::BLUESKY, 0];
$contacts = Contact::selectToArray(['url'], $condition, ['order' => ['last-item' => true], 'limit' => $did_limit]); $contacts = Contact::selectToArray(['url'], $condition, ['order' => ['last-item' => true], 'limit' => $did_limit]);
$dids = $this->addDids($contacts, $uids, $did_limit, $dids); $dids = $this->addDids($contacts, $uids, $did_limit, $dids);
} }
$this->keyValue->set('jetstream_did_count', count($dids)); $this->keyValue->set('jetstream_did_count', count($dids));
@ -365,7 +366,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.feed.post commits * Route app.bsky.feed.post commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @param integer $drift * @param integer $drift
@ -389,7 +390,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.feed.repost commits * Route app.bsky.feed.repost commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @param integer $drift * @param integer $drift
@ -413,7 +414,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.feed.like commits * Route app.bsky.feed.like commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @return void * @return void
@ -436,7 +437,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.actor.profile commits * Route app.bsky.actor.profile commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @return void * @return void
@ -463,7 +464,7 @@ class Jetstream
} }
/** /**
* Route app.bsky.graph.follow commits * Route app.bsky.graph.follow commits
* *
* @param stdClass $data message object * @param stdClass $data message object
* @return void * @return void

210
src/System/Daemon.php Normal file
View file

@ -0,0 +1,210 @@
<?php
// Copyright (C) 2010-2025, the Friendica project
// SPDX-FileCopyrightText: 2010-2024 the Friendica project
//
// SPDX-License-Identifier: AGPL-3.0-or-later
namespace Friendica\System;
use Friendica\Database\Database;
use Psr\Log\LoggerInterface;
/**
* class for direct interacting with the daemon commands
*/
final class Daemon
{
private LoggerInterface $logger;
private Database $dba;
private ?string $pidfile = null;
private ?int $pid = null;
/**
* The PID of the current daemon (null if either not set or not found)
*
* @return int|null
*/
public function getPid(): ?int
{
return $this->pid;
}
/**
* The path to the PID file (null if not set)
*
* @return string|null
*/
public function getPidfile(): ?string
{
return $this->pidfile;
}
public function __construct(LoggerInterface $logger, Database $dba)
{
$this->logger = $logger;
$this->dba = $dba;
}
/**
* Initialize the current daemon class with a given PID file
*
* @param string|null $pidfile the path to the PID file - using a given path if not directly set here
*
* @return void
*/
public function init(string $pidfile = null): void
{
if (!empty($pidfile)) {
$this->pid = null;
$this->pidfile = $pidfile;
}
if (!empty($this->pid)) {
return;
}
if (is_readable($this->pidfile)) {
$this->pid = intval(file_get_contents($this->pidfile));
}
}
/**
* Starts the daemon
*
* @param callable $daemonLogic the business logic of the daemon
* @param bool $foreground true, if started in foreground, otherwise spawned in the background
*
* @return bool true, if successfully started, otherwise false
*/
public function start(callable $daemonLogic, bool $foreground = false): bool
{
$this->init();
if (!empty($this->pid)) {
$this->logger->notice('process is already running', ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
return false;
}
$this->logger->notice('starting daemon', ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
if (!$foreground) {
$this->dba->disconnect();
// fork a daemon process
$this->pid = pcntl_fork();
if ($this->pid < 0) {
$this->logger->warning('Could not fork daemon');
return false;
} elseif ($this->pid) {
// The parent process continues here
if (!file_put_contents($this->pidfile, $this->pid)) {
$this->logger->warning('Could not store pid file', ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
posix_kill($this->pid, SIGTERM);
return false;
}
$this->logger->notice('Child process started', ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
return true;
}
// We now are in the child process
register_shutdown_function(function (): void {
posix_kill(posix_getpid(), SIGTERM);
posix_kill(posix_getpid(), SIGHUP);
});
// Make the child the main process, detach it from the terminal
if (posix_setsid() < 0) {
return true;
}
// Closing all existing connections with the outside
fclose(STDIN);
// And now connect the database again
$this->dba->connect();
}
// Just to be sure that this script really runs endlessly
set_time_limit(0);
$daemonLogic();
return true;
}
/**
* Checks, if the current daemon is running
*
* @return bool true, if the daemon is running, otherwise false (f.e no PID found, no PID file found, PID is not bound to a running process))
*/
public function isRunning(): bool
{
$this->init();
if (empty($this->pid)) {
$this->logger->notice("Pid wasn't found");
if (is_readable($this->pidfile)) {
unlink($this->pidfile);
$this->logger->notice("Pidfile removed", ['pidfile' => $this->pidfile]);
}
return false;
}
if (posix_kill($this->pid, 0)) {
$this->logger->notice("daemon process is running");
return true;
} else {
unlink($this->pidfile);
$this->logger->notice("daemon process isn't running");
return false;
}
}
/**
* Stops the daemon, if running
*
* @return bool true, if the daemon was successfully stopped or is already stopped, otherwise false
*/
public function stop(): bool
{
$this->init();
if (empty($this->pid)) {
$this->logger->notice("Pidfile wasn't found", ['pidfile' => $this->pidfile]);
return true;
}
if (!posix_kill($this->pid, SIGTERM)) {
$this->logger->warning("Cannot kill the given PID", ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
return false;
}
if (!unlink($this->pidfile)) {
$this->logger->warning("Cannot delete the given PID file", ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
return false;
}
$this->logger->notice('daemon process was killed', ['pid' => $this->pid, 'pidfile' => $this->pidfile]);
return true;
}
/**
* Sets the current daemon to sleep and checks the status afterward
*
* @param int $duration the duration of time for sleeping (in milliseconds)
*
* @return void
*/
public function sleep(int $duration)
{
usleep($duration);
$this->pid = pcntl_waitpid(-1, $status, WNOHANG);
if ($this->pid > 0) {
$this->logger->info('Children quit via pcntl_waitpid', ['pid' => $this->pid, 'status' => $status]);
}
}
}

View file

@ -8,7 +8,7 @@ msgid ""
msgstr "" msgstr ""
"Project-Id-Version: 2025.02-dev\n" "Project-Id-Version: 2025.02-dev\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-01-01 19:30+0000\n" "POT-Creation-Date: 2025-01-01 23:33+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n" "Language-Team: LANGUAGE <LL@li.org>\n"
@ -646,6 +646,10 @@ msgstr ""
msgid "Map" msgid "Map"
msgstr "" msgstr ""
#: src/App.php:397
msgid "Apologies but the website is unavailable at the moment."
msgstr ""
#: src/App/Page.php:241 #: src/App/Page.php:241
msgid "Delete this item?" msgid "Delete this item?"
msgstr "" msgstr ""