-
Notifications
You must be signed in to change notification settings - Fork 0
/
project2Model2.fsx
256 lines (236 loc) · 9.89 KB
/
project2Model2.fsx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#time "on"
#r "nuget: Akka.FSharp"
#r "nuget: Akka.TestKit"
open System
open Akka.Actor
open Akka.FSharp
open Akka.Configuration
// Configuration
let configuration =
ConfigurationFactory.ParseString(
@"akka {
stdout-loglevel : DEBUG
loglevel : ERROR
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}")
let system = ActorSystem.Create("Gossip", configuration)
type Information =
| GossipObj of (list<IActorRef>*IActorRef)
| GossipObjSelf of (list<IActorRef>*IActorRef)
| PushsumObj of (float*float*list<IActorRef>*IActorRef)
| PushsumObjSelf of (list<IActorRef>*IActorRef)
| Initialize of (list<IActorRef>*string*IActorRef)
| Terminate of (IActorRef*IActorRef)
type BossMessage =
| Start of (string)
| Received of (string)
// Round off to proper squares for 2D and imperfect 2D
let timer = System.Diagnostics.Stopwatch()
let roundOffNodes (numNode:int) =
let mutable sqrtVal = numNode |> float |> sqrt |> int
if sqrtVal*sqrtVal <> numNode then
sqrtVal <- sqrtVal + 1
sqrtVal*sqrtVal
// Input from Command Line
let mutable nodes = fsi.CommandLineArgs.[1] |> int
let topology = fsi.CommandLineArgs.[2]
let algo = fsi.CommandLineArgs.[3]
let rand = Random(nodes)
if topology = "imp2D" || topology = "2D" then
nodes <- roundOffNodes nodes
// Builds the neighbours according to the respective network from the nodes
let form2DNeighbours (actorName:string) (mypool:Information) =
match mypool with
| Initialize(pool, topo, boss) ->
let myId = (actorName.Split '_').[1] |> int
let mutable neighbourList = []
let mutable pickId = 0
let size = sqrt (nodes |> float) |> int
if myId%size <> 1 then
neighbourList <- pool.[myId-2] :: neighbourList
if myId%size <> 0 then
neighbourList <- pool.[myId] :: neighbourList
if myId > size then
neighbourList <- pool.[myId-size-1] :: neighbourList
if myId <= (nodes-size) then
neighbourList <- pool.[myId+size-1] :: neighbourList
if topo = "imp2D" then
let mutable temp = pool.[rand.Next()%nodes]
while temp.Path.Name = actorName do
temp <- pool.[rand.Next()%nodes]
neighbourList <- temp :: neighbourList
neighbourList
// Build Line Topology - select it's Left and Right neighbours
let formLineNeighbours (actorName:string) (pool:list<IActorRef>) =
let mutable neighbourList = []
let myId = (actorName.Split '_').[1] |> int
if myId = 1 then
neighbourList <- pool.[myId] :: neighbourList
else if myId = nodes then
neighbourList <- pool.[myId-2] :: neighbourList
else
neighbourList <- pool.[myId-2] :: neighbourList
neighbourList <- pool.[myId] :: neighbourList
neighbourList
// Build Full Topology - select all other nodes except for self
let formFullNeighbours (actorName:string) (pool:list<IActorRef>) =
let myId = (actorName.Split '_').[1] |> int
let neighbourList = pool |> List.indexed |> List.filter (fun (i, _) -> i <> myId-1) |> List.map snd
neighbourList
// Push-sum: for aggregation calculation. State pair (s, w) is used to calculate the
// convergence. The s/w ratio is added when recceived and halfed when sent to external neighbor from its
// list but remains same when for self message
let PushSumActors (mailbox:Actor<_>) =
let mutable neighbourList = []
let mutable s = 0.0
let mutable w = 1.0
let mutable endThis = 0
let mutable prevValue = s/w
let mutable pushSumTopo = ""
let mutable bossRef = mailbox.Self
let rec loop () = actor {
let! message = mailbox.Receive()
let mutable pushmsg : Information = message
let mutable actorPool = []
match pushmsg with
| PushsumObjSelf(pool, bRef) ->
actorPool <- pool
| PushsumObj(is,iw,pool,boss) ->
actorPool <- pool
if endThis < 3 then
s <- is + s
w <- iw + w
if abs ((s/w) - prevValue) <= (pown 10.0 -10) then
endThis <- endThis + 1
if endThis = 3 then
bossRef <! Received("Terminated")
neighbourList |> List.iter (fun item ->
item <! Terminate(mailbox.Self, bossRef))
else
endThis <- 0
| Initialize(pool, topo, boss) ->
pushSumTopo <- topo
bossRef <- boss
s <- (mailbox.Self.Path.Name.Split '_').[1] |> float
prevValue <- s/w
if topo = "2D" || topo = "imp2D" then
neighbourList <- form2DNeighbours mailbox.Self.Path.Name pushmsg
else if topo = "line" then
neighbourList <- formLineNeighbours mailbox.Self.Path.Name pool
else
neighbourList <- formFullNeighbours mailbox.Self.Path.Name pool
return! loop()
| Terminate(killedActorRef, boss) ->
let myId = (killedActorRef.Path.Name.Split '_').[1] |> int
let before = neighbourList.Length
neighbourList <- neighbourList |> List.indexed |> List.filter (fun (i, v) -> ((v.Path.Name.Split '_').[1] |> int) <> myId) |> List.map snd
if neighbourList.Length = 0 then
endThis <- 100
bossRef <! Received("Terminated")
| _ -> ignore()
if endThis <= 3 then
prevValue <- s/w
s <- s/2.0
w <- w/2.0
if endThis = 3 then
endThis <- 100
let randPushSum = System.Random()
if neighbourList.Length > 0 then
let neighbour = neighbourList.[randPushSum.Next(neighbourList.Length)]
neighbour <! PushsumObj(s,w,actorPool,bossRef)
system.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1.0), mailbox.Self, PushsumObjSelf(actorPool, bossRef))
return! loop()
}
loop()
// Gossip Actor - receives rumors and sends rumours to one of it's neighbours and to self
// periodically since it first receives until it terminates
let GossipActors (mailbox:Actor<_>) =
let mutable first = true
let mutable exhausted = 100
let mutable neighbourList = []
let mutable gossipTopo = ""
let rec loop () = actor {
let! message = mailbox.Receive()
let mutable gossipmsg : Information = message
let mutable actorpool = []
let mutable refBoss = mailbox.Self
let mutable init = false
match gossipmsg with
| GossipObj(pool, bossRef) ->
if first then
first <- false
bossRef <! Received("Received")
exhausted <- exhausted - 1
actorpool <- pool
refBoss <- bossRef
| GossipObjSelf(pool,bossRef) ->
gossipmsg <- GossipObj(pool,bossRef)
actorpool <- pool
refBoss <- bossRef
| Initialize(pool, topo, boss) ->
init <- true
gossipTopo <- topo
if topo = "2D" || topo = "imp2D" then
neighbourList <- form2DNeighbours mailbox.Self.Path.Name gossipmsg
else if topo = "line" then
neighbourList <- formLineNeighbours mailbox.Self.Path.Name pool
else
neighbourList <- formFullNeighbours mailbox.Self.Path.Name pool
return! loop()
| _ -> ignore()
if not init then
if exhausted >= 0 then
let randImp2D = System.Random()
if exhausted = 0 then
exhausted <- -1
if neighbourList.Length > 0 then
let neighbour = neighbourList.[randImp2D.Next(neighbourList.Length)]
neighbour <! gossipmsg
system.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1.0), mailbox.Self, GossipObjSelf(actorpool, refBoss))
else
exhausted <- -1
mailbox.Context.Stop(mailbox.Self)
return! loop()
}
loop()
// Initializes the pool of actors, builds the topology and randomly select one actor to start with
// Terminates when all the actors receive rumour 100 times for gossip
// For push-sum when s/w ratio is approx same for 3 rounds
let BossActor (mailbox:Actor<_>) =
let mutable reached = 0
let rec loop () = actor {
let! message = mailbox.Receive()
match message with
| Start(_) ->
if algo = "gossip" then
let actorsPool =
[1 .. nodes]
|> List.map(fun id -> spawn system (sprintf "Actor_%d" id) GossipActors)
actorsPool |> List.iter (fun item ->
item <! Initialize(actorsPool, topology, mailbox.Self))
timer.Start()
actorsPool.[(rand.Next()) % nodes] <! GossipObj(actorsPool, mailbox.Self)
else if algo = "push-sum" then
let actorsPool =
[1 .. nodes]
|> List.map(fun id -> spawn system (sprintf "Actor_%d" id) PushSumActors)
actorsPool |> List.iter (fun item ->
item <! Initialize(actorsPool, topology, mailbox.Self))
timer.Start()
actorsPool.[(rand.Next()) % nodes] <! PushsumObjSelf(actorsPool, mailbox.Self)
| Received(_) ->
reached <- reached + 1
if reached = nodes then
printfn "Time taken = %i\n" timer.ElapsedMilliseconds
mailbox.Context.Stop(mailbox.Self)
mailbox.Context.System.Terminate() |> ignore
| _ -> ()
return! loop()
}
loop()
// Start of the algorithm - spawn Boss, the delgator
let boss = spawn system "boss" BossActor
boss <! Start("start")
// Wait until all the actors has finished processing
system.WhenTerminated.Wait()