-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #5 from Headoo/cme-softdeletable-tu
Cme softdeletable tu
- Loading branch information
Showing
11 changed files
with
642 additions
and
177 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
<?php | ||
|
||
namespace Headoo\ElasticSearchBundle\Command; | ||
|
||
use Doctrine\ORM\EntityManager; | ||
use Headoo\ElasticSearchBundle\Helper\ElasticSearchHelper; | ||
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; | ||
use Symfony\Component\Console\Helper\ProgressBar; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
|
||
abstract class AbstractCommand extends ContainerAwareCommand | ||
{ | ||
const LINE_LENGTH = 70; | ||
const CLEAR_LINE = "\r\e[2K\r"; | ||
|
||
const EXIT_SUCCESS = 0; | ||
const EXIT_FAILED = 127; | ||
|
||
/** @var OutputInterface */ | ||
protected $output; | ||
/** @var integer */ | ||
protected $threads; | ||
/** @var integer */ | ||
protected $limit; | ||
/** @var integer */ | ||
protected $offset; | ||
/** @var bool */ | ||
protected $batch = false; | ||
/** @var string */ | ||
protected $type; | ||
/** @var array */ | ||
protected $aTypes; | ||
/** @var ElasticSearchHelper */ | ||
protected $elasticSearchHelper; | ||
/** @var array */ | ||
protected $mappings; | ||
/** @var EntityManager */ | ||
protected $entityManager; | ||
/** @var bool */ | ||
protected $reset = false; | ||
/** @var bool $verbose More verbose */ | ||
protected $verbose = false; | ||
/** @var bool $dryRun Do not make any change on ES */ | ||
protected $dryRun = false; | ||
|
||
/** | ||
* @param InputInterface $input | ||
* @param OutputInterface $output | ||
*/ | ||
protected function init(InputInterface $input, OutputInterface $output) | ||
{ | ||
$this->readOption($input); | ||
|
||
$this->output = $output; | ||
$this->elasticSearchHelper = $this->getContainer()->get('headoo.elasticsearch.helper'); | ||
$this->mappings = $this->getContainer()->getParameter('elastica_mappings'); | ||
$this->entityManager = $this->getContainer()->get('doctrine.orm.entity_manager'); | ||
|
||
$this->aTypes = array_keys($this->mappings); | ||
} | ||
|
||
/** | ||
* @param InputInterface $input | ||
*/ | ||
protected function readOption(InputInterface $input) | ||
{ | ||
$this->limit = $input->getOption('limit') ?: null; | ||
$this->offset = $input->getOption('offset') ?: 0; | ||
$this->type = $input->getOption('type'); | ||
$this->batch = $input->getOption('batch'); | ||
|
||
if ($input->hasOption('reset')) { | ||
$this->reset = $input->getOption('reset'); | ||
} | ||
|
||
if ($input->hasOption('threads')) { | ||
$this->threads = $input->getOption('threads') ?: 2; | ||
} | ||
|
||
if ($input->hasOption('dry-run')) { | ||
$this->dryRun = $input->getOption('dry-run'); | ||
} | ||
|
||
if ($input->hasOption('verbose')) { | ||
$this->verbose = $input->getOption('verbose'); | ||
} | ||
} | ||
|
||
/** | ||
* @param OutputInterface $output | ||
* @param int $max | ||
* @return ProgressBar | ||
*/ | ||
protected function getProgressBar($output, $max) | ||
{ | ||
$max = ($max > 0) ? $max : 1; | ||
$progressBar = new ProgressBar($output, $max); | ||
|
||
if ($this->verbose) { | ||
$progressBar->setFormat('%message% Doc. %percent%% [%bar%] (%elapsed% - %estimated%) (%memory%)' . "\r"); | ||
} else { | ||
$progressBar->setFormat('%message% %percent%% [%bar%] (%elapsed% - %estimated%)' . "\r"); | ||
} | ||
|
||
$progressBar->setMessage(''); | ||
$progressBar->start(); | ||
|
||
return $progressBar; | ||
} | ||
|
||
/** | ||
* @param $msg | ||
* @return string | ||
*/ | ||
protected function completeLine($msg) | ||
{ | ||
$nbrAstrix = (self::LINE_LENGTH - strlen($msg) - 2) / 2; | ||
|
||
if ($nbrAstrix <= 0) { | ||
return $msg; | ||
} | ||
|
||
$sAstrix = str_repeat('*', $nbrAstrix); | ||
$sReturn = "$sAstrix $msg $sAstrix"; | ||
|
||
return (strlen($sReturn) == self::LINE_LENGTH) | ||
? $sReturn | ||
: $sReturn . '*'; | ||
} | ||
|
||
/** | ||
* @param string $sType | ||
* @return \Elastica\Index | ||
*/ | ||
protected function getIndexFromType($sType) | ||
{ | ||
$connection = $this->mappings[$sType]['connection']; | ||
|
||
$index_name = $this->getContainer() | ||
->get('headoo.elasticsearch.handler') | ||
->getIndexName($sType); | ||
|
||
return $this->elasticSearchHelper | ||
->getClient($connection) | ||
->getIndex($index_name); | ||
} | ||
|
||
/** | ||
* @param string $sType | ||
* @return \Doctrine\ORM\EntityRepository | ||
*/ | ||
protected function getRepositoryFromType($sType) | ||
{ | ||
return $this->entityManager->getRepository($this->mappings[$sType]['class']); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
<?php | ||
|
||
namespace Headoo\ElasticSearchBundle\Command; | ||
|
||
use Elastica\Query; | ||
use Elastica\ResultSet; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Input\InputOption; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
|
||
class ExodusElasticCommand extends AbstractCommand | ||
{ | ||
protected $nbrDocumentsFound = 0; | ||
protected $counterDocumentTested = 0; | ||
protected $counterEntitiesRemoved = 0; | ||
|
||
protected function configure() | ||
{ | ||
$this | ||
->setName('headoo:elastic:exodus') | ||
->setDescription('Remove not linked entities from Elastic Search') | ||
->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Limit For selected Type', 0) | ||
->addOption('offset', 'o', InputOption::VALUE_OPTIONAL, 'Offset For selected Type', 0) | ||
->addOption('type', 't', InputOption::VALUE_OPTIONAL, 'Type of document you want to exodus. You must to have configure it before use', null) | ||
->addOption('batch', 'b', InputOption::VALUE_OPTIONAL, 'Number of Documents per batch', null) | ||
->addOption('dry-run', 'd', InputOption::VALUE_OPTIONAL, 'Dry run: do not make any change on ES', false); | ||
} | ||
|
||
/** | ||
* @param InputInterface $input | ||
* @param OutputInterface $output | ||
* @return int | ||
*/ | ||
protected function execute(InputInterface $input, OutputInterface $output) | ||
{ | ||
$this->init($input, $output); | ||
|
||
$this->output->writeln("<info>" . $this->completeLine($this->getName() . " " . date('H:i:s')) . "</info>"); | ||
|
||
if ($this->verbose) { | ||
$sMsg = ($this->type) ? "Type: {$this->type}\n" : ""; | ||
$sMsg .= "Offset: {$this->offset}\nBatch: {$this->batch}"; | ||
$this->output->writeln($sMsg); | ||
} | ||
|
||
if ($this->type) { | ||
return $this->_switchType($this->type); | ||
} | ||
|
||
$returnValue = AbstractCommand::EXIT_SUCCESS; | ||
|
||
foreach ($this->aTypes as $type) { | ||
$result = $this->_switchType($type); | ||
$returnValue = ($result == AbstractCommand::EXIT_SUCCESS) ? | ||
$returnValue : | ||
$result; | ||
} | ||
|
||
return $returnValue; | ||
} | ||
|
||
/** | ||
* @param $type | ||
* @return int | ||
*/ | ||
private function _switchType($type) | ||
{ | ||
if (!in_array($type, $this->aTypes)) { | ||
$this->output->writeln($this->completeLine("Wrong Type")); | ||
return AbstractCommand::EXIT_FAILED; | ||
} | ||
|
||
return $this->processBatch($type); | ||
} | ||
|
||
/** | ||
* @param \Elastica\Type $type | ||
* @param \Doctrine\ORM\EntityRepository $repository | ||
* @param ResultSet $resultSet | ||
*/ | ||
private function removeFromElasticSearch($type, $repository, $resultSet) | ||
{ | ||
foreach ($resultSet as $result) { | ||
$this->counterDocumentTested++; | ||
$documentId = $result->getDocument()->getId(); | ||
|
||
# Look up into Doctrine the associated Entity with the given document | ||
$entity = $repository->find($documentId); | ||
|
||
if (!is_null($entity)) { | ||
continue; | ||
} | ||
|
||
if ($this->verbose) { | ||
$this->output->writeln(self::CLEAR_LINE . "Entity not found: {$documentId}"); | ||
} | ||
|
||
# Remove document from ElasticSearch | ||
$this->counterEntitiesRemoved++; | ||
$response = $type->deleteById($documentId); | ||
|
||
if ($response->hasError()&& $this->verbose) { | ||
$this->output->writeln(self::CLEAR_LINE . "\tError: {$response->getError()}"); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* @param string $sType | ||
* @return int | ||
*/ | ||
private function countAllResult($sType) | ||
{ | ||
$index = $this->getIndexFromType($sType); | ||
$query = new Query(); | ||
|
||
return $index->count($query); | ||
} | ||
|
||
/** | ||
* @param string $sType | ||
* @return int | ||
*/ | ||
private function processBatch($sType) | ||
{ | ||
$this->output->writeln('<info>' . $this->completeLine("Start Exodus {$sType}") . '</info>'); | ||
|
||
$repository = $this->getRepositoryFromType($sType); | ||
$index = $this->getIndexFromType($sType); | ||
|
||
$from = $this->offset; | ||
$countTotalDocuments = $this->countAllResult($sType); | ||
|
||
$progressBar = $this->getProgressBar($this->output, $countTotalDocuments - $this->offset); | ||
$this->initCounter(); | ||
|
||
do { | ||
$query = new Query(); | ||
$query->setFrom($from); | ||
$query->setSize($this->batch); | ||
|
||
# Get documents from ElasticSearch | ||
$resultSet = $index->search($query); | ||
|
||
$this->removeFromElasticSearch($index->getType($sType), $repository, $resultSet); | ||
|
||
$addMore = $this->getNextStep($this->batch, $this->offset, $from, $this->limit); | ||
$from += $this->batch; | ||
|
||
$progressBar->setMessage("$from/$countTotalDocuments"); | ||
$progressBar->advance(count($resultSet)); | ||
|
||
} while ((count($resultSet) > 0) && ($addMore > 0)); | ||
|
||
$progressBar->finish(); | ||
|
||
$this->output->writeln(self::CLEAR_LINE . "{$sType}: Documents tested: {$this->counterDocumentTested} Entities removed: {$this->counterEntitiesRemoved}"); | ||
$this->output->writeln('<info>' . $this->completeLine("Finish Exodus {$sType}") . '</info>'); | ||
|
||
return AbstractCommand::EXIT_SUCCESS; | ||
} | ||
|
||
/** | ||
* @param int $batch Number of document to get each loop | ||
* @param int $offset Offset | ||
* @param int $from The last query FROM | ||
* @param int $limit The number of total result | ||
* @return mixed | ||
*/ | ||
private function getNextStep($batch, $offset, $from, $limit) | ||
{ | ||
# No limit | ||
if (empty($limit) || $limit <= 0) { | ||
return $batch; | ||
} | ||
|
||
$resultRest = ($offset + $limit) - $from; | ||
|
||
return ($resultRest > $batch) ? | ||
$batch : | ||
$resultRest; | ||
} | ||
|
||
private function initCounter() | ||
{ | ||
$this->counterEntitiesRemoved = 0; | ||
$this->counterDocumentTested = 0; | ||
$this->nbrDocumentsFound = 0; | ||
} | ||
|
||
} |
Oops, something went wrong.