-
Notifications
You must be signed in to change notification settings - Fork 2
/
Workflow.py
179 lines (162 loc) · 6 KB
/
Workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
this file define some standard workflow,
combining functions from the other files
"""
import StateFinder.Processing as pr
import Utils.DataSource as ds
import Utils.Converter as cv
import StateFinder.LookupTable as lt
import StateFinder.Symbolizer as sbz
import Utils.FileUtils as fu
from subprocess import call
import pickle
def convert_median_filter(src, dst, win):
"""
Apply median filtering to the time series
"""
dat = ds.FileDataSource(src, dst)
pmf = pr.MedianFilteringProcess(win)
dat.load()
dat.data = pmf.batch_process(dat.data)
dat.save()
def split_file_by(filename, folder, offset=0, duration=86400):
"""
split the file for applying the forecasting algorithm
"""
src = ds.FileDataSource(filename, None)
cut = fu.PeriodicCutProcess(int(duration), int(offset))
src.load()
src.data = cut.batch_process(src.data)
spl = fu.Splitter(src.data)
spl.splitFiles(folder, int(duration), int(offset))
def convert_rle(src, dst):
"""
apply RLE compression
"""
dat = ds.FileDataSource(src, dst)
prle = pr.RLEProcess()
dat.load()
dat.data = prle.batch_process(dat.data)
dat.save()
def convert_symbols_to_raw(src, dst, model, level, tolevel=0):
"""
convert a symbolic time series to lower level or raw data
"""
if level == "raw":
return
if level == "symbol":
level = 0
if level == "rle":
level = 1
if level == "statecluster":
level = 2
prle = pr.RLEProcess()
dat = ds.FileDataSource(src, src)
dat.load()
dat.data = prle.batch_process(dat.data)
dat.save()
if int(tolevel) == 0:
ctr = cv.ToRaw()
ctr.convert(src, dst, model, int(level))
else:
cts = cv.ToSymbols()
cts.convert(src, dst, model, int(level))
if int(tolevel) == 2:
prle = pr.RLEProcess()
dat = ds.FileDataSource(dst, dst)
dat.load()
dat.data = prle.batch_process(dat.data)
dat.save()
def convert_from_csv(src, dst):
"""
read a CSV file and generate a one dimensional time series
various functions for aggregation can be defined here
"""
cfc = cv.FromCSV()
#f = lambda tab: int(1000*math.sqrt(float(tab)**2+float(tab)**2+float(tab)**2))
#f = lambda tab: int(float(tab[0])*1000)
fun = lambda tab: int(float(tab[0]))
cfc.convert(src, dst, fun)
def get_distance(file0, file1, rate):
dat0 = ds.FileDataSource(file0, None)
dat1 = ds.FileDataSource(file1, None)
dat0.load()
dat1.load()
gdt = fu.Get_Distance()
dist = gdt.levenshtein(dat0.data, dat1.data, int(rate))
print "total-distance (d) = %d"%(dist[0])
print "total-time-length (l) = %d"%(dist[1])
print "normalized-distance (d/l) = %f"%(dist[0]*1.0/dist[1])
def find_states(inputfile, outputname, rate, smethod, snbr, ememory, ethreshold, mindist):
"""
batch process using standard symbolization
- input csv file with triple (time start, time end, value)
- base name for output files (can include a folder)
- sampling rate of the input time series
- symbolization method (0:unifom, 1:median, 2:distinct median)
- number of symbols to generates
- StateFinder fading factor
- StateFinder prediciton error threshold
- StateFinder min distance for clustering segments (0-1)
"""
src = ds.FileDataSource(inputfile, outputname+"-statefinder.csv")
src.load()
enc = sbz.UniformSymbolizer()
if smethod == "1":
enc = sbz.MedianSymbolizer()
if smethod == "2":
enc = sbz.DistinctMedianSymbolizer()
enc.load(src.data)
(sep, mini, maxi) = enc.get_separators(int(snbr))
sym = pr.SymbolizeProcess(1, sep)
rel = pr.RLEProcess()
sem = pr.SegmentSparseProcess(rate, ethreshold, ememory)
clu = pr.ClusterSparseProcess(mindist, int(snbr)+1)
src.data = sym.batch_process(src.data)
src.save_to(outputname+"-symbol.csv")
src.data = rel.batch_process(src.data)
src.save_to(outputname+"-rle.csv")
segments = sem.batch_process(src.data)
(src.data, lookup) = clu.batch_process(segments, src.data)
src.save()
lookups = {0:lt.SymbolLookupTable(sep, mini, maxi),
1:lt.ExpandLookupTable(rate),
2:(lt.ClusterSparseLookupTable(lookup, rate))}
lkf = open(outputname+"-model.mdl", 'w')
pickle.dump(lookups, lkf)
lkf.close()
def find_states_spclust(inputfile, outputname, rate, dimensions, wgrid, wnbr, ememory, ethreshold, mindist):
"""
batch process using spclust symbolization
- input csv file with triple (time start, time end, values)
- base name for output files (can include a folder)
- sampling rate of the input time series
- number of dimensions of the input time series
- Spclust grid size
- Spclust count threshold
- StateFinder fading factor
- StateFinder prediciton error threshold
- StateFinder min distance for clustering segments (0-1)
"""
call(["java", "-jar", "./Spclust/SpComputeModel.jar", inputfile,
dimensions, wgrid, wnbr, outputname+"-model.spc"])
call(["java", "-jar", "./Spclust/SpComputeSymbols.jar",
outputname+"-model.spc", inputfile, outputname+"-symbol.csv"])
nbclusters = int(open(outputname+"-model.spcn", 'r').readline())
src = ds.FileDataSource(outputname+"-symbol.csv", outputname+"-statefinder.csv")
rel = pr.RLEProcess()
sem = pr.SegmentSparseProcess(rate, ethreshold, ememory)
clu = pr.ClusterSparseProcess(mindist, nbclusters)
src.load()
src.data = rel.batch_process(src.data)
src.save_to(outputname+"-rle.csv")
src.data = rel.batch_process(src.data)
segments = sem.batch_process(src.data)
(src.data, lookup) = clu.batch_process(segments, src.data)
src.save()
lookups = {0:lt.SpclustSymbolLookupTable(outputname+"-model.spc"),
1:lt.ExpandLookupTable(rate),
2:(lt.ClusterSparseLookupTable(lookup, rate))}
lkf = open(outputname+"-model.mdl", 'w')
pickle.dump(lookups, lkf)
lkf.close()