Merge pull request #7499 from annando/new_defer
Worker: The retrial value can now skip retrial levels
This commit is contained in:
commit
e9fc2af1c3
2 changed files with 42 additions and 8 deletions
|
@ -1193,6 +1193,32 @@ class Worker
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the next retrial level for worker jobs.
|
||||||
|
* This function will skip levels when jobs are older.
|
||||||
|
*
|
||||||
|
* @param array $queue Worker queue entry
|
||||||
|
* @param integer $max_level maximum retrial level
|
||||||
|
* @return integer the next retrial level value
|
||||||
|
*/
|
||||||
|
private static function getNextRetrial($queue, $max_level)
|
||||||
|
{
|
||||||
|
$created = strtotime($queue['created']);
|
||||||
|
$retrial_time = time() - $created;
|
||||||
|
|
||||||
|
$new_retrial = $queue['retrial'] + 1;
|
||||||
|
$total = 0;
|
||||||
|
for ($retrial = 0; $retrial <= $max_level + 1; ++$retrial) {
|
||||||
|
$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
|
||||||
|
$total += $delay;
|
||||||
|
if (($total < $retrial_time) && ($retrial > $queue['retrial'])) {
|
||||||
|
$new_retrial = $retrial;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Logger::info('New retrial for task', ['id' => $queue['id'], 'created' => $queue['created'], 'old' => $queue['retrial'], 'new' => $new_retrial]);
|
||||||
|
return $new_retrial;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defers the current worker entry
|
* Defers the current worker entry
|
||||||
*/
|
*/
|
||||||
|
@ -1208,27 +1234,31 @@ class Worker
|
||||||
$id = $queue['id'];
|
$id = $queue['id'];
|
||||||
$priority = $queue['priority'];
|
$priority = $queue['priority'];
|
||||||
|
|
||||||
if ($retrial > 14) {
|
$max_level = Config::get('system', 'worker_defer_limit');
|
||||||
Logger::log('Id ' . $id . ' had been tried 14 times. We stop now.', Logger::DEBUG);
|
|
||||||
|
$new_retrial = self::getNextRetrial($queue, $max_level);
|
||||||
|
|
||||||
|
if ($new_retrial > $max_level) {
|
||||||
|
Logger::info('The task exceeded the maximum retry count', ['id' => $id, 'max_level' => $max_level, 'retrial' => $new_retrial]);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the delay until the next trial
|
// Calculate the delay until the next trial
|
||||||
$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
|
$delay = (($new_retrial + 2) ** 4) + (rand(1, 30) * ($new_retrial));
|
||||||
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
|
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
|
||||||
|
|
||||||
if (($priority < PRIORITY_MEDIUM) && ($retrial > 2)) {
|
if (($priority < PRIORITY_MEDIUM) && ($new_retrial > 3)) {
|
||||||
$priority = PRIORITY_MEDIUM;
|
$priority = PRIORITY_MEDIUM;
|
||||||
} elseif (($priority < PRIORITY_LOW) && ($retrial > 5)) {
|
} elseif (($priority < PRIORITY_LOW) && ($new_retrial > 6)) {
|
||||||
$priority = PRIORITY_LOW;
|
$priority = PRIORITY_LOW;
|
||||||
} elseif (($priority < PRIORITY_NEGLIGIBLE) && ($retrial > 7)) {
|
} elseif (($priority < PRIORITY_NEGLIGIBLE) && ($new_retrial > 8)) {
|
||||||
$priority = PRIORITY_NEGLIGIBLE;
|
$priority = PRIORITY_NEGLIGIBLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue['priority'] . '/' . $priority, Logger::DEBUG);
|
Logger::info('Deferred task', ['id' => $id, 'retrial' => $new_retrial, 'next_execution' => $next, 'old_prio' => $queue['priority'], 'new_prio' => $priority]);
|
||||||
|
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
$fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority];
|
$fields = ['retrial' => $new_retrial, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority];
|
||||||
DBA::update('workerqueue', $fields, ['id' => $id]);
|
DBA::update('workerqueue', $fields, ['id' => $id]);
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
self::$db_duration_write += (microtime(true) - $stamp);
|
self::$db_duration_write += (microtime(true) - $stamp);
|
||||||
|
|
|
@ -427,6 +427,10 @@ return [
|
||||||
// Setting 0 would allow maximum worker queues at all times, which is not recommended.
|
// Setting 0 would allow maximum worker queues at all times, which is not recommended.
|
||||||
'worker_load_exponent' => 3,
|
'worker_load_exponent' => 3,
|
||||||
|
|
||||||
|
// worker_defer_limit (Integer)
|
||||||
|
// Per default the systems tries delivering for 15 times before dropping it.
|
||||||
|
'worker_defer_limit' => 15,
|
||||||
|
|
||||||
// xrd_timeout (Integer)
|
// xrd_timeout (Integer)
|
||||||
// Timeout in seconds for fetching the XRD links.
|
// Timeout in seconds for fetching the XRD links.
|
||||||
'xrd_timeout' => 20,
|
'xrd_timeout' => 20,
|
||||||
|
|
Loading…
Reference in a new issue