diff --git a/README.md b/README.md index 9aacfec..9bfcb2e 100755 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ Implemented in this package (the `ClusterManagers.jl` package): | Job queue system | Command to add processors | | ---------------- | ------------------------- | -| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle))` | | Sun Grid Engine (SGE) via `qsub` | `addprocs_sge(np::Integer; qsub_flags=``)` or `addprocs(SGEManager(np, qsub_flags))` | | Sun Grid Engine (SGE) via `qrsh` | `addprocs_qrsh(np::Integer; qsub_flags=``)` or `addprocs(QRSHManager(np, qsub_flags))` | | PBS (Portable Batch System) | `addprocs_pbs(np::Integer; qsub_flags=``)` or `addprocs(PBSManager(np, qsub_flags))` | @@ -32,6 +31,7 @@ Implemented in external packages: | ---------------- | ------------------------- | | Kubernetes (K8s) via [K8sClusterManagers.jl](https://github.com/beacon-biosignals/K8sClusterManagers.jl) | `addprocs(K8sClusterManager(np; kwargs...))` | | Azure scale-sets via [AzManagers.jl](https://github.com/ChevronETC/AzManagers.jl) | `addprocs(vmtemplate, n; kwargs...)` | +| Load Sharing Facility (LSF) via [LSFClusterManagers.jl](https://github.com/JuliaParallel/LSFClusterManagers.jl) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle))` | You can also write your own custom cluster manager; see the instructions in the [Julia manual](https://docs.julialang.org/en/v1/manual/distributed-computing/#ClusterManagers). @@ -93,7 +93,7 @@ julia> From worker 2: compute-6 From worker 3: compute-6 ``` -Some clusters require the user to specify a list of required resources. +Some clusters require the user to specify a list of required resources. For example, it may be necessary to specify how much memory will be needed by the job - see this [issue](https://github.com/JuliaLang/julia/issues/10390). The keyword `qsub_flags` can be used to specify these and other options. Additionally the keyword `wd` can be used to specify the working directory (which defaults to `ENV["HOME"]`). @@ -132,11 +132,6 @@ Julia workers can frequently timeout waiting for the standard output files to ap In this case, it's better to use the `QRSHManager`, which uses SGE's `qrsh` command to bypass the filesystem and captures STDOUT directly. -### Load Sharing Facility (LSF) - -`LSFManager` supports IBM's scheduler. See the `addprocs_lsf` docstring -for more information. - ### Using `LocalAffinityManager` (for pinning local workers to specific cores) - Linux only feature. @@ -176,10 +171,10 @@ ElasticManager: Active workers : [] Number of workers to be added : 0 Terminated workers : [] - Worker connect command : + Worker connect command : /home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ClusterManagers; ClusterManagers.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)' ``` -By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor. +By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor. -Once workers are connected, you can print the `em` object again to see them added to the list of active workers. +Once workers are connected, you can print the `em` object again to see them added to the list of active workers. diff --git a/src/ClusterManagers.jl b/src/ClusterManagers.jl index 43b4860..ce91285 100755 --- a/src/ClusterManagers.jl +++ b/src/ClusterManagers.jl @@ -19,6 +19,5 @@ include("condor.jl") include("slurm.jl") include("affinity.jl") include("elastic.jl") -include("lsf.jl") end diff --git a/src/lsf.jl b/src/lsf.jl deleted file mode 100755 index a82fbf7..0000000 --- a/src/lsf.jl +++ /dev/null @@ -1,164 +0,0 @@ -export LSFManager, addprocs_lsf - -struct LSFManager <: ClusterManager - np::Integer - bsub_flags::Cmd - bpeek_flags::Cmd - ssh_cmd::Cmd - bsub_cmd::Cmd - bpeek_cmd::Cmd - retry_delays - throttle::Integer -end - -struct LSFException <: Exception - msg -end - -function parse_host_port(stream, port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)") - bytestr = readline(stream) - conn_info_match = match(port_host_regex, bytestr) - if !isnothing(conn_info_match) - host = conn_info_match.captures[2] - port = parse(Int, conn_info_match.captures[1]) - @debug("lsf worker listening", connect_info=bytestr, host, port) - - return true, bytestr, host, port - end - return false, bytestr, nothing, nothing -end - -function lsf_bpeek(manager::LSFManager, jobid, iarray) - stream = Base.BufferStream() - mark(stream) # so that we can reset to beginning after ensuring process started - - streamer_cmd = `$(manager.ssh_cmd) $(manager.bpeek_cmd) $(manager.bpeek_flags) $(jobid)\[$iarray\]` - retry_delays = manager.retry_delays - streamer_proc = run(pipeline(streamer_cmd; stdout=stream, stderr=stream); wait=false) - - # Try once before retry loop in case user supplied an empty retry_delays iterator - worker_started, bytestr, host, port = parse_host_port(stream) - worker_started && return stream, host, port - - for retry_delay in retry_delays - # isempty is for the case when -f flag is not used to handle the case when - # the << output from ... >> message is printed but the julia worker has not - # yet printed the ip and port nr - if isempty(bytestr) || occursin("Not yet started", bytestr) - # bpeek process would have stopped - # stream starts spewing out empty strings after this (in julia != 1.6) - # instead of trying to handle that we just close it and open a new stream - wait(streamer_proc) - close(stream) - stream = Base.BufferStream() - - # Try bpeeking again after the retry delay - sleep(retry_delay) - streamer_proc = run(pipeline(streamer_cmd; stdout=stream, stderr=stream); wait=false) - elseif occursin("<< output from stdout >>", bytestr) || occursin("<< output from stderr >>", bytestr) - # ignore this bpeek output decoration and continue to read the next line - mark(stream) - else - # unknown response from worker process - close(stream) - throw(LSFException(bytestr)) - end - - worker_started, bytestr, host, port = parse_host_port(stream) - worker_started && break - end - - if !worker_started - close(stream) - throw(LSFException(bytestr)) - end - - # process started, reset to marked position and hand over to Distributed module - reset(stream) - - return stream, host, port -end - -function lsf_launch_and_monitor(manager::LSFManager, launched, c, jobid, iarray) - config = WorkerConfig() - io, host, port = lsf_bpeek(manager, jobid, iarray) - config.io = io - config.host = host - config.port = port - config.userdata = `$jobid\[$iarray\]` - - push!(launched, config) - notify(c) -end - -function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition) - try - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - - np = manager.np - - jobname = `julia-$(getpid())` - - cmd = `$exename $exeflags $(worker_arg())` - bsub_cmd = `$(manager.ssh_cmd) $(manager.bsub_cmd) $(manager.bsub_flags) -cwd $dir -J $(jobname)\[1-$np\] "$cmd"` - - line = open(readline, bsub_cmd) - m = match(r"Job <([0-9]+)> is submitted", line) - jobid = m.captures[1] - - asyncmap((i)->lsf_launch_and_monitor(manager, launched, c, jobid, i), - 1:np; - ntasks=manager.throttle) - - catch e - println("Error launching workers") - println(e) - end -end - -manage(manager::LSFManager, id::Int64, config::WorkerConfig, op::Symbol) = nothing - -kill(manager::LSFManager, id::Int64, config::WorkerConfig) = remote_do(exit, id) - -""" - addprocs_lsf(np::Integer; - bsub_flags::Cmd=``, - bpeek_flags::Cmd=`-f`, - ssh_cmd::Cmd=``, - bsub_cmd::Cmd=`bsub`, - bpeek_cmd::Cmd=`bpeek`, - retry_delays=ExponentialBackOff(n=10, - first_delay=1, max_delay=512, - factor=2), - throttle::Integer=np, - params...) = - -Launch `np` workers on a cluster managed by IBM's Platform Load Sharing -Facility. `bsub_flags` can be used to pass flags to `bsub` that are specific -to your cluster or workflow needs. `ssh_cmd` can be used to launch workers -from other than the cluster head node (e.g. your personal workstation). -`retry_delays` is a vector of numbers specifying in seconds how long to -repeatedly wait for a worker to start. `throttle` specifies how many workers -to launch at once. Having `-f` in bpeek flags (which is the default) will let -stdout of workers to be displayed on master too. - -# Examples - -``` -addprocs_lsf(1000; ssh_cmd=`ssh login`, throttle=10) -``` -""" -addprocs_lsf(np::Integer; - bsub_flags::Cmd=``, - bpeek_flags::Cmd=`-f`, - ssh_cmd::Cmd=``, - bsub_cmd::Cmd=`bsub`, - bpeek_cmd::Cmd=`bpeek`, - retry_delays=ExponentialBackOff(n=10, - first_delay=1, max_delay=512, - factor=2), - throttle::Integer=np, - params...) = - addprocs(LSFManager(np, bsub_flags, bpeek_flags, ssh_cmd, bsub_cmd, bpeek_cmd, retry_delays, throttle); params...) diff --git a/test/lsf.jl b/test/lsf.jl deleted file mode 100644 index b44d46f..0000000 --- a/test/lsf.jl +++ /dev/null @@ -1,10 +0,0 @@ -@testset "LSFManager" begin - p = addprocs_lsf(1, bsub_flags=`-P scicompsoft`) - @test nprocs() == 2 - @test workers() == p - @test fetch(@spawnat :any myid()) == p[1] - @test remotecall_fetch(+,p[1],1,1) == 2 - rmprocs(p) - @test nprocs() == 1 - @test workers() == [1] -end diff --git a/test/runtests.jl b/test/runtests.jl index 8a7e906..370b6ef 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -13,8 +13,6 @@ using Test: @testset, @test, @test_skip using ClusterManagers: ElasticManager # Slurm: using ClusterManagers: addprocs_slurm, SlurmManager -# LSF: -using ClusterManagers: addprocs_lsf, LSFManager # SGE: using ClusterManagers: addprocs_sge, SGEManager @@ -23,12 +21,11 @@ const test_args = lowercase.(strip.(ARGS)) @info "" test_args slurm_is_installed() = !isnothing(Sys.which("sbatch")) -lsf_is_installed() = !isnothing(Sys.which("bsub")) qsub_is_installed() = !isnothing(Sys.which("qsub")) @testset "ClusterManagers.jl" begin include("elastic.jl") - + if slurm_is_installed() @info "Running the Slurm tests..." Sys.which("sbatch") include("slurm.jl") @@ -41,20 +38,7 @@ qsub_is_installed() = !isnothing(Sys.which("qsub")) @test_skip false end end - - if lsf_is_installed() - @info "Running the LSF tests..." Sys.which("bsub") - include("lsf.jl") - else - if "lsf" in test_args - @error "ERROR: The LSF tests were explicitly requested in ARGS, but bsub was not found, so the LSF tests cannot be run" Sys.which("bsub") test_args - @test false - else - @warn "bsub was not found - LSF tests will be skipped" Sys.which("bsub") - @test_skip false - end - end - + if qsub_is_installed() @info "Running the SGE (via qsub) tests..." Sys.which("qsub") include("slurm.jl")