Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separated thread for confluent kafka consumer client. #2014

Conversation

DABND19
Copy link

@DABND19 DABND19 commented Dec 29, 2024

Description

Fixes #1904

All consumer operations are guaranteed to run in a separate thread. This allows us to no longer use asyncio.Lock, and also does not block threads from the shared pool.

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh
  • I have ensured that static analysis tests are passing by running scripts/static-analysis.sh
  • I have included code examples to illustrate the modifications

@DABND19 DABND19 force-pushed the feat/separate-thread-for-confluent-kafka-consumer-0.6.0 branch 2 times, most recently from c55626d to 0067c92 Compare December 30, 2024 00:14
@DABND19 DABND19 force-pushed the feat/separate-thread-for-confluent-kafka-consumer-0.6.0 branch from 0067c92 to 882f69e Compare December 30, 2024 00:27
@@ -131,12 +131,12 @@ async def start(self) -> None:
self.add_task(self._consume())

async def close(self) -> None:
await super().close()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lancetnik Lancetnik merged commit 18c166f into airtai:0.6.0 Dec 30, 2024
25 of 28 checks passed
doublehomixide pushed a commit to doublehomixide/faststream that referenced this pull request Jan 3, 2025
* fix: Disabled excessive throttling for BatchSubscriber.

* fix: Use separate thread for confluent kafka consumer.

* refactor: Added run_in_executor function.

* fix: Stop consumer client after consumer tasks are stopped.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants