mirror of
https://github.com/friendica/friendica
synced 2025-01-11 00:04:54 +00:00
Merge pull request #2774 from annando/1609-worker-processes
New process table for a better detection of running workers
This commit is contained in:
commit
949a5f722a
9 changed files with 74 additions and 19 deletions
39
boot.php
39
boot.php
|
@ -38,7 +38,7 @@ define ( 'FRIENDICA_PLATFORM', 'Friendica');
|
||||||
define ( 'FRIENDICA_CODENAME', 'Asparagus');
|
define ( 'FRIENDICA_CODENAME', 'Asparagus');
|
||||||
define ( 'FRIENDICA_VERSION', '3.5-dev' );
|
define ( 'FRIENDICA_VERSION', '3.5-dev' );
|
||||||
define ( 'DFRN_PROTOCOL_VERSION', '2.23' );
|
define ( 'DFRN_PROTOCOL_VERSION', '2.23' );
|
||||||
define ( 'DB_UPDATE_VERSION', 1201 );
|
define ( 'DB_UPDATE_VERSION', 1202 );
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Constant with a HTML line break.
|
* @brief Constant with a HTML line break.
|
||||||
|
@ -1099,6 +1099,42 @@ class App {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Log active processes into the "process" table
|
||||||
|
*/
|
||||||
|
function start_process() {
|
||||||
|
$trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1);
|
||||||
|
|
||||||
|
$command = basename($trace[0]["file"]);
|
||||||
|
|
||||||
|
$this->remove_inactive_processes();
|
||||||
|
|
||||||
|
$r = q("SELECT `pid` FROM `process` WHERE `pid` = %d", intval(getmypid()));
|
||||||
|
if(!dbm::is_result($r))
|
||||||
|
q("INSERT INTO `process` (`pid`,`command`,`created`) VALUES (%d, '%s', '%s')",
|
||||||
|
intval(getmypid()),
|
||||||
|
dbesc($command),
|
||||||
|
dbesc(datetime_convert()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Remove inactive processes
|
||||||
|
*/
|
||||||
|
function remove_inactive_processes() {
|
||||||
|
$r = q("SELECT `pid` FROM `process`");
|
||||||
|
if(dbm::is_result($r))
|
||||||
|
foreach ($r AS $process)
|
||||||
|
if (!posix_kill($process["pid"], 0))
|
||||||
|
q("DELETE FROM `process` WHERE `pid` = %d", intval($process["pid"]));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Remove the active process from the "process" table
|
||||||
|
*/
|
||||||
|
function end_process() {
|
||||||
|
q("DELETE FROM `process` WHERE `pid` = %d", intval(getmypid()));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Returns a string with a callstack. Can be used for logging.
|
* @brief Returns a string with a callstack. Can be used for logging.
|
||||||
*
|
*
|
||||||
|
@ -1699,6 +1735,7 @@ function login($register = false, $hiddens=false) {
|
||||||
* @brief Used to end the current process, after saving session state.
|
* @brief Used to end the current process, after saving session state.
|
||||||
*/
|
*/
|
||||||
function killme() {
|
function killme() {
|
||||||
|
|
||||||
if (!get_app()->is_backend())
|
if (!get_app()->is_backend())
|
||||||
session_write_close();
|
session_write_close();
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ function cron_run(&$argv, &$argc){
|
||||||
unset($db_host, $db_user, $db_pass, $db_data);
|
unset($db_host, $db_user, $db_pass, $db_data);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
require_once('include/session.php');
|
require_once('include/session.php');
|
||||||
require_once('include/datetime.php');
|
require_once('include/datetime.php');
|
||||||
require_once('include/items.php');
|
require_once('include/items.php');
|
||||||
|
|
|
@ -27,7 +27,6 @@ function cronjobs_run(&$argv, &$argc){
|
||||||
unset($db_host, $db_user, $db_pass, $db_data);
|
unset($db_host, $db_user, $db_pass, $db_data);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
require_once('include/session.php');
|
require_once('include/session.php');
|
||||||
require_once('include/datetime.php');
|
require_once('include/datetime.php');
|
||||||
require_once('include/ostatus.php');
|
require_once('include/ostatus.php');
|
||||||
|
|
|
@ -1114,6 +1114,17 @@ function db_definition() {
|
||||||
"choice" => array("choice"),
|
"choice" => array("choice"),
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
$database["process"] = array(
|
||||||
|
"fields" => array(
|
||||||
|
"pid" => array("type" => "int(10) unsigned", "not null" => "1", "primary" => "1"),
|
||||||
|
"command" => array("type" => "varchar(32)", "not null" => "1", "default" => ""),
|
||||||
|
"created" => array("type" => "datetime", "not null" => "1", "default" => "0000-00-00 00:00:00"),
|
||||||
|
),
|
||||||
|
"indexes" => array(
|
||||||
|
"PRIMARY" => array("pid"),
|
||||||
|
"command" => array("command"),
|
||||||
|
)
|
||||||
|
);
|
||||||
$database["profile"] = array(
|
$database["profile"] = array(
|
||||||
"fields" => array(
|
"fields" => array(
|
||||||
"id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"),
|
"id" => array("type" => "int(11)", "not null" => "1", "extra" => "auto_increment", "primary" => "1"),
|
||||||
|
|
|
@ -24,7 +24,6 @@ function onepoll_run(&$argv, &$argc){
|
||||||
unset($db_host, $db_user, $db_pass, $db_data);
|
unset($db_host, $db_user, $db_pass, $db_data);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
require_once('include/session.php');
|
require_once('include/session.php');
|
||||||
require_once('include/datetime.php');
|
require_once('include/datetime.php');
|
||||||
require_once('include/items.php');
|
require_once('include/items.php');
|
||||||
|
|
|
@ -29,6 +29,10 @@ function poller_run(&$argv, &$argc){
|
||||||
unset($db_host, $db_user, $db_pass, $db_data);
|
unset($db_host, $db_user, $db_pass, $db_data);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
$a->start_process();
|
||||||
|
|
||||||
|
$mypid = getmypid();
|
||||||
|
|
||||||
if ($a->max_processes_reached())
|
if ($a->max_processes_reached())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -81,15 +85,19 @@ function poller_run(&$argv, &$argc){
|
||||||
|
|
||||||
q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'",
|
q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'",
|
||||||
dbesc(datetime_convert()),
|
dbesc(datetime_convert()),
|
||||||
intval(getmypid()),
|
intval($mypid),
|
||||||
intval($r[0]["id"]));
|
intval($r[0]["id"]));
|
||||||
|
|
||||||
// Assure that there are no tasks executed twice
|
// Assure that there are no tasks executed twice
|
||||||
$id = q("SELECT `id` FROM `workerqueue` WHERE `id` = %d AND `pid` = %d",
|
$id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
|
||||||
intval($r[0]["id"]),
|
|
||||||
intval(getmypid()));
|
|
||||||
if (!$id) {
|
if (!$id) {
|
||||||
logger("Queue item ".$r[0]["id"]." was executed multiple times - skip this execution", LOGGER_DEBUG);
|
logger("Queue item ".$r[0]["id"]." vanished - skip this execution", LOGGER_DEBUG);
|
||||||
|
continue;
|
||||||
|
} elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
|
||||||
|
logger("Entry for queue item ".$r[0]["id"]." wasn't stored - we better stop here", LOGGER_DEBUG);
|
||||||
|
return;
|
||||||
|
} elseif ($id[0]["pid"] != $mypid) {
|
||||||
|
logger("Queue item ".$r[0]["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,15 +119,15 @@ function poller_run(&$argv, &$argc){
|
||||||
$funcname = str_replace(".php", "", basename($argv[0]))."_run";
|
$funcname = str_replace(".php", "", basename($argv[0]))."_run";
|
||||||
|
|
||||||
if (function_exists($funcname)) {
|
if (function_exists($funcname)) {
|
||||||
logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]);
|
logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]);
|
||||||
$funcname($argv, $argc);
|
$funcname($argv, $argc);
|
||||||
|
|
||||||
if ($cooldown > 0) {
|
if ($cooldown > 0) {
|
||||||
logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
|
logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
|
||||||
sleep($cooldown);
|
sleep($cooldown);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger("Process ".getmypid()." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done");
|
logger("Process ".$mypid." - Prio ".$r[0]["priority"]." - ID ".$r[0]["id"].": ".$funcname." - done");
|
||||||
|
|
||||||
q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
|
q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($r[0]["id"]));
|
||||||
} else
|
} else
|
||||||
|
@ -319,13 +327,16 @@ function poller_too_much_workers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
function poller_active_workers() {
|
function poller_active_workers() {
|
||||||
$workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
|
$workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
|
||||||
|
|
||||||
return($workers[0]["workers"]);
|
return($workers[0]["processes"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (array_search(__file__,get_included_files())===0){
|
if (array_search(__file__,get_included_files())===0){
|
||||||
poller_run($_SERVER["argv"],$_SERVER["argc"]);
|
poller_run($_SERVER["argv"],$_SERVER["argc"]);
|
||||||
killme();
|
|
||||||
|
get_app()->end_process();
|
||||||
|
|
||||||
|
killme();
|
||||||
}
|
}
|
||||||
?>
|
?>
|
||||||
|
|
|
@ -17,7 +17,6 @@ function queue_run(&$argv, &$argc){
|
||||||
unset($db_host, $db_user, $db_pass, $db_data);
|
unset($db_host, $db_user, $db_pass, $db_data);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
require_once("include/session.php");
|
require_once("include/session.php");
|
||||||
require_once("include/datetime.php");
|
require_once("include/datetime.php");
|
||||||
require_once('include/items.php');
|
require_once('include/items.php');
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
define('UPDATE_VERSION' , 1201);
|
define('UPDATE_VERSION' , 1202);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in a new issue