Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
mofeing committed Jul 31, 2023
1 parent 51cf8a4 commit 5df0395
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 202 deletions.
13 changes: 7 additions & 6 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ authors = ["Sergio Sánchez Ramírez <sergio.sanchez.ramirez@bsc.es> and contrib
version = "0.1.0"

[deps]
Cassette = "7057c7e9-c182-5462-911a-8362d720325c"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Extrae_jll = "2b2c4be0-e38c-5918-b8b4-9a308845a1e9"
Requires = "ae029012-a4dd-5104-9daa-d747884805df"

[weakdeps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"

[extensions]
ExtraeDistributedExt = "Distributed"

[compat]
Cassette = "0.3"
Extrae_jll = "4.0.3"
Requires = "1.3"
julia = "1"
julia = "1.9"
134 changes: 134 additions & 0 deletions ext/ExtraeDistributedExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using Distributed

"""
ExtraeLocalManager.jl
Implements a copy of the default `LocalClusterManager` that starts
workers with an overdubbed event loop.
"""

struct ExtraeLocalManager <: ClusterManager
np::Int
restrict::Bool # Restrict binding to 127.0.0.1 only
end

Base.show(io::IO, manager::ExtraeLocalManager) = print(io, "ExtraeLocalManager(#procs=$(manager.np), restrict=$(manager.restrict))")

function Distributed.launch(manager::ExtraeLocalManager, params::Dict, launched::Array, c::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
cookie = cluster_cookie()

active_project = Base.active_project()
hookline = """
using Distributed
using Extrae
using Cassette
Cassette.overdub(Extrae.ExtraeCtx(), start_worker, $(repr(cookie)))
"""

# Bug: Instead of using julia_cmd(exename), I directly use exename because idk howto access Base.julia_cmd
for i in 1:manager.np
cmd = `env EXTRAE_SKIP_AUTO_LIBRARY_INITIALIZE=1 $exename $exeflags --project=$active_project --bind-to $bind_to -e "$hookline"`
io = open(detach(setenv(cmd, dir=dir)), "r+")

# Cluster cookie is not passed through IO. Instead, we set it
# as a parameter when starting worker, through the hookline
#Distributed.write_cookie(io)

wconfig = WorkerConfig()
wconfig.process = io
wconfig.io = io.out
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
push!(launched, wconfig)
end

notify(c)
end

function Distributed.manage(manager::ExtraeLocalManager, id::Integer, config::WorkerConfig, op::Symbol)
if op === :interrupt
kill(config.process, 2)
end
end

export ExtraeLocalManager

const DistributedEvent{ValueCode} = Event{400002,ValueCode}
const DistributedUsefulWorkEvent{ValueCode} = Event{400001,ValueCode}
const DistributedMessageHandlingEvent{ValueCode} = Event{400004,ValueCode}

description(::Type{DistributedEvent}) = "Distributed runtime calls"
description(::Type{DistributedUsefulWorkEvent}) = "Distributed workload execution"
description(::Type{DistributedMessageHandlingEvent}) = "Distributed message handling functions"

const DistributedEnd = DistributedEvent{0}()
const DistributedAddProcs = DistributedEvent{1}()
const DistributedRmProcs = DistributedEvent{2}()
const DistributedInitWorker = DistributedEvent{3}()
const DistributedStartWorker = DistributedEvent{4}()
const DistributedRemoteCall = DistributedEvent{5}()
const DistributedRemoteCallFetch = DistributedEvent{6}()
const DistributedRemoteCallWait = DistributedEvent{7}()
const DistributedProcessMessages = DistributedEvent{8}()
const DistributedInterrupt = DistributedEvent{9}()

const DistributedUsefulWork = DistributedUsefulWorkEvent{1}()
const DistributedNotUsefulWork = DistributedUsefulWorkEvent{0}()

const DistributedHandleEnd = DistributedMessageHandlingEvent{0}()
const DistributedHandleCall = DistributedMessageHandlingEvent{1}()
const DistributedHandleCallFetch = DistributedMessageHandlingEvent{2}()
const DistributedHandleCallWait = DistributedMessageHandlingEvent{3}()
const DistributedHandleRemoteDo = DistributedMessageHandlingEvent{4}()
const DistributedHandleResult = DistributedMessageHandlingEvent{5}()
const DistributedHandleIdentifySocket = DistributedMessageHandlingEvent{6}()
const DistributedHandleIdentifySocketAck = DistributedMessageHandlingEvent{7}()
const DistributedHandleJoinPGRP = DistributedMessageHandlingEvent{8}()
const DistributedHandleJoinComplete = DistributedMessageHandlingEvent{9}()

description(::typeof(DistributedEnd)) = "end"
description(::typeof(DistributedAddProcs)) = "addprocs"
description(::typeof(DistributedRmProcs)) = "rmprocs"
description(::typeof(DistributedInitWorker)) = "init_worker"
description(::typeof(DistributedStartWorker)) = "start_worker"
description(::typeof(DistributedRemoteCall)) = "remotecall"
description(::typeof(DistributedRemoteCallFetch)) = "remotecall_fetch"
description(::typeof(DistributedRemoteCallWait)) = "remotecall_wait"
description(::typeof(DistributedProcessMessages)) = "process_messages"
description(::typeof(DistributedInterrupt)) = "interrupt"

description(::typeof(DistributedHandleEnd)) = "End"
description(::typeof(DistributedHandleCall)) = "CallMsg{:call}"
description(::typeof(DistributedHandleCallFetch)) = "CallMsg{:call_fetch}"
description(::typeof(DistributedHandleCallWait)) = "CallWaitMsg"
description(::typeof(DistributedHandleRemoteDo)) = "RemoteDoMsg"
description(::typeof(DistributedHandleResult)) = "ResultMsg"
description(::typeof(DistributedHandleIdentifySocket)) = "IdentifySocketMsg"
description(::typeof(DistributedHandleIdentifySocketAck)) = "IdentifySocketAckMsg"
description(::typeof(DistributedHandleJoinPGRP)) = "JoinPGRPMsg"
description(::typeof(DistributedHandleJoinComplete)) = "JoinCompletesg"

description(::typeof(DistributedUsefulWork)) = "Useful"
description(::typeof(DistributedNotUsefulWork)) = "Not Useful"

# resource identification
dist_taskid()::Cuint = Distributed.myid() - 1
dist_numtasks()::Cuint = Distributed.nworkers() + 1

# cluster manager addprocs
function addprocs_extrae(np::Integer; restrict=true, kwargs...)
manager = Extrae.ExtraeLocalManager(np, restrict)
#check_addprocs_args(manager, kwargs)
new_workers = addprocs(manager; kwargs...)

Extrae.init()
for pid in new_workers
@fetchfrom pid Extrae.init()
end
return new_workers
end
export addprocs_extrae
9 changes: 0 additions & 9 deletions src/Extrae.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ include("API.jl")
export Event, typecode, valuecode, description
export version, init, isinit, finish, flush, instrumentation, emit, register, previous_hwc_set, next_hwc_set, set_tracing_tasks, setoption, network_counters, network_routes, user_function

using Cassette
Cassette.@context ExtraeCtx

using Requires

function __init__()
@require Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" include("Instrumentation/Distributed.jl")
end

include("Instrumentation/Threads.jl")

end
131 changes: 0 additions & 131 deletions src/Instrumentation/Distributed.jl

This file was deleted.

56 changes: 0 additions & 56 deletions src/Instrumentation/ExtraeLocalManager.jl

This file was deleted.

0 comments on commit 5df0395

Please sign in to comment.