From b702ef8340c26432e152b137d0b760ca0e1a4e48 Mon Sep 17 00:00:00 2001 From: Manuele Menozzi Date: Fri, 20 Nov 2020 18:13:00 +0100 Subject: [PATCH] Add lock mechanism to consume and enqueue commands (#22) --- README.md | 2 ++ composer.json | 3 ++- src/Command/ConsumeCommand.php | 13 +++++++++++++ src/Command/EnqueueCommand.php | 11 +++++++++++ 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cf8b59d8..53299d1e 100644 --- a/README.md +++ b/README.md @@ -403,6 +403,8 @@ To make all importers work automatically the following is the suggested crontab: It will enqueue the update of all attribute options every hour and it will import, every minute, all products that have been modified since the last execution, along with their associations. +Both enqueue and consume commands uses a [lock mechanism](https://symfony.com/doc/current/console/lockable_trait.html) which prevents to run them multiple times. + ## Architecture & customization This plugin has basically two main entry points: diff --git a/composer.json b/composer.json index 703e570b..877e1406 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,8 @@ "cocur/slugify": "^4.0", "guzzlehttp/guzzle": "^6.5", "sylius/sylius": "^1.8", - "symfony/deprecation-contracts": "^2.2" + "symfony/deprecation-contracts": "^2.2", + "symfony/lock": "^4.4|^5.0" }, "require-dev": { "ext-json": "*", diff --git a/src/Command/ConsumeCommand.php b/src/Command/ConsumeCommand.php index 3457893d..0b8319d6 100644 --- a/src/Command/ConsumeCommand.php +++ b/src/Command/ConsumeCommand.php @@ -7,6 +7,7 @@ use Doctrine\ORM\EntityManagerInterface; use Doctrine\Persistence\ManagerRegistry; use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Command\LockableTrait; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Webgriffe\SyliusAkeneoPlugin\ImporterInterface; @@ -15,6 +16,8 @@ final class ConsumeCommand extends Command { + use LockableTrait; + protected static $defaultName = 'webgriffe:akeneo:consume'; /** @var QueueItemRepositoryInterface */ @@ -47,6 +50,12 @@ protected function configure(): void */ protected function execute(InputInterface $input, OutputInterface $output): int { + if (!$this->lock()) { + $output->writeln('The command is already running in another process.'); + + return 0; + } + $queueItems = $this->queueItemRepository->findAllToImport(); foreach ($queueItems as $queueItem) { $akeneoIdentifier = $queueItem->getAkeneoIdentifier(); @@ -60,6 +69,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** @var EntityManagerInterface $objectManager */ $objectManager = $this->managerRegistry->getManager(); if (!$objectManager->isOpen()) { + $this->release(); + throw $t; } $queueItem->setErrorMessage($t->getMessage() . \PHP_EOL . $t->getTraceAsString()); @@ -84,6 +95,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int ); } + $this->release(); + return 0; } diff --git a/src/Command/EnqueueCommand.php b/src/Command/EnqueueCommand.php index dbd90174..2e7c4539 100644 --- a/src/Command/EnqueueCommand.php +++ b/src/Command/EnqueueCommand.php @@ -6,6 +6,7 @@ use Sylius\Component\Resource\Factory\FactoryInterface; use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Command\LockableTrait; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; @@ -18,6 +19,8 @@ final class EnqueueCommand extends Command { + use LockableTrait; + public const SINCE_OPTION_NAME = 'since'; public const SINCE_FILE_OPTION_NAME = 'since-file'; @@ -114,6 +117,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int ); } + if (!$this->lock()) { + $output->writeln('The command is already running in another process.'); + + return 0; + } + $runDate = $this->dateTimeBuilder->build(); foreach ($this->getImporters($input) as $importer) { $identifiers = $importer->getIdentifiersModifiedSince($sinceDate); @@ -153,6 +162,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $this->writeSinceDateFile($sinceFilePath, $runDate); } + $this->release(); + return 0; }