Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weird issue with distributed #7

Open
wcwitt opened this issue Feb 24, 2023 · 21 comments
Open

Weird issue with distributed #7

wcwitt opened this issue Feb 24, 2023 · 21 comments

Comments

@wcwitt
Copy link
Collaborator

wcwitt commented Feb 24, 2023

Posting so we don't lose track of this:

using Distributed
using ACE1pack

addprocs(2, exeflags="--project=$(Base.active_project())")

@everywhere using Distributed
@everywhere using ACE1pack

elements = [:Si]
basis = ACE1x.ace_basis(elements = elements,
                        order = 2,
                        totaldegree = 8,
                        rcut = 5.5)

# pass basis to workers
@everywhere basis = $basis

Causes errors like:

ERROR: LoadError: On worker 2:
UndefVarError: #7#8 not defined

From discussions with @cortner, seems likely the problem relates to serializing an anonymous function.

@cortner
Copy link
Member

cortner commented Feb 24, 2023

These two fail:

t = basis.BB[1].J[1].trans
@everywhere t1 = $t
f = t.f
@everywhere f1 = $(t.f)

But weirdly enough this works:

g = x -> x^2
@everywhere g1 = $g

maybe something to do with scope.

@wcwitt
Copy link
Collaborator Author

wcwitt commented Feb 24, 2023

For broader context, oversimplifying just a little, here are two lines from ACEfit

f = i -> linear_fill!(A, Y, W, data[i], basis; row_start=row_start[i])
pmap(f, 1:length(data))

These lines have worked in the past, but don't work now, presumably because there is difficulty sending and receiving the new basis. This approach is suboptimal in some ways, and I can think of a few possible refactorings, but I do like the simplicity

@cortner
Copy link
Member

cortner commented Feb 24, 2023

I'm not sure I want to send the basis everytime. I think it should be transferred or created just once?

@cortner
Copy link
Member

cortner commented Feb 24, 2023

Here is another workaround btw:

using Distributed
using ACE1pack
using ParallelDataTransfer

addprocs(2)

@everywhere begin 
   using Pkg 
   Pkg.activate(@__DIR__())
   using Distributed
   using ACE1pack
end

##

elements = [:Si]
basis = ACE1x.ace_basis(elements = elements,
                        order = 2,
                        totaldegree = 8,
                        rcut = 5.5)
@show length(basis)                        

##

sendto([2,], elements=elements)
@spawnat 2 display(elements)

##

sendto([2,], basis=basis)
@spawnat 2 display(basis)
sleep(4)

@wcwitt
Copy link
Collaborator Author

wcwitt commented Feb 24, 2023

Hm, I tried the ParallelDataTransfer earlier and it failed exactly the same as basis=$basis.

@wcwitt
Copy link
Collaborator Author

wcwitt commented Feb 25, 2023

I'll try again. And more generally, if there is a way to send the basis once before the pmap, I'm fine with that. In fact, I'm pretty sure an earlier version of ACEfit did it that way and I removed it because I didn't love the global variables.

What I don't want is to construct the basis separately on individual processes with @Everywhere clauses. Because the basis construction will likely happen somewhere else far away.

@cortner
Copy link
Member

cortner commented Feb 25, 2023

I don't really understand what happens behind the scenes... But for sure on my system the $ thing fails while the sendto works ok:

t = basis.BB[1].J[1].trans
@everywhere t1 = $t    # fails
sendto([2,], t2 = t)       # ok 

@wcwitt
Copy link
Collaborator Author

wcwitt commented Feb 27, 2023

Some progress, but still weird. I'll go with the sendto. Should we close this, or do you want to keep it open until we understand?

# fails
@everywhere basis = $basis

# fails, this is what I tried initially from ParallelDataTransfer
@passobj 1 workers() basis

# works, your solution
sendto(workers(), basis=basis)

@cortner
Copy link
Member

cortner commented Feb 27, 2023

My understanding was that the $ thing is not really a general tool to transfer data but more of a hack? Do you have evidence otherwise? If that is actually the intended way to transfer data to another process then we should explore how to fix it. But it I'm right then we should look into a workflow that uses sendto

@wcwitt
Copy link
Collaborator Author

wcwitt commented Feb 27, 2023

Oh I’ve already started converting to ‘sendto’ and I’m perfectly happy with that.

I still think the others should work (@passobj doesn’t seem like a hack, at least), but the practical question is resolved. Thanks!

@cortner
Copy link
Member

cortner commented Feb 27, 2023

then maybe this can be closed?

@wcwitt wcwitt closed this as completed Feb 27, 2023
@wcwitt wcwitt reopened this Mar 6, 2023
@wcwitt
Copy link
Collaborator Author

