-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGpsFileReplayer.py
120 lines (100 loc) · 4.03 KB
/
GpsFileReplayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import io
import glob
import time
import json
import random
import avro
import avro.schema
import avro.io
from kafka import KafkaProducer
from time import sleep
from StringIO import StringIO
class GpsFileReplayer:
def __init__(self, server, hbSchemaPath, gpsSchemaPath, topic):
self.server = server
self.gpsSchemaPath = gpsSchemaPath
self.hbSchemaPath = hbSchemaPath
self.topic = topic
self.gpsSchema = self._readSchema(gpsSchemaPath)
self.hbSchema = self._readSchema(hbSchemaPath)
self.producer = self._kafkaProducerPreps(server)
def _readSchema(self, schemaPath):
try:
f = open(schemaPath)
schema = avro.schema.parse(f.read())
f.close()
return schema
except IOError:
print('cannot open schema file.')
raise
def _kafkaProducerPreps(self, server):
producer = KafkaProducer(bootstrap_servers=self.server)
return producer
def replay(self, gpsFilesDir):
# get all the files in the given directory
files = glob.glob(gpsFilesDir + '/*.log')
# create isoblue id and file name mapping
ibIds = ['ibt0{}'.format(i) for i in range(1, len(files)+1)]
ibIdDict = dict(zip(files, ibIds))
print ibIdDict
# read all the files simultaneously
file_handles = {filename: open(filename, 'r') for filename in files}
while file_handles:
sleep(1)
print ' '
for filename, f in file_handles.items():
line = next(f, None)
if line is not None:
line = line.rstrip('\n').split(' ')
# print filename, line
# print filename, line[1], line[2]
ts = int(time.time())
# fill in the gps datum
datum = {
'object_name': 'TPV',
'object': {
'lat': float(line[1]),
'lon': float(line[2]),
'time': ts,
'status': None,
'alt': None,
'epx': None,
'epy': None,
'epv': None,
'track': None,
'speed': None,
'climb': None,
'epd': None,
'eps': None,
'epc': None
}
}
print filename, json.loads(json.dumps(datum))
gpsDatum = avro.io.DatumWriter(self.gpsSchema)
bytesWriter = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytesWriter)
gpsDatum.write(json.loads(json.dumps(datum)), encoder)
gpsBuf = bytesWriter.getvalue()
self.producer.send(\
self.topic, key='gps:' + ibIdDict[filename], \
value=gpsBuf)
datum = {}
datum = {
'timestamp': ts,
'wifins': random.randint(-80, -70),
'cellns': random.randint(-90, -80),
'netled': True,
'statled': True
}
hbDatum = avro.io.DatumWriter(self.hbSchema)
bytesWriter = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytesWriter)
hbDatum.write(json.loads(json.dumps(datum)), encoder)
hbBuf = bytesWriter.getvalue()
self.producer.send(\
'debug', key='hb:' + ibIdDict[filename], \
value=hbBuf)
print filename, json.loads(json.dumps(datum))
else:
f.close()
file_handles.pop(filename)