This repository has been archived by the owner on Sep 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
intensity.py
188 lines (164 loc) · 7.48 KB
/
intensity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import json
from typing import List, Optional, Tuple
import pandas as pd
import numpy as np
from scipy.special import softmax
from utils.time import TimestampAgg
from utils.ts import TSTransform, CompoundTransform
from utils.logger import setupLogging
from utils.dataloader import HuaweiDataset
from model.similarity import DTW, Aggregator
class AID:
def __init__(self):
# initialize logger
loggerName = "AID"
self._logger = setupLogging('logs', loggerName)
# initialize data loader
self._loader = HuaweiDataset()
def _filterCandidate(self, candidateList):
"""Only return candidate calls whose parents appear as others' children
Args:
candidateList: a list of dict indicating calls
Returns:
candidateList: a list of filtered calls
"""
childSet = set(map(lambda x: x['c'], candidateList))
parentSet = set(map(lambda x: x['p'], candidateList))
self._logger.info(f"No. of child services: {len(childSet)}")
self._logger.info(f"No. of parent services: {len(parentSet)}")
self._logger.info(
f"No. of services in both child and parent: {len(childSet & parentSet)}")
self._logger.info(
f"No. of parent not in child: {len(parentSet-childSet)}")
self._logger.info(
f"No. of child not in parent: {len(childSet-parentSet)}")
filteredCand = sorted(filter(lambda x: x['p'] in childSet, candidateList),
key=lambda x: x['cnt'],
reverse=True)
return filteredCand
def _calculateKPIDistance(self,
filteredCand,
TSDict,
kpiList,
rowIdx,
transformOperations,
mpw: int,
metricAggFunc=Aggregator.mean_agg,
kpiNorm: str = "minmax"):
"""Calculate the intensity of dependency
Args:
filteredCand: a list of filtered candidates, see self.eval()
TSDict: kpi series, see self.eval()
kpiList: name of kpis to use, see self.eval()
rowIdx: the time index (bin index), see self.eval()
transformOperations: the normalization of time series, see self.eval()
mpw: max propagation window, check the DSW algorithm for details
metricAggFunc: how to aggregate the metrics in each bin, default is mean aggregation
kpiNorm: normalize the distances of the same kpi, can be "minmax" or "softmax"
Returns:
candidateList: a list of filtered calls
"""
def transform(TSDict, cmdbId, kpi, rowIdx):
srs = pd.Series(TSDict.loc[cmdbId][kpi], index=rowIdx).fillna(0)
return CompoundTransform(srs, transformOperations)
for item in filteredCand:
for kpi in kpiList:
# TODO
# if the input array is constant (usually because we cannot detect any error)
# then we should mark it as UNKNOWN
item[f'dsw-{kpi}'] = DTW.dsw_distance(
transform(TSDict, item['c'], kpi, rowIdx),
transform(TSDict, item['p'], kpi, rowIdx),
mpw=mpw)
if kpiNorm == "softmax":
for kpi in kpiList:
allValues = np.array(
list(map(lambda x: x[f'dsw-{kpi}'], filteredCand)))
x = softmax(allValues)
assert len(x) == len(filteredCand)
for idx, candidate in enumerate(filteredCand):
candidate[f'normalized-dsw-{kpi}'] = x[idx]
elif kpiNorm == "minmax":
for kpi in kpiList:
allValues = list(map(lambda x: x[f'dsw-{kpi}'], filteredCand))
maxValue = np.max(allValues)
minValue = np.min(allValues)
for candidate in filteredCand:
candidate[f'normalized-dsw-{kpi}'] = candidate[f'dsw-{kpi}'] - minValue
if maxValue - minValue > 0:
candidate[f'normalized-dsw-{kpi}'] /= maxValue - minValue
else:
raise NotImplementedError
# calculate intensity
for candidate in filteredCand:
sims_dsw = []
for kpi in kpiList:
sims_dsw.append(candidate[f'normalized-dsw-{kpi}'])
# distance = 0 -> most similar, so need to use 1-agg
candidate[f'intensity'] = 1-metricAggFunc(sims_dsw)
filteredCand.sort(key=lambda x: x[f'intensity'], reverse=True)
return filteredCand
def eval(self,
path: str,
start: str,
end: str,
interval: int = 1,
transformOperations: List[Tuple] = [('ZN',), ("MA", 15)],
mpw: int = 5):
"""interface for evaluating dependency intensity
Args:
path: csv file name
start: start date or time, eight-digit date YYYYMMDD
end: end date or time, eight-digit date YYYYMMDD
interval: aggregation interval. 1 minute is recommeneded.
transformOperations
Returns:
intensity: a list of dicts, sorted by intensity value, higher
value indicates higher dependency intensity
"""
# 1. load file
self._logger.info(f"File name: {path}")
candidateList, TSDict, cmdbList, kpiList = self._loader.load(
path,
tsAggFunc=TimestampAgg.toFreqMinute,
tsAggFreq=int(interval))
self._logger.info(f"Finish loading dataset")
# 2. preprocess
# filter candidate
self._logger.info(
f"No. of candidates before filter: {len(candidateList)}")
candidateList = self._filterCandidate(candidateList)
self._logger.info(
f"No. of candidates after filter: {len(candidateList)}")
# filter data point
def genDate(datestr):
return f"{datestr[:4]}-{datestr[4:6]}-{datestr[6:8]}"
rowIdx = pd.date_range(f"{genDate(start)} 00:00:00",
f"{genDate(end)} 23:59:00", freq=f'{interval}T')
self._logger.info(f"Time start: {rowIdx[0]}")
self._logger.info(f"Time end: {rowIdx[-1]}")
# 3. Calculate intensity
self._logger.info("Calculate inensity")
self._logger.info(f"Applied Transformations: {transformOperations}")
self._logger.info(f"DSW Max Propagation Window: {mpw}")
intensityList = self._calculateKPIDistance(candidateList, TSDict, kpiList, rowIdx,
transformOperations=transformOperations,
mpw=mpw,
metricAggFunc=Aggregator.mean_agg)
self._logger.info("Finish calculating intensity")
# remove unnecessary attributes
intensityList = list(map(
lambda x: {"c": x["c"],
"p": x["p"],
"intensity": x["intensity"]},
intensityList)
)
return intensityList
if __name__ == "__main__":
# uasge example
aid = AID()
intensity = aid.eval("data/industry/status_1min_20210411.csv.xz",
start="20210411",
end='20210411')
with open("intensity.json", 'w') as f:
json.dump(intensity, f, indent=4)