wcwitt commented Mar 6, 2023

We need to revisit this unfortunately. The sendto solution only "works" in the sense that it doesn't error.

using Distributed

addprocs(2, exeflags="--project=$(Base.active_project())")

@everywhere using Distributed
@everywhere using ACE1pack
@everywhere using ParallelDataTransfer

elements = [:Si]
basis = ACE1x.ace_basis(elements = elements,
                        order = 2,
                        totaldegree = 8,
                        rcut = 5.5)

# fails
#@everywhere basis = $basis

# fails, this is what I tried initially from ParallelDataTransfer
#@passobj 1 workers() basis

# appears to succeed ...
sendto(workers(), basis=basis)
# ... but this fails with "UndefVarError: basis not defined"
@everywhere println(myid(), "  ", length(basis))

@wcwitt
Copy link
Collaborator Author

wcwitt commented Mar 6, 2023

Possibly related? ChrisRackauckas/ParallelDataTransfer.jl#17

@cortner
Copy link
Member

cortner commented Mar 7, 2023

ok, maybe for now I'll produce a fix at the ACE end. I'll let you know.

@cortner
Copy link
Member

cortner commented Mar 7, 2023

fix cf Slack, also document here: convert object to Dict, then transfer, then convert back.

@cortner
Copy link
Member

cortner commented Jun 27, 2023

@wcwitt -- I'm guessing I ran into this the other day (when I posted at ACEfit about it). It doesn't seem to be resolved. What's the status from your perspective?

@wcwitt
Copy link
Collaborator Author

wcwitt commented Jun 27, 2023

It hasn't been a problem for me recently, but I think that's only because you tweaked ACE1 or ACE1x to circumvent it. So not surprised it's back if you have some new experimental features.

@CheukHinHoJerry
Copy link
Collaborator

CheukHinHoJerry commented Jul 14, 2023

I got this error multiple times with the ACEfit.assemble function with multiple workers for large lsq system and I remember there was an issue about this so I think it's better to post it here. It happens when I am in the middle of assembling the design matrix. This is the full error log:

Worker 18 terminated.
Unhandled Task ERROR: EOFError: read end of file
Stacktrace:
 [1] (::Base.var"#wait_locked#715")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64)
   @ Base ./stream.jl:947
 [2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64)
   @ Base ./stream.jl:955
 [3] unsafe_read
   @ ./io.jl:761 [inlined]
 [4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64)
   @ Base ./io.jl:760
 [5] read!
   @ ./io.jl:762 [inlined]
 [6] deserialize_hdr_raw
   @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/messages.jl:167 [inlined]
 [7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
   @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:172
 [8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
   @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:133
 [9] (::Distributed.var"#103#104"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})()
   @ Distributed ./task.jl:514
Progress:  21%|████████████████████████▌                                                                                           |  ETA: 0:52:08ERROR: Lo18Progress:  21%|████████████████████████▌                                                                                           |  ETA: 0:51:57)
Stacktrace:
  [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
    @ Base ./task.jl:920
  [2] wait()
    @ Base ./task.jl:984
  [3] wait(c::Base.GenericCondition{ReentrantLock}; first::Bool)
    @ Base ./condition.jl:130
  [4] wait
    @ ./condition.jl:125 [inlined]
  [5] take_buffered(c::Channel{Any})
    @ Base ./channels.jl:456
  [6] take!(c::Channel{Any})
    @ Base ./channels.jl:450
  [7] take!(::Distributed.RemoteValue)
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:726
  [8] remotecall_fetch(f::Function, w::Distributed.Worker, args::ACEfit.DataPacket{AtomsData}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:461
  [9] remotecall_fetch(f::Function, w::Distributed.Worker, args::ACEfit.DataPacket{AtomsData})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [10] #remotecall_fetch#162
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [11] remotecall_fetch(f::Function, id::Int64, args::ACEfit.DataPacket{AtomsData})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [12] remotecall_pool(rc_f::Function, f::Function, pool::WorkerPool, args::ACEfit.DataPacket{AtomsData}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:126
 [13] remotecall_pool
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:123 [inlined]
 [14] #remotecall_fetch#200
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:232 [inlined]
 [15] remotecall_fetch
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:232 [inlined]
 [16] #208#209
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:288 [inlined]
 [17] #208
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:288 [inlined]
 [18] (::Base.var"#978#983"{Distributed.var"#208#210"{Distributed.var"#208#209#211"{WorkerPool, ProgressMeter.var"#56#59"{RemoteChannel{Channel{Bool}}, ACEfit.var"#3#4"{JuLIP.MLIPs.IPSuperBasis{JuLIP.MLIPs.IPBasis}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedMatrix{Float64}}}}}})(r::Base.RefValue{Any}, args::Tuple{ACEfit.DataPacket{AtomsData}})
    @ Base ./asyncmap.jl:100
 [19] macro expansion
    @ ./asyncmap.jl:234 [inlined]
 [20] (::Base.var"#994#995"{Base.var"#978#983"{Distributed.var"#208#210"{Distributed.var"#208#209#211"{WorkerPool, ProgressMeter.var"#56#59"{RemoteChannel{Channel{Bool}}, ACEfit.var"#3#4"{JuLIP.MLIPs.IPSuperBasis{JuLIP.MLIPs.IPBasis}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedMatrix{Float64}}}}}}, Channel{Any}, Nothing})()
    @ Base ./task.jl:514
Stacktrace:
  [1] (::Base.var"#988#990")(x::Task)
    @ Base ./asyncmap.jl:177
  [2] foreach(f::Base.var"#988#990", itr::Vector{Any})
    @ Base ./abstractarray.jl:3073
  [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::Vector{ACEfit.DataPacket{AtomsData}})
    @ Base ./asyncmap.jl:177
  [4] wrap_n_exec_twice
    @ ./asyncmap.jl:153 [inlined]
  [5] #async_usemap#973
    @ ./asyncmap.jl:103 [inlined]
  [6] async_usemap
    @ ./asyncmap.jl:84 [inlined]
  [7] #asyncmap#972
    @ ./asyncmap.jl:81 [inlined]
  [8] asyncmap
    @ ./asyncmap.jl:80 [inlined]
  [9] pmap(f::Function, p::WorkerPool, c::Vector{ACEfit.DataPacket{AtomsData}}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:126
 [10] pmap(f::Function, p::WorkerPool, c::Vector{ACEfit.DataPacket{AtomsData}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:99
 [11] pmap(f::Function, c::Vector{ACEfit.DataPacket{AtomsData}}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:156
 [12] pmap(f::Function, c::Vector{ACEfit.DataPacket{AtomsData}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:156
 [13] macro expansion
    @ ~/.julia/packages/ProgressMeter/sN2xr/src/ProgressMeter.jl:1015 [inlined]
 [14] macro expansion
    @ ./task.jl:476 [inlined]
 [15] macro expansion
    @ ~/.julia/packages/ProgressMeter/sN2xr/src/ProgressMeter.jl:1014 [inlined]
 [16] macro expansion
    @ ./task.jl:476 [inlined]
 [17] progress_map(::Function, ::Vararg{Any}; mapfun::Function, progress::ProgressMeter.Progress, channel_bufflen::Int64, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ ProgressMeter ~/.julia/packages/ProgressMeter/sN2xr/src/ProgressMeter.jl:1007
 [18] assemble(data::Vector{AtomsData}, basis::JuLIP.MLIPs.IPSuperBasis{JuLIP.MLIPs.IPBasis})
    @ ACEfit ~/.julia/packages/ACEfit/ID48n/src/assemble.jl:31
 [19] make_train(model::ACE1x.ACE1Model)
    @ Main ~/julia_ws/ACEworkflows/Fe_pure_jerry/asm_all_lsq.jl:54
 [20] top-level scope
    @ ~/julia_ws/ACEworkflows/Fe_pure_jerry/asm_all_lsq.jl:91
 [21] include(fname::String)
    @ Base.MainInclude ./client.jl:478
 [22] top-level scope
    @ REPL[2]:1
in expression starting at /zfs/users/jerryho528/jerryho528/julia_ws/ACEworkflows/Fe_pure_jerry/asm_all_lsq.jl:71
  [e3f9bc04] ACE1 v0.11.12
  [8c4e8d19] ACE1pack v0.4.1
  [5cc4c08c] ACE1x v0.1.4
  [ad31a8ef] ACEfit v0.1.1
  [f67ccb44] HDF5 v0.16.15
  [682c06a0] JSON v0.21.4
  [898213cb] LowRankApprox v0.5.3
  [91a5bcdd] Plots v1.38.16
  [08abe8d2] PrettyTables v2.2.5
  [de0858da] Printf

Is there a quick work around with this?

@wcwitt
Copy link
Collaborator Author

wcwitt commented Jul 14, 2023 via email

@CheukHinHoJerry
Copy link
Collaborator

CheukHinHoJerry commented Jul 14, 2023

thank you for your reply. It happens every time so it stops me from assembling a large lsq.

@wcwitt
Copy link
Collaborator Author

wcwitt commented Jul 14, 2023

Thanks - I'm happy to help resolve it. But I think this is a different problem from the current issue, so let's use the new issue I just made in ACEfit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants