Skip to content

Commit

Permalink
Merge pull request #2318 from buildkite/pdp-1504-artifacts-in-azure
Browse files Browse the repository at this point in the history
Artifact up/download to/from Azure Blob Storage
  • Loading branch information
DrJosh9000 authored Aug 29, 2023
2 parents add7e26 + 764384c commit 585e4b2
Show file tree
Hide file tree
Showing 15 changed files with 581 additions and 82 deletions.
108 changes: 63 additions & 45 deletions agent/artifact_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/aws/aws-sdk-go/service/s3"
"github.com/buildkite/agent/v3/api"
iartifact "github.com/buildkite/agent/v3/internal/artifact"
"github.com/buildkite/agent/v3/logger"
"github.com/buildkite/agent/v3/pool"
)
Expand Down Expand Up @@ -57,14 +58,14 @@ func NewArtifactDownloader(l logger.Logger, ac APIClient, c ArtifactDownloaderCo

func (a *ArtifactDownloader) Download(ctx context.Context) error {
// Turn the download destination into an absolute path and confirm it exists
downloadDestination, _ := filepath.Abs(a.conf.Destination)
fileInfo, err := os.Stat(downloadDestination)
destination, _ := filepath.Abs(a.conf.Destination)
fileInfo, err := os.Stat(destination)
if err != nil {
return fmt.Errorf("Could not find information about destination: %s %v",
downloadDestination, err)
destination, err)
}
if !fileInfo.IsDir() {
return fmt.Errorf("%s is not a directory", downloadDestination)
return fmt.Errorf("%s is not a directory", destination)
}

artifacts, err := NewArtifactSearcher(a.logger, a.apiClient, a.conf.BuildID).
Expand All @@ -79,7 +80,7 @@ func (a *ArtifactDownloader) Download(ctx context.Context) error {
return errors.New("No artifacts found for downloading")
}

a.logger.Info("Found %d artifacts. Starting to download to: %s", artifactCount, downloadDestination)
a.logger.Info("Found %d artifacts. Starting to download to: %s", artifactCount, destination)

p := pool.New(pool.MaxConcurrencyLimit)
errors := []error{}
Expand All @@ -101,46 +102,7 @@ func (a *ArtifactDownloader) Download(ctx context.Context) error {
path = strings.Replace(path, `\`, `/`, -1)
}

// Handle downloading from S3, GS, or RT
var dler interface {
Start(context.Context) error
}
switch {
case strings.HasPrefix(artifact.UploadDestination, "s3://"):
bucketName, _ := ParseS3Destination(artifact.UploadDestination)
dler = NewS3Downloader(a.logger, S3DownloaderConfig{
S3Client: s3Clients[bucketName],
Path: path,
S3Path: artifact.UploadDestination,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
case strings.HasPrefix(artifact.UploadDestination, "gs://"):
dler = NewGSDownloader(a.logger, GSDownloaderConfig{
Path: path,
Bucket: artifact.UploadDestination,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
case strings.HasPrefix(artifact.UploadDestination, "rt://"):
dler = NewArtifactoryDownloader(a.logger, ArtifactoryDownloaderConfig{
Path: path,
Repository: artifact.UploadDestination,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
default:
dler = NewDownload(a.logger, http.DefaultClient, DownloadConfig{
URL: artifact.URL,
Path: path,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
}
dler := a.createDownloader(artifact, path, destination, s3Clients)

// If the downloaded encountered an error, lock
// the pool, collect it, then unlock the pool
Expand Down Expand Up @@ -188,3 +150,59 @@ func (a *ArtifactDownloader) generateS3Clients(artifacts []*api.Artifact) (map[s

return s3Clients, nil
}

type downloader interface {
Start(context.Context) error
}

func (a *ArtifactDownloader) createDownloader(artifact *api.Artifact, path, destination string, s3Clients map[string]*s3.S3) downloader {
// Handle downloading from S3, GS, RT, or Azure
switch {
case strings.HasPrefix(artifact.UploadDestination, "s3://"):
bucketName, _ := ParseS3Destination(artifact.UploadDestination)
return NewS3Downloader(a.logger, S3DownloaderConfig{
S3Client: s3Clients[bucketName],
Path: path,
S3Path: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

case strings.HasPrefix(artifact.UploadDestination, "gs://"):
return NewGSDownloader(a.logger, GSDownloaderConfig{
Path: path,
Bucket: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

case strings.HasPrefix(artifact.UploadDestination, "rt://"):
return NewArtifactoryDownloader(a.logger, ArtifactoryDownloaderConfig{
Path: path,
Repository: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

case iartifact.IsAzureBlobPath(artifact.UploadDestination):
return iartifact.NewAzureBlobDownloader(a.logger, iartifact.AzureBlobDownloaderConfig{
Path: path,
Repository: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

default:
return NewDownload(a.logger, http.DefaultClient, DownloadConfig{
URL: artifact.URL,
Path: path,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
}
}
77 changes: 48 additions & 29 deletions agent/artifact_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/internal/artifact"
"github.com/buildkite/agent/v3/internal/experiments"
"github.com/buildkite/agent/v3/internal/mime"
"github.com/buildkite/agent/v3/logger"
Expand Down Expand Up @@ -246,41 +247,59 @@ func (a *ArtifactUploader) build(path string, absolutePath string, globPath stri
return artifact, nil
}

func (a *ArtifactUploader) upload(ctx context.Context, artifacts []*api.Artifact) error {
var uploader Uploader
var err error

// Determine what uploader to use
if a.conf.Destination != "" {
if strings.HasPrefix(a.conf.Destination, "s3://") {
uploader, err = NewS3Uploader(a.logger, S3UploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})
} else if strings.HasPrefix(a.conf.Destination, "gs://") {
uploader, err = NewGSUploader(a.logger, GSUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})
} else if strings.HasPrefix(a.conf.Destination, "rt://") {
uploader, err = NewArtifactoryUploader(a.logger, ArtifactoryUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})
} else {
return fmt.Errorf("invalid upload destination: '%v'. Only s3://, gs:// or rt:// upload schemes are allowed. Did you forget to surround your artifact upload pattern in double quotes?", a.conf.Destination)
// createUploader applies some heuristics to the destination to infer which
// uploader to use.
func (a *ArtifactUploader) createUploader() (uploader Uploader, err error) {
var dest string
defer func() {
if err != nil || dest == "" {
return
}
a.logger.Info("Uploading to %s (%q), using your agent configuration", dest, a.conf.Destination)
}()

a.logger.Info("Uploading to %q, using your agent configuration", a.conf.Destination)
} else {
uploader = NewFormUploader(a.logger, FormUploaderConfig{
switch {
case a.conf.Destination == "":
a.logger.Info("Uploading to default Buildkite artifact storage")
return NewFormUploader(a.logger, FormUploaderConfig{
DebugHTTP: a.conf.DebugHTTP,
}), nil

case strings.HasPrefix(a.conf.Destination, "s3://"):
dest = "Amazon S3"
return NewS3Uploader(a.logger, S3UploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})

a.logger.Info("Uploading to default Buildkite artifact storage")
case strings.HasPrefix(a.conf.Destination, "gs://"):
dest = "Google Cloud Storage"
return NewGSUploader(a.logger, GSUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})

case strings.HasPrefix(a.conf.Destination, "rt://"):
dest = "Artifactory"
return NewArtifactoryUploader(a.logger, ArtifactoryUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})

case artifact.IsAzureBlobPath(a.conf.Destination):
dest = "Azure Blob storage"
return artifact.NewAzureBlobUploader(a.logger, artifact.AzureBlobUploaderConfig{
Destination: a.conf.Destination,
})

default:
return nil, fmt.Errorf("invalid upload destination: '%v'. Only s3://*, gs://*, rt://*, or https://*.blob.core.windows.net destinations are allowed. Did you forget to surround your artifact upload pattern in double quotes?", a.conf.Destination)
}
}

// Check if creation caused an error
func (a *ArtifactUploader) upload(ctx context.Context, artifacts []*api.Artifact) error {
// Determine what uploader to use
uploader, err := a.createUploader()
if err != nil {
return fmt.Errorf("creating uploader: %v", err)
}
Expand Down Expand Up @@ -413,7 +432,7 @@ func (a *ArtifactUploader) upload(ctx context.Context, artifacts []*api.Artifact
roko.WithMaxAttempts(10),
roko.WithStrategy(roko.Constant(5*time.Second)),
).DoWithContext(ctx, func(r *roko.Retrier) error {
if err := uploader.Upload(artifact); err != nil {
if err := uploader.Upload(ctx, artifact); err != nil {
a.logger.Warn("%s (%s)", err, r)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion agent/artifactory_uploader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (u *ArtifactoryUploader) URL(artifact *api.Artifact) string {
return url.String()
}

func (u *ArtifactoryUploader) Upload(artifact *api.Artifact) error {
func (u *ArtifactoryUploader) Upload(_ context.Context, artifact *api.Artifact) error {
// Open file from filesystem
u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath)
f, err := os.Open(artifact.AbsolutePath)
Expand Down
3 changes: 2 additions & 1 deletion agent/form_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"bytes"
"context"
_ "crypto/sha512" // import sha512 to make sha512 ssl certs work
"fmt"
"io"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (u *FormUploader) URL(artifact *api.Artifact) string {
return ""
}

func (u *FormUploader) Upload(artifact *api.Artifact) error {
func (u *FormUploader) Upload(_ context.Context, artifact *api.Artifact) error {
if artifact.FileSize > maxFormUploadedArtifactSize {
return errArtifactTooLarge{Size: artifact.FileSize}
}
Expand Down
11 changes: 8 additions & 3 deletions agent/form_uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -16,6 +17,8 @@ import (
)

func TestFormUploading(t *testing.T) {
ctx := context.Background()

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/buildkiteartifacts.com":
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestFormUploading(t *testing.T) {
}},
}

if err := uploader.Upload(artifact); err != nil {
if err := uploader.Upload(ctx, artifact); err != nil {
t.Errorf("uploader.Upload(artifact) = %v", err)
}
}
Expand All @@ -117,6 +120,7 @@ func TestFormUploading(t *testing.T) {
}

func TestFormUploadFileMissing(t *testing.T) {
ctx := context.Background()
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
http.Error(rw, "Not found", http.StatusNotFound)
}))
Expand Down Expand Up @@ -154,12 +158,13 @@ func TestFormUploadFileMissing(t *testing.T) {
}},
}

if err := uploader.Upload(artifact); !os.IsNotExist(err) {
if err := uploader.Upload(ctx, artifact); !os.IsNotExist(err) {
t.Errorf("uploader.Upload(artifact) = %v, want os.ErrNotExist", err)
}
}

func TestFormUploadTooBig(t *testing.T) {
ctx := context.Background()
uploader := NewFormUploader(logger.Discard, FormUploaderConfig{})
const size = int64(6442450944) // 6Gb
artifact := &api.Artifact{
Expand All @@ -172,7 +177,7 @@ func TestFormUploadTooBig(t *testing.T) {
UploadInstructions: &api.ArtifactUploadInstructions{},
}

if err := uploader.Upload(artifact); !errors.Is(err, errArtifactTooLarge{Size: size}) {
if err := uploader.Upload(ctx, artifact); !errors.Is(err, errArtifactTooLarge{Size: size}) {
t.Errorf("uploader.Upload(artifact) = %v, want errArtifactTooLarge", err)
}
}
2 changes: 1 addition & 1 deletion agent/gs_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (u *GSUploader) URL(artifact *api.Artifact) string {
return artifactURL.String()
}

func (u *GSUploader) Upload(artifact *api.Artifact) error {
func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error {
permission := os.Getenv("BUILDKITE_GS_ACL")

// The dirtiest validation method ever...
Expand Down
3 changes: 2 additions & 1 deletion agent/s3_uploader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (u *S3Uploader) URL(artifact *api.Artifact) string {
return url.String()
}

func (u *S3Uploader) Upload(artifact *api.Artifact) error {
func (u *S3Uploader) Upload(_ context.Context, artifact *api.Artifact) error {

permission, err := u.resolvePermission()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion agent/uploader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package agent

import (
"context"

"github.com/buildkite/agent/v3/api"
)

Expand All @@ -10,5 +12,5 @@ type Uploader interface {
URL(*api.Artifact) string

// The actual uploading of the file
Upload(*api.Artifact) error
Upload(context.Context, *api.Artifact) error
}
3 changes: 3 additions & 0 deletions clicommand/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ var RedactedVars = cli.StringSliceFlag{
"*_PRIVATE_KEY",
"*_ACCESS_KEY",
"*_SECRET_KEY",
// Connection strings frequently contain passwords, e.g.
// https://user:pass@host/ or Server=foo;Database=my-db;User Id=user;Password=pass;
"*_CONNECTION_STRING",
},
}

Expand Down
Loading

0 comments on commit 585e4b2

Please sign in to comment.