Skip to content

Commit

Permalink
BUG: Doorman runner job locking fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mfendeksilverstripe committed Jan 14, 2021
1 parent e259183 commit b83fc4e
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 40 deletions.
2 changes: 1 addition & 1 deletion _config/queuedjobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ SilverStripe\Core\Injector\Injector:
Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner:
properties:
DefaultRules:
- '%$DefaultRule'
DefaultRule: '%$DefaultRule'

SilverStripe\SiteConfig\SiteConfig:
extensions:
Expand Down
18 changes: 13 additions & 5 deletions src/DataObjects/QueuedJobDescriptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,13 @@ public function getJobTypeValues()
}

/**
* @return FieldList
* List all possible job statuses, useful for forms and filters
*
* @return array
*/
public function getCMSFields()
public function getJobStatusValues(): array
{
$fields = parent::getCMSFields();

$statuses = [
return [
QueuedJob::STATUS_NEW,
QueuedJob::STATUS_INIT,
QueuedJob::STATUS_RUN,
Expand All @@ -373,7 +373,15 @@ public function getCMSFields()
QueuedJob::STATUS_CANCELLED,
QueuedJob::STATUS_BROKEN,
];
}

/**
* @return FieldList
*/
public function getCMSFields()
{
$fields = parent::getCMSFields();
$statuses = $this->getJobStatusValues();
$runAs = $fields->fieldByName('Root.Main.RunAsID');

$fields->removeByName([
Expand Down
144 changes: 110 additions & 34 deletions src/Tasks/Engines/DoormanRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
namespace Symbiote\QueuedJobs\Tasks\Engines;

use AsyncPHP\Doorman\Manager\ProcessManager;
use SilverStripe\Core\ClassInfo;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Environment;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
use Symbiote\QueuedJobs\Jobs\DoormanQueuedJobTask;
use Symbiote\QueuedJobs\Services\QueuedJob;
Expand All @@ -16,6 +17,33 @@
*/
class DoormanRunner extends BaseRunner implements TaskRunnerEngine
{
use Configurable;

/**
* How many ticks are executed per one @see runQueue method call
* set 0 for unlimited ticks
*
* @config
* @var int
*/
private static $max_ticks = 0;

/**
* How many seconds between ticks
*
* @config
* @var int
*/
private static $tick_interval = 1;

/**
* Name of the dev task used to run the child process
*
* @config
* @var string
*/
private static $child_runner = 'ProcessJobQueueChildTask';

/**
* @var string[]
*/
Expand Down Expand Up @@ -48,10 +76,13 @@ public function getDefaultRules()
*/
public function runQueue($queue)
{
// check if queue can be processed
$service = QueuedJobService::singleton();
$logger = $service->getLogger();

// check if queue can be processed
if ($service->isAtMaxJobs()) {
$service->getLogger()->info('Not processing queue as jobs are at max initialisation limit.');
$logger->info('Not processing queue as jobs are at max initialisation limit.');

return;
}

Expand All @@ -60,68 +91,113 @@ public function runQueue($queue)
/** @var ProcessManager $manager */
$manager = Injector::inst()->create(ProcessManager::class);
$manager->setWorker(
BASE_PATH . "/vendor/silverstripe/framework/cli-script.php dev/tasks/ProcessJobQueueChildTask"
sprintf(
'%s/vendor/silverstripe/framework/cli-script.php dev/tasks/%s',
BASE_PATH,
$this->getChildRunner()
)
);

$logPath = Environment::getEnv('SS_DOORMAN_LOGPATH');

if ($logPath) {
$manager->setLogPath($logPath);
}

// Assign default rules
$defaultRules = $this->getDefaultRules();

if ($defaultRules) {
foreach ($defaultRules as $rule) {
if (!$rule) {
continue;
}

$manager->addRule($rule);
}
}

$descriptor = $this->getNextJobDescriptorWithoutMutex($queue);
$tickCount = 0;
$maxTicks = $this->getMaxTicks();
$descriptor = $service->getNextPendingJob($queue);

while ($manager->tick() || $descriptor) {
if (QueuedJobService::singleton()->isMaintenanceLockActive()) {
$service->getLogger()->info('Skipped queued job descriptor since maintenance log is active.');
if ($service->isMaintenanceLockActive()) {
$logger->info('Skipped queued job descriptor since maintenance log is active.');

return;
}

$this->logDescriptorStatus($descriptor, $queue);

if ($descriptor instanceof QueuedJobDescriptor) {
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->write();
if ($maxTicks > 0 && $tickCount >= $maxTicks) {
$logger->info(sprintf('Tick count has hit max ticks %d >= %d', $tickCount, $maxTicks));

$manager->addTask(new DoormanQueuedJobTask($descriptor));
return;
}

sleep(1);
if ($service->isAtMaxJobs()) {
$logger->info(
sprintf(
'Not processing queue as all job are at max limit. %s',
ClassInfo::shortName($service)
)
);
} elseif ($descriptor) {
$logger->info(sprintf('Next pending job is: %d', $descriptor->ID));
$this->logDescriptorStatus($descriptor, $queue);

if ($descriptor instanceof QueuedJobDescriptor) {
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->write();

$manager->addTask(new DoormanQueuedJobTask($descriptor));
}
} else {
$logger->info('Next pending job could NOT be found or lock could NOT be obtained.');
}

$descriptor = $this->getNextJobDescriptorWithoutMutex($queue);
$tickCount += 1;
sleep($this->getTickInterval());
$descriptor = $service->getNextPendingJob($queue);
}
}

/**
* @param string $queue
* @return null|QueuedJobDescriptor
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return int
*/
protected function getNextJobDescriptorWithoutMutex($queue)
protected function getMaxTicks(): int
{
$list = QueuedJobDescriptor::get()
->filter('JobType', $queue)
->sort('ID', 'ASC');
return (int) $this->config()->get('max_ticks');
}

$descriptor = $list
->filter('JobStatus', QueuedJob::STATUS_WAIT)
->first();
/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return int
*/
protected function getTickInterval(): int
{
return (int) $this->config()->get('tick_interval');
}

if ($descriptor) {
return $descriptor;
}
/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return string
*/
protected function getChildRunner(): string
{
return (string) $this->config()->get('child_runner');
}

return $list
->filter('JobStatus', QueuedJob::STATUS_NEW)
->where(sprintf(
'"StartAfter" < \'%s\' OR "StartAfter" IS NULL',
DBDatetime::now()->getValue()
))
->first();
/**
* @param string $queue
* @return QueuedJobDescriptor|null
* @deprecated 5.0
*/
protected function getNextJobDescriptorWithoutMutex($queue)
{
return $this->getService()->getNextPendingJob($queue);
}
}

0 comments on commit b83fc4e

Please sign in to comment.