Skip to content

Commit

Permalink
Merge pull request #23 from webgriffe/issue-22
Browse files Browse the repository at this point in the history
Add lock mechanism to consume and enqueue commands (#22)
  • Loading branch information
mmenozzi authored Nov 23, 2020
2 parents aec0a92 + b702ef8 commit f1e125e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ To make all importers work automatically the following is the suggested crontab:

It will enqueue the update of all attribute options every hour and it will import, every minute, all products that have been modified since the last execution, along with their associations.

Both enqueue and consume commands uses a [lock mechanism](https://symfony.com/doc/current/console/lockable_trait.html) which prevents to run them multiple times.

## Architecture & customization

This plugin has basically two main entry points:
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"cocur/slugify": "^4.0",
"guzzlehttp/guzzle": "^6.5",
"sylius/sylius": "^1.8",
"symfony/deprecation-contracts": "^2.2"
"symfony/deprecation-contracts": "^2.2",
"symfony/lock": "^4.4|^5.0"
},
"require-dev": {
"ext-json": "*",
Expand Down
13 changes: 13 additions & 0 deletions src/Command/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\Persistence\ManagerRegistry;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Webgriffe\SyliusAkeneoPlugin\ImporterInterface;
Expand All @@ -15,6 +16,8 @@

final class ConsumeCommand extends Command
{
use LockableTrait;

protected static $defaultName = 'webgriffe:akeneo:consume';

/** @var QueueItemRepositoryInterface */
Expand Down Expand Up @@ -47,6 +50,12 @@ protected function configure(): void
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');

return 0;
}

$queueItems = $this->queueItemRepository->findAllToImport();
foreach ($queueItems as $queueItem) {
$akeneoIdentifier = $queueItem->getAkeneoIdentifier();
Expand All @@ -60,6 +69,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
/** @var EntityManagerInterface $objectManager */
$objectManager = $this->managerRegistry->getManager();
if (!$objectManager->isOpen()) {
$this->release();

throw $t;
}
$queueItem->setErrorMessage($t->getMessage() . \PHP_EOL . $t->getTraceAsString());
Expand All @@ -84,6 +95,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
);
}

$this->release();

return 0;
}

Expand Down
11 changes: 11 additions & 0 deletions src/Command/EnqueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Sylius\Component\Resource\Factory\FactoryInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
Expand All @@ -18,6 +19,8 @@

final class EnqueueCommand extends Command
{
use LockableTrait;

public const SINCE_OPTION_NAME = 'since';

public const SINCE_FILE_OPTION_NAME = 'since-file';
Expand Down Expand Up @@ -114,6 +117,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
);
}

if (!$this->lock()) {
$output->writeln('The command is already running in another process.');

return 0;
}

$runDate = $this->dateTimeBuilder->build();
foreach ($this->getImporters($input) as $importer) {
$identifiers = $importer->getIdentifiersModifiedSince($sinceDate);
Expand Down Expand Up @@ -153,6 +162,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$this->writeSinceDateFile($sinceFilePath, $runDate);
}

$this->release();

return 0;
}

Expand Down

0 comments on commit f1e125e

Please sign in to comment.