Skip to content

Commit

Permalink
Merge branch 'rl-3.0.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
amenezes committed Mar 9, 2023
2 parents c7c98a7 + bc501aa commit ff8abee
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ tests:
docs:
@echo "> generate project documentation..."
@cp README.md docs/index.md
mkdocs serve
mkdocs serve -a 0.0.0.0:8000

install-deps:
@echo "> installing dependencies..."
Expand Down
1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[![ci](https://github.com/amenezes/rabbit-client/workflows/ci/badge.svg)](https://github.com/amenezes/rabbit-client/actions)
[![Maintainability](https://api.codeclimate.com/v1/badges/f24caeb9d85f17de93e2/maintainability)](https://codeclimate.com/github/amenezes/rabbit-client/maintainability)
[![codecov](https://codecov.io/gh/amenezes/rabbit-client/branch/master/graph/badge.svg)](https://codecov.io/gh/amenezes/rabbit-client)
[![PyPI version](https://badge.fury.io/py/rabbit-client.svg)](https://badge.fury.io/py/rabbit-client)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/rabbit-client)
Expand Down
5 changes: 3 additions & 2 deletions docs/polling-publisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ logging.getLogger().setLevel(logging.DEBUG)
client = AioRabbitClient()
asyncio.create_task(client.persistent_connect())

publish = Publish(client=client)
loop.create_task(publish.configure())
publish = Publish()
await client.register(publish)


class MyRepo:
def __init__(self, publish, db):
Expand Down
18 changes: 9 additions & 9 deletions docs/publish-subscribe.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ All code examples can be used in the python asyncio REPL: `python -m asyncio` av
```python
import logging

from rabbit.client import AioRabbitClient
from rabbit.subscribe import Subscribe
from rabbit import AioRabbitClient, Subscribe
from rabbit.job import async_chaos_job


logging.basicConfig(level=logging.INFO)


client = AioRabbitClient()
asyncio.create_task(client.persistent_connect())
asyncio.create_task(client.persistent_connect(host='localhost', port=5672))

subscribe = Subscribe(client, concurrent=5, task=async_chaos_job)
asyncio.create_task(subscribe.configure())
subscribe = Subscribe(concurrent=5, task=async_chaos_job)
await client.register(subscribe)
```

### Publisher
Expand All @@ -41,8 +40,7 @@ python -m rabbit send-event data.json
```python
import logging

from rabbit.client import AioRabbitClient
from rabbit.publish import Publish
from rabbit import AioRabbitClient, Publish


logging.basicConfig(level=logging.INFO)
Expand All @@ -51,8 +49,10 @@ logging.basicConfig(level=logging.INFO)
client = AioRabbitClient()
asyncio.create_task(client.persistent_connect())

publish = Publish(client)
await publish.configure()
publish = Publish()
# publish = Publish(True) # for enable publish_confirms

await client.register(publish)
await publish.send_event('{"document": 1, "description": "123", "pages": ["abc", "def", "ghi"]}'.encode('utf8'))
```

2 changes: 1 addition & 1 deletion rabbit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .queue import Queue
from .subscribe import Subscribe

__version__ = "2.9.1b3"
__version__ = "3.0.1"
__all__ = [
"__version__",
"AioRabbitClient",
Expand Down
16 changes: 10 additions & 6 deletions rabbit/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ def __init__(
queue_name: str,
concurrent: int,
):
self.loop = asyncio.get_event_loop()
self.subscribe_client = AioRabbitClient()
self.loop.create_task(

self._loop = asyncio.get_event_loop()
self._loop.create_task(
self.subscribe_client.persistent_connect(
host=host, port=port, login=login, password=password
)
),
name="rabbit-client-cli-connection",
)

self.exchange_type = exchange_type
self.exchange_topic = exchange_topic
self.exchange_name = exchange_name
Expand All @@ -38,11 +41,12 @@ def run(self, chaos_mode: bool = False, verbose: bool = True):
task = async_echo_job
if chaos_mode:
task = async_chaos_job
self.loop.run_until_complete(self.init(task, verbose))
self.loop.run_forever()

self._loop.run_until_complete(self.init(task, verbose))
self._loop.run_forever()

async def init(self, task, verbose: bool = False):
logger.info(f"Using {task.__doc__}")
logger.info(f"Using '{task.__doc__}'")
subscribe = Subscribe(
task=task,
exchange=Exchange(
Expand Down
2 changes: 2 additions & 0 deletions rabbit/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import random
from typing import List, Optional
from uuid import uuid4

Expand Down Expand Up @@ -106,6 +107,7 @@ async def watch_channel_state(self, item) -> None:
pass

async def register(self, item) -> None:
await asyncio.sleep(random.uniform(1.0, 1.5))
self._items.append(item)
task_id = uuid4().hex
item.channel = await self.get_channel()
Expand Down
5 changes: 4 additions & 1 deletion rabbit/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ class OperationError(Exception):


class ExchangeNotFound(Exception):
pass
def __init__(
self, exchange_name: str, message: str = "Exchange '{name}' not found"
):
super().__init__(message.format(name=exchange_name))


class ClientNotConnectedError(Exception):
Expand Down
4 changes: 1 addition & 3 deletions rabbit/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from .logger import logger


async def async_echo_job(
data: bytes, skip_wait: bool = False, *args, **kwargs
) -> bytes:
async def async_echo_job(data: bytes, skip_wait: bool = True, *args, **kwargs) -> bytes:
"""async simple job."""
logger.warning("Using the standard callable to process subscribe events.")
data_response = json.loads(data)
Expand Down
23 changes: 9 additions & 14 deletions rabbit/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from aioamqp.channel import Channel
from aioamqp.exceptions import ChannelClosed
from attrs import field, mutable
from attrs import field, mutable, validators

from rabbit.exceptions import ClientNotConnectedError

Expand All @@ -12,23 +12,18 @@

@mutable(repr=False)
class Publish:
publish_confirms: bool = field(
default=False, validator=validators.instance_of(bool)
)
_channel: Channel = field(init=False)

def __repr__(self) -> str:
return f"Publish(channel_id={self.channel_id}, publisher_confirms={self.publisher_confirms})"
return f"Publish(channel_id={self.channel_id}, publisher_confirms={self.publish_confirms})"

@property
def name(self) -> str:
return "Publish"

@property
def publisher_confirms(self) -> bool:
"""Check if publisher_confirms was enable on the channel."""
try:
return self.channel.publisher_confirms # type: ignore
except AttributeError:
return False

@property
def channel_id(self) -> int:
"""Check the current channel ID."""
Expand All @@ -48,9 +43,9 @@ def channel(self) -> Channel:
def channel(self, channel: Channel) -> None:
self._channel = channel

async def configure(self, enable_publish_confirms: bool = False) -> None:
async def configure(self) -> None:
"""Configure publisher channel."""
if enable_publish_confirms:
if self.publish_confirms:
await self.enable_publish_confirms()

async def enable_publish_confirms(self) -> None:
Expand Down Expand Up @@ -79,6 +74,6 @@ async def send_event(
**kwargs,
)
except ChannelClosed as err:
await self.configure(enable_publish_confirms=self.publisher_confirms)
await self.configure()
if err.message.find("no exchange") > 0:
raise ExchangeNotFound(f"Exchange '{exchange_name}' not found")
raise ExchangeNotFound(exchange_name) # type: ignore
19 changes: 8 additions & 11 deletions tests/unit/test_publish.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
import pytest

from rabbit import Publish
from rabbit.exceptions import ClientNotConnectedError


async def test_send_event(publish_mock):
await publish_mock.send_event(123)


async def test_register_publish_without_client_connected(publish):
def test_register_publish_without_client_connected(publish):
with pytest.raises(ClientNotConnectedError):
await publish.configure(True)
hasattr(publish, "channel")


def test_publish_repr(publish_mock):
assert repr(publish_mock) == "Publish(channel_id=0, publisher_confirms=False)"


async def test_enable_publish_confirms(publish_mock):
assert publish_mock.publisher_confirms is False
await publish_mock.enable_publish_confirms()
assert publish_mock.publisher_confirms is True
async def test_publish_confirms_disabled(publish):
assert publish.publish_confirms is False


async def test_enable_publish_confirms_twice(publish_mock):
assert publish_mock.publisher_confirms is False
await publish_mock.enable_publish_confirms()
await publish_mock.enable_publish_confirms()
assert publish_mock.publisher_confirms is True
async def test_publish_confirms_enabled():
publish = Publish(True)
assert publish.publish_confirms is True

0 comments on commit ff8abee

Please sign in to comment.