Skip to content

Commit

Permalink
Merge pull request #7 from Headoo/cme-exodus
Browse files Browse the repository at this point in the history
Command Exodus uses Scroll
  • Loading branch information
corentinheadoo authored Jul 27, 2017
2 parents afb8dfb + 5d52143 commit 85a0fdf
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 138 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

app/
vendor/

coverage\.clover
Expand Down
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ install:

script:
- mkdir -p build/logs
- phpunit --coverage-text --coverage-clover=coverage.clover
- phpunit --coverage-clover build/logs/clover.xml
- vendor/bin/phpunit --coverage-text --coverage-clover build/logs/clover.xml

after_success:
- vendor/bin/test-reporter
Expand Down
42 changes: 28 additions & 14 deletions Command/AbstractCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ abstract class AbstractCommand extends ContainerAwareCommand
/** @var integer */
protected $threads;
/** @var integer */
protected $limit;
protected $limit = null;
/** @var integer */
protected $offset;
protected $offset = 0;
/** @var bool */
protected $batch = false;
/** @var string */
Expand Down Expand Up @@ -103,11 +103,17 @@ protected function getOptionsToString($excludedOption = [])
*/
protected function readOption(InputInterface $input)
{
$this->limit = $input->getOption('limit') ?: null;
$this->offset = $input->getOption('offset') ?: 0;
$this->type = $input->getOption('type');
$this->batch = $input->getOption('batch') ?: $this->batch;

if ($input->hasOption('limit')) {
$this->limit = $input->getOption('limit');
}

if ($input->hasOption('offset')) {
$this->offset = $input->getOption('offset');
}

if ($input->hasOption('reset')) {
$this->reset = $input->getOption('reset');
}
Expand Down Expand Up @@ -135,12 +141,11 @@ protected function getProgressBar($output, $max)
$max = ($max > 0) ? $max : 1;
$progressBar = new ProgressBar($output, $max);

if ($this->verbose) {
$progressBar->setFormat('%message% Doc. %percent%% [%bar%] (%elapsed% - %estimated%) (%memory%)' . "\r");
} else {
$progressBar->setFormat('%message% %percent%% [%bar%] (%elapsed% - %estimated%)' . "\r");
}
$sFormat = ($this->verbose)
? '%message% Doc. %percent%% [%bar%] (%elapsed% - %remaining%) (%memory%)' . "\r"
: '%message% %percent%% [%bar%] (%elapsed% - %remaining%)' . "\r";

$progressBar->setFormat($sFormat);
$progressBar->setMessage('');
$progressBar->start();

Expand All @@ -151,7 +156,7 @@ protected function getProgressBar($output, $max)
* @param $msg
* @return string
*/
protected function completeLine($msg)
static public function completeLine($msg)
{
$nbrAstrix = (self::LINE_LENGTH - strlen($msg) - 4) / 2;

Expand All @@ -173,14 +178,12 @@ protected function completeLine($msg)
*/
protected function getIndexFromType($sType)
{
$connection = $this->mappings[$sType]['connection'];

$index_name = $this->getContainer()
->get('headoo.elasticsearch.handler')
->getIndexName($sType);

return $this->elasticSearchHelper
->getClient($connection)
return $this
->getClient($sType)
->getIndex($index_name);
}

Expand All @@ -193,4 +196,15 @@ protected function getRepositoryFromType($sType)
return $this->entityManager->getRepository($this->mappings[$sType]['class']);
}

/**
* @param string $sType
* @return \Elastica\Client
*/
protected function getClient($sType)
{
$connection = $this->mappings[$sType]['connection'];

return $this->elasticSearchHelper->getClient($connection);
}

}
69 changes: 21 additions & 48 deletions Command/ExodusElasticCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use Elastica\Query;
use Elastica\ResultSet;
use Elastica\Scroll;
use Elastica\Search;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
Expand All @@ -13,17 +15,15 @@ class ExodusElasticCommand extends AbstractCommand
protected $nbrDocumentsFound = 0;
protected $counterDocumentTested = 0;
protected $counterEntitiesRemoved = 0;
protected $nbrDone = 0;

/** @var int */
protected $batch = 100;

