From ecb5dc93d8f3fbac4f6e85f97b9fd2c7a9c8a545 Mon Sep 17 00:00:00 2001 From: Piotr Fus Date: Wed, 11 Dec 2024 12:16:20 +0100 Subject: [PATCH 1/4] Test GCS HEAD requests with retry --- gcs_storage_client.go | 11 +++++++++-- put_get_test.go | 9 +++++++++ retry.go | 5 +++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/gcs_storage_client.go b/gcs_storage_client.go index 0627f6122..aa43407de 100644 --- a/gcs_storage_client.go +++ b/gcs_storage_client.go @@ -3,6 +3,7 @@ package gosnowflake import ( + "context" "encoding/json" "fmt" "io" @@ -11,6 +12,7 @@ import ( "os" "strconv" "strings" + "time" ) const ( @@ -74,7 +76,12 @@ func (util *snowflakeGcsClient) getFileHeader(meta *fileMetadata, filename strin if meta.mockGcsClient != nil { client = meta.mockGcsClient } - resp, err := client.Do(req) + req.Close = true + r := newRetryHTTP(context.Background(), client, http.NewRequest, URL, gcsHeaders, time.Second, 3, defaultTimeProvider, nil) // TODO replace with timeout context + r.doHead() + fmt.Printf("Before calling HEAD to GCS\n") + resp, err := r.execute() + fmt.Printf("After calling HEAD to GCS, err: %v\n", err) if err != nil { return nil, err } @@ -394,6 +401,6 @@ func (util *snowflakeGcsClient) isTokenExpired(resp *http.Response) bool { func newGcsClient() gcsAPI { return &http.Client{ - Transport: SnowflakeTransport, + //Transport: SnowflakeTransport, } } diff --git a/put_get_test.go b/put_get_test.go index 68ca087d4..042b96a2c 100644 --- a/put_get_test.go +++ b/put_get_test.go @@ -253,6 +253,11 @@ func TestPutLocalFile(t *testing.T) { } func TestPutGetWithAutoCompressFalse(t *testing.T) { + level := logger.GetLogLevel() + _ = logger.SetLogLevel("debug") + defer func() { + _ = logger.SetLogLevel(level) + }() tmpDir := t.TempDir() testData := filepath.Join(tmpDir, "data.txt") f, err := os.Create(testData) @@ -267,14 +272,17 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) { assertNilF(t, f.Close()) }() + fmt.Printf("Starting TestPutGetWithAutoCompressFalse at %v\n", time.Now()) runDBTest(t, func(dbt *DBTest) { stageDir := "test_put_uncompress_file_" + randomString(10) dbt.mustExec("rm @~/" + stageDir) // PUT test + fmt.Printf("Running PUT at %v\n", time.Now()) sqlText := fmt.Sprintf("put 'file://%v' @~/%v auto_compress=FALSE", testData, stageDir) sqlText = strings.ReplaceAll(sqlText, "\\", "\\\\") dbt.mustExec(sqlText) + fmt.Printf("Finished PUT at %v\n", time.Now()) defer dbt.mustExec("rm @~/" + stageDir) rows := dbt.mustQuery("ls @~/" + stageDir) defer func() { @@ -327,6 +335,7 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) { } func TestPutOverwrite(t *testing.T) { + fmt.Printf("Starting TestPutOverwrite at %v\n", time.Now()) tmpDir := t.TempDir() testData := filepath.Join(tmpDir, "data.txt") f, err := os.Create(testData) diff --git a/retry.go b/retry.go index 59d2fdf60..cef2ac00e 100644 --- a/retry.go +++ b/retry.go @@ -282,6 +282,11 @@ func (r *retryHTTP) doPost() *retryHTTP { return r } +func (r *retryHTTP) doHead() *retryHTTP { + r.method = "HEAD" + return r +} + func (r *retryHTTP) setBody(body []byte) *retryHTTP { r.bodyCreator = func() ([]byte, error) { return body, nil From fd20716d7f37535e7e34d7a24614007c13daa78b Mon Sep 17 00:00:00 2001 From: Piotr Fus Date: Fri, 13 Dec 2024 07:21:09 +0100 Subject: [PATCH 2/4] v2 --- gcs_storage_client.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/gcs_storage_client.go b/gcs_storage_client.go index aa43407de..f2eb34859 100644 --- a/gcs_storage_client.go +++ b/gcs_storage_client.go @@ -4,9 +4,11 @@ package gosnowflake import ( "context" + "crypto/tls" "encoding/json" "fmt" "io" + "net" "net/http" "net/url" "os" @@ -401,6 +403,18 @@ func (util *snowflakeGcsClient) isTokenExpired(resp *http.Response) bool { func newGcsClient() gcsAPI { return &http.Client{ - //Transport: SnowflakeTransport, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: certPool, + VerifyPeerCertificate: verifyPeerCertificateSerial, + }, + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Minute, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + }, } } From a7186f545be2ab39332c151af371d418d51d2bce Mon Sep 17 00:00:00 2001 From: Piotr Fus Date: Fri, 13 Dec 2024 08:08:48 +0100 Subject: [PATCH 3/4] v3 --- gcs_storage_client.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/gcs_storage_client.go b/gcs_storage_client.go index f2eb34859..824ecd9eb 100644 --- a/gcs_storage_client.go +++ b/gcs_storage_client.go @@ -3,7 +3,6 @@ package gosnowflake import ( - "context" "crypto/tls" "encoding/json" "fmt" @@ -78,12 +77,7 @@ func (util *snowflakeGcsClient) getFileHeader(meta *fileMetadata, filename strin if meta.mockGcsClient != nil { client = meta.mockGcsClient } - req.Close = true - r := newRetryHTTP(context.Background(), client, http.NewRequest, URL, gcsHeaders, time.Second, 3, defaultTimeProvider, nil) // TODO replace with timeout context - r.doHead() - fmt.Printf("Before calling HEAD to GCS\n") - resp, err := r.execute() - fmt.Printf("After calling HEAD to GCS, err: %v\n", err) + resp, err := client.Do(req) if err != nil { return nil, err } @@ -403,18 +397,20 @@ func (util *snowflakeGcsClient) isTokenExpired(resp *http.Response) bool { func newGcsClient() gcsAPI { return &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: certPool, - VerifyPeerCertificate: verifyPeerCertificateSerial, - }, - MaxIdleConns: 10, - IdleConnTimeout: 30 * time.Minute, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - }, + Transport: gcsTransport, } } + +var gcsTransport = &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: certPool, + VerifyPeerCertificate: verifyPeerCertificateSerial, + }, + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Minute, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, +} From c12650f01750714f2a1a6fc6ad08b1787c6165da Mon Sep 17 00:00:00 2001 From: Piotr Fus Date: Fri, 13 Dec 2024 09:13:18 +0100 Subject: [PATCH 4/4] v4 --- put_get_test.go | 9 --------- retry.go | 5 ----- 2 files changed, 14 deletions(-) diff --git a/put_get_test.go b/put_get_test.go index 042b96a2c..68ca087d4 100644 --- a/put_get_test.go +++ b/put_get_test.go @@ -253,11 +253,6 @@ func TestPutLocalFile(t *testing.T) { } func TestPutGetWithAutoCompressFalse(t *testing.T) { - level := logger.GetLogLevel() - _ = logger.SetLogLevel("debug") - defer func() { - _ = logger.SetLogLevel(level) - }() tmpDir := t.TempDir() testData := filepath.Join(tmpDir, "data.txt") f, err := os.Create(testData) @@ -272,17 +267,14 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) { assertNilF(t, f.Close()) }() - fmt.Printf("Starting TestPutGetWithAutoCompressFalse at %v\n", time.Now()) runDBTest(t, func(dbt *DBTest) { stageDir := "test_put_uncompress_file_" + randomString(10) dbt.mustExec("rm @~/" + stageDir) // PUT test - fmt.Printf("Running PUT at %v\n", time.Now()) sqlText := fmt.Sprintf("put 'file://%v' @~/%v auto_compress=FALSE", testData, stageDir) sqlText = strings.ReplaceAll(sqlText, "\\", "\\\\") dbt.mustExec(sqlText) - fmt.Printf("Finished PUT at %v\n", time.Now()) defer dbt.mustExec("rm @~/" + stageDir) rows := dbt.mustQuery("ls @~/" + stageDir) defer func() { @@ -335,7 +327,6 @@ func TestPutGetWithAutoCompressFalse(t *testing.T) { } func TestPutOverwrite(t *testing.T) { - fmt.Printf("Starting TestPutOverwrite at %v\n", time.Now()) tmpDir := t.TempDir() testData := filepath.Join(tmpDir, "data.txt") f, err := os.Create(testData) diff --git a/retry.go b/retry.go index cef2ac00e..59d2fdf60 100644 --- a/retry.go +++ b/retry.go @@ -282,11 +282,6 @@ func (r *retryHTTP) doPost() *retryHTTP { return r } -func (r *retryHTTP) doHead() *retryHTTP { - r.method = "HEAD" - return r -} - func (r *retryHTTP) setBody(body []byte) *retryHTTP { r.bodyCreator = func() ([]byte, error) { return body, nil