-
Notifications
You must be signed in to change notification settings - Fork 2
/
djoin.jl
284 lines (248 loc) · 7.74 KB
/
djoin.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@everywhere using DataFrames
# TODO: Assuming the key pool is within 10 MB. The key pool generation is not
# out of core. However, loading the keys from CSV would mean loading rows from
# CSV and this would have to be out of core, unless we write a custom readcsv
# that only reads the key column.
"""
Read only the key column from CSV and return it as an array.
"""
function readkeys(filename, keycol::Symbol, delim=',')
# Need to handle the case when `delim` is quoted.
f = open(filename, "r")
headers = split(strip(readline(f)), delim)
keyidx = findfirst(headers, string(keycol))
if keyidx == 0
error("Invalid `keycol` argument. The specified column name `$keycol` does not exist in this table.")
end
keys = AbstractString[]
for ln in eachline(f)
vals = split(strip(ln), delim)
push!(keys, parse(vals[keyidx]))
end
close(f)
return keys
end
"""
Get a unique set of keys from an array of keys `keycol`.
"""
function get_uniquekeys(keycol, pool=Set())
return union(pool, Set(keycol))
end
"""
Get pool of keys. `left_file` and `right_file` are the two
CSV files representing the table, and `keycol` is the column representing
the keys.
"""
function get_keypool(leftkeys, rightkeys)
pool = get_uniquekeys(leftkeys)
pool = get_uniquekeys(rightkeys, pool)
return pool
end
"""
Get a hash of {key => processor_id} from `keypool` and `parts` number of parts.
"""
function get_keyhash(keypool, parts)
hash = Dict()
arr = collect(keypool)
len = length(arr)
bucketsize = div(len, parts)
remainder = rem(len, parts)
psize = bucketsize*parts
for i = 1:psize
hash[arr[i]] = div(i-1, bucketsize) + 1
end
for i = 1:remainder
hash[arr[psize + i]] = i
end
return hash
end
"""
Partition a dataframe `df` into `n` parts.
Returns an array of dataframes.
"""
function partition_df(df, n)
ret = Array(DataFrame, n)
dsize = size(df, 1)
psize = div(dsize, n)
start = 1
finish = psize
for i = 1:n-1
ret[i] = df[start:finish, :]
start = finish + 1
finish = finish + psize
end
ret[n] = df[start:end, :]
return ret
end
# @everywhere definitions start here
@everywhere type ProcessCtx
dfarr_left::Array{DataFrame, 1}
dfarr_right::Array{DataFrame, 1}
accdf_left::DataFrame
accdf_right::DataFrame
end
@everywhere function Base.show(io::IO, ctx::ProcessCtx)
procid = myid()
print(io, "\n=================\n\nProcess: $procid\n\nDF array left: $(ctx.dfarr_left)\n\nDF array right: $(ctx.dfarr_right)\n\nAcc DF left: $(ctx.accdf_left)\n\nAcc DF right: $(ctx.accdf_right)\n===================\n")
end
# Determine which of the keys in `keyarr` belongs to which process from the
# hash of keys and process id's `keyhash`. `parts` is the number of processors.
#
# Returns an array of indexes of the dataframe. The array at index `i` of the
# return value represents the `i`th processors indexes.
@everywhere function arrangement_idxs(keyarr, keyhash, parts)
idxs = Array(Array, parts)
for i in 1:parts
idxs[i] = Int[]
end
for i = 1:length(keyarr)
procid = keyhash[keyarr[i]]
push!(idxs[procid], i)
end
return idxs
end
# Get an array of sub-dataframes from the dataframe `df` and array of indexes
# `idxs`
@everywhere function idxs_to_dfarray(df, idxs)
np = length(idxs)
dfarr = Array(DataFrame, np)
for i = 1:np
dfarr[i] = df[idxs[i], :]
end
return dfarr
end
# Figure out what parts of the received dataframe should be given to what
# process. Store this in a global context. Also initialize the accumulator
# dataframe in the global context.
@everywhere function init_process(dfl, dfr, keyhash, keycol, np)
procid = myid()
idxsl = arrangement_idxs(dfl[keycol], keyhash, np)
idxsr = arrangement_idxs(dfr[keycol], keyhash, np)
dfarr_left = idxs_to_dfarray(dfl, idxsl)
dfarr_right = idxs_to_dfarray(dfr, idxsr)
accdf_left = dfarr_left[procid] # df belonging to self
accdf_right = dfarr_right[procid]
global g_ctx = ProcessCtx(dfarr_left, dfarr_right, accdf_left, accdf_right)
end
# Get the `typ` dataframe belonging to process `pid`.
@everywhere function getrows(pid, typ)
if typ == :left
return g_ctx.dfarr_left[pid]
else
return g_ctx.dfarr_right[pid]
end
end
# Pull rows of `typ` dataframe belonging to this process from the other processes.
# `typ` is :left or :right.
@everywhere function pullrows(typ)
np = nprocs()
procid = myid()
refs = RemoteRef[]
@sync for j = 1:np
if j == procid
continue
end
@async begin
ref = remotecall(j, getrows, procid, typ)
push!(refs, ref)
end
end
for ref in refs
df = fetch(ref)
if size(df, 1) == 0
continue
end
if typ == :left
g_ctx.accdf_left = vcat(g_ctx.accdf_left, df)
else
g_ctx.accdf_right = vcat(g_ctx.accdf_right, df)
end
end
end
# Send the sub-dataframe's in the array of dataframes `dfarr` to
# the respective processes. `typ` which is either `:left` or `:right`
# tells the receiving process wether this is to be accumulated in
# the left dataframe or right dataframe.
@everywhere function pushrows(dfarr, typ)
procid = myid()
@sync for i = 1:length(dfarr)
if (size(dfarr[i], 1) == 0) # No need to send empty df's
continue
end
if (procid == i) # No need to send to self
continue
end
@async remotecall(i, recvrows, dfarr[i], typ)
end
end
# Recieve rows from other processes and concatenate them.
@everywhere function recvrows(df, typ)
if typ == :left
g_ctx.accdf_left = vcat(g_ctx.accdf_left, df)
else
g_ctx.accdf_right = vcat(g_ctx.accdf_right, df)
end
end
# Pass sub-dataframes to respective processes.
@everywhere function communicate_push()
pushrows(g_ctx.dfarr_left, :left)
pushrows(g_ctx.dfarr_right, :right)
end
# Pass sub-dataframes to respective processes.
@everywhere function communicate_pull()
pullrows(:left)
pullrows(:right)
end
@everywhere function logdata()
procid = myid()
f = open("log$procid", "w")
print(f, g_ctx)
close(f)
end
# Join the left and right df belonging to you.
@everywhere function joindf(keycol)
return join(g_ctx.accdf_left, g_ctx.accdf_right, on=keycol, kind=:inner)
end
function main()
np = nprocs()
const keycol = :carid
# The serial part: Get the keyhash
leftkeys = readkeys("left.csv", keycol)
rightkeys = readkeys("right.csv", keycol)
keypool = get_keypool(leftkeys, rightkeys)
keyhash = get_keyhash(keypool, np)
# Partition dataframe
dfl = readtable("left.csv")
dflparts = partition_df(dfl, np)
dfr = readtable("right.csv")
dfrparts = partition_df(dfr, np)
# The parallel parts:
# For each process give a copy of the keyhash and a part of the dataframes.
@sync for i = 1:np
@async remotecall(i, init_process, dflparts[i], dfrparts[i], keyhash,
keycol, np)
end
# Rearrange the rows of the dataframe between processes.
for i = 1:np
remotecall(i, communicate_pull)
end
sleep(2) # The below code picks up data before the communicate step without this sleep.
# call join and get an array of refs
refs = RemoteRef[]
@sync for i = 1:np
@async begin
ref = remotecall(i, joindf, keycol)
push!(refs, ref)
end
end
# fetch and concatenate
df = DataFrame()
for ref in refs
df = vcat(df, fetch(ref))
end
println(df)
# Uncomment to log the context state of each process
# for i = 1:np
# remotecall(i, logdata)
# end
end