Skip to content

Commit

Permalink
Merge pull request #6 from Headoo/cme-perf
Browse files Browse the repository at this point in the history
Fix memory leak / Handle process error
  • Loading branch information
corentinheadoo authored Jul 12, 2017
2 parents 99ca428 + 7243b6c commit afb8dfb
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 29 deletions.
50 changes: 44 additions & 6 deletions Command/AbstractCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ abstract class AbstractCommand extends ContainerAwareCommand

/** @var OutputInterface */
protected $output;
/** @var InputInterface */
protected $input;
/** @var integer */
protected $threads;
/** @var integer */
Expand All @@ -43,21 +45,57 @@ abstract class AbstractCommand extends ContainerAwareCommand
protected $verbose = false;
/** @var bool $dryRun Do not make any change on ES */
protected $dryRun = false;
/** @var string */
protected $environment;

/**
* @param InputInterface $input
* @param OutputInterface $output
*/
protected function init(InputInterface $input, OutputInterface $output)
{
$this->readOption($input);

$this->output = $output;
$this->input = $input;

$this->elasticSearchHelper = $this->getContainer()->get('headoo.elasticsearch.helper');
$this->mappings = $this->getContainer()->getParameter('elastica_mappings');
$this->entityManager = $this->getContainer()->get('doctrine.orm.entity_manager');
$this->environment = $this->getContainer()->get('kernel')->getEnvironment();

$this->aTypes = array_keys($this->mappings);

$this->readOption($input);

if ($this->environment == 'prod') {
$this->entityManager->getConnection()->getConfiguration()->setSQLLogger(null);
$this->entityManager->flush();
$this->entityManager->clear();
}
}

/**
* @param array $excludedOption
* @return string
*/
protected function getOptionsToString($excludedOption = [])
{
$aOptions = $this->input->getOptions();
$sOptions = '';

foreach ($aOptions as $key => $value) {
if ($value === false || in_array($key, $excludedOption)) {
continue;
}

if ($value === true) {
$sOptions .= " --$key";
continue;
}

$sOptions .= " --$key=$value";
}

return $sOptions;
}

/**
Expand Down Expand Up @@ -115,18 +153,18 @@ protected function getProgressBar($output, $max)
*/
protected function completeLine($msg)
{
$nbrAstrix = (self::LINE_LENGTH - strlen($msg) - 2) / 2;
$nbrAstrix = (self::LINE_LENGTH - strlen($msg) - 4) / 2;

if ($nbrAstrix <= 0) {
return $msg;
}

$sAstrix = str_repeat('*', $nbrAstrix);
$sReturn = "$sAstrix $msg $sAstrix";
$sReturn = "$sAstrix $msg $sAstrix";

return (strlen($sReturn) == self::LINE_LENGTH)
? $sReturn
: $sReturn . '*';
? self::CLEAR_LINE . $sReturn
: self::CLEAR_LINE . $sReturn . '*';
}

/**
Expand Down
15 changes: 13 additions & 2 deletions Command/ExodusElasticCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private function removeFromElasticSearch($type, $repository, $resultSet)

# Look up into Doctrine the associated Entity with the given document
$entity = $repository->find($documentId);
$this->entityManager->clear();

if (!is_null($entity)) {
continue;
Expand All @@ -105,7 +106,10 @@ private function removeFromElasticSearch($type, $repository, $resultSet)
if ($response->hasError()&& $this->verbose) {
$this->output->writeln(self::CLEAR_LINE . "\tError: {$response->getError()}");
}

gc_collect_cycles();
}

}

/**
Expand All @@ -126,7 +130,7 @@ private function countAllResult($sType)
*/
private function processBatch($sType)
{
$this->output->writeln('<info>' . $this->completeLine("Start Exodus {$sType}") . '</info>');
$this->output->writeln('<info>' . $this->completeLine("Start Exodus '{$sType}'") . '</info>');

$repository = $this->getRepositoryFromType($sType);
$index = $this->getIndexFromType($sType);
Expand All @@ -138,12 +142,15 @@ private function processBatch($sType)
$this->initCounter();

do {
unset($resultSet);

$query = new Query();
$query->setFrom($from);
$query->setSize($this->batch);

# Get documents from ElasticSearch
$resultSet = $index->search($query);
unset($query);

$this->removeFromElasticSearch($index->getType($sType), $repository, $resultSet);

Expand All @@ -157,8 +164,12 @@ private function processBatch($sType)

$progressBar->finish();

unset($progressBar);

$this->output->writeln(self::CLEAR_LINE . "{$sType}: Documents tested: {$this->counterDocumentTested} Entities removed: {$this->counterEntitiesRemoved}");
$this->output->writeln('<info>' . $this->completeLine("Finish Exodus {$sType}") . '</info>');
$this->output->writeln('<info>' . $this->completeLine("Finish Exodus '{$sType}'") . '</info>');

gc_collect_cycles();

return AbstractCommand::EXIT_SUCCESS;
}
Expand Down
81 changes: 60 additions & 21 deletions Command/PopulateElasticCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,45 @@ protected function execute(InputInterface $input, OutputInterface $output)
}

if($input->getOption('type')){
$this->_switchType($this->type, $this->batch);
return AbstractCommand::EXIT_SUCCESS;
return $this->_switchType($this->type, $this->batch);
}

$returnValue = self::EXIT_SUCCESS;
foreach ($this->aTypes as $type){
$this->_switchType($type, $this->batch);
$returnedValue = $this->_switchType($type, $this->batch);
if ($returnedValue != self::EXIT_SUCCESS) {
$returnValue = self::EXIT_FAILED;
}
}

return AbstractCommand::EXIT_SUCCESS;
return $returnValue;
}

/**
* @param $type
* @param $batch
* @param string $type
* @param int $batch
* @return int
*/
private function _switchType($type, $batch)
{
if(in_array($type, $this->aTypes)){
$this->output->writeln($this->completeLine("BEGIN {$type}"));
if($this->reset){
$this->output->writeln($this->completeLine("RESET INDEX"));
$index_name = $this->getContainer()->get('headoo.elasticsearch.handler')->getIndexName($type);
$connection = $this->mappings[$type]['connection'];
$index = $this->mappings[$type]['index'];
$this->elasticSearchHelper->getClient($connection)->getIndex($index_name)->create($index, true);
$this->_resetType($type);
}
if(!$batch){

$returnValue = ($batch) ?
$this->beginBatch($type) :
$this->processBatch($type, $this->getContainer()->get($this->mappings[$type]['transformer']));
}else{
$this->beginBatch($type);
}

$this->output->writeln($this->completeLine("FINISH {$type}"));
return;

return $returnValue;
}

$this->output->writeln($this->completeLine("Wrong Type"));

return self::EXIT_FAILED;
}

/**
Expand Down Expand Up @@ -110,6 +113,7 @@ private function _bulk($type, $aDocuments)
* @param $maxParallel
* @param int $poll
* @param int $numberOfEntities
* @return int
*/
public function runParallel(ProgressBar $progressBar, array $processes, $maxParallel, $poll = 1000, $numberOfEntities)
{
Expand All @@ -127,13 +131,19 @@ public function runParallel(ProgressBar $progressBar, array $processes, $maxPara

$progression = $this->offset;
$progressMax = $numberOfEntities + $this->offset;
$returnValue = self::EXIT_SUCCESS;

do {
// wait for the given time
usleep($poll);
// remove all finished processes from the stack
foreach ($currentProcesses as $index => $process) {
if (!$process->isRunning()) {
if ($process->getExitCode() != self::EXIT_SUCCESS) {
$this->output->writeln($process->getErrorOutput());
$returnValue = self::EXIT_FAILED;
}

unset($currentProcesses[$index]);

$progression += $this->limit;
Expand All @@ -152,31 +162,36 @@ public function runParallel(ProgressBar $progressBar, array $processes, $maxPara

// continue loop while there are processes being executed or waiting for execution
} while (count($processesQueue) > 0 || count($currentProcesses) > 0);

$progressBar->finish();

return $returnValue;
}

/**
* @param $type
* @return int
*/
public function beginBatch($type)
{
$numberObjects = $this->entityManager->createQuery("SELECT COUNT(u) FROM {$this->mappings[$type]['class']} u")->getResult()[0][1];
$aProcess = [];
$numberOfEntities = $numberObjects - $this->offset;
$numberOfProcess = floor($numberOfEntities / $this->limit);
$sOptions = $this->getOptionsToString(['type', 'limit', 'offset', 'threads', 'batch']);

$progressBar = $this->getProgressBar($this->output, $numberOfEntities);

for ($i = 0; $i <= $numberOfProcess; $i++) {
$_offset = $this->offset + ($this->limit * $i);
$process = new Process("php app/console headoo:elastic:populate --type={$type} --limit={$this->limit} --offset={$_offset}");
$process = new Process("php app/console headoo:elastic:populate --type={$type} --limit={$this->limit} --offset={$_offset} " . $sOptions);
$aProcess[] = $process;
}

$max_parallel_processes = $this->threads;
$polling_interval = 1000; // microseconds
$this->runParallel($progressBar, $aProcess, $max_parallel_processes, $polling_interval, $numberOfEntities);

return;
return $this->runParallel($progressBar, $aProcess, $max_parallel_processes, $polling_interval, $numberOfEntities);
}

/**
Expand Down Expand Up @@ -226,14 +241,38 @@ public function processBatch($type, $transformer)

$progressBar->setMessage(($progression++) . "/{$progressMax}");
$progressBar->advance();

gc_collect_cycles();
}

$this->_bulk($objectType,$aDocuments);
$this->output->writeln($this->completeLine("Start populate {$type}"));
$this->_bulk($objectType, $aDocuments);
$this->output->writeln($this->completeLine("Start populate '{$type}'"));

$progressBar->finish();
$this->output->writeln('');
$this->output->writeln("<info>" . $this->completeLine("Finish populate {$type}") . "</info>");
}

/**
* @param string $type
* @return bool
*/
private function _resetType($type)
{
$this->output->writeln($this->completeLine("RESET INDEX"));

$index_name = $this->getContainer()->get('headoo.elasticsearch.handler')->getIndexName($type);
$connection = $this->mappings[$type]['connection'];
$index = $this->mappings[$type]['index'];

$response = $this->elasticSearchHelper->getClient($connection)->getIndex($index_name)->create($index, true);

if ($response->hasError()) {
$this->output->writeln("Cannot reset index '{$type}': " . $response->getErrorMessage());
return false;
}

return true;
}

}

0 comments on commit afb8dfb

Please sign in to comment.