From 4e476f3d1ff682b83c88a69b652f7d23b8eeeb69 Mon Sep 17 00:00:00 2001 From: Mojmir Fendek Date: Tue, 17 Mar 2020 09:26:57 +1300 Subject: [PATCH] BUG: Doorman runner job locking fix --- _config/queuedjobs.yml | 2 +- src/DataObjects/QueuedJobDescriptor.php | 18 ++- src/Tasks/Engines/DoormanRunner.php | 144 ++++++++++++++++++------ 3 files changed, 124 insertions(+), 40 deletions(-) diff --git a/_config/queuedjobs.yml b/_config/queuedjobs.yml index a2863b6a..f0e3785b 100644 --- a/_config/queuedjobs.yml +++ b/_config/queuedjobs.yml @@ -24,7 +24,7 @@ SilverStripe\Core\Injector\Injector: Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner: properties: DefaultRules: - - '%$DefaultRule' + DefaultRule: '%$DefaultRule' SilverStripe\SiteConfig\SiteConfig: extensions: diff --git a/src/DataObjects/QueuedJobDescriptor.php b/src/DataObjects/QueuedJobDescriptor.php index 16af124e..9bca9d08 100644 --- a/src/DataObjects/QueuedJobDescriptor.php +++ b/src/DataObjects/QueuedJobDescriptor.php @@ -357,13 +357,13 @@ public function getJobTypeValues() } /** - * @return FieldList + * List all possible job statuses, useful for forms and filters + * + * @return array */ - public function getCMSFields() + public function getJobStatusValues(): array { - $fields = parent::getCMSFields(); - - $statuses = [ + return [ QueuedJob::STATUS_NEW, QueuedJob::STATUS_INIT, QueuedJob::STATUS_RUN, @@ -373,7 +373,15 @@ public function getCMSFields() QueuedJob::STATUS_CANCELLED, QueuedJob::STATUS_BROKEN, ]; + } + /** + * @return FieldList + */ + public function getCMSFields() + { + $fields = parent::getCMSFields(); + $statuses = $this->getJobStatusValues(); $runAs = $fields->fieldByName('Root.Main.RunAsID'); $fields->removeByName([ diff --git a/src/Tasks/Engines/DoormanRunner.php b/src/Tasks/Engines/DoormanRunner.php index 502620ab..f8454520 100644 --- a/src/Tasks/Engines/DoormanRunner.php +++ b/src/Tasks/Engines/DoormanRunner.php @@ -3,9 +3,10 @@ namespace Symbiote\QueuedJobs\Tasks\Engines; use AsyncPHP\Doorman\Manager\ProcessManager; +use SilverStripe\Core\ClassInfo; +use SilverStripe\Core\Config\Configurable; use SilverStripe\Core\Environment; use SilverStripe\Core\Injector\Injector; -use SilverStripe\ORM\FieldType\DBDatetime; use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; use Symbiote\QueuedJobs\Jobs\DoormanQueuedJobTask; use Symbiote\QueuedJobs\Services\QueuedJob; @@ -16,6 +17,33 @@ */ class DoormanRunner extends BaseRunner implements TaskRunnerEngine { + use Configurable; + + /** + * How many ticks are executed per one @see runQueue method call + * set 0 for unlimited ticks + * + * @config + * @var int + */ + private static $max_ticks = 0; + + /** + * How many seconds between ticks + * + * @config + * @var int + */ + private static $tick_interval = 1; + + /** + * Name of the dev task used to run the child process + * + * @config + * @var string + */ + private static $child_runner = 'ProcessJobQueueChildTask'; + /** * @var string[] */ @@ -48,10 +76,13 @@ public function getDefaultRules() */ public function runQueue($queue) { - // check if queue can be processed $service = QueuedJobService::singleton(); + $logger = $service->getLogger(); + + // check if queue can be processed if ($service->isAtMaxJobs()) { - $service->getLogger()->info('Not processing queue as jobs are at max initialisation limit.'); + $logger->info('Not processing queue as jobs are at max initialisation limit.'); + return; } @@ -60,68 +91,113 @@ public function runQueue($queue) /** @var ProcessManager $manager */ $manager = Injector::inst()->create(ProcessManager::class); $manager->setWorker( - BASE_PATH . "/vendor/silverstripe/framework/cli-script.php dev/tasks/ProcessJobQueueChildTask" + sprintf( + '%s/vendor/silverstripe/framework/cli-script.php dev/tasks/%s', + BASE_PATH, + $this->getChildRunner() + ) ); + $logPath = Environment::getEnv('SS_DOORMAN_LOGPATH'); + if ($logPath) { $manager->setLogPath($logPath); } // Assign default rules $defaultRules = $this->getDefaultRules(); + if ($defaultRules) { foreach ($defaultRules as $rule) { + if (!$rule) { + continue; + } + $manager->addRule($rule); } } - $descriptor = $this->getNextJobDescriptorWithoutMutex($queue); + $tickCount = 0; + $maxTicks = $this->getMaxTicks(); + $descriptor = $service->getNextPendingJob($queue); while ($manager->tick() || $descriptor) { - if (QueuedJobService::singleton()->isMaintenanceLockActive()) { - $service->getLogger()->info('Skipped queued job descriptor since maintenance log is active.'); + if ($service->isMaintenanceLockActive()) { + $logger->info('Skipped queued job descriptor since maintenance log is active.'); + return; } - $this->logDescriptorStatus($descriptor, $queue); - - if ($descriptor instanceof QueuedJobDescriptor) { - $descriptor->JobStatus = QueuedJob::STATUS_INIT; - $descriptor->write(); + if ($maxTicks > 0 && $tickCount >= $maxTicks) { + $logger->info(sprintf('Tick count has hit max ticks %d >= %d', $tickCount, $maxTicks)); - $manager->addTask(new DoormanQueuedJobTask($descriptor)); + return; } - sleep(1); + if ($service->isAtMaxJobs()) { + $logger->info( + sprintf( + 'Not processing queue as all job are at max limit. %s', + ClassInfo::shortName($service) + ) + ); + } elseif ($descriptor) { + $logger->info(sprintf('Next pending job is: %d', $descriptor->ID)); + $this->logDescriptorStatus($descriptor, $queue); + + if ($descriptor instanceof QueuedJobDescriptor) { + $descriptor->JobStatus = QueuedJob::STATUS_INIT; + $descriptor->write(); + + $manager->addTask(new DoormanQueuedJobTask($descriptor)); + } + } else { + $logger->info('Next pending job could NOT be found or lock could NOT be obtained.'); + } - $descriptor = $this->getNextJobDescriptorWithoutMutex($queue); + $tickCount += 1; + sleep($this->getTickInterval()); + $descriptor = $service->getNextPendingJob($queue); } } /** - * @param string $queue - * @return null|QueuedJobDescriptor + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return int */ - protected function getNextJobDescriptorWithoutMutex($queue) + protected function getMaxTicks(): int { - $list = QueuedJobDescriptor::get() - ->filter('JobType', $queue) - ->sort('ID', 'ASC'); + return (int) $this->config()->get('max_ticks'); + } - $descriptor = $list - ->filter('JobStatus', QueuedJob::STATUS_WAIT) - ->first(); + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return int + */ + protected function getTickInterval(): int + { + return (int) $this->config()->get('tick_interval'); + } - if ($descriptor) { - return $descriptor; - } + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return string + */ + protected function getChildRunner(): string + { + return (string) $this->config()->get('child_runner'); + } - return $list - ->filter('JobStatus', QueuedJob::STATUS_NEW) - ->where(sprintf( - '"StartAfter" < \'%s\' OR "StartAfter" IS NULL', - DBDatetime::now()->getValue() - )) - ->first(); + /** + * @param string $queue + * @return QueuedJobDescriptor|null + * @deprecated 5.0 + */ + protected function getNextJobDescriptorWithoutMutex($queue) + { + return $this->getService()->getNextPendingJob($queue); } }