diff --git a/go.mod b/go.mod index 8c9a892..3ebaab4 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,13 @@ toolchain go1.22.2 require ( cloud.google.com/go v0.112.2 - cloud.google.com/go/bigquery v1.61.0 + cloud.google.com/go/bigquery v1.59.1 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-sdk-go-v2 v1.26.1 github.com/aws/aws-sdk-go-v2/config v1.27.11 github.com/aws/aws-sdk-go-v2/credentials v1.17.11 github.com/aws/aws-sdk-go-v2/service/redshiftdata v1.25.4 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/databricks/databricks-sql-go v1.5.5 github.com/dlclark/regexp2 v1.11.0 github.com/gliderlabs/ssh v0.3.7 @@ -49,6 +50,7 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect + github.com/apache/arrow/go/v14 v14.0.2 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect github.com/apache/arrow/go/v16 v16.0.0 // indirect github.com/apache/thrift v0.19.0 // indirect @@ -68,7 +70,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect github.com/aws/smithy-go v1.20.2 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect github.com/coreos/go-oidc/v3 v3.5.0 // indirect github.com/danieljoos/wincred v1.1.2 // indirect @@ -142,7 +143,7 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 // indirect google.golang.org/grpc v1.63.2 // indirect diff --git a/go.sum b/go.sum index bbf0207..b4212ef 100644 --- a/go.sum +++ b/go.sum @@ -5,17 +5,17 @@ cloud.google.com/go/auth v0.4.1 h1:Z7YNIhlWRtrnKlZke7z3GMqzvuYzdc2z98F9D1NV5Hg= cloud.google.com/go/auth v0.4.1/go.mod h1:QVBuVEKpCn4Zp58hzRGvL0tjRGU0YqdRTdCHM1IHnro= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= -cloud.google.com/go/bigquery v1.61.0 h1:w2Goy9n6gh91LVi6B2Sc+HpBl8WbWhIyzdvVvrAuEIw= -cloud.google.com/go/bigquery v1.61.0/go.mod h1:PjZUje0IocbuTOdq4DBOJLNYB0WF3pAKBHzAYyxCwFo= +cloud.google.com/go/bigquery v1.59.1 h1:CpT+/njKuKT3CEmswm6IbhNu9u35zt5dO4yPDLW+nG4= +cloud.google.com/go/bigquery v1.59.1/go.mod h1:VP1UJYgevyTwsV7desjzNzDND5p6hZB+Z8gZJN1GQUc= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= -cloud.google.com/go/datacatalog v1.20.0 h1:BGDsEjqpAo0Ka+b9yDLXnE5k+jU3lXGMh//NsEeDMIg= -cloud.google.com/go/datacatalog v1.20.0/go.mod h1:fSHaKjIroFpmRrYlwz9XBB2gJBpXufpnxyAKaT4w6L0= +cloud.google.com/go/datacatalog v1.19.3 h1:A0vKYCQdxQuV4Pi0LL9p39Vwvg4jH5yYveMv50gU5Tw= +cloud.google.com/go/datacatalog v1.19.3/go.mod h1:ra8V3UAsciBpJKQ+z9Whkxzxv7jmQg1hfODr3N3YPJ4= cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM= cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA= -cloud.google.com/go/longrunning v0.5.6 h1:xAe8+0YaWoCKr9t1+aWe+OeQgN/iJK1fEgZSXmjuEaE= -cloud.google.com/go/longrunning v0.5.6/go.mod h1:vUaDrWYOMKRuhiv6JBnn49YxCPz2Ayn9GqyjaBT8/mA= +cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg= +cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s= cloud.google.com/go/storage v1.40.0 h1:VEpDQV5CJxFmJ6ueWNsKxcr1QAYOXEgxDa+sBbJahPw= cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2ucUMY7Ir0g= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= @@ -49,6 +49,8 @@ github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26 h1:3YVZUqkoev4mL+aCwVO github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26/go.mod h1:ymXt5bw5uSNu4jveerFxE0vNYxF8ncqbptntMaFMg3k= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= +github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= +github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= github.com/apache/arrow/go/v16 v16.0.0 h1:qRLbJRPj4zaseZrjbDHa7mUoZDDIU+4pu+mE2Lucs5g= @@ -202,8 +204,8 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= -github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= +github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= +github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= @@ -493,8 +495,8 @@ google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCID google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda h1:wu/KJm9KJwpfHWhkkZGohVC6KRrc1oJNr4jwtQMOQXw= -google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda/go.mod h1:g2LLCvCeCSir/JJSWosk19BR4NVxGqHUC6rxIRsd7Aw= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be h1:Zz7rLWqp0ApfsR/l7+zSHhY3PMiH2xqgxlfYfAfNpoU= google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w= google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 h1:DujSIu+2tC9Ht0aPNA7jgj23Iq8Ewi5sgkQ++wdvonE= diff --git a/sqlconnect/internal/bigquery/db.go b/sqlconnect/internal/bigquery/db.go index 2231201..61acd02 100644 --- a/sqlconnect/internal/bigquery/db.go +++ b/sqlconnect/internal/bigquery/db.go @@ -30,6 +30,9 @@ func NewDB(configJSON json.RawMessage) (*DB, error) { db := sql.OpenDB(driver.NewConnector( config.ProjectID, + driver.Config{ + JobRateLimitExceededRetryEnabled: true, + }, option.WithCredentialsJSON([]byte(config.CredentialsJSON))), ) diff --git a/sqlconnect/internal/bigquery/driver/connection.go b/sqlconnect/internal/bigquery/driver/connection.go index 932a5c4..24d502a 100644 --- a/sqlconnect/internal/bigquery/driver/connection.go +++ b/sqlconnect/internal/bigquery/driver/connection.go @@ -5,12 +5,16 @@ import ( "database/sql/driver" "errors" "fmt" + "strings" + "time" "cloud.google.com/go/bigquery" + "github.com/cenkalti/backoff/v4" "google.golang.org/api/iterator" ) type bigQueryConnection struct { + config Config ctx context.Context client *bigquery.Client closed bool @@ -76,3 +80,28 @@ func (bigQueryConnection) CheckNamedValue(*driver.NamedValue) error { func (connection *bigQueryConnection) BigqueryClient() *bigquery.Client { return connection.client } + +// readWithBackoff will retry the read operation if the error is [jobRateLimitExceeded] and the config is set to retry rate limit errors +// +// TODO: this should no longer be needed once this fix is released: +// https://github.com/googleapis/google-cloud-go/pull/9726 +func (connection *bigQueryConnection) readWithBackoff(ctx context.Context, query *bigquery.Query) (it *bigquery.RowIterator, err error) { + if !connection.config.JobRateLimitExceededRetryEnabled { + return query.Read(ctx) + } + // mimicking google's own retry backoff settings + // https://github.com/googleapis/google-cloud-go/blob/b2e704d9d287445304d2b6030b6e35a4eb8be80a/bigquery/bigquery.go#L236 + retry := backoff.WithContext(backoff.NewExponentialBackOff( + backoff.WithInitialInterval(1*time.Second), + backoff.WithMaxInterval(32*time.Second), + backoff.WithMultiplier(2), + ), ctx) + _ = backoff.Retry(func() error { + it, err = query.Read(ctx) + if err != nil && (strings.Contains(err.Error(), "jobRateLimitExceeded")) { + return err + } + return nil + }, retry) + return it, err +} diff --git a/sqlconnect/internal/bigquery/driver/connector.go b/sqlconnect/internal/bigquery/driver/connector.go index a45b7ba..01a8dc6 100644 --- a/sqlconnect/internal/bigquery/driver/connector.go +++ b/sqlconnect/internal/bigquery/driver/connector.go @@ -8,15 +8,21 @@ import ( "google.golang.org/api/option" ) -func NewConnector(projectID string, opts ...option.ClientOption) driver.Connector { +type Config struct { + JobRateLimitExceededRetryEnabled bool // Enable jobRateLimitExceeded retries: default false +} + +func NewConnector(projectID string, config Config, opts ...option.ClientOption) driver.Connector { return &bigQueryConnector{ projectID: projectID, + config: config, opts: opts, } } type bigQueryConnector struct { projectID string + config Config opts []option.ClientOption } @@ -27,6 +33,7 @@ func (c *bigQueryConnector) Connect(ctx context.Context) (driver.Conn, error) { } return &bigQueryConnection{ + config: c.config, ctx: ctx, client: client, }, nil diff --git a/sqlconnect/internal/bigquery/driver/driver_test.go b/sqlconnect/internal/bigquery/driver/driver_test.go index 4926f01..9470c13 100644 --- a/sqlconnect/internal/bigquery/driver/driver_test.go +++ b/sqlconnect/internal/bigquery/driver/driver_test.go @@ -34,7 +34,7 @@ func TestBigqueryDriver(t *testing.T) { require.NoError(t, json.Unmarshal([]byte(configJSON), &c)) t.Run("OpenDB", func(t *testing.T) { - db := sql.OpenDB(driver.NewConnector(c.ProjectID, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) + db := sql.OpenDB(driver.NewConnector(c.ProjectID, driver.Config{}, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) t.Cleanup(func() { require.NoError(t, db.Close(), "it should be able to close the database connection") }) @@ -225,9 +225,32 @@ func TestBigqueryDriver(t *testing.T) { }) }) + // TODO: this test should start failing once this fix is released: + // https://github.com/googleapis/google-cloud-go/pull/9726 t.Run("jobRateLimitExceeded", func(t *testing.T) { + t.Run("selecting from information schema from multiple go-routines should lead to a jobRateLimitExceeded error", func(t *testing.T) { + db := sql.OpenDB(driver.NewConnector(c.ProjectID, driver.Config{}, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) + t.Cleanup(func() { + require.NoError(t, db.Close(), "it should be able to close the database connection") + }) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(10) + for i := 0; i < 200; i++ { + g.Go(func() error { + _, err := db.ExecContext(ctx, "SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = 'invalid'") + return err + }) + } + err := g.Wait() + require.Error(t, err, "it should return an error") + require.Contains(t, err.Error(), "jobRateLimitExceeded", "it should return a jobRateLimitExceeded error") + }) + t.Run("selecting from information schema from multiple go-routines and having rate limit retries enabled, shoudln't lead to a jobRateLimitExceeded error", func(t *testing.T) { - db := sql.OpenDB(driver.NewConnector(c.ProjectID, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) + db := sql.OpenDB(driver.NewConnector(c.ProjectID, driver.Config{JobRateLimitExceededRetryEnabled: true}, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) t.Cleanup(func() { require.NoError(t, db.Close(), "it should be able to close the database connection") }) diff --git a/sqlconnect/internal/bigquery/driver/statement.go b/sqlconnect/internal/bigquery/driver/statement.go index 5bfde9e..ae5c18e 100644 --- a/sqlconnect/internal/bigquery/driver/statement.go +++ b/sqlconnect/internal/bigquery/driver/statement.go @@ -41,7 +41,7 @@ func (statement *bigQueryStatement) ExecContext(ctx context.Context, args []driv return nil, err } - rowIterator, err := query.Read(ctx) + rowIterator, err := statement.connection.readWithBackoff(ctx, query) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (statement *bigQueryStatement) QueryContext(ctx context.Context, args []dri return nil, err } - rowIterator, err := query.Read(ctx) + rowIterator, err := statement.connection.readWithBackoff(ctx, query) if err != nil { return nil, err }