-
Notifications
You must be signed in to change notification settings - Fork 0
/
nask_esa.py
208 lines (187 loc) · 9.88 KB
/
nask_esa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python3
import argparse
import requests
import json
import re
import time
import pandas as pd
from typing import List
from requests_cache import CachedSession
def get_json(url, ttl):
# requests
headers = {'Accept': 'application/json'}
session = CachedSession('esa_cache', expire_after=ttl, stale_if_error=True, allowable_codes=[200])
r = session.get(url, headers=headers)
json_data = r.json()
json_data["request_stats"] = add_api_stats(r, json_data)
return json_data
def build_elevation_url(lat, lng):
#https://api.opentopodata.org/v1/eudem25m?locations=<lat>,<lng>
url = "https://api.opentopodata.org/v1/eudem25m?locations={},{}".format(lat, lng)
return url
def elevation_get(lat, lng, ttl):
url = build_elevation_url(lat, lng)
data = get_json(url, ttl)
return data["results"][0]["elevation"]
def pressure_sea_level(elevation, pressure, temp):
if pressure is not None and temp is not None:
pressure_sea = pressure / pow(1 - (0.0065 * elevation) / (temp + 0.0065 * elevation + 273.15), 5.257)
else:
pressure_sea = 0
return pressure_sea
def time_epoch(timestamp):
pattern = '%Y-%m-%d %H:%M:%S'
epoch = int(time.mktime(time.strptime(timestamp, pattern)))
return epoch
def add_measurement(global_tags, tags, fields, timestamp):
elev = elevation_get(tags["latitude"], tags["longitude"], 0)
# add elevation to measurements
fields['elevation'] = elev
pressure = fields['pressure_avg']
temp = fields['temperature_avg']
pressure_sea = pressure_sea_level(elev, pressure, temp)
# add pressure with sea elevation and temp compensated
fields['pressure_sea_avg'] = pressure_sea
# TODO add global tags to item tags from school
formated_struct = {}
formated_struct["details"] = tags
formated_struct["measurements"] = fields
formated_struct["timestamp"] = timestamp
return formated_struct
def dedup_dicts(items: List[dict]):
dedu = [json.loads(x) for x in set(json.dumps(item, sort_keys=True) for item in items)]
return dedu
def add_api_stats(requests_data, struct_list):
dict_struct = {}
dict_struct["request_time"] = requests_data.elapsed.total_seconds()
dict_struct["status_code"] = requests_data.status_code
return dict_struct
def get_struct(json_data, mode, global_tags, city, post_code, street, name, longitude, latitude, ttl):
formated_list = []
for item in json_data["smog_data"]:
# check if item is not None and exact match normalized to lowered
if city is not None:
if item["school"]["city"] is not None:
if item["school"]["city"].lower() == city.lower():
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
if post_code is not None:
if item["school"]["post_code"] is not None:
if item["school"]["post_code"].lower() == post_code.lower():
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
if street is not None:
if item["school"]["street"] is not None:
if item["school"]["street"].lower() == post_code.lower():
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
if name is not None:
if item["school"]["name"] is not None:
if item["school"]["name"].lower() == post_code.lower():
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
if longitude is not None:
if item["school"]["longitude"] is not None:
if item["school"]["longitude"] == longitude:
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
if latitude is not None:
if item["school"]["latitude"] is not None:
if item["school"]["latitude"] == latitude:
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
if city is None and post_code is None and street is None and name is None and longitude is None and latitude is None:
formated_list.append(add_measurement(global_tags, item["school"], item["data"], item["timestamp"]))
# uniq items
return_list = dedup_dicts(formated_list)
return return_list # json with fields + tags
def data_output(measurement_name, measurement_name_stats, formated_struct, url, mode, debug, json_data):
if mode == "telegraf-exec" or mode == "telegraf-http":
epo = int(time.time())
curr_epoch = str(epo).split(".")[0][::-1].zfill(19)[::-1]
stats_status = {}
for item in formated_struct:
fields = item["measurements"]
tags = item["details"]
fields_list = []
for kfield, vfield in fields.items():
if vfield is not None:
vfield = "%.4f" % vfield
field = "{k}={v}".format(k=kfield, v=vfield)
fields_list.append(field)
tags_list = []
for ktag, vtag in tags.items():
tag = "{k}={v}".format(k=ktag, v=vtag)
tags_list.append(tag)
tags = ",".join(tags_list)
# escape al chars that are not allowed in influx protocol https://archive.docs.influxdata.com/influxdb/v0.9/write_protocols/write_syntax/#escaping-characters
ntags=tags.replace(" ","\ ")
fields = ",".join(fields_list)
data_string = '{measurement},{tag} {field} {epoch}'.format(measurement=measurement_name, field=fields, tag=ntags, epoch=curr_epoch)
if mode == "telegraf-http":
r = requests.post(url, data=data_string.encode('utf-8'))
if r.status_code != 204 and debug:
print(data_string)
print(r.json())
if r.status_code in stats_status:
stats_status[r.status_code] += 1
else:
stats_status[r.status_code] = 1
# send internal stats about summary of each return coddes writing to telegraf influxdb listener - easy to monitor how many metrics sensing and which one ends with proper codes
for code, count in stats_status.items():
http_write_request_time = r.elapsed.total_seconds()
esa_api_request_time = json_data["request_stats"]["request_time"]
esa_api_status_code = json_data["request_stats"]["status_code"]
data_stats = '{measurement},write_status_code={code},esa_api_status_code={api_status_code} count=1,write_request_time={http_write_request_time},esa_api_request_time={api_request_time} {epoch}'.format(measurement=measurement_name_stats, code=code, api_request_time=esa_api_request_time, api_status_code=esa_api_status_code, http_write_request_time=http_write_request_time, count=count, epoch=curr_epoch)
# same url used as used in metrics sending
requests.post(url, data_stats.encode('utf-8'))
if debug:
print(data_stats)
if mode == "telegraf-exec":
print(data_string)
if mode == "json":
# list of JSON output with indent for better reading
print(json.dumps(formated_struct, indent=4, ensure_ascii=False))
if mode == "table":
# human readable table output
for item in formated_struct:
data_frame = pd.DataFrame.from_dict(item)
print(data_frame)
if mode == "telegraf-http" and debug:
print(stats_status)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', action="store_true", default=False, dest='debug')
parser.add_argument('-c', '--city', action="store", default=None, dest='city')
parser.add_argument('-p', '--post-code', action="store", default=None, dest='post_code')
parser.add_argument('-s', '--street', action="store", default=None, dest='street')
parser.add_argument('-n', '--school-name', action="store", default=None, dest='school_name')
parser.add_argument('-m', '--mode', action="store", default="json", dest='mode', choices=['table', 'json', 'telegraf-exec', 'telegraf-http'])
parser.add_argument('-o', '--longitude', action="store", default=None, dest='longitude')
parser.add_argument('-a', '--latitude', action="store", default=None, dest='latitude')
parser.add_argument('-t', '--telegraf-url', action="store", default="http://localhost:8186/write", dest='telegraf_http_url')
parser.add_argument('-l', '--ttl', action="store", default=1800, dest='ttl', help="Cache TTL for HTTP GET data ESA API in calls")
args = parser.parse_args()
debug=args.debug
mode = args.mode # available raa, human or http outputs
measurement_name = "nask_esa" # for metrics generation and sending
measurement_name_stats = "nask_esa_stats" # this measurement name will be used for stats generated in telegraf-http sending
esa_api_url = "https://public-esa.ose.gov.pl/api/v1/smog"
# telegraf url for listener in telegraf used to accept HTTP data in InfluxData format
# https://github.com/influxdata/telegraf/blob/master/plugins/inputs/influxdb_listener/README.md
telegraf_http_url = args.telegraf_http_url
# static global tags
global_tags = {}
# arguments values
city=args.city
post_code=args.post_code
street=args.street
school_name=args.school_name
longitude=args.longitude
latitude=args.latitude
ttl=args.ttl
# source
# get JSON data via API call
json_data = get_json(esa_api_url, ttl)
# transform
# prepare to formated structure
formated_struct = get_struct(json_data, mode, global_tags, city, post_code, street, school_name, longitude, latitude, ttl)
# send
# output data based on mode
data_output(measurement_name, measurement_name_stats, formated_struct, telegraf_http_url, mode, debug, json_data)
if __name__ == '__main__':
main()