Skip to content

Commit

Permalink
refactor: Added run_in_executor function.
Browse files Browse the repository at this point in the history
  • Loading branch information
DABND19 committed Dec 29, 2024
1 parent 914490c commit c55626d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 33 deletions.
10 changes: 9 additions & 1 deletion faststream/_internal/utils/functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
from collections.abc import AsyncIterator, Awaitable, Iterator
from concurrent.futures import Executor
from contextlib import asynccontextmanager, contextmanager
from functools import wraps
from functools import partial, wraps
from typing import (
Any,
Callable,
Optional,
TypeVar,
Union,
cast,
Expand Down Expand Up @@ -80,3 +83,8 @@ def drop_response_type(model: CallModel) -> CallModel:

async def return_input(x: Any) -> Any:
return x


async def run_in_executor(executor: Optional[Executor], func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, partial(func, *args, **kwargs))
48 changes: 16 additions & 32 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from faststream._internal.constants import EMPTY
from faststream._internal.log import logger as faststream_logger
from faststream._internal.utils.functions import call_or_await
from faststream._internal.utils.functions import call_or_await, run_in_executor
from faststream.confluent import config as config_module
from faststream.confluent.schemas import TopicPartition
from faststream.exceptions import SetupError
Expand Down Expand Up @@ -324,15 +324,13 @@ def topics_to_create(self) -> list[str]:

async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
loop = asyncio.get_running_loop()

if self.allow_auto_create_topics:
await loop.run_in_executor(
await run_in_executor(
self._thread_pool,
create_topics,
self.topics_to_create,
self.config,
self.logger_state.logger.logger,
topics=self.topics_to_create,
config=self.config,
logger_=self.logger_state.logger.logger,
)

else:
Expand All @@ -342,15 +340,15 @@ async def start(self) -> None:
)

if self.topics:
await loop.run_in_executor(
self._thread_pool, self.consumer.subscribe, self.topics
await run_in_executor(
self._thread_pool, self.consumer.subscribe, topics=self.topics
)

elif self.partitions:
await loop.run_in_executor(
await run_in_executor(
self._thread_pool,
self.consumer.assign,
[p.to_confluent() for p in self.partitions],
partitions=[p.to_confluent() for p in self.partitions],
)

else:
Expand All @@ -359,19 +357,10 @@ async def start(self) -> None:

async def commit(self, asynchronous: bool = True) -> None:
"""Commits the offsets of all messages returned by the last poll operation."""
loop = asyncio.get_running_loop()
await loop.run_in_executor(
self._thread_pool,
self.consumer.commit,
None,
None,
asynchronous,
)
await run_in_executor(self._thread_pool, self.consumer.commit, asynchronous=asynchronous)

async def stop(self) -> None:
"""Stops the Kafka consumer and releases all resources."""
loop = asyncio.get_running_loop()

# NOTE: If we don't explicitly call commit and then close the consumer, the confluent consumer gets stuck.
# We are doing this to avoid the issue.
enable_auto_commit = self.config["enable.auto.commit"]
Expand All @@ -391,14 +380,13 @@ async def stop(self) -> None:
)

# Wrap calls to async to make method cancelable by timeout
await loop.run_in_executor(self._thread_pool, self.consumer.close)
await run_in_executor(self._thread_pool, self.consumer.close)

self._thread_pool.shutdown(wait=False)

async def getone(self, timeout: float = 0.1) -> Optional[Message]:
"""Consumes a single message from Kafka."""
loop = asyncio.get_running_loop()
msg = await loop.run_in_executor(self._thread_pool, self.consumer.poll, timeout)
msg = await run_in_executor(self._thread_pool, self.consumer.poll, timeout)
return check_msg_error(msg)

async def getmany(
Expand All @@ -407,26 +395,22 @@ async def getmany(
max_records: Optional[int] = 10,
) -> tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
loop = asyncio.get_running_loop()
raw_messages: list[Optional[Message]] = await loop.run_in_executor(
raw_messages: list[Optional[Message]] = await run_in_executor(
self._thread_pool,
self.consumer.consume, # type: ignore[arg-type]
max_records or 10,
timeout,
num_messages=max_records or 10,
timeout=timeout,
)
return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)

async def seek(self, topic: str, partition: int, offset: int) -> None:
"""Seeks to the specified offset in the specified topic and partition."""
loop = asyncio.get_running_loop()
topic_partition = TopicPartition(
topic=topic,
partition=partition,
offset=offset,
)
await loop.run_in_executor(
self._thread_pool, self.consumer.seek, topic_partition.to_confluent()
)
await run_in_executor(self._thread_pool, self.consumer.seek, topic_partition.to_confluent())


def check_msg_error(msg: Optional[Message]) -> Optional[Message]:
Expand Down

0 comments on commit c55626d

Please sign in to comment.