Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

childContainer of previous run is stopping ConcurrentContainer after a new start #3448

Open
LokeshAlamuri opened this issue Aug 20, 2024 · 5 comments

Comments

@LokeshAlamuri
Copy link
Contributor

LokeshAlamuri commented Aug 20, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3

Describe the bug

If a ConcurrentContainer is stopped, then the child container should not be allowed to stop ConcurrentContainer. But, there are some scenarios where it is possible.

Scenario:

Concurrency: 2

ConcurrentContainer:: CMain
Child containers: C0, C1.

  1. ConcurrentContainer started.

    CMain -- running.
    C0 -- running.
    C1 -- running.

  2. ConcurrentContainer stopped.

    CMain -- not running.
    C0 -- delinked. (message processing is happening)
    C1 -- delinked.

  3. ConcurrentContainer started. This is permitted since stop is called before. Nothing wrong here. It should be allowed. Only the practise is not correct.

    CMain -- running.
    C2 -- running.
    C3 -- running.

    C0 -- delinked. (message processing is happening)

  4. C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

    CMain -- not running.
    C2 -- delinked.
    C3 -- delinked.

    C0 -- delinked.

To Reproduce

@Test
	public void testFencedContainerFailed() throws Exception {
		this.logger.info("Start testFencedContainerFailed");
		Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
				embeddedKafka);
		AtomicReference<Properties> overrides = new AtomicReference<>();
		DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
			@Override
			protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
																	String clientIdSuffixArg, Properties properties) {
				overrides.set(properties);
				return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
			}
		};
		ContainerProperties containerProps = new ContainerProperties(topic1);
		containerProps.setLogContainerConfig(true);
		containerProps.setClientId("client");
		containerProps.setAckMode(ContainerProperties.AckMode.RECORD);

		final CountDownLatch secondRunLatch = new CountDownLatch(5);
		final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
		final List<String> payloads = new ArrayList<>();
		final CountDownLatch processingLatch = new CountDownLatch(1);
		final CountDownLatch firstLatch = new CountDownLatch(1);

		AtomicBoolean first = new AtomicBoolean(true);

		containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
			if (first.getAndSet(false)) {
				try {
					firstLatch.await(100, TimeUnit.SECONDS);
					throw new NullPointerException();
				}
				catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
			listenerThreadNames.add(Thread.currentThread().getName());
			payloads.add(message.value());
			secondRunLatch.countDown();
			processingLatch.countDown();
		});

		ConcurrentMessageListenerContainer<Integer, String> container =
				new ConcurrentMessageListenerContainer<>(cf, containerProps);
		container.setConcurrency(2);
		container.setBeanName("testAuto");
		container.setChangeConsumerThreadName(true);
		container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());

		BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
		CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
		CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1);

		container.setApplicationEventPublisher(e -> {
			events.add((KafkaEvent) e);
			if (e instanceof ConcurrentContainerStoppedEvent) {
				concurrentContainerStopLatch.countDown();
			}
			if (e instanceof ConsumerStoppedEvent) {
				consumerStoppedEventLatch.countDown();
			}
		});

		CountDownLatch interceptedSecondRun = new CountDownLatch(5);
		container.setRecordInterceptor((record, consumer) -> {
			interceptedSecondRun.countDown();
			return record;
		});

		container.start();

		MessageListenerContainer childContainer0 = container.getContainers().get(0);
		MessageListenerContainer childContainer1 = container.getContainers().get(1);

		ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
		assertThat(container.getAssignedPartitions()).hasSize(2);
		Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
		assertThat(assignments).hasSize(2);
		assertThat(assignments.get("client-0")).isNotNull();
		assertThat(assignments.get("client-1")).isNotNull();

		Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
		ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
		template.setDefaultTopic(topic1);
		template.sendDefault(0, 0, "foo");
		template.sendDefault(1, 2, "bar");
		template.sendDefault(0, 0, "baz");
		template.sendDefault(1, 2, "qux");
		template.flush();

		assertThat(container.metrics()).isNotNull();
		assertThat(container.isInExpectedState()).isTrue();
		assertThat(childContainer0.isRunning()).isTrue();
		assertThat(childContainer1.isRunning()).isTrue();
		assertThat(container.isChildRunning()).isTrue();

		assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue();

		container.stop();

		assertThat(container.isChildRunning()).isTrue();
		assertThat(container.isRunning()).isFalse();
		assertThat(childContainer0.isRunning()).isFalse();
		assertThat(childContainer1.isRunning()).isFalse();

		assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue();

		assertThat(container.isChildRunning()).isTrue();

		assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1");

		assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse();


		template.sendDefault(0, 0, "FOO");
		template.sendDefault(1, 2, "BAR");
		template.sendDefault(0, 0, "BAZ");
		template.sendDefault(1, 2, "QUX");
		template.flush();

		// permitted since stop is called prior.
		container.start();

		assertThat(container.isRunning()).isTrue();
		assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning()))
				.isTrue();

		firstLatch.countDown();

		//Running container is stopped!!!!!!!!
		assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isTrue();

		assertThat(container.isRunning()).isFalse();
		assertThat(container.getContainers().stream().anyMatch(containerL -> containerL.isRunning()))
				.isFalse();

		this.logger.info("Stop testFencedContainerFailed");
	}

Please suggest if this is a valid scenario.

@artembilan
Copy link
Member

What are your 1, 2, 3, 4?
Different scenarios, or steps, or states of the same scenario?
Isn't this a result of all the changes you have introduced recently?

C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

This indeed must not happen.
The failure in one child container must not effect all others.

Does it happen even in previous versions, even before your recent changes?

@LokeshAlamuri
Copy link
Contributor Author

Isn't this a result of all the changes you have introduced recently?

I have not introduced any bugs. Current bug is a different one. It is existing in previous versions also. I have provided JUnit to replicate this scenario. Please review and suggest if this a valid scenario and needs to be fixed.

@LokeshAlamuri
Copy link
Contributor Author

What are your 1, 2, 3, 4?
Different scenarios, or steps, or states of the same scenario?
Isn't this a result of all the changes you have introduced recently?

These are steps to be followed to get the overview of bug. These are not different scenarios. Please let me know, if I have to provide more information regarding this issue.

This bug is not because of my changes.

@artembilan
Copy link
Member

No problem!

I will need more time to investigate this.
We may ask if @sobychacko has some cycles to look into this quicker.
Thanks

@LokeshAlamuri
Copy link
Contributor Author

If you are ok, I will try to fix the issue and provide the PR. I am having some idea on it.

LokeshAlamuri added a commit to LokeshAlamuri/spring-kafka that referenced this issue Oct 5, 2024
[DRAFT]
Fixes #spring-projectsGH-3448
spring-projects#3448

Issue: Fenced Child Container could stop the running ConcurrentContainer
Fix: Configure KafkaMessageListenerContainer (KMLC) to use ConcurrentMessagleListenerContainerRef instead ofConcurrentContainer.
Internally, ConcurrentContainerRef checks if KMLC is fenced when stop operations are called on Concurrent Container. If KMLC is fenced, suppress the `stop` related operations.
If KMLC is not fenced, delegate the stop call to ConcurrentContainer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants