From 8658f7bef489499ad3a8724427240ffedc4b3e02 Mon Sep 17 00:00:00 2001 From: Mojmir Fendek Date: Tue, 17 Mar 2020 09:26:57 +1300 Subject: [PATCH] BUG: Doorman runner job locking fix --- _config/queuedjobs.yml | 2 +- src/Services/QueuedJobService.php | 2 +- src/Tasks/Engines/DoormanRunner.php | 150 ++++++++++++++++++++-------- 3 files changed, 110 insertions(+), 44 deletions(-) diff --git a/_config/queuedjobs.yml b/_config/queuedjobs.yml index da3da2a4..e97c9f99 100644 --- a/_config/queuedjobs.yml +++ b/_config/queuedjobs.yml @@ -23,7 +23,7 @@ SilverStripe\Core\Injector\Injector: Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner: properties: DefaultRules: - - '%$DefaultRule' + DefaultRule: '%$DefaultRule' SilverStripe\SiteConfig\SiteConfig: extensions: diff --git a/src/Services/QueuedJobService.php b/src/Services/QueuedJobService.php index 9fa76732..1c80ddcd 100644 --- a/src/Services/QueuedJobService.php +++ b/src/Services/QueuedJobService.php @@ -372,7 +372,7 @@ protected function copyDescriptorToJob($jobDescriptor, $job) * * @param string $type Job type * - * @return QueuedJobDescriptor|false + * @return QueuedJobDescriptor|null */ public function getNextPendingJob($type = null) { diff --git a/src/Tasks/Engines/DoormanRunner.php b/src/Tasks/Engines/DoormanRunner.php index 502620ab..09f97994 100644 --- a/src/Tasks/Engines/DoormanRunner.php +++ b/src/Tasks/Engines/DoormanRunner.php @@ -3,9 +3,10 @@ namespace Symbiote\QueuedJobs\Tasks\Engines; use AsyncPHP\Doorman\Manager\ProcessManager; +use SilverStripe\Core\ClassInfo; +use SilverStripe\Core\Config\Configurable; use SilverStripe\Core\Environment; use SilverStripe\Core\Injector\Injector; -use SilverStripe\ORM\FieldType\DBDatetime; use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; use Symbiote\QueuedJobs\Jobs\DoormanQueuedJobTask; use Symbiote\QueuedJobs\Services\QueuedJob; @@ -16,11 +17,68 @@ */ class DoormanRunner extends BaseRunner implements TaskRunnerEngine { + use Configurable; + + /** + * How many ticks are executed per one @see runQueue method call + * set 0 for unlimited ticks + * + * @config + * @var int + */ + private static $max_ticks = 0; + + /** + * How many seconds between ticks + * + * @config + * @var int + */ + private static $tick_interval = 1; + + /** + * Name of the dev task used to run the child process + * + * @config + * @var string + */ + private static $child_runner = 'ProcessJobQueueChildTask'; + /** * @var string[] */ protected $defaultRules = []; + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return int + */ + protected function getMaxTicks(): int + { + return (int) $this->config()->get('max_ticks'); + } + + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return int + */ + protected function getTickInterval(): int + { + return (int) $this->config()->get('tick_interval'); + } + + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return string + */ + protected function getChildRunner(): string + { + return (string) $this->config()->get('child_runner'); + } + /** * Assign default rules for this task * @@ -48,10 +106,13 @@ public function getDefaultRules() */ public function runQueue($queue) { - // check if queue can be processed $service = QueuedJobService::singleton(); + $logger = $service->getLogger(); + + // check if queue can be processed if ($service->isAtMaxJobs()) { - $service->getLogger()->info('Not processing queue as jobs are at max initialisation limit.'); + $logger->info('Not processing queue as jobs are at max initialisation limit.'); + return; } @@ -60,68 +121,73 @@ public function runQueue($queue) /** @var ProcessManager $manager */ $manager = Injector::inst()->create(ProcessManager::class); $manager->setWorker( - BASE_PATH . "/vendor/silverstripe/framework/cli-script.php dev/tasks/ProcessJobQueueChildTask" + sprintf( + '%s/vendor/silverstripe/framework/cli-script.php dev/tasks/%s', + BASE_PATH, + $this->getChildRunner() + ) ); + $logPath = Environment::getEnv('SS_DOORMAN_LOGPATH'); + if ($logPath) { $manager->setLogPath($logPath); } // Assign default rules $defaultRules = $this->getDefaultRules(); + if ($defaultRules) { foreach ($defaultRules as $rule) { + if (!$rule) { + continue; + } + $manager->addRule($rule); } } - $descriptor = $this->getNextJobDescriptorWithoutMutex($queue); + $tickCount = 0; + $maxTicks = $this->getMaxTicks(); + $descriptor = $service->getNextPendingJob($queue); while ($manager->tick() || $descriptor) { - if (QueuedJobService::singleton()->isMaintenanceLockActive()) { - $service->getLogger()->info('Skipped queued job descriptor since maintenance log is active.'); + if ($service->isMaintenanceLockActive()) { + $logger->info('Skipped queued job descriptor since maintenance log is active.'); + return; } - $this->logDescriptorStatus($descriptor, $queue); + if ($maxTicks > 0 && $tickCount >= $maxTicks) { + $logger->info(sprintf('Tick count has hit max ticks %d >= %d', $tickCount, $maxTicks)); - if ($descriptor instanceof QueuedJobDescriptor) { - $descriptor->JobStatus = QueuedJob::STATUS_INIT; - $descriptor->write(); - - $manager->addTask(new DoormanQueuedJobTask($descriptor)); + return; } - sleep(1); - - $descriptor = $this->getNextJobDescriptorWithoutMutex($queue); - } - } - - /** - * @param string $queue - * @return null|QueuedJobDescriptor - */ - protected function getNextJobDescriptorWithoutMutex($queue) - { - $list = QueuedJobDescriptor::get() - ->filter('JobType', $queue) - ->sort('ID', 'ASC'); - - $descriptor = $list - ->filter('JobStatus', QueuedJob::STATUS_WAIT) - ->first(); + if ($service->isAtMaxJobs()) { + $logger->info( + sprintf( + 'Not processing queue as all job are at max limit. %s', + ClassInfo::shortName($service) + ) + ); + } elseif ($descriptor) { + $logger->info(sprintf('Next pending job is: %d', $descriptor->ID)); + $this->logDescriptorStatus($descriptor, $queue); + + if ($descriptor instanceof QueuedJobDescriptor) { + $descriptor->JobStatus = QueuedJob::STATUS_INIT; + $descriptor->write(); + + $manager->addTask(new DoormanQueuedJobTask($descriptor)); + } + } else { + $logger->info('Next pending job could NOT be found or lock could NOT be obtained.'); + } - if ($descriptor) { - return $descriptor; + $tickCount += 1; + sleep($this->getTickInterval()); + $descriptor = $service->getNextPendingJob($queue); } - - return $list - ->filter('JobStatus', QueuedJob::STATUS_NEW) - ->where(sprintf( - '"StartAfter" < \'%s\' OR "StartAfter" IS NULL', - DBDatetime::now()->getValue() - )) - ->first(); } }