Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1769605 Add timeout to cloud storage #1272

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions azure_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

type snowflakeAzureClient struct {
cfg *Config
}

type azureLocation struct {
Expand Down Expand Up @@ -85,9 +86,11 @@ func (util *snowflakeAzureClient) getFileHeader(meta *fileMetadata, filename str
if meta.mockAzureClient != nil {
blobClient = meta.mockAzureClient
}
resp, err := blobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{
AccessConditions: &blob.AccessConditions{},
CPKInfo: &blob.CPKInfo{},
resp, err := withCloudStorageTimeout(util.cfg, func(ctx context.Context) (blob.GetPropertiesResponse, error) {
return blobClient.GetProperties(ctx, &blob.GetPropertiesOptions{
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
AccessConditions: &blob.AccessConditions{},
CPKInfo: &blob.CPKInfo{},
})
})
if err != nil {
var se *azcore.ResponseError
Expand Down Expand Up @@ -203,9 +206,11 @@ func (util *snowflakeAzureClient) uploadFile(
if meta.realSrcStream != nil {
uploadSrc = meta.realSrcStream
}
_, err = blobClient.UploadStream(context.Background(), uploadSrc, &azblob.UploadStreamOptions{
BlockSize: int64(uploadSrc.Len()),
Metadata: azureMeta,
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (azblob.UploadStreamResponse, error) {
return blobClient.UploadStream(ctx, uploadSrc, &azblob.UploadStreamOptions{
BlockSize: int64(uploadSrc.Len()),
Metadata: azureMeta,
})
})
} else {
var f *os.File
Expand All @@ -228,7 +233,9 @@ func (util *snowflakeAzureClient) uploadFile(
if meta.options.putAzureCallback != nil {
blobOptions.Progress = meta.options.putAzureCallback.call
}
_, err = blobClient.UploadFile(context.Background(), f, blobOptions)
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (azblob.UploadFileResponse, error) {
return blobClient.UploadFile(ctx, f, blobOptions)
})
}
if err != nil {
var se *azcore.ResponseError
Expand Down Expand Up @@ -279,7 +286,9 @@ func (util *snowflakeAzureClient) nativeDownloadFile(
blobClient = meta.mockAzureClient
}
if meta.options.GetFileToStream {
blobDownloadResponse, err := blobClient.DownloadStream(context.Background(), &azblob.DownloadStreamOptions{})
blobDownloadResponse, err := withCloudStorageTimeout(util.cfg, func(ctx context.Context) (azblob.DownloadStreamResponse, error) {
return blobClient.DownloadStream(ctx, &azblob.DownloadStreamOptions{})
})
if err != nil {
return err
}
Expand All @@ -295,9 +304,11 @@ func (util *snowflakeAzureClient) nativeDownloadFile(
return err
}
defer f.Close()
_, err = blobClient.DownloadFile(
context.Background(), f, &azblob.DownloadFileOptions{
Concurrency: uint16(maxConcurrency)})
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (any, error) {
return blobClient.DownloadFile(
ctx, f, &azblob.DownloadFileOptions{
Concurrency: uint16(maxConcurrency)})
})
if err != nil {
return err
}
Expand Down
51 changes: 48 additions & 3 deletions azure_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ func TestUploadFileWithAzureUploadFailedError(t *testing.T) {
return azblob.UploadFileResponse{}, errors.New("unexpected error uploading file")
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

uploadMeta.realSrcFileName = uploadMeta.srcFileName
Expand Down Expand Up @@ -230,6 +235,11 @@ func TestUploadStreamWithAzureUploadFailedError(t *testing.T) {
return azblob.UploadStreamResponse{}, errors.New("unexpected error uploading file")
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

uploadMeta.realSrcStream = uploadMeta.srcStream
Expand Down Expand Up @@ -291,6 +301,11 @@ func TestUploadFileWithAzureUploadTokenExpired(t *testing.T) {
}
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

uploadMeta.realSrcFileName = uploadMeta.srcFileName
Expand Down Expand Up @@ -362,6 +377,11 @@ func TestUploadFileWithAzureUploadNeedsRetry(t *testing.T) {
}
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

uploadMeta.realSrcFileName = uploadMeta.srcFileName
Expand Down Expand Up @@ -418,6 +438,11 @@ func TestDownloadOneFileToAzureFailed(t *testing.T) {
return blob.GetPropertiesResponse{}, nil
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}
err = new(remoteStorageUtil).downloadOneFile(&downloadMeta)
if err == nil {
Expand All @@ -444,9 +469,14 @@ func TestGetFileHeaderErrorStatus(t *testing.T) {
return blob.GetPropertiesResponse{}, errors.New("failed to retrieve headers")
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

if header, err := new(snowflakeAzureClient).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
if header, err := (&snowflakeAzureClient{cfg: &Config{}}).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
t.Fatalf("expected null header, got: %v", header)
}
if meta.resStatus != errStatus {
Expand Down Expand Up @@ -477,9 +507,14 @@ func TestGetFileHeaderErrorStatus(t *testing.T) {
}
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

if header, err := new(snowflakeAzureClient).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
if header, err := (&snowflakeAzureClient{cfg: &Config{}}).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
t.Fatalf("expected null header, got: %v", header)
}
if meta.resStatus != notFoundFile {
Expand All @@ -505,7 +540,7 @@ func TestGetFileHeaderErrorStatus(t *testing.T) {
},
}

if header, err := new(snowflakeAzureClient).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
if header, err := (&snowflakeAzureClient{cfg: &Config{}}).getFileHeader(&meta, "file.txt"); header != nil || err == nil {
t.Fatalf("expected null header, got: %v", header)
}
if meta.resStatus != renewToken {
Expand Down Expand Up @@ -540,6 +575,11 @@ func TestUploadFileToAzureClientCastFail(t *testing.T) {
options: &SnowflakeFileTransferOptions{
MultiPartThreshold: dataSizeThreshold,
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

uploadMeta.realSrcFileName = uploadMeta.srcFileName
Expand Down Expand Up @@ -573,6 +613,11 @@ func TestAzureGetHeaderClientCastFail(t *testing.T) {
return blob.GetPropertiesResponse{}, nil
},
},
sfa: &snowflakeFileTransferAgent{
sc: &snowflakeConn{
cfg: &Config{},
},
},
}

_, err = new(snowflakeAzureClient).getFileHeader(&meta, "file.txt")
Expand Down
13 changes: 13 additions & 0 deletions dsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
defaultRequestTimeout = 0 * time.Second // Timeout for retry for request EXCLUDING clientTimeout
defaultJWTTimeout = 60 * time.Second
defaultExternalBrowserTimeout = 120 * time.Second // Timeout for external browser login
defaultCloudStorageTimeout = -1 // Timeout for calling cloud storage.
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
defaultMaxRetryCount = 7 // specifies maximum number of subsequent retries
defaultDomain = ".snowflakecomputing.com"
cnDomain = ".snowflakecomputing.cn"
Expand Down Expand Up @@ -77,6 +78,7 @@ type Config struct {
ClientTimeout time.Duration // Timeout for network round trip + read out http response
JWTClientTimeout time.Duration // Timeout for network round trip + read out http response used when JWT token auth is taking place
ExternalBrowserTimeout time.Duration // Timeout for external browser login
CloudStorageTimeout time.Duration // Timeout for a single call to a cloud storage provider
MaxRetryCount int // Specifies how many times non-periodic HTTP request can be retried

Application string // application name.
Expand Down Expand Up @@ -215,6 +217,9 @@ func DSN(cfg *Config) (dsn string, err error) {
if cfg.ExternalBrowserTimeout != defaultExternalBrowserTimeout {
params.Add("externalBrowserTimeout", strconv.FormatInt(int64(cfg.ExternalBrowserTimeout/time.Second), 10))
}
if cfg.CloudStorageTimeout != defaultCloudStorageTimeout {
params.Add("cloudStorageTimeout", strconv.FormatInt(int64(cfg.CloudStorageTimeout/time.Second), 10))
}
if cfg.MaxRetryCount != defaultMaxRetryCount {
params.Add("maxRetryCount", strconv.Itoa(cfg.MaxRetryCount))
}
Expand Down Expand Up @@ -498,6 +503,9 @@ func fillMissingConfigParameters(cfg *Config) error {
if cfg.ExternalBrowserTimeout == 0 {
cfg.ExternalBrowserTimeout = defaultExternalBrowserTimeout
}
if cfg.CloudStorageTimeout == 0 {
cfg.CloudStorageTimeout = defaultCloudStorageTimeout
}
if cfg.MaxRetryCount == 0 {
cfg.MaxRetryCount = defaultMaxRetryCount
}
Expand Down Expand Up @@ -714,6 +722,11 @@ func parseDSNParams(cfg *Config, params string) (err error) {
if err != nil {
return err
}
case "cloudStorageTimeout":
cfg.CloudStorageTimeout, err = parseTimeout(value)
if err != nil {
return err
}
case "maxRetryCount":
cfg.MaxRetryCount, err = strconv.Atoi(value)
if err != nil {
Expand Down
Loading
Loading