-
Notifications
You must be signed in to change notification settings - Fork 0
/
bridge_grpc_service.py
111 lines (85 loc) · 3.17 KB
/
bridge_grpc_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Bridge gRPC Service.
This program is free software: you can redistribute it under the terms
of the GNU General Public License, v. 3.0. If a copy of the GNU General
Public License was not distributed with this file, see <https://www.gnu.org/licenses/>.
"""
import grpc
import bridge_pb2
import bridge_pb2_grpc
from utils import get_logger
logger = get_logger(__name__)
class BridgeService(bridge_pb2_grpc.EntityServiceServicer):
"""Bridge Service Descriptor"""
def handle_create_grpc_error_response(
self, context, response, sys_msg, status_code, **kwargs
):
"""
Handles the creation of a gRPC error response.
Args:
context: gRPC context.
response: gRPC response object.
sys_msg (str or tuple): System message.
status_code: gRPC status code.
user_msg (str or tuple): User-friendly message.
error_type (str): Type of error.
Returns:
An instance of the specified response with the error set.
"""
user_msg = kwargs.get("user_msg")
error_type = kwargs.get("error_type")
if not user_msg:
user_msg = sys_msg
if error_type == "UNKNOWN":
logger.exception(sys_msg)
context.set_details(user_msg)
context.set_code(status_code)
return response()
def handle_request_field_validation(
self, context, request, response, required_fields
):
"""
Validates the fields in the gRPC request.
Args:
context: gRPC context.
request: gRPC request object.
response: gRPC response object.
required_fields (list): List of required fields, can include tuples.
Returns:
None or response: None if no missing fields,
error response otherwise.
"""
def validate_field(field):
if not getattr(request, field, None):
return self.handle_create_grpc_error_response(
context,
response,
f"Missing required field: {field}",
grpc.StatusCode.INVALID_ARGUMENT,
)
return None
for field in required_fields:
validation_error = validate_field(field)
if validation_error:
return validation_error
return None
def PublishContent(self, request, context):
"""Handles publishing bridge payload"""
response = bridge_pb2.PublishContentResponse
def validate_fields():
return self.handle_request_field_validation(
context, request, response, ["content"]
)
try:
invalid_fields_response = validate_fields()
if invalid_fields_response:
return invalid_fields_response
except Exception as exc:
return self.handle_create_grpc_error_response(
context,
response,
exc,
grpc.StatusCode.INTERNAL,
user_msg="Oops! Something went wrong. Please try again later.",
error_type="UNKNOWN",
)