protected function configure()
{
$this
->setName('headoo:elastic:exodus')
$this->setName('headoo:elastic:exodus')
->setDescription('Remove not linked entities from Elastic Search')
->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Limit For selected Type', 0)
->addOption('offset', 'o', InputOption::VALUE_OPTIONAL, 'Offset For selected Type', 0)
->addOption('type', 't', InputOption::VALUE_OPTIONAL, 'Type of document you want to exodus. You must to have configure it before use', null)
->addOption('batch', 'b', InputOption::VALUE_OPTIONAL, 'Number of Documents per batch', null)
->addOption('dry-run', 'd', InputOption::VALUE_OPTIONAL, 'Dry run: do not make any change on ES', false);
Expand All @@ -38,11 +38,11 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$this->init($input, $output);

$this->output->writeln("<info>" . $this->completeLine($this->getName() . " " . date('H:i:s')) . "</info>");
$this->output->writeln("<info>" . self::completeLine($this->getName() . " " . date('H:i:s')) . "</info>");

if ($this->verbose) {
$sMsg = ($this->type) ? "Type: {$this->type}\n" : "";
$sMsg .= "Offset: {$this->offset}\nBatch: {$this->batch}";
$sMsg .= "Batch: {$this->batch}";
$this->output->writeln($sMsg);
}

Expand All @@ -69,7 +69,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
private function _switchType($type)
{
if (!in_array($type, $this->aTypes)) {
$this->output->writeln($this->completeLine("Wrong Type"));
$this->output->writeln(self::completeLine("Wrong Type"));
return AbstractCommand::EXIT_FAILED;
}

Expand Down Expand Up @@ -130,76 +130,49 @@ private function countAllResult($sType)
*/
private function processBatch($sType)
{
$this->output->writeln('<info>' . $this->completeLine("Start Exodus '{$sType}'") . '</info>');
$this->output->writeln('<info>' . self::completeLine("Start Exodus '{$sType}'") . '</info>');

$repository = $this->getRepositoryFromType($sType);
$index = $this->getIndexFromType($sType);

$from = $this->offset;
$countTotalDocuments = $this->countAllResult($sType);

$progressBar = $this->getProgressBar($this->output, $countTotalDocuments - $this->offset);
$progressBar = $this->getProgressBar($this->output, $countTotalDocuments);
$this->initCounter();

do {
unset($resultSet);

$query = new Query();
$query->setFrom($from);
$query->setSize($this->batch);
$search = new Search($this->getClient($sType));
$search->addIndex($index)->addType($sType);
$search->scroll('10m');

# Get documents from ElasticSearch
$resultSet = $index->search($query);
unset($query);
$scroll = new Scroll($search);

foreach ($scroll as $scrollId => $resultSet) {
$this->removeFromElasticSearch($index->getType($sType), $repository, $resultSet);

$addMore = $this->getNextStep($this->batch, $this->offset, $from, $this->limit);
$from += $this->batch;

$progressBar->setMessage("$from/$countTotalDocuments");
$progressBar->advance(count($resultSet));

} while ((count($resultSet) > 0) && ($addMore > 0));
$this->nbrDone += $this->batch;
$progressBar->setMessage("{$this->nbrDone}/{$countTotalDocuments}");
$progressBar->advance($resultSet->count());
unset($resultSet);
}

$progressBar->finish();

unset($progressBar);

$this->output->writeln(self::CLEAR_LINE . "{$sType}: Documents tested: {$this->counterDocumentTested} Entities removed: {$this->counterEntitiesRemoved}");
$this->output->writeln('<info>' . $this->completeLine("Finish Exodus '{$sType}'") . '</info>');
$this->output->writeln('<info>' . self::completeLine("Finish Exodus '{$sType}'") . '</info>');

gc_collect_cycles();

return AbstractCommand::EXIT_SUCCESS;
}

/**
* @param int $batch Number of document to get each loop
* @param int $offset Offset
* @param int $from The last query FROM
* @param int $limit The number of total result
* @return mixed
*/
private function getNextStep($batch, $offset, $from, $limit)
{
# No limit
if (empty($limit) || $limit <= 0) {
return $batch;
}

$resultRest = ($offset + $limit) - $from;

return ($resultRest > $batch) ?
$batch :
$resultRest;
}

private function initCounter()
{
$this->counterEntitiesRemoved = 0;
$this->counterDocumentTested = 0;
$this->nbrDocumentsFound = 0;
$this->nbrDone = 0;
}

}
22 changes: 11 additions & 11 deletions Command/PopulateElasticCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class PopulateElasticCommand extends AbstractCommand

protected function configure()
{
$this
->setName('headoo:elastic:populate')
$this->setName('headoo:elastic:populate')
->setDescription('Repopulate Elastic Search')
->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit For selected Type', 0)
->addOption('offset', null, InputOption::VALUE_OPTIONAL, 'Offset For selected Type', 0)
Expand Down Expand Up @@ -61,7 +60,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
private function _switchType($type, $batch)
{
if(in_array($type, $this->aTypes)){
$this->output->writeln($this->completeLine("BEGIN {$type}"));
$this->output->writeln(self::completeLine("BEGIN {$type}"));
if($this->reset){
$this->_resetType($type);
}
Expand All @@ -70,12 +69,12 @@ private function _switchType($type, $batch)
$this->beginBatch($type) :
$this->processBatch($type, $this->getContainer()->get($this->mappings[$type]['transformer']));

$this->output->writeln($this->completeLine("FINISH {$type}"));
$this->output->writeln(self::completeLine("FINISH {$type}"));

return $returnValue;
}

$this->output->writeln($this->completeLine("Wrong Type"));
$this->output->writeln(self::completeLine("Wrong Type"));

return self::EXIT_FAILED;
}
Expand Down Expand Up @@ -163,6 +162,7 @@ public function runParallel(ProgressBar $progressBar, array $processes, $maxPara
// continue loop while there are processes being executed or waiting for execution
} while (count($processesQueue) > 0 || count($currentProcesses) > 0);

$progressBar->setProgress($numberOfEntities);
$progressBar->finish();

return $returnValue;
Expand Down Expand Up @@ -200,13 +200,13 @@ public function beginBatch($type)
*/
public function processBatch($type, $transformer)
{
$this->output->writeln($this->completeLine("Creating Type {$type} and Mapping"));
$this->output->writeln(self::completeLine("Creating Type {$type} and Mapping"));

$objectType = $this->getIndexFromType($type)->getType($type);
$this->_mappingFields($objectType, $this->mappings[$type]['mapping']);

$this->output->writeln($this->completeLine("Finish Type {$type} and Mapping"));
$this->output->writeln($this->completeLine("Start populate {$type}"));
$this->output->writeln(self::completeLine("Finish Type {$type} and Mapping"));
$this->output->writeln(self::completeLine("Start populate {$type}"));

$iResults = $this->entityManager->createQuery("SELECT COUNT(u) FROM {$this->mappings[$type]['class']} u")->getResult()[0][1];
$q = $this->entityManager->createQuery("select u from {$this->mappings[$type]['class']} u");
Expand Down Expand Up @@ -246,11 +246,11 @@ public function processBatch($type, $transformer)
}

$this->_bulk($objectType, $aDocuments);
$this->output->writeln($this->completeLine("Start populate '{$type}'"));
$this->output->writeln(self::completeLine("Start populate '{$type}'"));

$progressBar->finish();
$this->output->writeln('');
$this->output->writeln("<info>" . $this->completeLine("Finish populate {$type}") . "</info>");
$this->output->writeln("<info>" . self::completeLine("Finish populate {$type}") . "</info>");
}

/**
Expand All @@ -259,7 +259,7 @@ public function processBatch($type, $transformer)
*/
private function _resetType($type)
{
$this->output->writeln($this->completeLine("RESET INDEX"));
$this->output->writeln(self::completeLine("RESET INDEX"));

$index_name = $this->getContainer()->get('headoo.elasticsearch.handler')->getIndexName($type);
$connection = $this->mappings[$type]['connection'];
Expand Down
6 changes: 2 additions & 4 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public function getConfigTreeBuilder()
$treeBuilder = new TreeBuilder();
$rootNode = $treeBuilder->root('headoo_elastic_search');

$rootNode
->children()
$rootNode->children()
->arrayNode('connections')
->isRequired()
->cannotBeEmpty()
Expand All @@ -47,8 +46,7 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->end()
;
->end();

return $treeBuilder;
}
Expand Down
Loading

0 comments on commit 85a0fdf

Please sign in to comment.