-
Notifications
You must be signed in to change notification settings - Fork 0
/
middleware.py
112 lines (104 loc) · 3.93 KB
/
middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
import logging
import os
import time
from io import BytesIO
from flask import request, g
from confluent_kafka import Producer
friendlyHttpStatus = {
'200': 'OK',
'201': 'Created',
'202': 'Accepted',
'203': 'Non-Authoritative Information',
'204': 'No Content',
'205': 'Reset Content',
'206': 'Partial Content',
'300': 'Multiple Choices',
'301': 'Moved Permanently',
'302': 'Found',
'303': 'See Other',
'304': 'Not Modified',
'305': 'Use Proxy',
'306': 'Unused',
'307': 'Temporary Redirect',
'400': 'Bad Request',
'401': 'Unauthorized',
'402': 'Payment Required',
'403': 'Forbidden',
'404': 'Not Found',
'405': 'Method Not Allowed',
'406': 'Not Acceptable',
'407': 'Proxy Authentication Required',
'408': 'Request Timeout',
'409': 'Conflict',
'410': 'Gone',
'411': 'Length Required',
'412': 'Precondition Required',
'413': 'Request Entry Too Large',
'414': 'Request-URI Too Long',
'415': 'Unsupported Media Type',
'416': 'Requested Range Not Satisfiable',
'417': 'Expectation Failed',
'418': 'I\'m a teapot',
'429': 'Too Many Requests',
'500': 'Internal Server Error',
'501': 'Not Implemented',
'502': 'Bad Gateway',
'503': 'Service Unavailable',
'504': 'Gateway Timeout',
'505': 'HTTP Version Not Supported',
}
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
conf = {
'bootstrap.servers': os.getenv('KAFKA_URL'),
'batch.num.messages': int(os.getenv('KAFKA_BATCH_SIZE', '100')),
'linger.ms': int(os.getenv('KAFKA_BATCH_TIMEOUT', '10000')),
'request.timeout.ms':int(os.getenv('KAFKA_BATCH_TIMEOUT', '10000')),
'compression.type':'zstd',
'acks':0,
}
producer = Producer(**conf)
topic = os.getenv('KAFKA_TOPIC', 'akto.api.logs')
MAX_PAYLOAD_SIZE = int(os.getenv('MAX_PAYLOAD_SIZE', '100000'))
def log_request():
g.request_payload_too_big = request.content_length and request.content_length > MAX_PAYLOAD_SIZE
g.req_payload = '{}'
if not g.request_payload_too_big:
g.req_payload = request.get_data(as_text=True)
request.environ['wsgi.input'] = BytesIO(g.req_payload.encode('utf-8'))
def log_response(response):
try:
if response.content_type and 'json' in response.content_type:
if g.request_payload_too_big:
logger.info("Request payload too big, skipping logging.")
return response
resp_payload = response.get_data(as_text=True)
log_data = {
"akto_account_id": os.getenv("AKTO_ACCOUNT_ID"),
"path": request.path,
"requestHeaders": json.dumps(dict(request.headers)),
"responseHeaders": json.dumps(dict(response.headers)),
"method": request.method,
"requestPayload": g.req_payload,
"responsePayload": resp_payload,
"ip": request.remote_addr,
"time": str(int(time.time())),
"statusCode": str(response.status_code),
"type": request.environ.get('SERVER_PROTOCOL'),
"status": friendlyHttpStatus[str(response.status_code)],
"source": "MIRRORING",
"akto_vxlan_id": "0",
}
try:
producer.produce(topic, value=json.dumps(log_data).encode('utf-8'))
producer.poll(0)
except Exception as e:
logger.error(f"Kafka send error: {e}")
except Exception as f:
logger.error(f"Error in middleware : {f}")
finally:
return response
def setup_middleware(app):
app.before_request(log_request)
app.after_request(log_response)