Skip to content

Commit

Permalink
improving url helper
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Jan 8, 2025
1 parent b01834d commit a4a15c8
Showing 1 changed file with 59 additions and 7 deletions.
66 changes: 59 additions & 7 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,57 @@
from .entities import BindingSpecification


def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
binding_path_wth_exchange_queue_key = (
"/bindings"
+ "/"
+ "src="
+ bind_specification.source_exchange
+ ";"
+ "dstq="
+ bind_specification.destination_queue
+ ";key="
+ bind_specification.binding_key
+ ";args="
)
return binding_path_wth_exchange_queue_key


def encode_path_segment(input: str) -> str:
encoded_input = ""
for character in input:
if is_unreserved(character):
encoded_input += character

else:
# encoded_input += "%%%02X" % character
encoded_input += hex(ord(character))

return encoded_input


def exchange_address(exchange_name: str, routing_key: str = "") -> str:
if routing_key == "":
path = "/exchanges/" + exchange_name
path = "/exchanges/" + encode_path_segment(exchange_name)
else:
path = "/exchanges/" + exchange_name + "/" + routing_key
path = (
"/exchanges/"
+ encode_path_segment(exchange_name)
+ "/"
+ encode_path_segment(routing_key)
)

return path


def queue_address(name: str) -> str:
path = "/queues/" + name
def queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name)

return path


def purge_queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name) + "/messages"

return path

Expand All @@ -28,17 +68,29 @@ def path_address() -> str:
return path


def is_unreserved(input: str) -> bool:
return (
(input >= "A" and input <= "Z")
or (input >= "a" and input <= "z")
or (input >= "0" and input <= "9")
or input == "-"
or input == "."
or input == "_"
or input == "~"
)


def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
binding_path_wth_exchange_queue_key = (
"/bindings"
+ "/"
+ "src="
+ bind_specification.source_exchange
+ encode_path_segment(bind_specification.source_exchange)
+ ";"
+ "dstq="
+ bind_specification.destination_queue
+ encode_path_segment(bind_specification.destination_queue)
+ ";key="
+ bind_specification.binding_key
+ encode_path_segment(bind_specification.binding_key)
+ ";args="
)
return binding_path_wth_exchange_queue_key

0 comments on commit a4a15c8

Please sign in to comment.