Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reformat with black #6

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os

workers = int(os.environ.get('GUNICORN_PROCESSES', '2'))
threads = int(os.environ.get('GUNICORN_THREADS', '1'))
#bind = "0.0.0.0:8080"
workers = int(os.environ.get("GUNICORN_PROCESSES", "2"))
threads = int(os.environ.get("GUNICORN_THREADS", "1"))
# bind = "0.0.0.0:8080"

forwarded_allow_ips = '*'
secure_scheme_headers = { 'X-Forwarded-Proto': 'https' }
forwarded_allow_ips = "*"
secure_scheme_headers = {"X-Forwarded-Proto": "https"}
24 changes: 13 additions & 11 deletions data/data_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def calculatePerCentDiffs(xDataVals, yDataVals, regSlope, regIntersept):
for point in zip(xDataVals, yDataVals):
regY = regSlope * point[0] + regIntersept
yErrors.append(100 * (point[1] - regY) / (point[1] + 0.01))

return np.array(yErrors)


Expand All @@ -28,13 +28,13 @@ def find_anomalies(df):
# Define datetime window size
datetimeWindowSize = 43
# Get data as two numpy arrays: xvals and yvals
xStrDates = df.iloc[:,0]
yVals = df.iloc[:,1]
xStrDates = df.iloc[:, 0]
yVals = df.iloc[:, 1]

# Convert dates as strings to dates as datetime.datetime values
#dates_list = [dt.datetime.strptime(date, '%Y-%m-%d %H:%M:%S') for date in xStrDates]
# dates_list = [dt.datetime.strptime(date, '%Y-%m-%d %H:%M:%S') for date in xStrDates]
# Convert list of datetime.datetime values to ndarray of
xVals = mdates.date2num(df.iloc[:,0])
xVals = mdates.date2num(df.iloc[:, 0])

fit = np.polyfit(np.array(xVals), np.array(yVals), deg=1)

Expand All @@ -45,17 +45,19 @@ def find_anomalies(df):

anomalies = list()
for diff, index in zip(yPerCentDiffs, df.index.values):
if (diff < mean - anomalyStdDevFactor * std) or (diff > mean + anomalyStdDevFactor * std):
if (diff < mean - anomalyStdDevFactor * std) or (
diff > mean + anomalyStdDevFactor * std
):
anomalies.append(index)

print('**********************************')
print("**********************************")
print(len(anomalies))
print('**********************************')
print("**********************************")
return anomalies


def load_sensor(sensor='sensor_25'):
def load_sensor(sensor="sensor_25"):
query = AnomalyDataService
df_data = query.get_all_data()
df_sensor = df_data[['sensortimestamp', sensor]]
return df_sensor, find_anomalies(df_sensor)
df_sensor = df_data[["sensortimestamp", sensor]]
return df_sensor, find_anomalies(df_sensor)
170 changes: 119 additions & 51 deletions managers/preprocess_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
from managers.synthesize_data_manager import SynthesizeDataManager



class PreprocessDataManager:
"""Prepares a timeseries data point for graphing
The class uses a generator that it receives from a data source. A timeseries data point is received
and the method, process_point() is used to process the point. When the point has been processed, it
yields a message that contains json data that will be consumed.

"""
def __init__(self, data_source, regress_group_size, points_group_size, col_name, anomaly_std_factor, max_window_size, speed_up, csv_file_name=None):

