Skip to content

Commit

Permalink
fixing queue arguments management
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Jan 9, 2025
1 parent a031ce4 commit 37570f2
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/getting_started/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def main() -> None:
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

print("binding queue to exchange")
Expand Down
14 changes: 9 additions & 5 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Optional
from typing import Any, Optional

from .common import ExchangeType, QueueType

Expand All @@ -17,20 +17,24 @@ class ExchangeSpecification:
@dataclass
class QueueSpecification:
name: str
arguments: dict[str, str]
queue_type: QueueType = QueueType.quorum
dead_letter_routing_key: str = ""
dead_letter_routing_key: Optional[str] = None
is_exclusive: Optional[bool] = None
max_len: Optional[int] = None
max_len_bytes: Optional[int] = None
dead_letter_exchange: str = ""
message_ttl: Optional[int] = None
expires: Optional[int] = None
dead_letter_exchange: Optional[str] = ""
is_auto_delete: bool = False
is_durable: bool = True
overflow: Optional[str] = None
single_active_consumer: Optional[bool] = None


@dataclass
class QueueInfo:
name: str
arguments: dict[str, str]
arguments: dict[str, Any]
queue_type: QueueType = QueueType.quorum
is_exclusive: Optional[bool] = None
is_auto_delete: bool = False
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq_amqp_python_client/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class ValidationCodeException(Exception):
class ValidationCodeException(BaseException):
# Constructor or Initializer
def __init__(self, msg: str):
self.msg = msg
Expand Down
33 changes: 26 additions & 7 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def declare_exchange(
body["durable"] = exchange_specification.is_durable
body["type"] = exchange_specification.exchange_type.value # type: ignore
body["internal"] = exchange_specification.is_internal
body["arguments"] = {} # type: ignore
body["arguments"] = exchange_specification.arguments # type: ignore

path = exchange_address(exchange_specification.name)

Expand All @@ -129,14 +129,33 @@ def declare_queue(
) -> QueueSpecification:
logger.debug("declare_queue operation called")
body = {}
args: dict[str, Any] = {}

body["auto_delete"] = queue_specification.is_auto_delete
body["durable"] = queue_specification.is_durable
body["arguments"] = { # type: ignore
"x-queue-type": queue_specification.queue_type.value,
"x-dead-letter-exchange": queue_specification.dead_letter_exchange,
"x-dead-letter-routing-key": queue_specification.dead_letter_routing_key,
"max-length-bytes": queue_specification.max_len_bytes,
}
args["x-queue-type"] = queue_specification.queue_type.value
if queue_specification.dead_letter_exchange is not None:
args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
if queue_specification.dead_letter_routing_key is not None:
args["x-dead-letter-routing-key"] = (
queue_specification.dead_letter_routing_key
)
if queue_specification.overflow is not None:
args["x-overflow"] = queue_specification.overflow
if queue_specification.max_len is not None:
args["x-max-length"] = queue_specification.max_len
if queue_specification.max_len_bytes is not None:
args["x-max-length-bytes"] = queue_specification.max_len_bytes
if queue_specification.message_ttl is not None:
args["x-message-ttl"] = queue_specification.message_ttl
if queue_specification.expires is not None:
args["x-expires"] = queue_specification.expires
if queue_specification.single_active_consumer is not None:
args["x-single-active-consumer"] = (
queue_specification.single_active_consumer
)

body["arguments"] = args # type: ignore

path = queue_address(queue_specification.name)

Expand Down
106 changes: 101 additions & 5 deletions tests/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
QueueSpecification,
QueueType,
)
from rabbitmq_amqp_python_client.exceptions import (
ValidationCodeException,
)


def test_declare_delete_exchange() -> None:
Expand Down Expand Up @@ -33,7 +36,7 @@ def test_declare_purge_delete_queue() -> None:
management = connection.management()

queue_info = management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

assert queue_info.name == queue_name
Expand All @@ -57,7 +60,7 @@ def test_bind_exchange_to_queue() -> None:
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

binding_exchange_queue_path = management.bind(
Expand Down Expand Up @@ -88,20 +91,113 @@ def test_bind_exchange_to_queue() -> None:
management.unbind(binding_exchange_queue_path)


def test_queue_info() -> None:
def test_queue_info_with_validations() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-bind-exchange-to-queue-queue"
queue_name = "test_queue_info_with_validation"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, arguments={}
name=queue_name,
queue_type=QueueType.quorum,
)
management.declare_queue(queue_specification)

queue_info = management.queue_info(queue_name=queue_name)

management.delete_queue(queue_name)

assert queue_info.name == queue_name
assert queue_info.queue_type == queue_specification.queue_type
assert queue_info.is_durable == queue_specification.is_durable
assert queue_info.message_count == 0


def test_queue_precondition_fail() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()
test_failure = True

queue_name = "test-queue_precondition_fail"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, is_auto_delete=False
)
management.declare_queue(queue_specification)

management.declare_queue(queue_specification)

queue_specification = QueueSpecification(
name=queue_name,
queue_type=QueueType.quorum,
is_auto_delete=True,
)

management.delete_queue(queue_name)

try:
management.declare_queue(queue_specification)
except ValidationCodeException:
test_failure = False

assert test_failure is False


def test_declare_classic_queue() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-declare_classic_queue"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
)
queue_info = management.declare_queue(queue_specification)

assert queue_info.name == queue_specification.name
assert queue_info.queue_type == queue_specification.queue_type

management.delete_queue(queue_name)


def test_declare_queue_with_args() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-queue_with_args"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
dead_letter_exchange="my_exchange",
dead_letter_routing_key="my_key",
max_len=50000000,
max_len_bytes=1000000000,
expires=2000,
single_active_consumer=True,
)

queue_info = management.declare_queue(queue_specification)

assert queue_specification.name == queue_info.name
assert queue_specification.is_auto_delete == queue_info.is_auto_delete
assert queue_specification.dead_letter_exchange == queue_info.dead_letter_exchange
assert (
queue_specification.dead_letter_routing_key
== queue_info.dead_letter_routing_key
)
assert queue_specification.max_len == queue_info.max_len
assert queue_specification.max_len_bytes == queue_info.max_len_bytes
assert queue_specification.expires == queue_info.expires
assert (
queue_specification.single_active_consumer == queue_info.single_active_consumer
)

management.delete_queue(queue_name)
2 changes: 1 addition & 1 deletion tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_bind_exchange_to_queue() -> None:
management = connection.management()

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

raised = False
Expand Down

0 comments on commit 37570f2

Please sign in to comment.