mirror of
https://github.com/friendica/friendica
synced 2025-04-26 15:10:11 +00:00
Refactor Process for new paradigm
This commit is contained in:
parent
0e2e488521
commit
38f70cc55a
14 changed files with 456 additions and 491 deletions
|
@ -23,6 +23,7 @@ namespace Friendica\Core;
|
|||
|
||||
use Friendica\App\Mode;
|
||||
use Friendica\Core;
|
||||
use Friendica\Core\Worker\Entity\Process;
|
||||
use Friendica\Database\DBA;
|
||||
use Friendica\DI;
|
||||
use Friendica\Util\DateTimeFormat;
|
||||
|
@ -51,26 +52,29 @@ class Worker
|
|||
private static $last_update;
|
||||
private static $state;
|
||||
private static $daemon_mode = null;
|
||||
/** @var Worker\Entity\Process */
|
||||
private static $process;
|
||||
|
||||
/**
|
||||
* Processes the tasks that are in the workerqueue table
|
||||
*
|
||||
* @param boolean $run_cron Should the cron processes be executed?
|
||||
* @param Process $process The current running process
|
||||
* @return void
|
||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
*/
|
||||
public static function processQueue($run_cron = true)
|
||||
public static function processQueue($run_cron, Process $process)
|
||||
{
|
||||
self::$up_start = microtime(true);
|
||||
|
||||
// At first check the maximum load. We shouldn't continue with a high load
|
||||
if (DI::process()->isMaxLoadReached()) {
|
||||
if (DI::system()->isMaxLoadReached()) {
|
||||
Logger::notice('Pre check: maximum load reached, quitting.');
|
||||
return;
|
||||
}
|
||||
|
||||
// We now start the process. This is done after the load check since this could increase the load.
|
||||
DI::process()->start();
|
||||
self::$process = $process;
|
||||
|
||||
// Kill stale processes every 5 minutes
|
||||
$last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
|
||||
|
@ -134,7 +138,7 @@ class Worker
|
|||
}
|
||||
|
||||
// Check free memory
|
||||
if (DI::process()->isMinMemoryReached()) {
|
||||
if (DI::system()->isMinMemoryReached()) {
|
||||
Logger::warning('Memory limit reached, quitting.');
|
||||
DI::lock()->release(self::LOCK_WORKER);
|
||||
return;
|
||||
|
@ -147,7 +151,7 @@ class Worker
|
|||
// Quit the worker once every cron interval
|
||||
if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
|
||||
Logger::info('Process lifetime reached, respawning.');
|
||||
self::unclaimProcess();
|
||||
self::unclaimProcess($process);
|
||||
if (self::isDaemonMode()) {
|
||||
self::IPCSetJobState(true);
|
||||
} else {
|
||||
|
@ -180,7 +184,7 @@ class Worker
|
|||
}
|
||||
|
||||
// Do we have too few memory?
|
||||
if (DI::process()->isMinMemoryReached()) {
|
||||
if (DI::system()->isMinMemoryReached()) {
|
||||
Logger::warning('Memory limit reached, quitting.');
|
||||
return false;
|
||||
}
|
||||
|
@ -192,7 +196,7 @@ class Worker
|
|||
}
|
||||
|
||||
// Possibly there are too much database processes that block the system
|
||||
if (DI::process()->isMaxProcessesReached()) {
|
||||
if (DI::system()->isMaxProcessesReached()) {
|
||||
Logger::warning('Maximum processes reached, quitting.');
|
||||
return false;
|
||||
}
|
||||
|
@ -334,7 +338,7 @@ class Worker
|
|||
}
|
||||
|
||||
// Constantly check the number of parallel database processes
|
||||
if (DI::process()->isMaxProcessesReached()) {
|
||||
if (DI::system()->isMaxProcessesReached()) {
|
||||
Logger::warning("Max processes reached for process", ['pid' => $mypid]);
|
||||
return false;
|
||||
}
|
||||
|
@ -1105,15 +1109,15 @@ class Worker
|
|||
/**
|
||||
* Removes a workerqueue entry from the current process
|
||||
*
|
||||
* @param Process $process the process behind the workerqueue
|
||||
*
|
||||
* @return void
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function unclaimProcess()
|
||||
public static function unclaimProcess(Process $process)
|
||||
{
|
||||
$mypid = getmypid();
|
||||
|
||||
$stamp = (float)microtime(true);
|
||||
DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]);
|
||||
DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $process->pid, 'done' => false]);
|
||||
self::$db_duration += (microtime(true) - $stamp);
|
||||
self::$db_duration_write += (microtime(true) - $stamp);
|
||||
}
|
||||
|
@ -1146,7 +1150,7 @@ class Worker
|
|||
*/
|
||||
private static function forkProcess(bool $do_cron)
|
||||
{
|
||||
if (DI::process()->isMinMemoryReached()) {
|
||||
if (DI::system()->isMinMemoryReached()) {
|
||||
Logger::warning('Memory limit reached - quitting');
|
||||
return;
|
||||
}
|
||||
|
@ -1176,11 +1180,10 @@ class Worker
|
|||
}
|
||||
|
||||
// We now are in the new worker
|
||||
$pid = getmypid();
|
||||
|
||||
DBA::connect();
|
||||
|
||||
/// @todo Reinitialize the logger to set a new process_id and uid
|
||||
DI::process()->setPid($pid);
|
||||
$process = DI::process()->create($pid);
|
||||
|
||||
$cycles = 0;
|
||||
while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
|
||||
|
@ -1189,12 +1192,12 @@ class Worker
|
|||
|
||||
Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
|
||||
|
||||
self::processQueue($do_cron);
|
||||
self::processQueue($do_cron, $process);
|
||||
|
||||
self::unclaimProcess();
|
||||
self::unclaimProcess($process);
|
||||
|
||||
self::IPCSetJobState(false, $pid);
|
||||
DI::process()->end();
|
||||
DI::process()->delete($process);
|
||||
Logger::info('Worker ended', ['pid' => $pid]);
|
||||
exit();
|
||||
}
|
||||
|
@ -1211,9 +1214,7 @@ class Worker
|
|||
if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
|
||||
self::forkProcess($do_cron);
|
||||
} else {
|
||||
$process = new Core\Process(DI::logger(), DI::mode(), DI::config(),
|
||||
DI::modelProcess(), DI::app()->getBasePath(), getmypid());
|
||||
$process->run('bin/worker.php', ['no_cron' => !$do_cron]);
|
||||
DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]);
|
||||
}
|
||||
if (self::isDaemonMode()) {
|
||||
self::IPCSetJobState(false);
|
||||
|
@ -1571,8 +1572,7 @@ class Worker
|
|||
Logger::notice('Starting new daemon process');
|
||||
$command = 'bin/daemon.php';
|
||||
$a = DI::app();
|
||||
$process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid());
|
||||
$process->run($command, ['start']);
|
||||
DI::system()->run($command, ['start']);
|
||||
Logger::notice('New daemon process started');
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue