Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The distributed consumer in application tests stopped working correctly #386

Open
lifinsky opened this issue Oct 15, 2024 · 25 comments
Open

Comments

@lifinsky
Copy link
Contributor

Ecotone version(s) affected: 1.229.0-latest

Description
We have tests that send a distributed command and check its processing with a delayed retry. After an update, we noticed that tests pass individually, but when running two or more, only one test passes. The remaining tests do not receive the message; it stays in the queue.

How to reproduce
First test for example:

    public function testResilientSending(): void
    {
        $this->logger
            ->expects(self::exactly(3))
            ->method('error')
            ->with(self::isInstanceOf(AggregateNotFoundException::class))
        ;
        $this->distributedBus
            ->expects(self::once())
            ->method('convertAndPublishEvent')
            ->with(
                BlockCardHasFailed::ROUTING_KEY,
                self::isInstanceOf(BlockCardHasFailed::class),
            )
        ;

        $ecotone = $this->bootstrapEcotoneLite();
        $ecotone->getDistributedBus()->convertAndSendCommand(
            destination: self::SERVICE_NAME,
            routingKey: BlockCardCommand::ROUTING_KEY,
            command: new BlockCardCommand(
                id: new UuidV4(),
            )
        );
        $ecotone->run(
            endpointId: self::SERVICE_NAME,
            executionPollingMetadata: ExecutionPollingMetadata::createWithFinishWhenNoMessages(false),
        );
    }

Second test (separate test case):

    public function testResilientSending(): void
    {
        $this->logger
            ->expects(self::exactly(3))
            ->method('error')
            ->with(self::isInstanceOf(AggregateNotFoundException::class))
        ;
        $this->distributedBus
            ->expects(self::once())
            ->method('convertAndPublishEvent')
            ->with(
                UnblockCardHasFailed::ROUTING_KEY,
                self::isInstanceOf(UnblockCardHasFailed::class),
            )
        ;

        $ecotone = $this->bootstrapEcotoneLite();
        $ecotone->getDistributedBus()->convertAndSendCommand(
            destination: self::SERVICE_NAME,
            routingKey: UnblockCardCommand::ROUTING_KEY,
            command: new UnblockCardCommand(
                id: new UuidV4(),
            )
        );
        $ecotone->run(
            endpointId: self::SERVICE_NAME,
            executionPollingMetadata: ExecutionPollingMetadata::createWithFinishWhenNoMessages(false),
        );
    }
UnblockCardTest::testResilientSending
Expectation failed for method name is "error" when invoked 3 time(s).
Method was expected to be called 3 times, actually called 0 times.

EcotoneTestConfiguration:

class EcotoneTestConfiguration
{
    #[ServiceContext]
    #[Environment(['test'])]
    public function errorConfiguration(): ErrorHandlerConfiguration
    {
        return ErrorHandlerConfiguration::createWithDeadLetterChannel(
            errorChannelName: MessagingChannel::Error->value,
            delayedRetryTemplate: RetryTemplateBuilder::fixedBackOff(1)
                ->maxRetryAttempts(2),
            deadLetterChannel: MessagingChannel::DeadLetter->value,
        );
    }
}
private function bootstrapEcotoneLite(): ConfiguredMessagingSystem
    {
        return EcotoneLite::bootstrap(
            classesToResolve: [EcotoneTestConfiguration::class],
            containerOrAvailableServices: self::getContainer(),
            configuration: ServiceConfiguration::createWithDefaults()
                ->withEnvironment('test')
                ->withFailFast(true)
                ->withDefaultSerializationMediaType(MediaType::APPLICATION_JSON)
                ->withDefaultErrorChannel(MessagingChannel::Error->value)
                ->withConsumerMemoryLimit(512)
                ->withServiceName(self::SERVICE_NAME),
        );
    }

When each test is run individually, everything works correctly. It also worked fine in all versions up to 1.228.

#[ServiceActivator(inputChannelName: MessagingChannel::Error->value)]
    public function handle(
        ErrorMessage $message,
        #[Header('ecotone_retry_number')]
        null|int $retryNumber = null,
    ): void {
        try {
            if (!$this->shouldBeSentToDeadLetter($retryNumber)) {
                return;
            }

            $cause = $message->getPayload()->getCause();
            if ($cause instanceof Throwable) {
                $this->react($cause);
            }
        } catch (Throwable $e) {
            $this->logger->error($e);
        }
    }

The purpose of the tests is to verify that, after reaching the maximum number of retries with a delay, an error event is sent to other services, while logging each failure at every attempt.

@dgafka
Copy link
Member

dgafka commented Oct 15, 2024

Hey @lifinsky,

I do think, it's because of the switch from basic_get to consume in 1.229.0.

Consume is based on subscription, so it does receive Messages in event-driven manner. This means it's not wasting time like basic_get which get stuck between each message. Consume is informed when Message arrives, instead of asking for it.

You do use finish when no messages, and my bet would be that, it's just get the informed quicker now, therefore consumer closes before Message is available.

executionPollingMetadata: ExecutionPollingMetadata::createWithFinishWhenNoMessages(false),

Can you try with Consumer having time limit instead?

ExecutionPollingMetadata::createWithDefaults()->withExecutionTimeLimitInMilliseconds(10000)->withStopOnError(false)

@lifinsky
Copy link
Contributor Author

@dgafka Hi! I had a suspicion about this, I added sleep between sending the command and running the consumer - it doesn’t help, plus single test works (without running Ecotone multiple times between tests).

@lifinsky
Copy link
Contributor Author

ExecutionPollingMetadata::createWithDefaults()
                ->withExecutionTimeLimitInMilliseconds(10000)
                ->withStopOnError(false)

Doesn't change anything

@lifinsky
Copy link
Contributor Author

I decided to debug this part of the code

$subscriptionConsumer = $connectionFactory->getSubscriptionConsumer($this->queueName, function (EnqueueMessage $receivedMessage, Consumer $consumer) use ($queueChannel) {
                \var_dump($receivedMessage->getProperties());
                $message = $this->inboundMessageConverter->toMessage($receivedMessage, $consumer, $this->conversionService);
                $message = $this->enrichMessage($receivedMessage, $message);

                $queueChannel->send($message->build());

                return false;
            });

First test output:

.array(10) {
  ["__TypeId__"]=>
  string(52) "Fcards\App\Command\Distributed\Card\BlockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "8ad78379-ad5a-40d8-89b8-390bc045f54a"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(11) "card.freeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["id"]=>
  string(36) "03cb20b8-40ab-4afc-a5ec-168700140d58"
  ["polledChannelName"]=>
  string(24) "distributed_test_service"
  ["timestamp"]=>
  int(1728977271)
}
array(18) {
  ["__TypeId__"]=>
  string(52) "Fcards\App\Command\Distributed\Card\BlockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "8ad78379-ad5a-40d8-89b8-390bc045f54a"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(11) "card.freeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["ecotone_delay"]=>
  int(1)
  ["ecotone_retry_number"]=>
  int(1)
  ["id"]=>
  string(36) "03cb20b8-40ab-4afc-a5ec-168700140d58"
  ["timestamp"]=>
  int(1728977271)
  ["x-death"]=>
  array(1) {
    [0]=>
    array(6) {
      ["count"]=>
      int(1)
      ["reason"]=>
      string(7) "expired"
      ["queue"]=>
      string(15) "ecotone_1_delay"
      ["time"]=>
      object(AMQPTimestamp)#7416 (1) {
        ["timestamp":"AMQPTimestamp":private]=>
        float(1728977270)
      }
      ["exchange"]=>
      string(13) "ecotone_delay"
      ["routing-keys"]=>
      array(1) {
        [0]=>
        string(24) "distributed_test_service"
      }
    }
  }
  ["x-first-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-first-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-first-death-reason"]=>
  string(7) "expired"
  ["x-last-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-last-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-last-death-reason"]=>
  string(7) "expired"
}
array(18) {
  ["__TypeId__"]=>
  string(52) "Fcards\App\Command\Distributed\Card\BlockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "8ad78379-ad5a-40d8-89b8-390bc045f54a"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(11) "card.freeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["ecotone_delay"]=>
  int(1)
  ["ecotone_retry_number"]=>
  int(2)
  ["id"]=>
  string(36) "03cb20b8-40ab-4afc-a5ec-168700140d58"
  ["timestamp"]=>
  int(1728977271)
  ["x-death"]=>
  array(1) {
    [0]=>
    array(6) {
      ["count"]=>
      int(1)
      ["reason"]=>
      string(7) "expired"
      ["queue"]=>
      string(15) "ecotone_1_delay"
      ["time"]=>
      object(AMQPTimestamp)#7709 (1) {
        ["timestamp":"AMQPTimestamp":private]=>
        float(1728977270)
      }
      ["exchange"]=>
      string(13) "ecotone_delay"
      ["routing-keys"]=>
      array(1) {
        [0]=>
        string(24) "distributed_test_service"
      }
    }
  }
  ["x-first-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-first-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-first-death-reason"]=>
  string(7) "expired"
  ["x-last-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-last-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-last-death-reason"]=>
  string(7) "expired"
}

Second test...

array(10) {
  ["__TypeId__"]=>
  string(54) "Fcards\App\Command\Distributed\Card\UnblockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "abb26afc-e9dd-435a-8c3b-4e54b7d2e5e6"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(13) "card.unfreeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["id"]=>
  string(36) "526020e2-82c4-467e-be31-c0dcf7dd419f"
  ["polledChannelName"]=>
  string(24) "distributed_test_service"
  ["timestamp"]=>
  int(1728977281)
}

But if just run it:

array(10) {
  ["__TypeId__"]=>
  string(54) "Fcards\App\Command\Distributed\Card\UnblockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "9e45ad63-57b1-4308-8b94-ad0f4dfac6f4"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(13) "card.unfreeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["id"]=>
  string(36) "1cb3270a-1e08-4a30-bfa7-80f9d6c2e481"
  ["polledChannelName"]=>
  string(24) "distributed_test_service"
  ["timestamp"]=>
  int(1728977641)
}
array(18) {
  ["__TypeId__"]=>
  string(54) "Fcards\App\Command\Distributed\Card\UnblockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "9e45ad63-57b1-4308-8b94-ad0f4dfac6f4"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(13) "card.unfreeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["ecotone_delay"]=>
  int(1)
  ["ecotone_retry_number"]=>
  int(1)
  ["id"]=>
  string(36) "1cb3270a-1e08-4a30-bfa7-80f9d6c2e481"
  ["timestamp"]=>
  int(1728977641)
  ["x-death"]=>
  array(1) {
    [0]=>
    array(6) {
      ["count"]=>
      int(1)
      ["reason"]=>
      string(7) "expired"
      ["queue"]=>
      string(15) "ecotone_1_delay"
      ["time"]=>
      object(AMQPTimestamp)#6957 (1) {
        ["timestamp":"AMQPTimestamp":private]=>
        float(1728977641)
      }
      ["exchange"]=>
      string(13) "ecotone_delay"
      ["routing-keys"]=>
      array(1) {
        [0]=>
        string(24) "distributed_test_service"
      }
    }
  }
  ["x-first-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-first-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-first-death-reason"]=>
  string(7) "expired"
  ["x-last-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-last-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-last-death-reason"]=>
  string(7) "expired"
}
array(18) {
  ["__TypeId__"]=>
  string(54) "Fcards\App\Command\Distributed\Card\UnblockCardCommand"
  ["contentType"]=>
  string(16) "application/json"
  ["correlationId"]=>
  string(36) "9e45ad63-57b1-4308-8b94-ad0f4dfac6f4"
  ["ecotone.amqp.distributed.service_target"]=>
  string(12) "test_service"
  ["ecotone.distributed.payloadType"]=>
  string(7) "command"
  ["ecotone.distributed.routingKey"]=>
  string(13) "card.unfreeze"
  ["ecotoneTracingCarrier"]=>
  string(2) "[]"
  ["ecotone_delay"]=>
  int(1)
  ["ecotone_retry_number"]=>
  int(2)
  ["id"]=>
  string(36) "1cb3270a-1e08-4a30-bfa7-80f9d6c2e481"
  ["timestamp"]=>
  int(1728977641)
  ["x-death"]=>
  array(1) {
    [0]=>
    array(6) {
      ["count"]=>
      int(1)
      ["reason"]=>
      string(7) "expired"
      ["queue"]=>
      string(15) "ecotone_1_delay"
      ["time"]=>
      object(AMQPTimestamp)#7717 (1) {
        ["timestamp":"AMQPTimestamp":private]=>
        float(1728977641)
      }
      ["exchange"]=>
      string(13) "ecotone_delay"
      ["routing-keys"]=>
      array(1) {
        [0]=>
        string(24) "distributed_test_service"
      }
    }
  }
  ["x-first-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-first-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-first-death-reason"]=>
  string(7) "expired"
  ["x-last-death-exchange"]=>
  string(13) "ecotone_delay"
  ["x-last-death-queue"]=>
  string(15) "ecotone_1_delay"
  ["x-last-death-reason"]=>
  string(7) "expired"
}

@lifinsky
Copy link
Contributor Author

It looks like on second Ecotone run the dead letter will not be used and the message returns to the original queue

@lifinsky
Copy link
Contributor Author

@dgafka Even worse, the second distributed command does not reach the command handler

@dgafka
Copy link
Member

dgafka commented Oct 15, 2024

@lifinsky can you provide an PR with this failure scenario?

@lifinsky
Copy link
Contributor Author

@dgafka This problem arises if several tests use the same service name for the producer and the consumer

@lifinsky
Copy link
Contributor Author

If create the EcotoneLite application once and sequentially start sending the distributed command and run consumer - everything is ok

@lifinsky
Copy link
Contributor Author

@dgafka If different Ecotone Lite instances use the same service name, then the second command message remains in the queue

@dgafka
Copy link
Member

dgafka commented Oct 15, 2024

@lifinsky I see, so could you provide an scenarios like this, so I can have a reproducible test case?

@dgafka If different Ecotone Lite instances use the same service name, then the second command message remains in the queue

I do believe, you won't face this problem on production. This is most likely due to reused connection, amqp extension has ability to freeze, when same channel is reused. I would expect, if you would close connection between each test, that would work.

@lifinsky
Copy link
Contributor Author

@dgafka What is the most correct way to close the connection after the test? How can we get the job done as efficiently as possible from the perspective of an already created EcotoneLite instance?

@lifinsky
Copy link
Contributor Author

For now we have decided to generate a unique service name in each application test (temporary solution)

@lifinsky
Copy link
Contributor Author

lifinsky commented Oct 15, 2024

It also works if we create Ecotone Lite application once and share between tests, but I would like a safer and simpler way to test AMQP with EcotoneLite using application (system) tests.

@dgafka
Copy link
Member

dgafka commented Oct 18, 2024

@lifinsky can you check if the fix with proxy generation has somehow fixed this?

@lifinsky
Copy link
Contributor Author

Now we have another problem:

The "Psr\Log\LoggerInterface" service is already initialized, you cannot replace it.

namespace Tests\Application\App\Console\Command;

...

final class InitCommandTest extends KernelTestCase
{
    use WithHttpClient;

    private LoggerInterface&MockObject $logger;

    protected function setUp(): void
    {
        parent::setUp();

        $this->response = json_decode(MockLoader::load('api.company.success.get-response'), true);
        $this->logger = $this->createMock(LoggerInterface::class);
        self::getContainer()->set(LoggerInterface::class, $this->logger);
    }

But with cache in var/cache/test directory the exception does not appear (second test run).

cat var/cache/test/Fcards_App_KernelTestContainer.php

<?php

// This file has been auto-generated by the Symfony Dependency Injection Component for internal use.

if (\class_exists(\ContainerYPbnv5K\Fcards_App_KernelTestContainer::class, false)) {
    // no-op
} elseif (!include __DIR__.'/ContainerYPbnv5K/Fcards_App_KernelTestContainer.php') {
    touch(__DIR__.'/ContainerYPbnv5K.legacy');

    return;
}

if (!\class_exists(Fcards_App_KernelTestContainer::class, false)) {
    \class_alias(\ContainerYPbnv5K\Fcards_App_KernelTestContainer::class, Fcards_App_KernelTestContainer::class, false);
}

return new \ContainerYPbnv5K\Fcards_App_KernelTestContainer([
    'container.build_hash' => 'YPbnv5K',
    'container.build_id' => '7d2f01c9',
    'container.build_time' => 1729243145,
    'container.runtime_mode' => \in_array(\PHP_SAPI, ['cli', 'phpdbg', 'embed'], true) ? 'web=0' : 'web=1',
], __DIR__.\DIRECTORY_SEPARATOR.'ContainerYPbnv5K');

@lifinsky
Copy link
Contributor Author

Without cache Monolog\Logger was registered as private service.

TestContainer:

public function set(string $id, mixed $service): void
    {
        $container = $this->getPublicContainer();
        $renamedId = $this->renamedIds[$id] ?? $id;

        try {
            $container->set($renamedId, $service);
        } catch (InvalidArgumentException $e) {
            if (!str_starts_with($e->getMessage(), "The \"$renamedId\" service is private")) {
                throw $e;
            }
            if (isset($container->privates[$renamedId])) {
                throw new InvalidArgumentException(sprintf('The "%s" service is already initialized, you cannot replace it.', $id));
            }
            $container->privates[$renamedId] = $service;
        }
    }

@dgafka
Copy link
Member

dgafka commented Oct 18, 2024

Hmm, so main problem is fixed, it's just the Logger now?

@jlabedo we did refactor the logging. Any thoughts on this?

@lifinsky
Copy link
Contributor Author

@dgafka I haven't tested it yet

@lifinsky
Copy link
Contributor Author

@dgafka Without a unique service name in each test - the problem appears on the second test as usual

@jlabedo
Copy link
Contributor

jlabedo commented Oct 18, 2024

Hello there. It is hard to follow what the problem is now.
I understand that you have an error in your test: The "Psr\Log\LoggerInterface" service is already initialized, you cannot replace it.
Was this test working in a previous ecotone version @lifinsky ? Because it does not seem to come from Ecotone.

@lifinsky
Copy link
Contributor Author

This is a constant problem with application testing with Ecotone Symfony bundle.

@lifinsky
Copy link
Contributor Author

All services are public for test env, I don’t understand how this service was registered as private

@lifinsky
Copy link
Contributor Author

@jlabedo I found a solution for LoggerInterface mock:

add to services_test.yaml:

monolog.logger:
    synthetic: true

@lifinsky
Copy link
Contributor Author

@dgafka #398

docker exec -it -w=/data/app/packages/Amqp ecotone_development vendor/bin/phpunit --filter AmqpMessageChannelTest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants