diff --git a/include/poller.php b/include/poller.php index 839e5c11bb..0c5bebbda5 100644 --- a/include/poller.php +++ b/include/poller.php @@ -73,6 +73,10 @@ function poller_run($argv, $argc){ while ($r = poller_worker_process()) { + if (!poller_claim_process($r[0])) { + continue; + } + // Check free memory if ($a->min_memory_reached()) { return; @@ -108,43 +112,22 @@ function poller_execute($queue) { // Quit when in maintenance if (Config::get('system', 'maintenance', true)) { + logger("Maintenance mode - quit process ".$mypid, LOGGER_DEBUG); return false; } // Constantly check the number of parallel database processes if ($a->max_processes_reached()) { + logger("Max processes reached for process ".$mypid, LOGGER_DEBUG); return false; } // Constantly check the number of available database connections to let the frontend be accessible at any time if (poller_max_connections_reached()) { + logger("Max connection reached for process ".$mypid, LOGGER_DEBUG); return false; } - if (!dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), - array('id' => $queue["id"], 'pid' => 0))) { - logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); - dba::commit(); - return true; - } - - // Assure that there are no tasks executed twice - $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); - if (!$id) { - logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG); - dba::commit(); - return true; - } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) { - logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); - dba::commit(); - return true; - } elseif ($id[0]["pid"] != $mypid) { - logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); - dba::commit(); - return true; - } - dba::commit(); - $argv = json_decode($queue["parameter"]); // Check for existance and validity of the include file @@ -566,7 +549,7 @@ function poller_worker_process() { // Are there waiting processes with a higher priority than the currently highest? $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' AND `priority` < %d - ORDER BY `priority`, `created` LIMIT 1", + ORDER BY `priority`, `created` LIMIT 1 FOR UPDATE", dbesc(NULL_DATE), intval($highest_priority)); if (dbm::is_result($r)) { @@ -575,18 +558,70 @@ function poller_worker_process() { // Give slower processes some processing time $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' AND `priority` > %d - ORDER BY `priority`, `created` LIMIT 1", + ORDER BY `priority`, `created` LIMIT 1 FOR UPDATE", dbesc(NULL_DATE), intval($highest_priority)); + + if (dbm::is_result($r)) { + return $r; + } } // If there is no result (or we shouldn't pass lower processes) we check without priority limit - if (($highest_priority == 0) OR !dbm::is_result($r)) { - $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE)); + if (!dbm::is_result($r)) { + $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1 FOR UPDATE", dbesc(NULL_DATE)); } + return $r; } +/** + * @brief Assigns a workerqueue entry to the current process + * + * All the checks after the update are only needed with MyISAM. + * + * @param array $queue Workerqueue entry + * + * @return boolean "true" if the claiming was successful + */ +function poller_claim_process($queue) { + $mypid = getmypid(); + + if (!dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), + array('id' => $queue["id"], 'pid' => 0))) { + logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG); + dba::commit(); + return false; + } + + // Assure that there are no tasks executed twice + $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"])); + if (!$id) { + logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG); + dba::commit(); + return false; + } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) { + logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG); + dba::commit(); + return false; + } elseif ($id[0]["pid"] != $mypid) { + logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG); + dba::commit(); + return false; + } + dba::commit(); + return true; +} + +/** + * @brief Removes a workerqueue entry from the current process + */ +function poller_unclaim_process() { + $mypid = getmypid(); + + dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid)); +} + /** * @brief Call the front end worker */ @@ -683,6 +718,8 @@ function poller_run_cron() { if (array_search(__file__,get_included_files())===0){ poller_run($_SERVER["argv"],$_SERVER["argc"]); + poller_unclaim_process(); + get_app()->end_process(); killme(); diff --git a/mod/worker.php b/mod/worker.php index 62f9bd3dde..947656ab7c 100644 --- a/mod/worker.php +++ b/mod/worker.php @@ -41,11 +41,15 @@ function worker_init($a){ // But since it doesn't destroy anything, we just try to get more execution time in any way. set_time_limit(0); - poller_execute($r[0]); + if (poller_claim_process($r[0])) { + poller_execute($r[0]); + } } call_worker(); + poller_unclaim_process(); + $a->end_process(); logger("Front end worker ended: ".getmypid());