2017-11-05 10:33:46 +00:00
< ? php
2017-11-19 17:04:40 -05:00
/**
2022-01-02 08:27:47 +01:00
* @ copyright Copyright ( C ) 2010 - 2022 , the Friendica project
2020-02-09 15:45:36 +01:00
*
* @ license GNU AGPL version 3 or any later version
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation , either version 3 of the
* License , or ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU Affero General Public License for more details .
*
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < https :// www . gnu . org / licenses />.
*
2017-11-19 17:04:40 -05:00
*/
2020-02-09 15:45:36 +01:00
2017-11-05 10:33:46 +00:00
namespace Friendica\Core ;
2021-10-24 20:43:59 +02:00
use Friendica\Core\Worker\Entity\Process ;
2018-07-20 08:19:26 -04:00
use Friendica\Database\DBA ;
2019-12-15 22:34:11 +01:00
use Friendica\DI ;
2018-01-26 21:38:34 -05:00
use Friendica\Util\DateTimeFormat ;
2017-11-05 10:33:46 +00:00
/**
2020-01-19 06:05:23 +00:00
* Contains the class for the worker background job processing
2017-11-05 10:33:46 +00:00
*/
2017-11-19 17:04:40 -05:00
class Worker
{
2022-10-17 05:49:55 +00:00
/**
* @ name Priority
*
* Process priority for the worker
* @ {
*/
const PRIORITY_UNDEFINED = 0 ;
const PRIORITY_CRITICAL = 10 ;
const PRIORITY_HIGH = 20 ;
const PRIORITY_MEDIUM = 30 ;
const PRIORITY_LOW = 40 ;
const PRIORITY_NEGLIGIBLE = 50 ;
const PRIORITIES = [ self :: PRIORITY_CRITICAL , self :: PRIORITY_HIGH , self :: PRIORITY_MEDIUM , self :: PRIORITY_LOW , self :: PRIORITY_NEGLIGIBLE ];
/* @}*/
2022-09-26 13:33:31 +00:00
2019-02-27 09:49:26 +00:00
const STATE_STARTUP = 1 ; // Worker is in startup. This takes most time.
2019-03-08 20:39:58 +00:00
const STATE_LONG_LOOP = 2 ; // Worker is processing the whole - long - loop.
2019-02-27 09:49:26 +00:00
const STATE_REFETCH = 3 ; // Worker had refetched jobs in the execution loop.
2019-03-08 20:39:58 +00:00
const STATE_SHORT_LOOP = 4 ; // Worker is processing preassigned jobs, thus saving much time.
2019-02-27 06:55:04 +00:00
2020-10-24 08:05:03 +00:00
const FAST_COMMANDS = [ 'APDelivery' , 'Delivery' ];
2019-03-23 06:08:18 +00:00
2020-08-06 18:53:45 +00:00
const LOCK_PROCESS = 'worker_process' ;
const LOCK_WORKER = 'worker' ;
2019-03-23 06:08:18 +00:00
2017-11-05 10:33:46 +00:00
private static $up_start ;
2019-02-09 23:10:15 +00:00
private static $db_duration = 0 ;
private static $db_duration_count = 0 ;
private static $db_duration_write = 0 ;
private static $db_duration_stat = 0 ;
private static $lock_duration = 0 ;
2017-11-05 10:33:46 +00:00
private static $last_update ;
2019-02-27 06:55:04 +00:00
private static $state ;
2021-11-01 07:53:34 +01:00
/** @var Process */
2021-10-24 20:43:59 +02:00
private static $process ;
2017-11-05 10:33:46 +00:00
/**
2020-01-19 06:05:23 +00:00
* Processes the tasks that are in the workerqueue table
2017-11-05 10:33:46 +00:00
*
* @ param boolean $run_cron Should the cron processes be executed ?
2021-10-24 20:43:59 +02:00
* @ param Process $process The current running process
2017-11-19 17:04:40 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
public static function processQueue ( bool $run_cron , Process $process )
2017-11-19 17:04:40 -05:00
{
2017-11-05 10:33:46 +00:00
self :: $up_start = microtime ( true );
2017-11-05 15:28:55 +00:00
// At first check the maximum load. We shouldn't continue with a high load
2021-10-24 20:43:59 +02:00
if ( DI :: system () -> isMaxLoadReached ()) {
2020-09-17 17:57:41 +00:00
Logger :: notice ( 'Pre check: maximum load reached, quitting.' );
2017-11-05 15:28:55 +00:00
return ;
}
// We now start the process. This is done after the load check since this could increase the load.
2021-10-24 20:43:59 +02:00
self :: $process = $process ;
2017-11-05 15:28:55 +00:00
2017-11-05 10:33:46 +00:00
// Kill stale processes every 5 minutes
2022-12-29 20:30:19 +01:00
$last_cleanup = DI :: keyValue () -> get ( 'worker_last_cleaned' ) ? ? 0 ;
2017-11-05 10:33:46 +00:00
if ( time () > ( $last_cleanup + 300 )) {
2022-12-29 20:30:19 +01:00
DI :: keyValue () -> set ( 'worker_last_cleaned' , time ());
2022-05-19 19:24:21 +00:00
Worker\Cron :: killStaleWorkers ();
2017-11-05 10:33:46 +00:00
}
2020-08-19 18:21:40 +00:00
// Check if the system is ready
if ( ! self :: isReady ()) {
2017-11-05 10:33:46 +00:00
return ;
}
// Now we start additional cron processes if we should do so
if ( $run_cron ) {
2022-05-19 19:24:21 +00:00
Worker\Cron :: run ();
2017-11-05 10:33:46 +00:00
}
2020-08-29 13:01:58 +00:00
$last_check = $starttime = time ();
2019-02-27 06:55:04 +00:00
self :: $state = self :: STATE_STARTUP ;
2017-11-05 10:33:46 +00:00
// We fetch the next queue entry that is about to be executed
2019-02-17 18:55:17 +00:00
while ( $r = self :: workerProcess ()) {
2022-05-19 19:24:21 +00:00
if ( Worker\IPC :: JobsExists ( getmypid ())) {
Worker\IPC :: DeleteJobState ( getmypid ());
2021-01-05 10:18:25 +00:00
}
2021-01-05 16:01:05 +00:00
2020-08-29 11:26:40 +00:00
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI :: config () -> get ( 'system' , 'worker_multiple_fetch' );
2017-11-19 17:04:40 -05:00
foreach ( $r as $entry ) {
2017-11-05 10:33:46 +00:00
// The work will be done
if ( ! self :: execute ( $entry )) {
2022-08-31 19:03:37 +00:00
Logger :: warning ( 'Process execution failed, quitting.' , [ 'entry' => $entry ]);
2017-11-05 10:33:46 +00:00
return ;
}
2019-03-08 20:39:58 +00:00
// Trying to fetch new processes - but only once when successful
2020-08-06 18:53:45 +00:00
if ( ! $refetched && DI :: lock () -> acquire ( self :: LOCK_PROCESS , 0 )) {
2019-02-17 18:55:17 +00:00
self :: findWorkerProcesses ();
2020-08-06 18:53:45 +00:00
DI :: lock () -> release ( self :: LOCK_PROCESS );
2019-02-27 06:55:04 +00:00
self :: $state = self :: STATE_REFETCH ;
2019-03-08 20:39:58 +00:00
$refetched = true ;
} else {
self :: $state = self :: STATE_SHORT_LOOP ;
2017-11-05 10:33:46 +00:00
}
}
2019-03-08 20:39:58 +00:00
// To avoid the quitting of multiple workers only one worker at a time will execute the check
2020-08-29 13:01:58 +00:00
if (( time () > $last_check + 5 ) && ! self :: getWaitingJobForPID ()) {
2019-02-27 06:55:04 +00:00
self :: $state = self :: STATE_LONG_LOOP ;
2019-02-27 06:36:19 +00:00
2020-08-06 18:53:45 +00:00
if ( DI :: lock () -> acquire ( self :: LOCK_WORKER , 0 )) {
2017-11-05 10:33:46 +00:00
// Count active workers and compare them with a maximum value that depends on the load
2019-03-08 20:39:58 +00:00
if ( self :: tooMuchWorkers ()) {
2020-12-31 20:14:13 +00:00
Logger :: notice ( 'Active worker limit reached, quitting.' );
2020-08-06 18:53:45 +00:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2019-03-08 20:39:58 +00:00
return ;
}
// Check free memory
2021-10-24 20:43:59 +02:00
if ( DI :: system () -> isMinMemoryReached ()) {
2020-12-31 20:14:13 +00:00
Logger :: warning ( 'Memory limit reached, quitting.' );
2020-08-06 18:53:45 +00:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2019-03-08 20:39:58 +00:00
return ;
}
2020-08-06 18:53:45 +00:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2017-11-05 10:33:46 +00:00
}
2020-08-29 13:01:58 +00:00
$last_check = time ();
2017-11-05 10:33:46 +00:00
}
2019-02-27 07:08:44 +00:00
// Quit the worker once every cron interval
2022-09-22 05:45:42 +00:00
if ( time () > ( $starttime + ( DI :: config () -> get ( 'system' , 'cron_interval' ) * 60 )) && ! self :: systemLimitReached ()) {
2019-02-27 09:49:26 +00:00
Logger :: info ( 'Process lifetime reached, respawning.' );
2021-10-24 20:43:59 +02:00
self :: unclaimProcess ( $process );
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode ()) {
Worker\IPC :: SetJobState ( true );
2021-01-05 10:18:25 +00:00
} else {
self :: spawnWorker ();
2021-01-04 09:20:44 +00:00
}
2019-02-27 07:08:44 +00:00
return ;
}
2017-11-05 10:33:46 +00:00
}
2018-06-15 18:18:20 +00:00
// Cleaning up. Possibly not needed, but it doesn't harm anything.
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode ()) {
Worker\IPC :: SetJobState ( false );
2018-06-01 22:09:27 +00:00
}
2020-06-01 13:51:58 +00:00
Logger :: info ( " Couldn't select a workerqueue entry, quitting process " , [ 'pid' => getmypid ()]);
2017-11-05 10:33:46 +00:00
}
2020-08-19 18:21:40 +00:00
/**
* Checks if the system is ready .
*
* Several system parameters like memory , connections and processes are checked .
*
* @ return boolean
*/
2022-06-16 22:12:38 +02:00
public static function isReady () : bool
2020-08-19 18:21:40 +00:00
{
// Count active workers and compare them with a maximum value that depends on the load
if ( self :: tooMuchWorkers ()) {
2020-12-31 20:14:13 +00:00
Logger :: notice ( 'Active worker limit reached, quitting.' );
2020-08-19 18:21:40 +00:00
return false ;
}
// Do we have too few memory?
2021-10-24 20:43:59 +02:00
if ( DI :: system () -> isMinMemoryReached ()) {
2020-12-31 20:14:13 +00:00
Logger :: warning ( 'Memory limit reached, quitting.' );
2020-08-19 18:21:40 +00:00
return false ;
}
// Possibly there are too much database connections
if ( self :: maxConnectionsReached ()) {
2020-12-31 20:14:13 +00:00
Logger :: warning ( 'Maximum connections reached, quitting.' );
2020-08-19 18:21:40 +00:00
return false ;
}
// Possibly there are too much database processes that block the system
2021-10-24 20:43:59 +02:00
if ( DI :: system () -> isMaxProcessesReached ()) {
2020-12-31 20:14:13 +00:00
Logger :: warning ( 'Maximum processes reached, quitting.' );
2020-08-19 18:21:40 +00:00
return false ;
}
2021-01-02 08:43:55 +00:00
2020-08-19 18:21:40 +00:00
return true ;
}
2019-02-09 23:10:15 +00:00
/**
2020-01-19 06:05:23 +00:00
* Check if non executed tasks do exist in the worker queue
2019-02-09 23:10:15 +00:00
*
* @ return boolean Returns " true " if tasks are existing
* @ throws \Exception
*/
2022-06-16 22:12:38 +02:00
public static function entriesExists () : bool
2019-02-09 23:10:15 +00:00
{
$stamp = ( float ) microtime ( true );
$exists = DBA :: exists ( 'workerqueue' , [ " NOT `done` AND `pid` = 0 AND `next_try` < ? " , DateTimeFormat :: utcNow ()]);
self :: $db_duration += ( microtime ( true ) - $stamp );
return $exists ;
}
2018-10-23 20:38:28 +00:00
/**
2020-01-19 06:05:23 +00:00
* Returns the number of deferred entries in the worker queue
2018-10-23 20:38:28 +00:00
*
* @ return integer Number of deferred entries in the worker queue
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2018-10-23 20:38:28 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function deferredEntries () : int
2018-10-23 20:38:28 +00:00
{
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2019-08-13 06:43:08 +02:00
$count = DBA :: count ( 'workerqueue' , [ " NOT `done` AND `pid` = 0 AND `retrial` > ? " , 0 ]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_count += ( microtime ( true ) - $stamp );
return $count ;
2018-10-23 20:38:28 +00:00
}
2017-11-05 10:33:46 +00:00
/**
2020-01-19 06:05:23 +00:00
* Returns the number of non executed entries in the worker queue
2017-11-05 10:33:46 +00:00
*
* @ return integer Number of non executed entries in the worker queue
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function totalEntries () : int
2017-11-19 17:04:40 -05:00
{
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
$count = DBA :: count ( 'workerqueue' , [ 'done' => false , 'pid' => 0 ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_count += ( microtime ( true ) - $stamp );
return $count ;
2017-11-05 10:33:46 +00:00
}
/**
2020-01-19 06:05:23 +00:00
* Returns the highest priority in the worker queue that isn ' t executed
2017-11-05 10:33:46 +00:00
*
2017-11-19 21:47:21 +00:00
* @ return integer Number of active worker processes
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function highestPriority () : int
2017-11-19 17:04:40 -05:00
{
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2019-02-08 21:48:41 +00:00
$condition = [ " `pid` = 0 AND NOT `done` AND `next_try` < ? " , DateTimeFormat :: utcNow ()];
2018-07-20 08:19:26 -04:00
$workerqueue = DBA :: selectFirst ( 'workerqueue' , [ 'priority' ], $condition , [ 'order' => [ 'priority' ]]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-21 08:46:04 -04:00
if ( DBA :: isResult ( $workerqueue )) {
2022-05-01 08:58:48 +00:00
return $workerqueue [ 'priority' ];
2017-11-05 10:33:46 +00:00
} else {
return 0 ;
}
}
/**
2020-01-19 06:05:23 +00:00
* Returns if a process with the given priority is running
2017-11-05 10:33:46 +00:00
*
* @ param integer $priority The priority that should be checked
*
* @ return integer Is there a process running with that priority ?
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function processWithPriorityActive ( int $priority ) : int
2017-11-19 17:04:40 -05:00
{
2019-02-08 21:48:41 +00:00
$condition = [ " `priority` <= ? AND `pid` != 0 AND NOT `done` " , $priority ];
2018-07-20 08:19:26 -04:00
return DBA :: exists ( 'workerqueue' , $condition );
2017-11-05 10:33:46 +00:00
}
2021-11-04 20:29:59 +00:00
/**
* Checks if the given file is valid to be included
*
2022-05-02 14:36:21 +00:00
* @ param mixed $file
* @ return bool
2021-11-04 20:29:59 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function validateInclude ( & $file ) : bool
2021-11-04 20:29:59 +00:00
{
$orig_file = $file ;
2022-05-02 14:36:21 +00:00
2021-11-04 20:29:59 +00:00
$file = realpath ( $file );
2022-05-02 14:36:21 +00:00
2021-11-04 20:29:59 +00:00
if ( strpos ( $file , getcwd ()) !== 0 ) {
return false ;
}
2022-05-02 14:36:21 +00:00
2022-09-04 13:54:32 +00:00
$file = str_replace ( getcwd () . '/' , '' , $file , $count );
2021-11-04 20:29:59 +00:00
if ( $count != 1 ) {
return false ;
}
2022-05-02 14:36:21 +00:00
2021-11-04 20:29:59 +00:00
if ( $orig_file !== $file ) {
return false ;
}
2022-05-02 14:36:21 +00:00
2022-10-20 04:21:00 +00:00
return ( strpos ( $file , 'addon/' ) === 0 );
2021-11-04 20:29:59 +00:00
}
2022-05-02 14:36:21 +00:00
2017-11-05 10:33:46 +00:00
/**
2020-01-19 06:05:23 +00:00
* Execute a worker entry
2017-11-05 10:33:46 +00:00
*
* @ param array $queue Workerqueue entry
*
* @ return boolean " true " if further processing should be stopped
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
public static function execute ( array $queue ) : bool
2017-11-19 17:04:40 -05:00
{
2017-11-05 10:33:46 +00:00
$mypid = getmypid ();
// Quit when in maintenance
2020-01-19 21:21:13 +01:00
if ( DI :: config () -> get ( 'system' , 'maintenance' , false , true )) {
2022-09-04 13:54:32 +00:00
Logger :: notice ( 'Maintenance mode - quit process' , [ 'pid' => $mypid ]);
2017-11-05 10:33:46 +00:00
return false ;
}
// Constantly check the number of parallel database processes
2021-10-24 20:43:59 +02:00
if ( DI :: system () -> isMaxProcessesReached ()) {
2022-09-04 13:54:32 +00:00
Logger :: warning ( 'Max processes reached for process' , [ 'pid' => $mypid ]);
2017-11-05 10:33:46 +00:00
return false ;
}
// Constantly check the number of available database connections to let the frontend be accessible at any time
if ( self :: maxConnectionsReached ()) {
2022-09-04 13:54:32 +00:00
Logger :: warning ( 'Max connection reached for process' , [ 'pid' => $mypid ]);
2017-11-05 10:33:46 +00:00
return false ;
}
2020-12-03 15:47:50 +00:00
$argv = json_decode ( $queue [ 'parameter' ], true );
2021-03-24 19:52:53 +00:00
if ( ! is_array ( $argv )) {
$argv = [];
}
2020-12-03 15:47:50 +00:00
if ( ! empty ( $queue [ 'command' ])) {
array_unshift ( $argv , $queue [ 'command' ]);
}
2020-06-01 13:51:58 +00:00
if ( empty ( $argv )) {
2020-09-17 17:57:41 +00:00
Logger :: warning ( 'Parameter is empty' , [ 'queue' => $queue ]);
2020-06-01 13:51:58 +00:00
return false ;
}
2017-11-05 10:33:46 +00:00
// Check for existance and validity of the include file
$include = $argv [ 0 ];
2017-11-12 07:21:23 +00:00
if ( method_exists ( sprintf ( 'Friendica\Worker\%s' , $include ), 'execute' )) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if ( ! isset ( self :: $last_update )) {
2022-09-04 13:54:32 +00:00
self :: $last_update = strtotime ( $queue [ 'executed' ]);
2017-11-12 07:21:23 +00:00
}
$age = ( time () - self :: $last_update ) / 60 ;
self :: $last_update = time ();
if ( $age > 1 ) {
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow ()], [ 'pid' => $mypid , 'done' => false ]);
2017-11-12 07:21:23 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-09 23:10:15 +00:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-12 07:21:23 +00:00
}
array_shift ( $argv );
self :: execFunction ( $queue , $include , $argv , true );
$stamp = ( float ) microtime ( true );
2018-10-23 03:54:18 +00:00
$condition = [ " `id` = ? AND `next_try` < ? " , $queue [ 'id' ], DateTimeFormat :: utcNow ()];
if ( DBA :: update ( 'workerqueue' , [ 'done' => true ], $condition )) {
2022-12-29 20:30:19 +01:00
DI :: keyValue () -> set ( 'last_worker_execution' , DateTimeFormat :: utcNow ());
2017-11-12 07:21:23 +00:00
}
self :: $db_duration = ( microtime ( true ) - $stamp );
2019-02-09 23:10:15 +00:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-12 07:21:23 +00:00
return true ;
}
2021-11-04 20:29:59 +00:00
if ( ! self :: validateInclude ( $include )) {
2022-09-04 13:54:32 +00:00
Logger :: warning ( 'Include file is not valid' , [ 'file' => $argv [ 0 ]]);
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2022-09-04 13:54:32 +00:00
DBA :: delete ( 'workerqueue' , [ 'id' => $queue [ 'id' ]]);
2019-02-09 23:10:15 +00:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
return true ;
}
2017-11-19 17:04:40 -05:00
require_once $include ;
2017-11-05 10:33:46 +00:00
2022-09-04 13:54:32 +00:00
$funcname = str_replace ( '.php' , '' , basename ( $argv [ 0 ])) . '_run' ;
2017-11-05 10:33:46 +00:00
if ( function_exists ( $funcname )) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if ( ! isset ( self :: $last_update )) {
2022-09-04 13:54:32 +00:00
self :: $last_update = strtotime ( $queue [ 'executed' ]);
2017-11-05 10:33:46 +00:00
}
$age = ( time () - self :: $last_update ) / 60 ;
self :: $last_update = time ();
if ( $age > 1 ) {
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow ()], [ 'pid' => $mypid , 'done' => false ]);
2017-11-05 10:33:46 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-09 23:10:15 +00:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
}
2017-11-12 07:21:23 +00:00
self :: execFunction ( $queue , $funcname , $argv , false );
2017-11-05 10:33:46 +00:00
$stamp = ( float ) microtime ( true );
2022-09-04 13:54:32 +00:00
if ( DBA :: update ( 'workerqueue' , [ 'done' => true ], [ 'id' => $queue [ 'id' ]])) {
2022-12-29 20:30:19 +01:00
DI :: keyValue () -> set ( 'last_worker_execution' , DateTimeFormat :: utcNow ());
2017-11-05 10:33:46 +00:00
}
self :: $db_duration = ( microtime ( true ) - $stamp );
2019-02-09 23:10:15 +00:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
} else {
2022-09-04 13:54:32 +00:00
Logger :: warning ( 'Function does not exist' , [ 'function' => $funcname ]);
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2022-09-04 13:54:32 +00:00
DBA :: delete ( 'workerqueue' , [ 'id' => $queue [ 'id' ]]);
2019-02-09 23:10:15 +00:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
}
return true ;
}
2022-09-22 05:45:42 +00:00
/**
* Checks if system limits are reached .
*
* @ return boolean
*/
private static function systemLimitReached () : bool
{
$load_cooldown = DI :: config () -> get ( 'system' , 'worker_load_cooldown' );
$processes_cooldown = DI :: config () -> get ( 'system' , 'worker_processes_cooldown' );
2022-10-10 06:01:07 +00:00
if ( $load_cooldown == 0 ) {
$load_cooldown = DI :: config () -> get ( 'system' , 'maxloadavg' );
}
2022-09-22 05:45:42 +00:00
if (( $load_cooldown == 0 ) && ( $processes_cooldown == 0 )) {
return false ;
}
2022-12-03 19:44:50 +00:00
$load = System :: getLoadAvg ( $processes_cooldown != 0 );
2022-09-22 05:45:42 +00:00
if ( empty ( $load )) {
return false ;
}
if (( $load_cooldown > 0 ) && ( $load [ 'average1' ] > $load_cooldown )) {
return true ;
}
if (( $processes_cooldown > 0 ) && ( $load [ 'scheduled' ] > $processes_cooldown )) {
return true ;
}
return false ;
}
2017-11-05 10:33:46 +00:00
/**
2022-09-21 21:03:07 +02:00
* Slow the execution down if the system load is too high
2017-11-05 10:33:46 +00:00
*
2017-11-19 17:04:40 -05:00
* @ return void
2017-11-05 10:33:46 +00:00
*/
2022-09-21 21:03:07 +02:00
public static function coolDown ()
2017-11-19 17:04:40 -05:00
{
2022-09-22 22:46:59 +00:00
$cooldown = DI :: config () -> get ( 'system' , 'worker_cooldown' , 0 );
if ( $cooldown > 0 ) {
Logger :: debug ( 'Wait for cooldown.' , [ 'cooldown' => $cooldown ]);
if ( $cooldown < 1 ) {
usleep ( $cooldown * 1000000 );
} else {
sleep ( $cooldown );
}
}
2022-09-21 21:03:07 +02:00
$load_cooldown = DI :: config () -> get ( 'system' , 'worker_load_cooldown' );
$processes_cooldown = DI :: config () -> get ( 'system' , 'worker_processes_cooldown' );
2017-11-05 10:33:46 +00:00
2022-10-10 06:01:07 +00:00
if ( $load_cooldown == 0 ) {
$load_cooldown = DI :: config () -> get ( 'system' , 'maxloadavg' );
}
2022-09-21 21:03:07 +02:00
if (( $load_cooldown == 0 ) && ( $processes_cooldown == 0 )) {
return ;
2021-01-02 19:33:50 +00:00
}
2022-09-21 21:03:07 +02:00
$sleeping = false ;
2022-09-04 13:54:32 +00:00
2022-12-03 19:44:50 +00:00
while ( $load = System :: getLoadAvg ( $processes_cooldown != 0 )) {
2022-09-04 13:54:32 +00:00
if (( $load_cooldown > 0 ) && ( $load [ 'average1' ] > $load_cooldown )) {
2022-09-21 21:03:07 +02:00
if ( ! $sleeping ) {
Logger :: notice ( 'Load induced pre execution cooldown.' , [ 'max' => $load_cooldown , 'load' => $load , 'called-by' => System :: callstack ( 1 )]);
$sleeping = true ;
}
2022-09-04 13:54:32 +00:00
sleep ( 1 );
continue ;
}
if (( $processes_cooldown > 0 ) && ( $load [ 'scheduled' ] > $processes_cooldown )) {
2022-09-21 21:03:07 +02:00
if ( ! $sleeping ) {
Logger :: notice ( 'Process induced pre execution cooldown.' , [ 'max' => $processes_cooldown , 'load' => $load , 'called-by' => System :: callstack ( 1 )]);
$sleeping = true ;
}
2022-09-04 13:54:32 +00:00
sleep ( 1 );
continue ;
}
break ;
}
2022-09-21 21:03:07 +02:00
if ( $sleeping ) {
Logger :: notice ( 'Cooldown ended.' , [ 'max-load' => $load_cooldown , 'max-processes' => $processes_cooldown , 'load' => $load , 'called-by' => System :: callstack ( 1 )]);
}
}
/**
* Execute a function from the queue
*
* @ param array $queue Workerqueue entry
* @ param string $funcname name of the function
* @ param array $argv Array of values to be passed to the function
* @ param boolean $method_call boolean
* @ return void
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
*/
private static function execFunction ( array $queue , string $funcname , array $argv , bool $method_call )
{
$a = DI :: app ();
self :: coolDown ();
2019-07-21 20:24:16 +02:00
Logger :: enableWorker ( $funcname );
2017-11-05 10:33:46 +00:00
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Process start.' , [ 'priority' => $queue [ 'priority' ], 'id' => $queue [ 'id' ]]);
2017-11-05 10:33:46 +00:00
$stamp = ( float ) microtime ( true );
// We use the callstack here to analyze the performance of executed worker entries.
// For this reason the variables have to be initialized.
2019-12-15 23:50:35 +01:00
DI :: profiler () -> reset ();
2017-11-05 10:33:46 +00:00
2021-07-24 22:08:33 +00:00
$a -> setQueue ( $queue );
2017-11-05 10:33:46 +00:00
2019-02-09 23:10:15 +00:00
$up_duration = microtime ( true ) - self :: $up_start ;
2017-11-05 10:33:46 +00:00
// Reset global data to avoid interferences
unset ( $_SESSION );
2019-02-22 20:41:13 +01:00
// Set the workerLogger as new default logger
2017-11-12 07:21:23 +00:00
if ( $method_call ) {
2022-11-26 15:21:46 -05:00
try {
call_user_func_array ( sprintf ( 'Friendica\Worker\%s::execute' , $funcname ), $argv );
} catch ( \TypeError $e ) {
// No need to defer a worker queue entry if the arguments are invalid
Logger :: notice ( 'Wrong worker arguments' , [ 'class' => $funcname , 'argv' => $argv , 'queue' => $queue , 'message' => $e -> getMessage ()]);
} catch ( \Throwable $e ) {
2022-12-30 22:25:19 -05:00
Logger :: error ( 'Uncaught exception in worker execution' , [ 'class' => get_class ( $e ), 'message' => $e -> getMessage (), 'code' => $e -> getCode (), 'file' => $e -> getFile () . ':' . $e -> getLine (), 'trace' => $e -> getTraceAsString (), 'previous' => $e -> getPrevious ()]);
2022-11-26 15:21:46 -05:00
Worker :: defer ();
}
2017-11-12 07:21:23 +00:00
} else {
2020-06-01 13:51:58 +00:00
$funcname ( $argv , count ( $argv ));
2017-11-12 07:21:23 +00:00
}
2019-07-21 20:24:16 +02:00
Logger :: disableWorker ();
2017-11-05 10:33:46 +00:00
2021-07-24 22:08:33 +00:00
$a -> setQueue ([]);
2017-11-05 10:33:46 +00:00
2018-07-15 20:36:20 +02:00
$duration = ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
/* With these values we can analyze how effective the worker is .
* The database and rest time should be low since this is the unproductive time .
* The execution time is the productive time .
* By changing parameters like the maximum number of workers we can check the effectivness .
*/
2019-02-27 06:36:19 +00:00
$dbtotal = round ( self :: $db_duration , 2 );
$dbread = round ( self :: $db_duration - ( self :: $db_duration_count + self :: $db_duration_write + self :: $db_duration_stat ), 2 );
$dbcount = round ( self :: $db_duration_count , 2 );
$dbstat = round ( self :: $db_duration_stat , 2 );
$dbwrite = round ( self :: $db_duration_write , 2 );
$dblock = round ( self :: $lock_duration , 2 );
$rest = round ( max ( 0 , $up_duration - ( self :: $db_duration + self :: $lock_duration )), 2 );
$exec = round ( $duration , 2 );
2019-02-21 19:32:31 +00:00
2019-07-21 20:24:16 +02:00
Logger :: info ( 'Performance:' , [ 'state' => self :: $state , 'count' => $dbcount , 'stat' => $dbstat , 'write' => $dbwrite , 'lock' => $dblock , 'total' => $dbtotal , 'rest' => $rest , 'exec' => $exec ]);
2017-11-19 17:04:40 -05:00
2022-09-21 21:03:07 +02:00
self :: coolDown ();
2019-02-09 23:10:15 +00:00
self :: $up_start = microtime ( true );
self :: $db_duration = 0 ;
self :: $db_duration_count = 0 ;
self :: $db_duration_stat = 0 ;
self :: $db_duration_write = 0 ;
2017-11-05 10:33:46 +00:00
self :: $lock_duration = 0 ;
if ( $duration > 3600 ) {
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Longer than 1 hour.' , [ 'priority' => $queue [ 'priority' ], 'id' => $queue [ 'id' ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 10:33:46 +00:00
} elseif ( $duration > 600 ) {
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Longer than 10 minutes.' , [ 'priority' => $queue [ 'priority' ], 'id' => $queue [ 'id' ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 10:33:46 +00:00
} elseif ( $duration > 300 ) {
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Longer than 5 minutes.' , [ 'priority' => $queue [ 'priority' ], 'id' => $queue [ 'id' ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 10:33:46 +00:00
} elseif ( $duration > 120 ) {
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Longer than 2 minutes.' , [ 'priority' => $queue [ 'priority' ], 'id' => $queue [ 'id' ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 10:33:46 +00:00
}
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Process done.' , [ 'priority' => $queue [ 'priority' ], 'id' => $queue [ 'id' ], 'duration' => round ( $duration , 3 )]);
2017-11-05 10:33:46 +00:00
2022-09-04 13:54:32 +00:00
DI :: profiler () -> saveLog ( DI :: logger (), 'ID ' . $queue [ 'id' ] . ': ' . $funcname );
2017-11-05 10:33:46 +00:00
}
/**
2020-01-19 06:05:23 +00:00
* Checks if the number of database connections has reached a critical limit .
2017-11-05 10:33:46 +00:00
*
* @ return bool Are more than 3 / 4 of the maximum connections used ?
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function maxConnectionsReached () : bool
2017-11-19 17:04:40 -05:00
{
2017-11-05 10:33:46 +00:00
// Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
2022-09-04 13:54:32 +00:00
$max = DI :: config () -> get ( 'system' , 'max_connections' );
2017-11-05 10:33:46 +00:00
2017-11-19 21:47:21 +00:00
// Fetch the percentage level where the worker will get active
2022-09-04 13:54:32 +00:00
$maxlevel = DI :: config () -> get ( 'system' , 'max_connections_level' , 75 );
2017-11-05 10:33:46 +00:00
if ( $max == 0 ) {
// the maximum number of possible user connections can be a system variable
2018-07-20 22:01:53 -04:00
$r = DBA :: fetchFirst ( " SHOW VARIABLES WHERE `variable_name` = 'max_user_connections' " );
2018-07-21 08:46:04 -04:00
if ( DBA :: isResult ( $r )) {
2022-09-04 13:54:32 +00:00
$max = $r [ 'Value' ];
2017-11-05 10:33:46 +00:00
}
// Or it can be granted. This overrides the system variable
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
$r = DBA :: p ( 'SHOW GRANTS' );
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-20 08:19:26 -04:00
while ( $grants = DBA :: fetch ( $r )) {
2017-11-05 17:13:37 +00:00
$grant = array_pop ( $grants );
if ( stristr ( $grant , " GRANT USAGE ON " )) {
if ( preg_match ( " /WITH MAX_USER_CONNECTIONS ( \ d*)/ " , $grant , $match )) {
$max = $match [ 1 ];
2017-11-05 10:33:46 +00:00
}
}
}
2018-07-20 08:19:26 -04:00
DBA :: close ( $r );
2017-11-05 10:33:46 +00:00
}
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ( $max != 0 ) {
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
$r = DBA :: p ( 'SHOW PROCESSLIST' );
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-20 22:05:12 -04:00
$used = DBA :: numRows ( $r );
2018-07-20 08:19:26 -04:00
DBA :: close ( $r );
2017-11-05 10:33:46 +00:00
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Connection usage (user values)' , [ 'usage' => $used , 'max' => $max ]);
2017-11-05 10:33:46 +00:00
$level = ( $used / $max ) * 100 ;
if ( $level >= $maxlevel ) {
2022-09-04 13:54:32 +00:00
Logger :: warning ( 'Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used . '/' . $max );
2017-11-05 10:33:46 +00:00
return true ;
}
}
// We will now check for the system values.
// This limit could be reached although the user limits are fine.
2018-07-20 22:01:53 -04:00
$r = DBA :: fetchFirst ( " SHOW VARIABLES WHERE `variable_name` = 'max_connections' " );
2018-07-21 08:46:04 -04:00
if ( ! DBA :: isResult ( $r )) {
2017-11-05 10:33:46 +00:00
return false ;
}
2022-09-04 13:54:32 +00:00
$max = intval ( $r [ 'Value' ]);
2017-11-05 10:33:46 +00:00
if ( $max == 0 ) {
return false ;
}
2018-07-20 22:01:53 -04:00
$r = DBA :: fetchFirst ( " SHOW STATUS WHERE `variable_name` = 'Threads_connected' " );
2018-07-21 08:46:04 -04:00
if ( ! DBA :: isResult ( $r )) {
2017-11-05 10:33:46 +00:00
return false ;
}
2022-09-04 13:54:32 +00:00
$used = intval ( $r [ 'Value' ]);
2017-11-05 10:33:46 +00:00
if ( $used == 0 ) {
return false ;
}
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Connection usage (system values)' , [ 'used' => $used , 'max' => $max ]);
2017-11-05 10:33:46 +00:00
$level = $used / $max * 100 ;
if ( $level < $maxlevel ) {
return false ;
}
2022-09-04 13:54:32 +00:00
Logger :: warning ( 'Maximum level (' . $level . '%) of system connections reached: ' . $used . '/' . $max );
2017-11-05 10:33:46 +00:00
return true ;
}
/**
2020-01-19 06:05:23 +00:00
* Checks if the number of active workers exceeds the given limits
2017-11-05 10:33:46 +00:00
*
* @ return bool Are there too much workers running ?
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function tooMuchWorkers () : bool
2017-11-19 17:04:40 -05:00
{
2022-09-04 13:54:32 +00:00
$queues = DI :: config () -> get ( 'system' , 'worker_queues' , 10 );
2017-11-05 10:33:46 +00:00
$maxqueues = $queues ;
$active = self :: activeWorkers ();
// Decrease the number of workers at higher load
2018-10-13 18:57:31 +02:00
$load = System :: currentLoad ();
2017-11-05 10:33:46 +00:00
if ( $load ) {
2022-09-04 13:54:32 +00:00
$maxsysload = intval ( DI :: config () -> get ( 'system' , 'maxloadavg' , 20 ));
2018-06-20 04:38:50 +00:00
2018-06-20 06:43:57 -04:00
/* Default exponent 3 causes queues to rapidly decrease as load increases .
* If you have 20 max queues at idle , then you get only 5 queues at 37.1 % of $maxsysload .
* For some environments , this rapid decrease is not needed .
* With exponent 1 , you could have 20 max queues at idle and 13 at 37 % of $maxsysload .
*/
2020-01-19 21:21:13 +01:00
$exponent = intval ( DI :: config () -> get ( 'system' , 'worker_load_exponent' , 3 ));
2018-06-20 06:06:20 -04:00
$slope = pow ( max ( 0 , $maxsysload - $load ) / $maxsysload , $exponent );
$queues = intval ( ceil ( $slope * $maxqueues ));
2018-01-01 15:51:02 -05:00
$processlist = '' ;
2017-11-05 10:33:46 +00:00
2020-01-19 21:21:13 +01:00
if ( DI :: config () -> get ( 'system' , 'worker_jpm' )) {
$intervals = explode ( ',' , DI :: config () -> get ( 'system' , 'worker_jpm_range' ));
2019-02-06 07:37:45 +00:00
$jobs_per_minute = [];
foreach ( $intervals as $interval ) {
2019-02-11 04:39:24 +00:00
if ( $interval == 0 ) {
continue ;
} else {
$interval = ( int ) $interval ;
}
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2021-12-02 09:19:01 -05:00
$jobs = DBA :: count ( 'workerqueue' , [ " `done` AND `executed` > ? " , DateTimeFormat :: utc ( 'now - ' . $interval . ' minute' )]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-17 19:20:24 +00:00
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2021-10-12 05:53:29 +00:00
$jobs_per_minute [ $interval ] = number_format ( $jobs / $interval , 0 );
2019-02-06 07:37:45 +00:00
}
$processlist = ' - jpm: ' . implode ( '/' , $jobs_per_minute );
}
2019-02-09 23:10:15 +00:00
// Create a list of queue entries grouped by their priority
$listitem = [ 0 => '' ];
2017-11-19 17:04:40 -05:00
2019-02-09 23:10:15 +00:00
$idle_workers = $active ;
2017-11-05 10:33:46 +00:00
2019-02-17 18:55:17 +00:00
$deferred = self :: deferredEntries ();
2019-02-11 08:59:14 +00:00
2020-01-19 21:21:13 +01:00
if ( DI :: config () -> get ( 'system' , 'worker_debug' )) {
2019-02-11 08:59:14 +00:00
$waiting_processes = 0 ;
2017-11-05 10:33:46 +00:00
// Now adding all processes with workerqueue entries
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2019-08-28 06:44:37 +02:00
$jobs = DBA :: p ( " SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority` " );
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
while ( $entry = DBA :: fetch ( $jobs )) {
$stamp = ( float ) microtime ( true );
2022-05-01 08:58:48 +00:00
$running = DBA :: count ( 'workerqueue-view' , [ 'priority' => $entry [ 'priority' ]]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2021-10-12 05:53:29 +00:00
$idle_workers -= $running ;
2022-09-04 13:54:32 +00:00
$waiting_processes += $entry [ 'entries' ];
$listitem [ $entry [ 'priority' ]] = $entry [ 'priority' ] . ':' . $running . '/' . $entry [ 'entries' ];
2017-11-05 10:33:46 +00:00
}
2019-02-09 23:10:15 +00:00
DBA :: close ( $jobs );
} else {
2019-08-13 06:43:08 +02:00
$waiting_processes = self :: totalEntries ();
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2020-04-24 18:50:36 +00:00
$jobs = DBA :: p ( " SELECT COUNT(*) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority` ORDER BY `priority` " );
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-17 18:55:17 +00:00
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2019-02-08 21:48:41 +00:00
2019-02-09 23:10:15 +00:00
while ( $entry = DBA :: fetch ( $jobs )) {
2022-09-04 13:54:32 +00:00
$idle_workers -= $entry [ 'running' ];
$listitem [ $entry [ 'priority' ]] = $entry [ 'priority' ] . ':' . $entry [ 'running' ];
2019-02-09 23:10:15 +00:00
}
DBA :: close ( $jobs );
2017-11-05 10:33:46 +00:00
}
2019-08-23 07:23:32 +02:00
$waiting_processes -= $deferred ;
2022-09-04 13:54:32 +00:00
$listitem [ 0 ] = '0:' . max ( 0 , $idle_workers );
2019-02-09 23:10:15 +00:00
$processlist .= ' (' . implode ( ', ' , $listitem ) . ')' ;
2017-11-05 10:33:46 +00:00
2022-09-04 13:54:32 +00:00
if ( DI :: config () -> get ( 'system' , 'worker_fastlane' , false ) && ( $queues > 0 ) && ( $active >= $queues ) && self :: entriesExists ()) {
2017-11-05 10:33:46 +00:00
$top_priority = self :: highestPriority ();
$high_running = self :: processWithPriorityActive ( $top_priority );
2022-10-17 05:49:55 +00:00
if ( ! $high_running && ( $top_priority > self :: PRIORITY_UNDEFINED ) && ( $top_priority < self :: PRIORITY_NEGLIGIBLE )) {
2022-09-04 13:54:32 +00:00
Logger :: info ( 'Jobs with a higher priority are waiting but none is executed. Open a fastlane.' , [ 'priority' => $top_priority ]);
2017-11-05 10:33:46 +00:00
$queues = $active + 1 ;
}
}
2022-09-04 13:54:32 +00:00
Logger :: notice ( 'Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues );
2017-11-05 10:33:46 +00:00
// Are there fewer workers running as possible? Then fork a new one.
2022-09-22 05:45:42 +00:00
if ( ! DI :: config () -> get ( 'system' , 'worker_dont_fork' , false ) && ( $queues > ( $active + 1 )) && self :: entriesExists () && ! self :: systemLimitReached ()) {
2022-09-04 13:54:32 +00:00
Logger :: info ( 'There are fewer workers as possible, fork a new worker.' , [ 'active' => $active , 'queues' => $queues ]);
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode ()) {
Worker\IPC :: SetJobState ( true );
2018-06-15 18:18:20 +00:00
} else {
self :: spawnWorker ();
}
2017-11-05 10:33:46 +00:00
}
}
2018-06-19 18:53:02 -04:00
// if there are too much worker, we don't spawn a new one.
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode () && ( $active > $queues )) {
Worker\IPC :: SetJobState ( false );
2018-06-15 18:18:20 +00:00
}
2018-06-19 18:53:02 -04:00
return $active > $queues ;
2017-11-05 10:33:46 +00:00
}
/**
2020-01-19 06:05:23 +00:00
* Returns the number of active worker processes
2017-11-05 10:33:46 +00:00
*
2017-11-19 21:47:21 +00:00
* @ return integer Number of active worker processes
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
private static function activeWorkers () : int
2017-11-19 17:04:40 -05:00
{
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2021-10-31 14:31:02 +01:00
$count = DI :: process () -> countCommand ( 'Worker.php' );
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2020-08-29 13:01:58 +00:00
self :: $db_duration_count += ( microtime ( true ) - $stamp );
2019-02-09 23:10:15 +00:00
return $count ;
2017-11-05 10:33:46 +00:00
}
2020-08-29 09:03:50 +00:00
/**
* Returns the number of active worker processes
*
* @ return array List of worker process ids
* @ throws \Exception
*/
2022-06-16 22:12:38 +02:00
private static function getWorkerPIDList () : array
2020-08-29 09:03:50 +00:00
{
$ids = [];
$stamp = ( float ) microtime ( true );
$queues = DBA :: p ( " SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
2021-07-24 22:08:33 +00:00
LEFT JOIN `workerqueue` ON `workerqueue` . `pid` = `process` . `pid` AND NOT `workerqueue` . `done`
2020-08-29 09:03:50 +00:00
GROUP BY `process` . `pid` " );
while ( $queue = DBA :: fetch ( $queues )) {
$ids [ $queue [ 'pid' ]] = $queue [ 'entries' ];
}
DBA :: close ( $queues );
self :: $db_duration += ( microtime ( true ) - $stamp );
2020-08-29 13:01:58 +00:00
self :: $db_duration_count += ( microtime ( true ) - $stamp );
2020-08-29 09:03:50 +00:00
return $ids ;
}
2019-02-17 19:20:24 +00:00
/**
2020-01-19 06:05:23 +00:00
* Returns waiting jobs for the current process id
2019-02-17 19:20:24 +00:00
*
2022-06-16 22:17:37 +02:00
* @ return array | bool waiting workerqueue jobs or FALSE on failture
2019-02-17 19:20:24 +00:00
* @ throws \Exception
*/
2022-06-16 22:17:37 +02:00
private static function getWaitingJobForPID ()
2019-02-17 18:55:17 +00:00
{
$stamp = ( float ) microtime ( true );
$r = DBA :: select ( 'workerqueue' , [], [ 'pid' => getmypid (), 'done' => false ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
if ( DBA :: isResult ( $r )) {
return DBA :: toArray ( $r );
}
DBA :: close ( $r );
return false ;
}
2019-02-17 19:20:24 +00:00
/**
2020-01-19 06:05:23 +00:00
* Returns the next jobs that should be executed
2020-08-29 09:03:50 +00:00
* @ param int $limit
2019-02-17 19:20:24 +00:00
* @ return array array with next jobs
* @ throws \Exception
*/
2022-06-16 22:12:38 +02:00
private static function nextProcess ( int $limit ) : array
2019-02-16 15:03:37 +00:00
{
$priority = self :: nextPriority ();
if ( empty ( $priority )) {
2019-02-21 19:32:31 +00:00
Logger :: info ( 'No tasks found' );
2019-02-16 15:03:37 +00:00
return [];
}
$ids = [];
$stamp = ( float ) microtime ( true );
$condition = [ " `priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ? " , $priority , DateTimeFormat :: utcNow ()];
2020-12-03 15:47:50 +00:00
$tasks = DBA :: select ( 'workerqueue' , [ 'id' , 'command' , 'parameter' ], $condition , [ 'limit' => $limit , 'order' => [ 'retrial' , 'created' ]]);
2019-02-16 15:03:37 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
while ( $task = DBA :: fetch ( $tasks )) {
$ids [] = $task [ 'id' ];
2019-03-23 06:08:18 +00:00
// Only continue that loop while we are storing commands that can be processed quickly
2020-12-03 15:47:50 +00:00
if ( ! empty ( $task [ 'command' ])) {
$command = $task [ 'command' ];
} else {
$command = json_decode ( $task [ 'parameter' ])[ 0 ];
}
2019-03-23 06:08:18 +00:00
if ( ! in_array ( $command , self :: FAST_COMMANDS )) {
break ;
}
2019-02-16 15:03:37 +00:00
}
DBA :: close ( $tasks );
2019-03-23 06:08:18 +00:00
Logger :: info ( 'Found:' , [ 'priority' => $priority , 'id' => $ids ]);
2019-02-16 15:03:37 +00:00
return $ids ;
}
2019-02-17 19:20:24 +00:00
/**
2020-01-19 06:05:23 +00:00
* Returns the priority of the next workerqueue job
2019-02-17 19:20:24 +00:00
*
2022-06-16 22:12:38 +02:00
* @ return string | bool priority or FALSE on failure
2019-02-17 19:20:24 +00:00
* @ throws \Exception
*/
2019-02-17 18:55:17 +00:00
private static function nextPriority ()
2019-02-16 15:03:37 +00:00
{
$waiting = [];
2022-10-17 05:49:55 +00:00
$priorities = [ self :: PRIORITY_CRITICAL , self :: PRIORITY_HIGH , self :: PRIORITY_MEDIUM , self :: PRIORITY_LOW , self :: PRIORITY_NEGLIGIBLE ];
2019-02-16 15:03:37 +00:00
foreach ( $priorities as $priority ) {
$stamp = ( float ) microtime ( true );
if ( DBA :: exists ( 'workerqueue' , [ " `priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ? " , $priority , DateTimeFormat :: utcNow ()])) {
$waiting [ $priority ] = true ;
}
self :: $db_duration += ( microtime ( true ) - $stamp );
}
2022-10-17 05:49:55 +00:00
if ( ! empty ( $waiting [ self :: PRIORITY_CRITICAL ])) {
return self :: PRIORITY_CRITICAL ;
2019-02-16 15:03:37 +00:00
}
$running = [];
2019-02-17 03:22:29 +00:00
$running_total = 0 ;
2019-02-16 15:03:37 +00:00
$stamp = ( float ) microtime ( true );
2020-04-24 18:50:36 +00:00
$processes = DBA :: p ( " SELECT COUNT(DISTINCT(`pid`)) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority` " );
2019-02-16 15:03:37 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
while ( $process = DBA :: fetch ( $processes )) {
$running [ $process [ 'priority' ]] = $process [ 'running' ];
2019-02-17 03:22:29 +00:00
$running_total += $process [ 'running' ];
2019-02-16 15:03:37 +00:00
}
DBA :: close ( $processes );
foreach ( $priorities as $priority ) {
if ( ! empty ( $waiting [ $priority ]) && empty ( $running [ $priority ])) {
2019-02-21 19:32:31 +00:00
Logger :: info ( 'No running worker found with priority {priority} - assigning it.' , [ 'priority' => $priority ]);
2019-02-16 15:03:37 +00:00
return $priority ;
}
}
2019-02-17 03:22:29 +00:00
$active = max ( self :: activeWorkers (), $running_total );
$priorities = max ( count ( $waiting ), count ( $running ));
$exponent = 2 ;
$total = 0 ;
for ( $i = 1 ; $i <= $priorities ; ++ $i ) {
$total += pow ( $i , $exponent );
2019-02-16 15:03:37 +00:00
}
2019-02-17 03:22:29 +00:00
$limit = [];
for ( $i = 1 ; $i <= $priorities ; ++ $i ) {
$limit [ $priorities - $i ] = max ( 1 , round ( $active * ( pow ( $i , $exponent ) / $total )));
}
$i = 0 ;
foreach ( $running as $priority => $workers ) {
if ( $workers < $limit [ $i ++ ]) {
2019-02-21 19:32:31 +00:00
Logger :: info ( 'Priority {priority} has got {workers} workers out of a limit of {limit}' , [ 'priority' => $priority , 'workers' => $workers , 'limit' => $limit [ $i - 1 ]]);
2019-02-17 03:22:29 +00:00
return $priority ;
}
2019-02-16 15:03:37 +00:00
}
if ( ! empty ( $waiting )) {
2019-02-25 07:11:35 -05:00
$priority = array_keys ( $waiting )[ 0 ];
2019-02-21 19:32:31 +00:00
Logger :: info ( 'No underassigned priority found, now taking the highest priority.' , [ 'priority' => $priority ]);
2019-02-17 03:22:29 +00:00
return $priority ;
2019-02-16 15:03:37 +00:00
}
return false ;
}
2017-11-05 10:33:46 +00:00
/**
2020-01-19 06:05:23 +00:00
* Find and claim the next worker process for us
2017-11-05 10:33:46 +00:00
*
2022-06-16 22:12:38 +02:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 10:33:46 +00:00
*/
2019-02-17 18:55:17 +00:00
private static function findWorkerProcesses ()
2017-11-19 17:04:40 -05:00
{
2020-08-29 09:03:50 +00:00
$fetch_limit = DI :: config () -> get ( 'system' , 'worker_fetch_limit' , 1 );
if ( DI :: config () -> get ( 'system' , 'worker_multiple_fetch' )) {
$pids = [];
2020-08-29 10:44:38 +00:00
foreach ( self :: getWorkerPIDList () as $pid => $count ) {
2020-08-29 09:03:50 +00:00
if ( $count <= $fetch_limit ) {
$pids [] = $pid ;
}
}
if ( empty ( $pids )) {
return ;
}
$limit = $fetch_limit * count ( $pids );
} else {
$pids = [ getmypid ()];
$limit = $fetch_limit ;
}
2019-02-16 15:03:37 +00:00
2020-08-29 09:03:50 +00:00
$ids = self :: nextProcess ( $limit );
$limit -= count ( $ids );
2019-03-23 06:08:18 +00:00
2020-08-29 09:03:50 +00:00
// If there is not enough results we check without priority limit
if ( $limit > 0 ) {
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2019-02-17 19:20:24 +00:00
$condition = [ " `pid` = 0 AND NOT `done` AND `next_try` < ? " , DateTimeFormat :: utcNow ()];
2020-12-03 15:47:50 +00:00
$tasks = DBA :: select ( 'workerqueue' , [ 'id' , 'command' , 'parameter' ], $condition , [ 'limit' => $limit , 'order' => [ 'priority' , 'retrial' , 'created' ]]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
2019-03-23 06:08:18 +00:00
while ( $task = DBA :: fetch ( $tasks )) {
$ids [] = $task [ 'id' ];
// Only continue that loop while we are storing commands that can be processed quickly
2020-12-03 15:47:50 +00:00
if ( ! empty ( $task [ 'command' ])) {
$command = $task [ 'command' ];
} else {
$command = json_decode ( $task [ 'parameter' ])[ 0 ];
}
2019-03-23 06:08:18 +00:00
if ( ! in_array ( $command , self :: FAST_COMMANDS )) {
break ;
}
2017-11-05 10:33:46 +00:00
}
2019-03-23 06:08:18 +00:00
DBA :: close ( $tasks );
2017-11-05 10:33:46 +00:00
}
2020-08-29 10:44:38 +00:00
if ( empty ( $ids )) {
return ;
}
2020-08-29 09:03:50 +00:00
2020-08-29 10:44:38 +00:00
// Assign the task ids to the workers
$worker = [];
foreach ( array_unique ( $ids ) as $id ) {
$pid = next ( $pids );
if ( ! $pid ) {
$pid = reset ( $pids );
2020-08-29 09:03:50 +00:00
}
2020-08-29 10:44:38 +00:00
$worker [ $pid ][] = $id ;
2017-11-05 10:33:46 +00:00
}
2020-08-29 10:44:38 +00:00
$stamp = ( float ) microtime ( true );
foreach ( $worker as $worker_pid => $worker_ids ) {
Logger :: info ( 'Set queue entry' , [ 'pid' => $worker_pid , 'ids' => $worker_ids ]);
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow (), 'pid' => $worker_pid ],
[ 'id' => $worker_ids , 'done' => false , 'pid' => 0 ]);
}
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
}
/**
2020-01-19 06:05:23 +00:00
* Returns the next worker process
2017-11-05 10:33:46 +00:00
*
2020-06-01 13:51:58 +00:00
* @ return array worker processes
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 10:33:46 +00:00
*/
2022-06-16 22:12:38 +02:00
public static function workerProcess () : array
2017-11-19 17:04:40 -05:00
{
2017-11-05 10:33:46 +00:00
// There can already be jobs for us in the queue.
2019-02-17 18:55:17 +00:00
$waiting = self :: getWaitingJobForPID ();
if ( ! empty ( $waiting )) {
return $waiting ;
2017-11-05 10:33:46 +00:00
}
2019-02-08 21:48:41 +00:00
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2020-08-06 18:53:45 +00:00
if ( ! DI :: lock () -> acquire ( self :: LOCK_PROCESS )) {
2022-06-16 22:12:38 +02:00
return [];
2017-11-05 10:33:46 +00:00
}
2019-02-09 23:10:15 +00:00
self :: $lock_duration += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
2020-08-29 10:44:38 +00:00
self :: findWorkerProcesses ();
2017-11-05 10:33:46 +00:00
2020-08-06 18:53:45 +00:00
DI :: lock () -> release ( self :: LOCK_PROCESS );
2017-11-05 10:33:46 +00:00
2022-06-18 15:56:58 +02:00
// Prevents "Return value of Friendica\Core\Worker::workerProcess() must be of the type array, bool returned"
$process = self :: getWaitingJobForPID ();
return ( is_array ( $process ) ? $process : []);
2017-11-05 10:33:46 +00:00
}
/**
2020-01-19 06:05:23 +00:00
* Removes a workerqueue entry from the current process
2020-01-19 09:51:37 +00:00
*
2021-10-24 20:43:59 +02:00
* @ param Process $process the process behind the workerqueue
*
2017-11-19 17:04:40 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 10:33:46 +00:00
*/
2021-10-24 20:43:59 +02:00
public static function unclaimProcess ( Process $process )
2017-11-19 17:04:40 -05:00
{
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2021-10-24 20:43:59 +02:00
DBA :: update ( 'workerqueue' , [ 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 ], [ 'pid' => $process -> pid , 'done' => false ]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 10:33:46 +00:00
}
2021-01-01 19:35:29 +00:00
/**
* Fork a child process
*
* @ param boolean $do_cron
* @ return void
*/
private static function forkProcess ( bool $do_cron )
{
2021-10-24 20:43:59 +02:00
if ( DI :: system () -> isMinMemoryReached ()) {
2021-01-01 23:10:38 +00:00
Logger :: warning ( 'Memory limit reached - quitting' );
return ;
}
2021-01-01 19:35:29 +00:00
// Children inherit their parent's database connection.
// To avoid problems we disconnect and connect both parent and child
DBA :: disconnect ();
$pid = pcntl_fork ();
if ( $pid == - 1 ) {
DBA :: connect ();
Logger :: warning ( 'Could not spawn worker' );
return ;
} elseif ( $pid ) {
// The parent process continues here
DBA :: connect ();
2021-01-05 16:01:05 +00:00
2022-05-19 19:24:21 +00:00
Worker\IPC :: SetJobState ( true , $pid );
2021-01-05 16:01:05 +00:00
Logger :: info ( 'Spawned new worker' , [ 'pid' => $pid ]);
2021-01-05 10:18:25 +00:00
$cycles = 0 ;
2022-05-19 19:24:21 +00:00
while ( Worker\IPC :: JobsExists ( $pid ) && ( ++ $cycles < 100 )) {
2021-01-05 10:18:25 +00:00
usleep ( 10000 );
}
Logger :: info ( 'Spawned worker is ready' , [ 'pid' => $pid , 'wait_cycles' => $cycles ]);
2021-01-01 19:35:29 +00:00
return ;
}
2021-01-02 08:43:55 +00:00
2021-01-01 19:35:29 +00:00
// We now are in the new worker
DBA :: connect ();
2021-10-24 20:43:59 +02:00
2021-10-31 20:15:57 +01:00
DI :: flushLogger ();
2021-11-01 13:54:18 +01:00
$process = DI :: process () -> create ( getmypid (), basename ( __FILE__ ));
2021-01-01 19:35:29 +00:00
2021-01-05 16:47:55 +00:00
$cycles = 0 ;
2022-05-19 19:24:21 +00:00
while ( ! Worker\IPC :: JobsExists ( $process -> pid ) && ( ++ $cycles < 100 )) {
2021-01-05 16:47:55 +00:00
usleep ( 10000 );
}
2021-10-31 20:23:23 +01:00
Logger :: info ( 'Worker spawned' , [ 'pid' => $process -> pid , 'wait_cycles' => $cycles ]);
2021-01-01 19:35:29 +00:00
2021-10-24 20:43:59 +02:00
self :: processQueue ( $do_cron , $process );
2021-01-01 19:35:29 +00:00
2021-10-24 20:43:59 +02:00
self :: unclaimProcess ( $process );
2021-01-01 19:35:29 +00:00
2022-05-19 19:24:21 +00:00
Worker\IPC :: SetJobState ( false , $process -> pid );
2021-10-24 20:43:59 +02:00
DI :: process () -> delete ( $process );
2021-10-31 20:23:23 +01:00
Logger :: info ( 'Worker ended' , [ 'pid' => $process -> pid ]);
2021-01-01 19:35:29 +00:00
exit ();
}
2017-11-19 17:33:07 -05:00
/**
2020-01-19 06:05:23 +00:00
* Spawns a new worker
2020-01-19 09:51:37 +00:00
*
2019-01-06 16:06:53 -05:00
* @ param bool $do_cron
2017-11-19 17:33:07 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-19 17:33:07 -05:00
*/
2022-06-16 22:12:38 +02:00
public static function spawnWorker ( bool $do_cron = false )
2017-11-19 17:33:07 -05:00
{
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode () && DI :: config () -> get ( 'system' , 'worker_fork' )) {
2021-01-01 19:35:29 +00:00
self :: forkProcess ( $do_cron );
} else {
2021-10-24 20:43:59 +02:00
DI :: system () -> run ( 'bin/worker.php' , [ 'no_cron' => ! $do_cron ]);
2021-01-01 19:35:29 +00:00
}
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode ()) {
Worker\IPC :: SetJobState ( false );
2018-06-15 18:18:20 +00:00
}
2017-11-05 15:28:55 +00:00
}
2017-11-05 10:33:46 +00:00
/**
2020-01-19 06:05:23 +00:00
* Adds tasks to the worker queue
2017-11-05 10:33:46 +00:00
*
2017-11-06 15:38:15 +00:00
* @ param ( integer | array ) priority or parameter array , strings are deprecated and are ignored
2017-11-05 10:33:46 +00:00
*
* next args are passed as $cmd command line
2022-10-17 05:49:55 +00:00
* or : Worker :: add ( Worker :: PRIORITY_HIGH , 'Notifier' , Delivery :: DELETION , $drop_id );
* or : Worker :: add ( array ( 'priority' => Worker :: PRIORITY_HIGH , 'dont_fork' => true ), 'Delivery' , $post_id );
2017-11-05 10:33:46 +00:00
*
2022-09-04 13:54:32 +00:00
* @ return int '0' if worker queue entry already existed or there had been an error , otherwise the ID of the worker task
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2022-09-04 13:54:32 +00:00
* @ note $cmd and string args are surrounded with ''
2017-11-05 10:33:46 +00:00
*
* @ hooks 'proc_run'
2019-01-06 16:06:53 -05:00
* array $arr
2017-11-05 10:33:46 +00:00
*
*/
2021-10-16 19:06:43 -04:00
public static function add ( ... $args )
2017-11-19 17:04:40 -05:00
{
2018-02-11 16:18:39 +00:00
if ( ! count ( $args )) {
2021-07-28 22:22:00 +00:00
return 0 ;
2017-11-05 10:33:46 +00:00
}
2018-01-15 08:05:12 -05:00
$arr = [ 'args' => $args , 'run_cmd' => true ];
2017-11-05 10:33:46 +00:00
2022-09-04 13:54:32 +00:00
Hook :: callAll ( 'proc_run' , $arr );
2017-11-05 10:33:46 +00:00
if ( ! $arr [ 'run_cmd' ] || ! count ( $args )) {
2021-07-28 22:22:00 +00:00
return 1 ;
2017-11-05 10:33:46 +00:00
}
2022-10-17 05:49:55 +00:00
$priority = self :: PRIORITY_MEDIUM ;
2019-08-08 22:42:12 +02:00
// Don't fork from frontend tasks by default
2022-09-04 13:54:32 +00:00
$dont_fork = DI :: config () -> get ( 'system' , 'worker_dont_fork' , false ) || ! DI :: mode () -> isBackend ();
2018-01-26 21:38:34 -05:00
$created = DateTimeFormat :: utcNow ();
2020-11-25 19:56:39 +00:00
$delayed = DBA :: NULL_DATETIME ;
2019-02-08 21:48:41 +00:00
$force_priority = false ;
2017-11-05 10:33:46 +00:00
2018-02-13 02:26:35 +00:00
$run_parameter = array_shift ( $args );
2017-11-05 10:33:46 +00:00
if ( is_int ( $run_parameter )) {
$priority = $run_parameter ;
} elseif ( is_array ( $run_parameter )) {
2020-11-25 19:56:39 +00:00
if ( isset ( $run_parameter [ 'delayed' ])) {
2020-11-30 05:39:12 +00:00
$delayed = $run_parameter [ 'delayed' ];
2020-11-25 19:56:39 +00:00
}
2017-11-05 10:33:46 +00:00
if ( isset ( $run_parameter [ 'priority' ])) {
$priority = $run_parameter [ 'priority' ];
}
if ( isset ( $run_parameter [ 'created' ])) {
$created = $run_parameter [ 'created' ];
}
if ( isset ( $run_parameter [ 'dont_fork' ])) {
$dont_fork = $run_parameter [ 'dont_fork' ];
}
2019-02-08 21:48:41 +00:00
if ( isset ( $run_parameter [ 'force_priority' ])) {
$force_priority = $run_parameter [ 'force_priority' ];
}
2021-10-16 19:06:43 -04:00
} else {
throw new \InvalidArgumentException ( 'Priority number or task parameter array expected as first argument' );
2017-11-05 10:33:46 +00:00
}
2020-12-03 15:47:50 +00:00
$command = array_shift ( $args );
2018-02-11 16:18:39 +00:00
$parameters = json_encode ( $args );
2022-12-31 16:20:18 +00:00
$queue = DBA :: selectFirst ( 'workerqueue' , [ 'id' , 'priority' ], [ 'command' => $command , 'parameter' => $parameters , 'done' => false ]);
2021-07-28 22:22:00 +00:00
$added = 0 ;
2017-11-05 10:33:46 +00:00
2022-10-17 05:49:55 +00:00
if ( ! is_int ( $priority ) || ! in_array ( $priority , self :: PRIORITIES )) {
2020-12-09 06:58:19 +01:00
Logger :: warning ( 'Invalid priority' , [ 'priority' => $priority , 'command' => $command , 'callstack' => System :: callstack ( 20 )]);
2022-10-17 05:49:55 +00:00
$priority = self :: PRIORITY_MEDIUM ;
2020-12-08 21:58:32 +00:00
}
2017-11-05 10:33:46 +00:00
// Quit if there was a database error - a precaution for the update process to 3.5.3
2018-07-20 08:19:26 -04:00
if ( DBA :: errorNo () != 0 ) {
2021-07-28 22:22:00 +00:00
return 0 ;
2017-11-05 10:33:46 +00:00
}
2022-12-31 12:19:34 +00:00
if ( empty ( $queue )) {
2021-07-28 22:22:00 +00:00
if ( ! DBA :: insert ( 'workerqueue' , [ 'command' => $command , 'parameter' => $parameters , 'created' => $created ,
'priority' => $priority , 'next_try' => $delayed ])) {
return 0 ;
2019-09-02 03:25:05 +00:00
}
2021-07-28 22:22:00 +00:00
$added = DBA :: lastInsertId ();
2019-02-08 21:48:41 +00:00
} elseif ( $force_priority ) {
2022-12-31 12:19:34 +00:00
$ret = DBA :: update ( 'workerqueue' , [ 'priority' => $priority ], [ 'command' => $command , 'parameter' => $parameters , 'done' => false , 'pid' => 0 ]);
if ( $ret && ( $priority != $queue [ 'priority' ])) {
$added = $queue [ 'id' ];
}
2017-11-05 10:33:46 +00:00
}
2020-11-18 13:29:10 +00:00
// Set the IPC flag to ensure an immediate process execution via daemon
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode ()) {
Worker\IPC :: SetJobState ( true );
2020-11-18 13:29:10 +00:00
}
2022-05-19 19:24:21 +00:00
Worker\Daemon :: checkState ();
2020-11-20 19:50:08 +00:00
2018-06-02 05:17:32 +00:00
// Should we quit and wait for the worker to be called as a cronjob?
2022-09-22 05:45:42 +00:00
if ( $dont_fork || self :: systemLimitReached ()) {
2019-09-02 03:25:05 +00:00
return $added ;
2018-06-01 22:09:27 +00:00
}
2017-11-05 10:33:46 +00:00
// If there is a lock then we don't have to check for too much worker
2020-08-06 18:53:45 +00:00
if ( ! DI :: lock () -> acquire ( self :: LOCK_WORKER , 0 )) {
2019-09-02 03:25:05 +00:00
return $added ;
2017-11-05 10:33:46 +00:00
}
// If there are already enough workers running, don't fork another one
$quit = self :: tooMuchWorkers ();
2020-08-06 18:53:45 +00:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2017-11-05 10:33:46 +00:00
if ( $quit ) {
2019-09-02 03:25:05 +00:00
return $added ;
2017-11-05 10:33:46 +00:00
}
2020-11-18 13:29:10 +00:00
// Quit on daemon mode
2022-05-19 19:24:21 +00:00
if ( Worker\Daemon :: isMode ()) {
2019-09-02 03:25:05 +00:00
return $added ;
2018-06-15 18:18:20 +00:00
}
2017-11-19 21:47:21 +00:00
// Now call the worker to execute the jobs that we just added to the queue
2017-11-05 15:28:55 +00:00
self :: spawnWorker ();
2017-11-05 10:33:46 +00:00
2019-09-02 03:25:05 +00:00
return $added ;
2017-11-05 10:33:46 +00:00
}
2018-01-15 19:08:28 -05:00
2022-06-16 22:12:38 +02:00
public static function countWorkersByCommand ( string $command ) : int
2020-12-03 15:47:50 +00:00
{
return DBA :: count ( 'workerqueue' , [ 'done' => false , 'pid' => 0 , 'command' => $command ]);
}
2019-08-11 10:28:52 +02:00
/**
* Returns the next retrial level for worker jobs .
* This function will skip levels when jobs are older .
*
* @ param array $queue Worker queue entry
* @ param integer $max_level maximum retrial level
* @ return integer the next retrial level value
*/
2022-06-16 22:12:38 +02:00
private static function getNextRetrial ( array $queue , int $max_level ) : int
2019-08-11 10:28:52 +02:00
{
$created = strtotime ( $queue [ 'created' ]);
$retrial_time = time () - $created ;
$new_retrial = $queue [ 'retrial' ] + 1 ;
$total = 0 ;
for ( $retrial = 0 ; $retrial <= $max_level + 1 ; ++ $retrial ) {
$delay = (( $retrial + 3 ) ** 4 ) + ( rand ( 1 , 30 ) * ( $retrial + 1 ));
$total += $delay ;
if (( $total < $retrial_time ) && ( $retrial > $queue [ 'retrial' ])) {
$new_retrial = $retrial ;
}
}
2020-12-31 20:14:13 +00:00
Logger :: notice ( 'New retrial for task' , [ 'id' => $queue [ 'id' ], 'created' => $queue [ 'created' ], 'old' => $queue [ 'retrial' ], 'new' => $new_retrial ]);
2019-08-11 10:28:52 +02:00
return $new_retrial ;
}
2018-10-15 05:19:35 +00:00
/**
* Defers the current worker entry
2020-01-19 09:51:37 +00:00
*
2019-08-20 07:39:13 +00:00
* @ return boolean had the entry been deferred ?
2021-10-16 21:07:31 -04:00
* @ throws \Exception
2018-10-15 05:19:35 +00:00
*/
2021-10-16 21:07:31 -04:00
public static function defer () : bool
2018-10-15 05:19:35 +00:00
{
2021-07-24 22:08:33 +00:00
$queue = DI :: app () -> getQueue ();
if ( empty ( $queue )) {
2019-08-20 07:39:13 +00:00
return false ;
2018-10-15 05:19:35 +00:00
}
$id = $queue [ 'id' ];
2019-02-08 21:48:41 +00:00
$priority = $queue [ 'priority' ];
2018-10-15 05:19:35 +00:00
2020-01-19 21:21:13 +01:00
$max_level = DI :: config () -> get ( 'system' , 'worker_defer_limit' );
2019-08-11 23:07:06 +02:00
2019-08-11 10:28:52 +02:00
$new_retrial = self :: getNextRetrial ( $queue , $max_level );
if ( $new_retrial > $max_level ) {
2020-12-31 20:14:13 +00:00
Logger :: notice ( 'The task exceeded the maximum retry count' , [ 'id' => $id , 'created' => $queue [ 'created' ], 'old_prio' => $queue [ 'priority' ], 'old_retrial' => $queue [ 'retrial' ], 'max_level' => $max_level , 'retrial' => $new_retrial ]);
2019-08-20 07:39:13 +00:00
return false ;
2018-10-15 05:19:35 +00:00
}
// Calculate the delay until the next trial
2019-08-11 10:28:52 +02:00
$delay = (( $new_retrial + 2 ) ** 4 ) + ( rand ( 1 , 30 ) * ( $new_retrial ));
2018-10-15 05:19:35 +00:00
$next = DateTimeFormat :: utc ( 'now + ' . $delay . ' seconds' );
2022-10-17 05:49:55 +00:00
if (( $priority < self :: PRIORITY_MEDIUM ) && ( $new_retrial > 3 )) {
$priority = self :: PRIORITY_MEDIUM ;
} elseif (( $priority < self :: PRIORITY_LOW ) && ( $new_retrial > 6 )) {
$priority = self :: PRIORITY_LOW ;
} elseif (( $priority < self :: PRIORITY_NEGLIGIBLE ) && ( $new_retrial > 8 )) {
$priority = self :: PRIORITY_NEGLIGIBLE ;
2019-02-08 21:48:41 +00:00
}
2019-08-27 21:01:11 +02:00
Logger :: info ( 'Deferred task' , [ 'id' => $id , 'retrial' => $new_retrial , 'created' => $queue [ 'created' ], 'next_execution' => $next , 'old_prio' => $queue [ 'priority' ], 'new_prio' => $priority ]);
2018-10-15 05:19:35 +00:00
2019-02-09 23:10:15 +00:00
$stamp = ( float ) microtime ( true );
2019-08-11 10:28:52 +02:00
$fields = [ 'retrial' => $new_retrial , 'next_try' => $next , 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 , 'priority' => $priority ];
2018-10-15 05:19:35 +00:00
DBA :: update ( 'workerqueue' , $fields , [ 'id' => $id ]);
2019-02-09 23:10:15 +00:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2019-08-20 07:39:13 +00:00
return true ;
2018-10-15 05:19:35 +00:00
}
2020-10-17 12:39:42 +00:00
/**
* Check if the system is inside the defined maintenance window
*
2022-06-16 22:12:38 +02:00
* @ param bool $check_last_execution Whether check last execution
2020-10-17 12:39:42 +00:00
* @ return boolean
*/
2022-06-16 22:12:38 +02:00
public static function isInMaintenanceWindow ( bool $check_last_execution = false ) : bool
2020-10-17 12:39:42 +00:00
{
// Calculate the seconds of the start end end of the maintenance window
$start = strtotime ( DI :: config () -> get ( 'system' , 'maintenance_start' )) % 86400 ;
$end = strtotime ( DI :: config () -> get ( 'system' , 'maintenance_end' )) % 86400 ;
Logger :: info ( 'Maintenance window' , [ 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
if ( $check_last_execution ) {
// Calculate the window duration
$duration = max ( $start , $end ) - min ( $start , $end );
// Quit when the last cron execution had been after the previous window
2022-12-29 20:30:19 +01:00
$last_cron = DI :: keyValue () -> get ( 'last_cron_daily' );
2020-10-17 12:39:42 +00:00
if ( $last_cron + $duration > time ()) {
Logger :: info ( 'The Daily cron had been executed recently' , [ 'last' => date ( DateTimeFormat :: MYSQL , $last_cron ), 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
return false ;
}
}
$current = time () % 86400 ;
if ( $start < $end ) {
// Execute if we are inside the window
$execute = ( $current >= $start ) && ( $current <= $end );
} else {
// Don't execute if we are outside the window
$execute = ! (( $current > $end ) && ( $current < $start ));
}
2020-10-17 15:54:52 +02:00
if ( $execute ) {
2020-10-17 12:39:42 +00:00
Logger :: info ( 'We are inside the maintenance window' , [ 'current' => date ( 'H:i:s' , $current ), 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
2020-10-17 15:54:52 +02:00
} else {
Logger :: info ( 'We are outside the maintenance window' , [ 'current' => date ( 'H:i:s' , $current ), 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
2020-10-17 12:39:42 +00:00
}
2021-07-24 22:08:33 +00:00
2020-10-17 12:39:42 +00:00
return $execute ;
}
2017-11-05 10:33:46 +00:00
}