def __init__(
self,
data_source,
regress_group_size,
points_group_size,
col_name,
anomaly_std_factor,
max_window_size,
speed_up,
csv_file_name=None,
):
"""
:param regress_group_size: Size in data points of how many points will be included in the linear regression
calculation.
Expand All @@ -32,13 +42,16 @@ def __init__(self, data_source, regress_group_size, points_group_size, col_name,
self.points_group_size = points_group_size # Not used in this first version
self.col_name = col_name # col name of feature to plot
self.file_name = csv_file_name
self.regress_buffX = [] # Fixed size buffer. Size is limited to value of self.regress_plot_size
self.regress_buffX = (
[]
) # Fixed size buffer. Size is limited to value of self.regress_plot_size
self.regress_buffY = []
self.anomaly_std_factor = anomaly_std_factor # Defines how many STD that determine an anomaly
self.anomaly_std_factor = (
anomaly_std_factor # Defines how many STD that determine an anomaly
)
self.speed_up = speed_up
self.max_window_size = max_window_size


def process_point(self):
"""Process one point and put calculated data into a json format.
This function uses a generator that yields a timeseries data point from a data source. When the data source
Expand Down Expand Up @@ -67,32 +80,35 @@ def process_point(self):
"""
x_old_idx = 0
y_percent_diff_old = 0
plot_color = 'green'
graphRange = [0,1]
plot_color = "green"
graphRange = [0, 1]

sdm = SynthesizeDataManager()
# This generator yields when one point is available from the data source
if (self.data_source == 'csv'):
gen = sdm.csv_line_reader(self.file_name, self.col_name, self.speed_up) # this is a generator
elif (self.data_source == 'postgres'):
if self.data_source == "csv":
gen = sdm.csv_line_reader(
self.file_name, self.col_name, self.speed_up
) # this is a generator
elif self.data_source == "postgres":
gen = sdm.load_sensor(self.col_name, self.speed_up)
else:
gen = sdm.synthesize_data(self.col_name, self.speed_up)


row = next(gen, None) # list of two strings
graphRange = sdm.return_range()
json_data = {'range':graphRange}
json_data = {"range": graphRange}

yield "event: initialize\ndata: " + json.dumps(json_data) + "\n\n" # Initialize plot
yield "event: initialize\ndata: " + json.dumps(
json_data
) + "\n\n" # Initialize plot

while True:
# print("rowcounter: {}".format(self.row_counter))
# Use the generator's next() with a param of None. If the generator is out of data, next() will
# return None.
row = next(gen, None) # list of two strings
if row is None:
print('row is none')
print("row is none")
# print("Job Finished")
# Generator is exhausted, yield message "jobfinished"
yield "event: jobfinished\ndata: " + "none" + "\n\n"
Expand All @@ -110,30 +126,58 @@ def process_point(self):

regress_start_idx = len(self.regress_buffX) - self.regress_plot_size
# Get endpoints of regression line for plotting
x_start_p, x_end_p, y_start_p, y_end_p = self.__get_endpoints_for_regr_line( fit, regress_start_idx)

y_percent_diffs = self.__calc_percent_diffs(fit) # Get all percent diffs for all points in regress buffer
std_for_buffer = np.std(y_percent_diffs) # STD of percent diffs in regress buffer
mean_for_buffer = np.mean(y_percent_diffs) # mean of percent diffs in regress buffer
(
x_start_p,
x_end_p,
y_start_p,
y_end_p,
) = self.__get_endpoints_for_regr_line(fit, regress_start_idx)

y_percent_diffs = self.__calc_percent_diffs(
fit
) # Get all percent diffs for all points in regress buffer
std_for_buffer = np.std(
y_percent_diffs
) # STD of percent diffs in regress buffer
mean_for_buffer = np.mean(
y_percent_diffs
) # mean of percent diffs in regress buffer

# Get percent diff for the current point, which is at the end of the regress_buffX
y_percent_diff = self.calculate_percent_diff_for_curr_point((len(self.regress_buffX) - 1), fit)
if y_percent_diff < (mean_for_buffer - self.anomaly_std_factor * std_for_buffer) or \
(y_percent_diff > (mean_for_buffer + self.anomaly_std_factor * std_for_buffer)):
plot_color = 'red'
y_percent_diff = self.calculate_percent_diff_for_curr_point(
(len(self.regress_buffX) - 1), fit
)
if y_percent_diff < (
mean_for_buffer - self.anomaly_std_factor * std_for_buffer
) or (
y_percent_diff
> (mean_for_buffer + self.anomaly_std_factor * std_for_buffer)
):
plot_color = "red"
y_percent_diff_new = y_percent_diff

# add [x_old_point, x_new_point] and [y_percent_diff_old, y_percent_diff_new] and plot_color to json
x_old_p = self.regress_buffX[len(self.regress_buffX) - 2]
x_new_p = self.regress_buffX[len(self.regress_buffX) - 1]
json_data = self.create_json(timestamp, sensor_val,
x_start_p, x_end_p, y_start_p, y_end_p,
x_old_p, x_new_p, y_percent_diff_old, y_percent_diff,
plot_color, self.row_counter, self.max_window_size)
json_data = self.create_json(
timestamp,
sensor_val,
x_start_p,
x_end_p,
y_start_p,
y_end_p,
x_old_p,
x_new_p,
y_percent_diff_old,
y_percent_diff,
plot_color,
self.row_counter,
self.max_window_size,
)
# print("Regress slope: {} Regress intspt: {}".format(fit[0],fit[1]))
print("Server json data: {} ".format(json_data) )
print("Server json data: {} ".format(json_data))
y_percent_diff_old = y_percent_diff_new
plot_color = 'green'
plot_color = "green"
self.row_counter = self.row_counter + self.points_group_size
yield "event: update\ndata: " + json.dumps(json_data) + "\n\n"
else:
Expand All @@ -143,8 +187,21 @@ def process_point(self):
# regression line. So while we are waiting for the buffer window to fill, we still plot
# the sensor data as points.
self.row_counter = self.row_counter + self.points_group_size
json_data = self.create_json(timestamp, sensor_val, None, None, None, None, None,
None, None, None, None, None, max_window_size=self.max_window_size)
json_data = self.create_json(
timestamp,
sensor_val,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
max_window_size=self.max_window_size,
)
yield "event: update\ndata: " + json.dumps(json_data) + "\n\n"
next(gen)

Expand All @@ -165,7 +222,6 @@ def __get_endpoints_for_regr_line(self, regr_fit, regress_start_index):
y_end_p = x_end_idx * regr_fit[0] + regr_fit[1]
return x_start_p, x_end_p, y_start_p, y_end_p


def __calc_percent_diffs(self, regr_fit):
"""Calculate y diffs between data points and regression line for all points in the buffer

