From 77e3b6a4d1d23ba6ec93ebff0137df59b3d56337 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Sat, 13 Jul 2024 17:46:02 +0200 Subject: [PATCH] parallelize artifact downloads --- src/API.jl | 2 +- src/Artifacts.jl | 127 ++++++++++++++++++++++---------------- src/MiniProgressBars.jl | 14 +++-- src/Operations.jl | 132 ++++++++++++++++++++++++++++++++++++---- src/PlatformEngines.jl | 14 +++-- 5 files changed, 211 insertions(+), 78 deletions(-) diff --git a/src/API.jl b/src/API.jl index d8853d02dd..0677e163ef 100644 --- a/src/API.jl +++ b/src/API.jl @@ -1268,7 +1268,7 @@ function instantiate(ctx::Context; manifest::Union{Bool, Nothing}=nothing, # Install all packages new_apply = Operations.download_source(ctx) # Install all artifacts - Operations.download_artifacts(ctx.env; platform, verbose, io=ctx.io) + Operations.download_artifacts(ctx; platform, verbose) # Run build scripts allow_build && Operations.build_versions(ctx, union(new_apply, new_git); verbose=verbose) diff --git a/src/Artifacts.jl b/src/Artifacts.jl index ae47811e06..192e22843a 100644 --- a/src/Artifacts.jl +++ b/src/Artifacts.jl @@ -301,6 +301,7 @@ function download_artifact( verbose::Bool = false, quiet_download::Bool = false, io::IO=stderr_f(), + progress::Union{Function, Nothing} = nothing, ) if artifact_exists(tree_hash) return true @@ -323,8 +324,8 @@ function download_artifact( temp_dir = mktempdir(artifacts_dir) try - download_verify_unpack(tarball_url, tarball_hash, temp_dir, ignore_existence=true, verbose=verbose, - quiet_download=quiet_download, io=io) + download_verify_unpack(tarball_url, tarball_hash, temp_dir; + ignore_existence=true, verbose, quiet_download, io, progress) calc_hash = SHA1(GitTools.tree_hash(temp_dir)) # Did we get what we expected? If not, freak out. @@ -394,82 +395,100 @@ function ensure_artifact_installed(name::String, artifacts_toml::String; pkg_uuid::Union{Base.UUID,Nothing}=nothing, verbose::Bool = false, quiet_download::Bool = false, + progress::Union{Function,Nothing} = nothing, io::IO=stderr_f()) meta = artifact_meta(name, artifacts_toml; pkg_uuid=pkg_uuid, platform=platform) if meta === nothing error("Cannot locate artifact '$(name)' in '$(artifacts_toml)'") end - return ensure_artifact_installed(name, meta, artifacts_toml; platform=platform, - verbose=verbose, quiet_download=quiet_download, io=io) + return ensure_artifact_installed(name, meta, artifacts_toml; + platform, verbose, quiet_download, progress, io) end function ensure_artifact_installed(name::String, meta::Dict, artifacts_toml::String; platform::AbstractPlatform = HostPlatform(), verbose::Bool = false, quiet_download::Bool = false, + progress::Union{Function,Nothing} = nothing, io::IO=stderr_f()) - hash = SHA1(meta["git-tree-sha1"]) + hash = SHA1(meta["git-tree-sha1"]) if !artifact_exists(hash) - errors = Any[] - # first try downloading from Pkg server - # TODO: only do this if Pkg server knows about this package - if (server = pkg_server()) !== nothing - url = "$server/artifact/$hash" - download_success = let url=url - @debug "Downloading artifact from Pkg server" name artifacts_toml platform url - with_show_download_info(io, name, quiet_download) do - download_artifact(hash, url; verbose=verbose, quiet_download=quiet_download, io=io) - end - end - # download_success is either `true` or an error object - if download_success === true - return artifact_path(hash) - else - @debug "Failed to download artifact from Pkg server" download_success - push!(errors, (url, download_success)) - end + if isnothing(progress) || verbose == true + return try_artifact_download_sources(name, hash, meta, artifacts_toml; platform, verbose, quiet_download, io) + else + return () -> try_artifact_download_sources(name, hash, meta, artifacts_toml; platform, quiet_download=true, io, progress) end + else + return artifact_path(hash) + end +end - # If this artifact does not exist on-disk already, ensure it has download - # information, then download it! - if !haskey(meta, "download") - error("Cannot automatically install '$(name)'; no download section in '$(artifacts_toml)'") +function try_artifact_download_sources( + name::String, hash::SHA1, meta::Dict, artifacts_toml::String; + platform::AbstractPlatform=HostPlatform(), + verbose::Bool=false, + quiet_download::Bool=false, + io::IO=stderr_f(), + progress::Union{Function,Nothing}=nothing) + + errors = Any[] + # first try downloading from Pkg server + # TODO: only do this if Pkg server knows about this package + if (server = pkg_server()) !== nothing + url = "$server/artifact/$hash" + download_success = let url = url + @debug "Downloading artifact from Pkg server" name artifacts_toml platform url + with_show_download_info(io, name, quiet_download) do + download_artifact(hash, url; verbose, quiet_download, io, progress) + end end + # download_success is either `true` or an error object + if download_success === true + return artifact_path(hash) + else + @debug "Failed to download artifact from Pkg server" download_success + push!(errors, (url, download_success)) + end + end - # Attempt to download from all sources - for entry in meta["download"] - url = entry["url"] - tarball_hash = entry["sha256"] - download_success = let url=url - @debug "Downloading artifact" name artifacts_toml platform url - with_show_download_info(io, name, quiet_download) do - download_artifact(hash, url, tarball_hash; verbose=verbose, quiet_download=quiet_download, io=io) - end - end - # download_success is either `true` or an error object - if download_success === true - return artifact_path(hash) - else - @debug "Failed to download artifact" download_success - push!(errors, (url, download_success)) + # If this artifact does not exist on-disk already, ensure it has download + # information, then download it! + if !haskey(meta, "download") + error("Cannot automatically install '$(name)'; no download section in '$(artifacts_toml)'") + end + + # Attempt to download from all sources + for entry in meta["download"] + url = entry["url"] + tarball_hash = entry["sha256"] + download_success = let url = url + @debug "Downloading artifact" name artifacts_toml platform url + with_show_download_info(io, name, quiet_download) do + download_artifact(hash, url, tarball_hash; verbose, quiet_download, io, progress) end end - errmsg = """ - Unable to automatically download/install artifact '$(name)' from sources listed in '$(artifacts_toml)'. - Sources attempted: - """ - for (url, err) in errors - errmsg *= "- $(url)\n" - errmsg *= " Error: $(sprint(showerror, err))\n" + # download_success is either `true` or an error object + if download_success === true + return artifact_path(hash) + else + @debug "Failed to download artifact" download_success + push!(errors, (url, download_success)) end - error(errmsg) - else - return artifact_path(hash) end + errmsg = """ + Unable to automatically download/install artifact '$(name)' from sources listed in '$(artifacts_toml)'. + Sources attempted: + """ + for (url, err) in errors + errmsg *= "- $(url)\n" + errmsg *= " Error: $(sprint(showerror, err))\n" + end + error(errmsg) end + function with_show_download_info(f, io, name, quiet_download) fancyprint = can_fancyprint(io) if !quiet_download @@ -485,7 +504,7 @@ function with_show_download_info(f, io, name, quiet_download) if !quiet_download fancyprint && print(io, "\033[1A") # move cursor up one line fancyprint && print(io, "\033[2K") # clear line - if success + if success fancyprint && printpkgstyle(io, :Downloaded, "artifact: $name") else printpkgstyle(io, :Failure, "artifact: $name", color = :red) diff --git a/src/MiniProgressBars.jl b/src/MiniProgressBars.jl index 564a803917..5862e78318 100644 --- a/src/MiniProgressBars.jl +++ b/src/MiniProgressBars.jl @@ -9,11 +9,11 @@ Base.@kwdef mutable struct MiniProgressBar header::String = "" color::Symbol = :nothing width::Int = 40 - current::Int = 0.0 - prev::Int = 0.0 + current::Int = 0 + prev::Int = 0 has_shown::Bool = false time_shown::Float64 = 0.0 - percentage::Bool = true + mode::Symbol = :percentage # :percentage :int :data always_reprint::Bool = false indent::Int = 4 end @@ -47,10 +47,14 @@ function show_progress(io::IO, p::MiniProgressBar; termwidth=nothing, carriagere p.prev = p.current p.has_shown = true - progress_text = if p.percentage + progress_text = if p.mode == :percentage @sprintf "%2.1f %%" perc - else + elseif p.mode == :int string(p.current, "/", p.max) + elseif p.mode == :data + string(Base.format_bytes(p.current), "/", Base.format_bytes(p.max)) + else + error("Unknown mode $(p.mode)") end termwidth = @something termwidth displaysize(io)[2] max_progress_width = max(0, min(termwidth - textwidth(p.header) - textwidth(progress_text) - 10 , p.width)) diff --git a/src/Operations.jl b/src/Operations.jl index 3e0429cecb..c25f0591d4 100644 --- a/src/Operations.jl +++ b/src/Operations.jl @@ -816,11 +816,12 @@ function collect_artifacts(pkg_root::String; platform::AbstractPlatform=HostPlat return artifacts_tomls end -function download_artifacts(env::EnvCache; +function download_artifacts(ctx::Context; platform::AbstractPlatform=HostPlatform(), julia_version = VERSION, - verbose::Bool=false, - io::IO=stderr_f()) + verbose::Bool=false) + env = ctx.env + io = ctx.io pkg_roots = String[] for (uuid, pkg) in env.manifest pkg = manifest_info(env.manifest, uuid) @@ -828,16 +829,121 @@ function download_artifacts(env::EnvCache; pkg_root === nothing || push!(pkg_roots, pkg_root) end push!(pkg_roots, dirname(env.project_file)) + used_artifact_tomls = Set{String}() + download_jobs = Channel{Function}(Inf) # ctx.num_concurrent_downloads + + download_states = Dict{String, Tuple{Bool,MiniProgressBar}}() + is_done = false + ansi_moveup(n::Int) = string("\e[", n, "A") + ansi_movecol1 = "\e[1G" + ansi_cleartoend = "\e[0J" + ansi_cleartoendofline = "\e[0K" + ansi_enablecursor = "\e[?25h" + ansi_disablecursor = "\e[?25l" + termwidth = displaysize(io)[2] + + longest_name = 0 + for pkg_root in pkg_roots + for (artifacts_toml, artifacts) in collect_artifacts(pkg_root; platform) + for name in keys(artifacts) + longest_name = max(longest_name, textwidth(name)) + end + end + end + for pkg_root in pkg_roots for (artifacts_toml, artifacts) in collect_artifacts(pkg_root; platform) # For each Artifacts.toml, install each artifact we've collected from it for name in keys(artifacts) - ensure_artifact_installed(name, artifacts[name], artifacts_toml; - verbose, quiet_download=!(usable_io(io)), io=io) + is_done && break + bar = MiniProgressBar(; indent=2, color = Base.info_color(), mode=:data, always_reprint=true) + progress = (total, current) -> (bar.max = total; bar.current = current) + # returns a string if exists, or function that downloads the artifact if not + ret = ensure_artifact_installed(name, artifacts[name], artifacts_toml; + verbose, quiet_download=!(usable_io(io)), io, progress) + if ret isa Function + download_states[name] = (true, bar) + push!(download_jobs, + () -> begin + ret() + download_states[name] = (false, bar) + end + ) + end end - write_env_usage(artifacts_toml, "artifact_usage.toml") + push!(used_artifact_tomls, artifacts_toml) + end + end + close(download_jobs) + + if !isempty(download_states) + + longest_name = maximum(textwidth, keys(download_states)) + for (name, (running, bar)) in download_states + bar.header = rpad(name, longest_name) + end + + @sync begin + t_print = Threads.@spawn begin + try + print(io, ansi_disablecursor) + first = true + timer = Timer(0, interval=1/24) + main_bar = MiniProgressBar(; indent=0, header = "Downloading artifacts", color = :green, mode = :int, always_reprint=true) + main_bar.max = length(download_states) + while !is_done + main_bar.current = length(filter(x -> !x[2][1], download_states)) + str = sprint(context=io) do iostr + first || print(iostr, ansi_cleartoend) + n_printed = 1 + show_progress(iostr, main_bar; termwidth, carriagereturn=false) + println(iostr) + for (name, (running, bar)) in download_states + running && bar.max > 1000 && bar.current > 0 || continue + show_progress(iostr, bar; termwidth, carriagereturn=false) + println(iostr) + n_printed += 1 + end + is_done || print(iostr, ansi_moveup(n_printed), ansi_movecol1) + first = false + end + print(io, str) + wait(timer) + end + print(io, ansi_cleartoend) + main_bar.current = length(filter(x -> !x[2][1], download_states)) + show_progress(io, main_bar; termwidth, carriagereturn=false) + println(io) + catch e + e isa InterruptException || rethrow() + is_done = true + finally + print(io, ansi_enablecursor) + end + end + Base.errormonitor(t_print) + + # TODO: figure out error handling/reporting + sema = Base.Semaphore(ctx.num_concurrent_downloads) + @sync for f in download_jobs + is_done && break + t = Threads.@spawn begin + try + Base.acquire(f, sema) + catch e + e isa InterruptException || rethrow() + is_done = true + end + end + Base.errormonitor(t) + end + is_done = true end end + + for f in used_artifact_tomls + write_env_usage(f, "artifact_usage.toml") + end end function check_artifacts_downloaded(pkg_root::String; platform::AbstractPlatform=HostPlatform()) @@ -949,7 +1055,7 @@ function download_source(ctx::Context; readonly=true) end bar = MiniProgressBar(; indent=2, header = "Progress", color = Base.info_color(), - percentage=false, always_reprint=true) + mode=:int, always_reprint=true) bar.max = length(pkgs_to_install) fancyprint = can_fancyprint(ctx.io) try @@ -1180,7 +1286,7 @@ function build_versions(ctx::Context, uuids::Set{UUID}; verbose=false) max_name = maximum(build->textwidth(build[2]), builds; init=0) bar = MiniProgressBar(; indent=2, header = "Progress", color = Base.info_color(), - percentage=false, always_reprint=true) + mode=:int, always_reprint=true) bar.max = length(builds) fancyprint = can_fancyprint(ctx.io) fancyprint && start_progress(ctx.io, bar) @@ -1509,7 +1615,7 @@ function add(ctx::Context, pkgs::Vector{PackageSpec}, new_git=Set{UUID}(); # After downloading resolutionary packages, search for (Julia)Artifacts.toml files # and ensure they are all downloaded and unpacked as well: - download_artifacts(ctx.env, platform=platform, julia_version=ctx.julia_version, io=ctx.io) + download_artifacts(ctx, platform=platform, julia_version=ctx.julia_version) # if env is a package add compat entries if ctx.env.project.name !== nothing && ctx.env.project.uuid !== nothing @@ -1553,7 +1659,7 @@ function develop(ctx::Context, pkgs::Vector{PackageSpec}, new_git::Set{UUID}; update_manifest!(ctx.env, pkgs, deps_map, ctx.julia_version) new_apply = download_source(ctx) fixups_from_projectfile!(ctx.env) - download_artifacts(ctx.env; platform=platform, julia_version=ctx.julia_version, io=ctx.io) + download_artifacts(ctx; platform=platform, julia_version=ctx.julia_version) write_env(ctx.env) # write env before building show_update(ctx.env, ctx.registries; io=ctx.io) build_versions(ctx, union(new_apply, new_git)) @@ -1694,7 +1800,7 @@ function up(ctx::Context, pkgs::Vector{PackageSpec}, level::UpgradeLevel; update_manifest!(ctx.env, pkgs, deps_map, ctx.julia_version) new_apply = download_source(ctx) fixups_from_projectfile!(ctx.env) - download_artifacts(ctx.env, julia_version=ctx.julia_version, io=ctx.io) + download_artifacts(ctx, julia_version=ctx.julia_version) write_env(ctx.env; skip_writing_project) # write env before building show_update(ctx.env, ctx.registries; io=ctx.io, hidden_upgrades_info = true) build_versions(ctx, union(new_apply, new_git)) @@ -1740,7 +1846,7 @@ function pin(ctx::Context, pkgs::Vector{PackageSpec}) update_manifest!(ctx.env, pkgs, deps_map, ctx.julia_version) new = download_source(ctx) fixups_from_projectfile!(ctx.env) - download_artifacts(ctx.env; julia_version=ctx.julia_version, io=ctx.io) + download_artifacts(ctx; julia_version=ctx.julia_version) write_env(ctx.env) # write env before building show_update(ctx.env, ctx.registries; io=ctx.io) build_versions(ctx, new) @@ -1788,7 +1894,7 @@ function free(ctx::Context, pkgs::Vector{PackageSpec}; err_if_free=true) update_manifest!(ctx.env, pkgs, deps_map, ctx.julia_version) new = download_source(ctx) fixups_from_projectfile!(ctx.env) - download_artifacts(ctx.env, io=ctx.io) + download_artifacts(ctx) write_env(ctx.env) # write env before building show_update(ctx.env, ctx.registries; io=ctx.io) build_versions(ctx, new) diff --git a/src/PlatformEngines.jl b/src/PlatformEngines.jl index f3f405ced9..287572c0be 100644 --- a/src/PlatformEngines.jl +++ b/src/PlatformEngines.jl @@ -255,7 +255,8 @@ function download( verbose::Bool = false, headers::Vector{Pair{String,String}} = Pair{String,String}[], auth_header::Union{Pair{String,String}, Nothing} = nothing, - io::IO=stderr_f() + io::IO=stderr_f(), + progress::Union{Nothing,Function} = nothing, # (total, now) -> nothing ) if auth_header === nothing auth_header = get_auth_header(url, verbose=verbose) @@ -268,7 +269,9 @@ function download( end do_fancy = verbose && can_fancyprint(io) - progress = if do_fancy + progress = if !isnothing(progress) + progress + elseif do_fancy bar = MiniProgressBar(header="Downloading", color=Base.info_color()) start_progress(io, bar) let bar=bar @@ -326,6 +329,7 @@ function download_verify( verbose::Bool = false, force::Bool = false, quiet_download::Bool = false, + progress::Union{Nothing,Function} = nothing, # (total, now) -> nothing ) # Whether the file existed in the first place file_existed = false @@ -352,7 +356,7 @@ function download_verify( attempts = 3 for i in 1:attempts try - download(url, dest; verbose=verbose || !quiet_download) + download(url, dest; verbose=verbose || !quiet_download, progress) break catch err @debug "download and verify failed on attempt $i/$attempts" url dest err @@ -466,6 +470,7 @@ function download_verify_unpack( verbose::Bool = false, quiet_download::Bool = false, io::IO=stderr_f(), + progress::Union{Nothing,Function} = nothing, # (total, now) -> nothing ) # First, determine whether we should keep this tarball around remove_tarball = false @@ -510,8 +515,7 @@ function download_verify_unpack( # Download the tarball; if it already existed and we needed to remove it # then we should remove the unpacked path as well - should_delete = !download_verify(url, hash, tarball_path; - force=force, verbose=verbose, quiet_download=quiet_download) + should_delete = !download_verify(url, hash, tarball_path; force, verbose, quiet_download, progress) if should_delete if verbose @info("Removing dest directory $(dest) as source tarball changed")