-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.py
258 lines (223 loc) · 8.96 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import numpy as np
import os, sys, pickle, random
import task_generator as task_gen
"""
Returns the utilization of a task
"""
def task_util(task):
return float(task['budget']) / task['period']
# GEKKO = 0
model = None
# """
# returns the end-to-end delay of a pipeline
# """
# def end_to_end_delay(pipeline):
# # Throw Error
# a = pipeline['dummy']
# return sum([x[1] for x in pipeline])
"""
Davare et al.[2019] - End-to-end delay upper bound:
response-time replaced by period
"""
def end_to_end_delay_davare(pipeline):
return 2 * sum([x[1] for x in pipeline])
"""
Durr et al.[2019] - End-to-end delay upper bound:
response-time replaced by period
"""
def end_to_end_delay_durr(pipeline):
e2e_ub = 0
N = len(pipeline)
for i in range(0, N - 1):
# If next task is of lower priority (higher or equal period), then P = 0
I = (0 if pipeline[i + 1][1] >= pipeline[i][1] else 1)
e2e_ub += max(pipeline[i][1], pipeline[i + 1][1] + pipeline[i][1] * I)
print (i, e2e_ub)
# add the period of first and last tasks
return (e2e_ub + pipeline[0][1] + pipeline[N - 1][1])
"""
An old E2E Delay implementation for GEKKO - Not used anymore
"""
def end_to_end_delay_durr_periods_gekko(periods):
e2e_ub = 0
N = len(periods)
print ("called", periods)
for i in range(0, N - 1):
# If next task is of lower priority (higher or equal period), then P = 0
if periods[i + 1].value >= periods[i].value:
e2e_ub += periods[i + 1].value
else:
e2e_ub += periods[i + 1].value + periods[i].value
# print (i, e2e_ub)
# add the period of first and last tasks
# print (periods, "E2E: ", int(e2e_ub + periods[0] + periods[N - 1]))
return int(e2e_ub + periods[0].value + periods[N - 1].value)
"""
A GEKKO model is provided as a second argument
"""
def end_to_end_delay_durr_GEKKO(periods, m):
N = len(periods)
e2e_ub = periods[0] + periods[N - 1]
# print ("called2", periods)
for i in range(0, N - 1):
# If next task is of lower priority (higher or equal period), then P = 0
e2e_ub += m.if3(periods[i + 1] - periods[i], periods[i + 1] + periods[i], periods[i + 1])
# print (i, e2e_ub)
# add the period of first and last tasks
# print (periods, "E2E: ", int(e2e_ub + periods[0] + periods[N - 1]))
# print ("c2", e2e_ub + periods[0] + periods[N - 1])
return e2e_ub
"""
This is the function used by most solvers to calculate end-to-end delay upper bound.
"""
def end_to_end_delay_durr_periods_orig(periods):
e2e_ub = 0
N = len(periods)
for i in range(0, N - 1):
# If next task is of lower priority (higher or equal period), then I = 0
if periods[i + 1] >= periods[i]:
e2e_ub += periods[i + 1]
else:
e2e_ub += periods[i + 1] + periods[i]
# print (i, e2e_ub)
# add the period of first and last tasks
# print (periods, "E2E: ", int(e2e_ub + periods[0] + periods[N - 1]))
return int(e2e_ub + periods[0] + periods[N - 1])
"""
A trial end-to-end delay calculation for pyomo
"""
def end_to_end_delay_durr_periods_pyomo(periods, extra_periods):
N = len(periods)
e2e_ub = periods[0] + periods[N - 1]
for i in range(0, N - 1):
# If next task is of lower priority (higher or equal period), then P = 0
e2e_ub += periods[i + 1] + extra_periods[i]
# print (i, e2e_ub)
# add the period of first and last tasks
# print (periods, "E2E: ", int(e2e_ub + periods[0] + periods[N - 1]))
return e2e_ub
def sample_rate_lb_helper(budgets, periods, initial_budgets, GEKKO=0):
fx = 1 # Sampling Rate
N = len(budgets)
if N == 1:
if GEKKO:
return GEKKO.Const(1)
return 1
elif N == 2:
if GEKKO:
budget_multiplier_producer = budgets[0] / initial_budgets[0]
budget_multiplier_consumer = budgets[1] / initial_budgets[1]
else:
budget_multiplier_producer = budgets[0] // initial_budgets[0]
budget_multiplier_consumer = budgets[1] // initial_budgets[1]
fx = (1.0 * periods[0]/periods[1]) * (budget_multiplier_consumer / budget_multiplier_producer)
# print ("L: ", N, fx)
return fx
else:
prev_pipeline_fx = sample_rate_lb_helper(budgets[:N - 1], periods[:N - 1], initial_budgets[:N - 1], GEKKO)
second_last_period = periods[N - 2]
last_period = periods[N - 1]
second_last_budget = budgets[N - 2]
last_budget = budgets[N - 1]
second_last_initial_budget = initial_budgets[N - 2]
last_initial_budget = initial_budgets[N - 1]
if GEKKO:
second_last_initial_budget_multipl = second_last_budget / second_last_initial_budget
last_initial_budget_multipl = last_budget / last_initial_budget
else:
second_last_initial_budget_multipl = second_last_budget // second_last_initial_budget
last_initial_budget_multipl = last_budget // last_initial_budget
if GEKKO:
# is_oversample = GEKKO.if2(second_last_period - last_period, GEKKO.Const(0), GEKKO.Const(1))
a = 1
else:
is_oversample = True if last_period <= second_last_period else False
last_sampling_rate = ((1.0 * second_last_period) / last_period) * (last_initial_budget_multipl / second_last_initial_budget_multipl)
if GEKKO:
# print ("Prev:", prev_pipeline_fx)
# temp = GEKKO.if2(prev_pipeline_fx - 1.0, GEKKO.Const(1), GEKKO.Const(0))
fx = GEKKO.if3((1 - prev_pipeline_fx) / 100.0 * (second_last_period - last_period) + (second_last_period - last_period), prev_pipeline_fx * last_sampling_rate, prev_pipeline_fx)
else:
if (prev_pipeline_fx < 1 and is_oversample): # Case 1
fx = prev_pipeline_fx
else: # Other cases
fx = prev_pipeline_fx * last_sampling_rate
# elif prev_pipeline_fx < 1 and not is_oversample: # Case 2
# fx = prev_pipeline_fx * last_sampling_rate
# elif prev_pipeline_fx >= 1 and is_oversample: # Case 3
# fx = prev_pipeline_fx * last_sampling_rate
# else: # Case 4
# fx = (1 / prev_pipeline_fx) * last_sampling_rate
# print ("Loss: ", N, fx)
return fx
"""
The following function calculates sampling rate LB of an asynchronous
pipeline.
"""
def sample_rate_lb(pipeline, initial_budgets, GEKKO=0):
budgets = [t[0] for t in pipeline]
periods = [t[1] for t in pipeline]
return sample_rate_lb_helper(budgets, periods, initial_budgets, GEKKO)
"""
The following function calculates loss rate UB of an asynchronous
pipeline.
"""
def loss_rate_ub(pipeline, initial_budgets):
sr = sample_rate_lb(pipeline, initial_budgets)
if sr >= 1:
return 0
else:
return (1 - sr)
def loss_rate_ub_GEKKO(pipeline, initial_budgets, m):
model = m
sr = sample_rate_lb(pipeline, initial_budgets, GEKKO=m)
lr = model.if3(1 - sr, 0, (1 - sr))
return lr, sr
"""
This calculates the minimum throughput of a Pipeline
"""
def throughput_lb_helper(budgets, periods, initial_budgets):
return min((float(budgets[i]) / initial_budgets[i]) for i in range(len(periods)))
def throughput_lb(pipeline, initial_budgets):
budgets = [t[0] for t in pipeline]
periods = [t[1] for t in pipeline]
return throughput_lb_helper(budgets, periods, initial_budgets)
"""
Get existing Pipelines Set
"""
def GetPipelineBudgets(no_pipelines, no_tasks, seed):
random.seed(seed)
# MIN_BUDGET = 5
# MAX_BUDGET = 100
pipeline_budgets = []
setfile_string = "pipelines_" + str(no_pipelines) + "_" + str(no_tasks) + "_" + str(seed) + ".pickle"
if not os.path.isfile(setfile_string):
min_period = 100
max_period = 1000
# Generate utilization
utils_sets = task_gen.gen_uunifastdiscard(no_pipelines, 0.75, no_tasks)
# Choose random a random period value
period_sets = task_gen.gen_periods_uniform(no_tasks, no_pipelines, min_period, max_period, True)
# Generate the initial task budgets
current_sets = task_gen.gen_tasksets(utils_sets, period_sets, True)
for i in range(no_pipelines):
budgets = []
for j in range(no_tasks):
budgets.append(current_sets[i][j][0])
pipeline_budgets.append(budgets)
with open(setfile_string, "wb") as setfile:
pickle.dump(pipeline_budgets, setfile)
print ("Saved new dataset: ", setfile_string)
else:
print ("Opening existing dataset: ", setfile_string)
with open(setfile_string, "rb") as setfile:
# return None
pipeline_budgets = pickle.load(setfile)
# temp = []
# for i in range(no_pipelines):
# budgets = []
# for j in range(no_tasks):
# budgets.append(pipeline_budgets[i][j][0])
# temp.append(budgets)
# return temp
return pipeline_budgets