diff --git a/src/Services/QueuedJobService.php b/src/Services/QueuedJobService.php index 0093cdc1..621f7ae6 100644 --- a/src/Services/QueuedJobService.php +++ b/src/Services/QueuedJobService.php @@ -31,6 +31,7 @@ use Symbiote\QueuedJobs\Jobs\RunBuildTaskJob; use Symbiote\QueuedJobs\QJUtils; use Symbiote\QueuedJobs\Tasks\Engines\TaskRunnerEngine; +use Throwable; /** * A service that can be used for starting, stopping and listing queued jobs. @@ -753,7 +754,7 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor) }); return true; - } catch (\Throwable $e) { + } catch (Throwable $e) { // note that error here may not be an issue as failing to acquire a job lock is a valid state // which happens when other process claimed the job lock first $this->getLogger()->debug( @@ -829,7 +830,13 @@ public function runJob($jobId) $job = null; try { - $job = $this->initialiseJob($jobDescriptor); + try { + $job = $this->initialiseJob($jobDescriptor); + } catch (Throwable $e) { + $this->handleJobInitialisationException($jobDescriptor, $e); + $broken = true; + $this->finaliseLogging($logger); + } // get the job ready to begin. if (!$jobDescriptor->JobStarted) { @@ -911,7 +918,7 @@ public function runJob($jobId) try { $job->process(); - } catch (\Throwable $e) { + } catch (Throwable $e) { $logger->error($e->getMessage(), ['exception' => $e]); $this->markJobAsBroken($jobDescriptor); $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e); @@ -991,29 +998,12 @@ public function runJob($jobId) $this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job); } - } catch (\Throwable $e) { - // PHP 7 Error handling) + } catch (Throwable $e) { $this->handleBrokenJobException($jobDescriptor, $job, $e); $broken = true; } - // Write any remaining batched messages at the end. - if ($logger instanceof Logger) { - foreach ($logger->getHandlers() as $handler) { - if ($handler instanceof BufferHandler) { - $handler->flush(); - } - } - } - - // If using a global singleton logger here, - // any messages added after this point will be auto-flushed on PHP shutdown through the handler. - // This causes a database write, and assumes the database and table will be available at this point. - if ($logger instanceof Logger) { - $logger->setHandlers(array_filter($logger->getHandlers() ?? [], function ($handler) { - return !($handler instanceof BufferHandler); - })); - } + $this->finaliseLogging($logger); }); $this->unsetRunAsUser($runAsUser, $originalUser); @@ -1021,6 +1011,27 @@ public function runJob($jobId) return !$broken; } + protected function finaliseLogging(LoggerInterface $logger) + { + if (!$logger instanceof Logger) { + return; + } + + // Write any remaining batched messages at the end. + foreach ($logger->getHandlers() as $handler) { + if ($handler instanceof BufferHandler) { + $handler->flush(); + } + } + + // If using a global singleton logger here, + // any messages added after this point will be auto-flushed on PHP shutdown through the handler. + // This causes a database write, and assumes the database and table will be available at this point. + $logger->setHandlers(array_filter($logger->getHandlers() ?? [], function ($handler) { + return !($handler instanceof BufferHandler); + })); + } + /** * Provides a wrapper when executing arbitrary code contained in job implementation * this ensures that job specific code doesn't alter the configuration of the queue runner execution @@ -1044,19 +1055,27 @@ protected function withNestedState(callable $callback) } } + protected function handleJobInitialisationException(QueuedJobDescriptor $jobDescriptor, Throwable $e) + { + $this->getLogger()->info( + $e->getMessage(), + ['exception' => $e] + ); + $this->markJobAsBroken($jobDescriptor); + $this->extend('updateJobDescriptorOnInitialisationException', $jobDescriptor, $e); + $jobDescriptor->write(); + } + /** * @param QueuedJobDescriptor $jobDescriptor * @param QueuedJob $job - * @param Exception|\Throwable $e + * @param Exception|Throwable $e */ protected function handleBrokenJobException(QueuedJobDescriptor $jobDescriptor, QueuedJob $job, $e) { - // okay, we'll just catch this exception for now $this->getLogger()->info( $e->getMessage(), - [ - 'exception' => $e, - ] + ['exception' => $e] ); $this->markJobAsBroken($jobDescriptor); $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e);