Expand Down Expand Up @@ -208,21 +264,34 @@ def get_fit_function(self, xarr, yarr): # pass regress_buffX and regress_buffY
xarr_temp = xarr.copy()
yarr_temp = yarr.copy() # list of strings representing floats

xarr_temp.pop() # Remove current point since it may be an anomaly.
xarr_temp.pop() # Remove current point since it may be an anomaly.
yarr_temp.pop()
float_list = [] # Must convert yarr_temp to numpy array of floats.
for item in yarr_temp:
float_list.append(float(item))
yarr_converted = np.array(float_list)
# Make temp array of integers that serve as x index for points in the buffer.
numericX = np.arange(len(xarr_temp))
fit = np.polyfit(numericX, yarr_converted, 1) # deg of 1
fit = np.polyfit(numericX, yarr_converted, 1) # deg of 1

return fit

def create_json(self, timestamp, sensor_val,
x1, x2, y1, y2, x_old_p, x_new_p, y_percent_diff_old,
y_percent_diff, plot_color, row_counter, max_window_size):
def create_json(
self,
timestamp,
sensor_val,
x1,
x2,
y1,
y2,
x_old_p,
x_new_p,
y_percent_diff_old,
y_percent_diff,
plot_color,
row_counter,
max_window_size,
):
"""Put all parameters into a JSON string

:param timestamp:
Expand All @@ -239,19 +308,19 @@ def create_json(self, timestamp, sensor_val,
:param row_counter:
:return: Json string
"""
plot_dict = {'plotpoint': [timestamp, sensor_val],
'regress': {'xs': [x1, x2],
'ys': [y1, y2]
},
'calc': {'x1': x_old_p,
'x2': x_new_p,
'y_diff1': y_percent_diff_old,
'y_diff2': y_percent_diff,
'plot_color': plot_color,
'row_counter': row_counter,
'max_window_size': max_window_size
}
}
plot_dict = {
"plotpoint": [timestamp, sensor_val],
"regress": {"xs": [x1, x2], "ys": [y1, y2]},
"calc": {
"x1": x_old_p,
"x2": x_new_p,
"y_diff1": y_percent_diff_old,
"y_diff2": y_percent_diff,
"plot_color": plot_color,
"row_counter": row_counter,
"max_window_size": max_window_size,
},
}
return plot_dict

def init_plot(self, graphRange):
Expand All @@ -264,4 +333,3 @@ def stop_plot(self):
"""Currently not used"""
print("PreprocessDataManager.stop_plot()")
yield "event: stop\ndata: " + "none" + "\n\n"

Loading