Skip to content

Commit

Permalink
Better error messages #14
Browse files Browse the repository at this point in the history
  • Loading branch information
heming-langrenn committed Jul 30, 2023
1 parent 38b5218 commit bb8cbd1
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 126 deletions.
85 changes: 51 additions & 34 deletions photo_service/services/azure_servicebus_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
from photo_service.adapters import VideoEventsAdapter
from photo_service.models.video_event_model import VideoEvent

NAMESPACE_CONNECTION_STR = os.getenv("SERVICEBUS_NAMESPACE_CONNECTION_STR", "")


class AzureServiceBusService:
"""Class for Azure Service Bus Service."""

@classmethod
async def send_message(cls: Any, message: str, queue_name: str) -> None:
"""Send message to Azure Service Bus."""
NAMESPACE_CONNECTION_STR = os.getenv("SERVICEBUS_NAMESPACE_CONNECTION_STR", "")
if not NAMESPACE_CONNECTION_STR:
raise HTTPUnprocessableEntity(
reason="Missing property SERVICEBUS_NAMESPACE_CONNECTION_STR."
)
async with ServiceBusClient.from_connection_string(
conn_str=NAMESPACE_CONNECTION_STR, logging_enable=True
) as client:
Expand All @@ -29,43 +32,57 @@ async def send_message(cls: Any, message: str, queue_name: str) -> None:

@classmethod
async def receive_messages(
cls: Any, db: Any, event_id: str, queue_name: str
cls: Any, db: Any, event_id: str, my_queue_name: str
) -> list:
"""Receive messages from Azure Service Bus - return list of message ids."""
message_id_list = []
NAMESPACE_CONNECTION_STR = os.getenv("SERVICEBUS_NAMESPACE_CONNECTION_STR", "")
if not NAMESPACE_CONNECTION_STR:
raise HTTPUnprocessableEntity(
reason="Missing property SERVICEBUS_NAMESPACE_CONNECTION_STR."
)

# create a Service Bus client using the connection string
async with ServiceBusClient.from_connection_string(
conn_str=NAMESPACE_CONNECTION_STR, logging_enable=False
) as servicebus_client:
async with servicebus_client:
# get the Queue Receiver object for the queue
receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
async with receiver:
received_msgs = await receiver.receive_messages(
max_wait_time=5, max_message_count=20
try:
async with ServiceBusClient.from_connection_string(
conn_str=NAMESPACE_CONNECTION_STR, logging_enable=True
) as servicebus_client:
async with servicebus_client:
# get the Queue Receiver object for the queue
receiver = servicebus_client.get_queue_receiver(
queue_name=my_queue_name
)
for message in received_msgs:
logging.debug(f"Received message: {message}")
# store message in database - create video_event, id from remote id
try:
message_dict = json.loads(str(message))
message_dict["id"] = str(message.message_id)
message_dict["event_id"] = event_id
video_event = VideoEvent.from_dict(message_dict)
async with receiver:
received_msgs = await receiver.receive_messages(
max_wait_time=5, max_message_count=20
)
for message in received_msgs:
logging.debug(f"Received message: {message}")
# store message in database - create video_event, id from remote id
try:
message_dict = json.loads(str(message))
message_dict["id"] = str(message.message_id)
message_dict["event_id"] = event_id
message_dict["queue_name"] = my_queue_name
video_event = VideoEvent.from_dict(message_dict)

# insert new video_event
result = await VideoEventsAdapter.create_video_event(
db, video_event.to_dict()
)
logging.debug(f"Inserted video_event with id: {id}")
if result:
message_id_list.append(id)
# complete the message so that the message is removed from the queue
await receiver.complete_message(message)
except KeyError as e:
raise HTTPUnprocessableEntity(
reason=f"Mandatory property {e.args[0]} is missing."
) from e
# insert new video_event
result = await VideoEventsAdapter.create_video_event(
db, video_event.to_dict()
)
logging.debug(f"Inserted video_event with id: {id}")
if result:
message_id_list.append(id)
# complete the message so that the message is removed from the queue
await receiver.complete_message(message)
except KeyError as e:
raise HTTPUnprocessableEntity(
reason=f"Mandatory property {e.args[0]} is missing."
) from e

logging.debug("End for message async")
logging.debug("End for message async")
except Exception as e:
raise HTTPUnprocessableEntity(
reason=f"Error receiving messages from Azure Service Bus: {e}"
) from e
return message_id_list
4 changes: 1 addition & 3 deletions photo_service/views/video_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ async def get(self) -> Response:
try:
event_id = self.request.rel_url.query["eventId"]
except Exception as e:
raise HTTPBadRequest(
reason="Mandatory param is missing - eventId."
) from e
raise HTTPBadRequest(reason="Mandatory param is missing - eventId.") from e

# get all video_events
video_events = await VideoEventsAdapter.get_all_video_events(db, event_id)
Expand Down
Loading

0 comments on commit bb8cbd1

Please sign in to comment.