Skip to content

Commit

Permalink
BUG: Doorman runner job locking fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mfendeksilverstripe committed Mar 16, 2020
1 parent ca269d0 commit 8658f7b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 44 deletions.
2 changes: 1 addition & 1 deletion _config/queuedjobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ SilverStripe\Core\Injector\Injector:
Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner:
properties:
DefaultRules:
- '%$DefaultRule'
DefaultRule: '%$DefaultRule'

SilverStripe\SiteConfig\SiteConfig:
extensions:
Expand Down
2 changes: 1 addition & 1 deletion src/Services/QueuedJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ protected function copyDescriptorToJob($jobDescriptor, $job)
*
* @param string $type Job type
*
* @return QueuedJobDescriptor|false
* @return QueuedJobDescriptor|null
*/
public function getNextPendingJob($type = null)
{
Expand Down
150 changes: 108 additions & 42 deletions src/Tasks/Engines/DoormanRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
namespace Symbiote\QueuedJobs\Tasks\Engines;

use AsyncPHP\Doorman\Manager\ProcessManager;
use SilverStripe\Core\ClassInfo;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Environment;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
use Symbiote\QueuedJobs\Jobs\DoormanQueuedJobTask;
use Symbiote\QueuedJobs\Services\QueuedJob;
Expand All @@ -16,11 +17,68 @@
*/
class DoormanRunner extends BaseRunner implements TaskRunnerEngine
{
use Configurable;

/**
* How many ticks are executed per one @see runQueue method call
* set 0 for unlimited ticks
*
* @config
* @var int
*/
private static $max_ticks = 0;

/**
* How many seconds between ticks
*
* @config
* @var int
*/
private static $tick_interval = 1;

/**
* Name of the dev task used to run the child process
*
* @config
* @var string
*/
private static $child_runner = 'ProcessJobQueueChildTask';

/**
* @var string[]
*/
protected $defaultRules = [];

/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return int
*/
protected function getMaxTicks(): int
{
return (int) $this->config()->get('max_ticks');
}

/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return int
*/
protected function getTickInterval(): int
{
return (int) $this->config()->get('tick_interval');
}

/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return string
*/
protected function getChildRunner(): string
{
return (string) $this->config()->get('child_runner');
}

/**
* Assign default rules for this task
*
Expand Down Expand Up @@ -48,10 +106,13 @@ public function getDefaultRules()
*/
public function runQueue($queue)
{
// check if queue can be processed
$service = QueuedJobService::singleton();
$logger = $service->getLogger();

// check if queue can be processed
if ($service->isAtMaxJobs()) {
$service->getLogger()->info('Not processing queue as jobs are at max initialisation limit.');
$logger->info('Not processing queue as jobs are at max initialisation limit.');

return;
}

Expand All @@ -60,68 +121,73 @@ public function runQueue($queue)
/** @var ProcessManager $manager */
$manager = Injector::inst()->create(ProcessManager::class);
$manager->setWorker(
BASE_PATH . "/vendor/silverstripe/framework/cli-script.php dev/tasks/ProcessJobQueueChildTask"
sprintf(
'%s/vendor/silverstripe/framework/cli-script.php dev/tasks/%s',
BASE_PATH,
$this->getChildRunner()
)
);

$logPath = Environment::getEnv('SS_DOORMAN_LOGPATH');

if ($logPath) {
$manager->setLogPath($logPath);
}

// Assign default rules
$defaultRules = $this->getDefaultRules();

if ($defaultRules) {
foreach ($defaultRules as $rule) {
if (!$rule) {
continue;
}

$manager->addRule($rule);
}
}

$descriptor = $this->getNextJobDescriptorWithoutMutex($queue);
$tickCount = 0;
$maxTicks = $this->getMaxTicks();
$descriptor = $service->getNextPendingJob($queue);

while ($manager->tick() || $descriptor) {
if (QueuedJobService::singleton()->isMaintenanceLockActive()) {
$service->getLogger()->info('Skipped queued job descriptor since maintenance log is active.');
if ($service->isMaintenanceLockActive()) {
$logger->info('Skipped queued job descriptor since maintenance log is active.');

return;
}

$this->logDescriptorStatus($descriptor, $queue);
if ($maxTicks > 0 && $tickCount >= $maxTicks) {
$logger->info(sprintf('Tick count has hit max ticks %d >= %d', $tickCount, $maxTicks));

if ($descriptor instanceof QueuedJobDescriptor) {
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->write();

$manager->addTask(new DoormanQueuedJobTask($descriptor));
return;
}

sleep(1);

$descriptor = $this->getNextJobDescriptorWithoutMutex($queue);
}
}

/**
* @param string $queue
* @return null|QueuedJobDescriptor
*/
protected function getNextJobDescriptorWithoutMutex($queue)
{
$list = QueuedJobDescriptor::get()
->filter('JobType', $queue)
->sort('ID', 'ASC');

$descriptor = $list
->filter('JobStatus', QueuedJob::STATUS_WAIT)
->first();
if ($service->isAtMaxJobs()) {
$logger->info(
sprintf(
'Not processing queue as all job are at max limit. %s',
ClassInfo::shortName($service)
)
);
} elseif ($descriptor) {
$logger->info(sprintf('Next pending job is: %d', $descriptor->ID));
$this->logDescriptorStatus($descriptor, $queue);

if ($descriptor instanceof QueuedJobDescriptor) {
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->write();

$manager->addTask(new DoormanQueuedJobTask($descriptor));
}
} else {
$logger->info('Next pending job could NOT be found or lock could NOT be obtained.');
}

if ($descriptor) {
return $descriptor;
$tickCount += 1;
sleep($this->getTickInterval());
$descriptor = $service->getNextPendingJob($queue);
}

return $list
->filter('JobStatus', QueuedJob::STATUS_NEW)
->where(sprintf(
'"StartAfter" < \'%s\' OR "StartAfter" IS NULL',
DBDatetime::now()->getValue()
))
->first();
}
}

0 comments on commit 8658f7b

Please sign in to comment.