-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.py
254 lines (193 loc) · 10.9 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 3 09:18:45 2022
@author: carlos
"""
import sys
import experimentationConfig
import optimizationConfig
"""
Subscribed to:
command/nodeX/solutionTemplate {solutionQuantity: <int>, solutionNumLines: <int>, solutionNumRows: <int>} coordinator --> worker(nodeX)
command/nodeX/removeSolutions {solIds:[<int>...<int>]} coordinator-->worker(nodeY)
command/nodeY/sendSolution {solId:<int>, originNodeId:<str>} master-->worker(nodeY)
solution/nodeX {chromosome: <str>} worker(nodeY) --> worker(nodeX)
command/stopOptimization {}
"""
"""
Publishes:
command/join {id: <str>} worker -->coordinator
fitness/nodeX {idSol: <int>, fitness:{obj1:<float>...,objn:<float>}} worker(nodeX)-->coordinator
solution/nodeX {chromosome: <str>} worker(nodeY) --> worker(nodeX)
"""
import paho.mqtt.client as paho
import paho.mqtt.publish as publish
import time
import threading
import json
from solution import Solution
from optimizationproblem import OptimizationProblem
from log import Log
def nodesDistance(a:int, b:int) -> float:
return optProblem.infrastructure4mqttJson['Gdistances'][str(a)][str(b)]
#actions to perform when arrives the message command/nodeX/solutionTemplate {solutionQuantity: <int>, solutionConfig: <dict>} coordinator --> worker(nodeX)
#this message is recieved as answer of the command to notify to the coordinator that a new worker (this one) is started. Then the coordinator send to new worker the template
#of the solutions. Once recieved, the worker should start to create its population with random solutions. Once it finished, the worker sends the fitness
#of the new solutions to the coordinator
def initSolutionTemplate(jConf: bytes) -> None:
log.print(nodeIdStr+":::defining the solution template",'operation')
global optProblem
global simulatedNode
global cloudNode
#we retrieve the size of the population and the solution template
jsonContent = json.loads(jConf)
numberOfSolutionsInWorkers = jsonContent['solutionQuantity']
solutionConfig = jsonContent['solutionConfig']
infrastructure = jsonContent['infrastructure']
simulatedNode = jsonContent['simulatedNode']
cloudNode = jsonContent['cloudNode']
randomseed = jsonContent['randomseed']
optProblem = OptimizationProblem(log,randomseed,numberOfSolutionsInWorkers,solutionConfig,infrastructure)
# optProblem = OptimizationProblem(numberOfSolutionsInWorkers,solutionConfig,infrastructure)
dictPayload = dict()
dictPayload['workerId']=nodeIdStr
setOfSolutions = optProblem.getPopulationFitnessInJson()
dictPayload['solutions']=setOfSolutions
log.print(nodeIdStr+":::sending set of solution's fitnesses",'operation')
log.print(nodeIdStr+":::with topic "+"fitness/"+nodeIdStr+"/newpopulation",'operation-details')
log.print(nodeIdStr+":::with payload "+json.dumps(dictPayload),'operation-details')
publish.single(topic="fitness/"+nodeIdStr+"/newpopulation", payload=json.dumps(dictPayload), hostname=mqtt_host)
#actions to perform when arrives the message command/nodeX/removeSolutions {solIds:[<int>...<int>]} coordinator-->worker(nodeY)
def removeSolutions(jConf: bytes) -> None:
log.print(nodeIdStr+":::removing solutions",'operation')
#we retrieve {solIds:[<int>...<int>]}
jsonContent = json.loads(jConf)
solIds = jsonContent['solIds']
for idSol in solIds:
if optProblem.removeOneWorstSolutionById(idSol,optimizationConfig.selfId4Worker)==False:
log.print(nodeIdStr+":::ERROR removing solution, solution not found")
#actions to perform when arrives the message ccommand/nodeY/sendSolution {solId:<int>, targetNodeId:<str>} master-->worker(nodeY)
def sendSolution(jConf: bytes) -> None:
global simulatedNode
log.print(nodeIdStr+":::sending the solution to the origin worker",'operation')
#we retrieve {solId:<int>, originNodeId:<str>}
jsonContent = json.loads(jConf)
solId = jsonContent['solId']
originId = jsonContent['originNodeId']
#the message solution/nodeX {chromosome: <str>} worker(nodeY) --> worker(nodeX)
#is published to send the solution chromosome to the origin worker
dictPayload = dict()
solChromosome = optProblem.getSolutionChromosomeById(solId,optimizationConfig.selfId4Worker)
dictPayload['chromosome']=solChromosome
dictPayload['senderWorkerId'] = nodeIdStr
dictPayload['senderSimulatedNode'] = simulatedNode
publish.single(topic="solution/"+originId+"", payload=json.dumps(dictPayload), hostname=mqtt_host)
if solChromosome == None:
log.print(nodeIdStr+":::ERROR solution id not found: "+str(solId),'worker')
##the message solution/nak {solId:<int>, originNodeId:<str>, targetNodeId:<str>} worker(nodeY) --> worker(nodeX)
#is sent by the worker to notify to the coordinator that the solution is not found
jsonContent['targetNodeId']=nodeIdStr
publish.single(topic="solution/nak", payload=json.dumps(jsonContent), hostname=mqtt_host)
else:
##the message solution/ack {solId:<int>, originNodeId:<str>, targetNodeId:<str>} worker(nodeY) --> worker(nodeX)
#is sent by the worker to notify to the coordinator that the solution is sent
jsonContent['targetNodeId']=nodeIdStr
publish.single(topic="solution/ack", payload=json.dumps(jsonContent), hostname=mqtt_host)
#actions to perform when arrives the message solution/nodeX {chromosome: <str>} worker(nodeY) --> worker(nodeX)
def solutionRecieved(jConf: bytes) -> None:
global simulatedNode
global cloudNode
log.print(nodeIdStr+":::we receive the solution from other worker to crossover it",'operation')
#we retrieve {chromosome: <str>}
jsonContent = json.loads(jConf)
solChrom = jsonContent['chromosome']
senderWorkerID = jsonContent['senderWorkerId']
senderSimulatedNode = jsonContent['senderSimulatedNode']
childrenOffspring = optProblem.evolveWithRemoteSolution(solChrom)
#Two alternatives possible for the populaiton in the workers. Join the twoNewChildren to the current subpopulation
#of the worker, incresing its size, or replace the two worst solutions in the subpopulation with the two new ones.
#we decided the fist one, and the selection of the solutions to be removed is shifted to the coordinator. Then
#the worst solutions in the whole population are selected, instead the worst two solutions in the subpopulation
#The only problem of this solution is that the population sizes in the workers can be different.
optProblem.joinTwoPopulations(childrenOffspring)
dictPayload = dict()
dictPayload['workerId']=nodeIdStr
setOfSolutions = optProblem.getSolutionsFitnessInJson(childrenOffspring)
dictPayload['solutions']=setOfSolutions
dictPayload['pathLengthIslands'] = 2*nodesDistance(cloudNode,simulatedNode) #this is the calculated distance for a case where all the solution space (objective and decision) is stored in the coordinator
dictPayload['pathLengthCentralizedFront'] = nodesDistance(cloudNode,senderSimulatedNode) + nodesDistance(senderSimulatedNode,simulatedNode) + nodesDistance(simulatedNode, cloudNode) #this is the distance for our proposed semi-distributed with distributed decision space and centralized objective space
publish.single(topic="fitness/"+nodeIdStr+"/newchildren", payload=json.dumps(dictPayload), hostname=mqtt_host)
def stopOptimization(jConf: bytes) -> None:
global finishCondition
finishCondition=True
def process_message(mosq: paho.Client, obj, msg: paho.MQTTMessage) -> None:
global i
log.print(nodeIdStr+":::getting message number "+str(i),'operation-details')
log.print(nodeIdStr+":::with topic "+str(msg.topic),'operation-details')
log.print(nodeIdStr+":::with qos "+str(msg.qos),'operation-details')
log.print(nodeIdStr+":::with payload "+str(msg.payload),'operation-details')
i=i+1
#decode the command through the analysis of the topic
if (msg.topic=="command/"+nodeIdStr+"/solutionTemplate"):
initSolutionTemplate(msg.payload)
elif (msg.topic=="command/"+nodeIdStr+"/removeSolutions"):
removeSolutions(msg.payload)
elif (msg.topic=="command/"+nodeIdStr+"/sendSolution"):
sendSolution(msg.payload)
elif (msg.topic=="solution/"+nodeIdStr+""):
solutionRecieved(msg.payload)
elif (msg.topic=="command/stopOptimization"):
stopOptimization(msg.payload)
else:
log.print(nodeIdStr+":::ERROR-message not understood",'worker')
#call back functon when a message is received
def on_message(mosq: paho.Client, obj, msg: paho.MQTTMessage) -> None:
proMsg = threading.Thread(target=process_message,args=[mosq, obj, msg])
proMsg.start()
if experimentationConfig.time2SleepInWorker > 0.0:
time.sleep(experimentationConfig.time2SleepInWorker)
#mosq.publish('pong', 'ack', 0)
#call back functon when a message is sent
def on_publish(mosq, obj, mid):
pass
if __name__ == '__main__':
if len(sys.argv)>2:
repetition = sys.argv[2]
executionScenario = sys.argv[1]
else:
repetition=0
executionScenario='manually'
i = 0 # counter for the number of messages
#stablish connection to mosquitto server
client = paho.Client()
client.on_message = on_message
client.on_publish = on_publish
#client.tls_set('root.ca', certfile='c1.crt', keyfile='c1.key')
mqtt_host="127.0.0.1"
client.connect(mqtt_host, 1883, 60)
#getting the id for the worker node based on the mqtt client id
nodeId =client._client_id
nodeId=nodeId.replace('paho/','')
nodeIdStr = 'nodeX'
nodeIdStr = str(nodeId)
log = Log(nodeIdStr,nodeIdStr+":::",executionScenario+'-'+str(repetition)+'-workers')
log.print("worker node started for repetition: "+str(repetition)+" and scenario "+executionScenario,'worker')
#defining the listh of topic the client is subscribed to
client.subscribe("command/"+nodeIdStr+"/solutionTemplate", 0)
client.subscribe("command/"+nodeIdStr+"/removeSolutions", 0)
client.subscribe("command/"+nodeIdStr+"/sendSolution", 0)
client.subscribe("solution/"+nodeIdStr+"", 0)
client.subscribe("command/stopOptimization",0)
#the message command/join {id: <str>} worker -->coordinator
#is published to notify to the coordinator that a new worker is available. The coordinator will send to this new worker the solution template.
dictPayload = dict()
dictPayload['workerId']=nodeIdStr
publish.single(topic="command/join", payload=json.dumps(dictPayload), hostname=mqtt_host)
finishCondition = False
#getting into an infinite loop or the finishcondition is reached
while client.loop() == 0 and not finishCondition:
pass
client.disconnect() # disconnect gracefully
client.loop_stop() # stops network loop
log.print("worker finished", 'worker')