diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c1de80c..6236071 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -82,7 +82,7 @@ jobs: run: | go install github.com/wadey/gocovmerge@latest gocovmerge */profile.out > profile.out - - uses: codecov/codecov-action@v3 + - uses: codecov/codecov-action@v4 with: fail_ci_if_error: true files: ./profile.out diff --git a/Makefile b/Makefile index 19e20bf..248f7c5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help default test test-run test-teardown generate lint fmt +.PHONY: help default test test-run generate lint fmt GO=go LDFLAGS?=-s -w @@ -9,7 +9,7 @@ default: lint generate: install-tools $(GO) generate ./... -test: install-tools test-run test-teardown +test: install-tools test-run test-run: ## Run all unit tests ifeq ($(filter 1,$(debug) $(RUNNER_DEBUG)),) @@ -33,23 +33,6 @@ else $(TEST_CMD) -count=1 $(TEST_OPTIONS) ./... && touch $(TESTFILE) || true endif -test-teardown: - @if [ -f "$(TESTFILE)" ]; then \ - echo "Tests passed, tearing down..." ;\ - rm -f $(TESTFILE) ;\ - echo "mode: atomic" > coverage.txt ;\ - find . -name "profile.out" | while read file; do grep -v 'mode: atomic' $${file} >> coverage.txt; rm -f $${file}; done ;\ - else \ - rm -f coverage.txt coverage.html ; find . -name "profile.out" | xargs rm -f ;\ - echo "Tests failed :-(" ;\ - exit 1 ;\ - fi - -coverage: - go tool cover -html=coverage.txt -o coverage.html - -test-with-coverage: test coverage - help: ## Show the available commands @grep -E '^[0-9a-zA-Z_-]+:.*?## .*$$' ./Makefile | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' diff --git a/sqlconnect/internal/base/db.go b/sqlconnect/internal/base/db.go index 04542d2..1e85337 100644 --- a/sqlconnect/internal/base/db.go +++ b/sqlconnect/internal/base/db.go @@ -26,7 +26,7 @@ func NewDB(db *sql.DB, rudderSchema string, opts ...Option) *DB { return "SELECT schema_name FROM information_schema.schemata", "schema_name" }, SchemaExists: func(schema string) string { - return fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata where schema_name = '%[1]s')", schema) + return fmt.Sprintf("SELECT schema_name FROM information_schema.schemata where schema_name = '%[1]s'", schema) }, DropSchema: func(schema string) string { return fmt.Sprintf("DROP SCHEMA %[1]s CASCADE", schema) }, CreateTestTable: func(table string) string { diff --git a/sqlconnect/internal/base/schemaadmin.go b/sqlconnect/internal/base/schemaadmin.go index 91432df..64c561d 100644 --- a/sqlconnect/internal/base/schemaadmin.go +++ b/sqlconnect/internal/base/schemaadmin.go @@ -68,10 +68,15 @@ func (db *DB) ListSchemas(ctx context.Context) ([]sqlconnect.SchemaRef, error) { // SchemaExists returns true if the schema exists func (db *DB) SchemaExists(ctx context.Context, schemaRef sqlconnect.SchemaRef) (bool, error) { - var exists bool - if err := db.QueryRowContext(ctx, db.sqlCommands.SchemaExists(schemaRef.Name)).Scan(&exists); err != nil { + rows, err := db.QueryContext(ctx, db.sqlCommands.SchemaExists(schemaRef.Name)) + if err != nil { return false, fmt.Errorf("querying schema exists: %w", err) } + defer func() { _ = rows.Close() }() + exists := rows.Next() + if err := rows.Err(); err != nil { + return false, fmt.Errorf("iterating schema exists: %w", err) + } return exists, nil } diff --git a/sqlconnect/internal/bigquery/db.go b/sqlconnect/internal/bigquery/db.go index 2d585b5..b9eb7bd 100644 --- a/sqlconnect/internal/bigquery/db.go +++ b/sqlconnect/internal/bigquery/db.go @@ -1,10 +1,12 @@ package bigquery import ( + "context" "database/sql" "encoding/json" "fmt" + "cloud.google.com/go/bigquery" "github.com/samber/lo" "google.golang.org/api/option" @@ -49,7 +51,7 @@ func NewDB(configJSON json.RawMessage) (*DB, error) { } } cmds.TableExists = func(schema, table string) string { - return fmt.Sprintf("SELECT EXISTS (SELECT table_name FROM `%[1]s`.INFORMATION_SCHEMA.TABLES WHERE table_name = '%[1]s'", schema, table) + return fmt.Sprintf("SELECT EXISTS (SELECT table_name FROM `%[1]s`.INFORMATION_SCHEMA.TABLES WHERE table_name = '%[2]s'", schema, table) } return cmds @@ -67,3 +69,21 @@ func init() { type DB struct { *base.DB } + +func (db *DB) WithBigqueryClient(ctx context.Context, f func(*bigquery.Client) error) error { + sqlconn, err := db.Conn(ctx) + if err != nil { + return err + } + defer func() { _ = sqlconn.Close() }() + return sqlconn.Raw(func(driverConn any) error { + if c, ok := driverConn.(bqclient); ok { + return f(c.BigqueryClient()) + } + return fmt.Errorf("invalid driver connection") + }) +} + +type bqclient interface { + BigqueryClient() *bigquery.Client +} diff --git a/sqlconnect/internal/bigquery/integration_test.go b/sqlconnect/internal/bigquery/integration_test.go new file mode 100644 index 0000000..5ec2ada --- /dev/null +++ b/sqlconnect/internal/bigquery/integration_test.go @@ -0,0 +1,18 @@ +package bigquery_test + +import ( + "os" + "strings" + "testing" + + "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/bigquery" + integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test" +) + +func TestBigqueryDB(t *testing.T) { + configJSON, ok := os.LookupEnv("BIGQUERY_TEST_ENVIRONMENT_CREDENTIALS") + if !ok { + t.Skip("skipping bigquery integration test due to lack of a test environment") + } + integrationtest.TestDatabaseScenarios(t, bigquery.DatabaseType, []byte(configJSON), strings.ToLower) +} diff --git a/sqlconnect/internal/bigquery/schemaadmin.go b/sqlconnect/internal/bigquery/schemaadmin.go new file mode 100644 index 0000000..b10b2b8 --- /dev/null +++ b/sqlconnect/internal/bigquery/schemaadmin.go @@ -0,0 +1,57 @@ +package bigquery + +import ( + "context" + "errors" + + "cloud.google.com/go/bigquery" + "google.golang.org/api/googleapi" + "google.golang.org/api/iterator" + + "github.com/rudderlabs/sqlconnect-go/sqlconnect" +) + +// SchemaExists uses the bigquery client instead of [INFORMATION_SCHEMA.SCHEMATA] due to absence of a region qualifier +// https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata#scope_and_syntax +func (db *DB) SchemaExists(ctx context.Context, schemaRef sqlconnect.SchemaRef) (bool, error) { + var exists bool + if err := db.WithBigqueryClient(ctx, func(c *bigquery.Client) error { + if _, err := c.Dataset(schemaRef.Name).Metadata(ctx); err != nil { + var e *googleapi.Error + if ok := errors.As(err, &e); ok { + if e.Code == 404 { // not found + return nil + } + } + return err + } + exists = true + return nil + }); err != nil { + return false, err + } + return exists, nil +} + +// ListSchemas uses the bigquery client instead of [INFORMATION_SCHEMA.SCHEMATA] due to absence of a region qualifier +// https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata#scope_and_syntax +func (db *DB) ListSchemas(ctx context.Context) ([]sqlconnect.SchemaRef, error) { + var schemas []sqlconnect.SchemaRef + if err := db.WithBigqueryClient(ctx, func(c *bigquery.Client) error { + datasets := c.Datasets(ctx) + for { + var dataset *bigquery.Dataset + dataset, err := datasets.Next() + if err != nil { + if err == iterator.Done { + return nil + } + return err + } + schemas = append(schemas, sqlconnect.SchemaRef{Name: dataset.DatasetID}) + } + }); err != nil { + return nil, err + } + return schemas, nil +} diff --git a/sqlconnect/internal/databricks/db.go b/sqlconnect/internal/databricks/db.go index 374673c..2b38c97 100644 --- a/sqlconnect/internal/databricks/db.go +++ b/sqlconnect/internal/databricks/db.go @@ -3,6 +3,7 @@ package databricks import ( "database/sql" "encoding/json" + "fmt" databricks "github.com/databricks/databricks-sql-go" "github.com/samber/lo" @@ -55,6 +56,7 @@ func NewDB(configJson json.RawMessage) (*DB, error) { base.WithJsonRowMapper(jsonRowMapper), base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands { cmds.ListSchemas = func() (string, string) { return "SHOW SCHEMAS", "schema_name" } + cmds.SchemaExists = func(schema string) string { return fmt.Sprintf(`SHOW SCHEMAS LIKE '%s'`, schema) } cmds.ListTables = func(schema string) []lo.Tuple2[string, string] { return []lo.Tuple2[string, string]{ {A: "SHOW TABLES IN " + schema, B: "tableName"}, diff --git a/sqlconnect/internal/databricks/integration_test.go b/sqlconnect/internal/databricks/integration_test.go new file mode 100644 index 0000000..317e255 --- /dev/null +++ b/sqlconnect/internal/databricks/integration_test.go @@ -0,0 +1,30 @@ +package databricks_test + +import ( + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tidwall/sjson" + + "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/databricks" + integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test" +) + +func TestDatabricksDB(t *testing.T) { + configJSON, ok := os.LookupEnv("DATABRICKS_TEST_ENVIRONMENT_CREDENTIALS") + if !ok { + t.Skip("skipping databricks integration test due to lack of a test environment") + } + + configJSON, err := sjson.Set(configJSON, "retryAttempts", 1) + require.NoError(t, err, "failed to set retryAttempts") + configJSON, err = sjson.Set(configJSON, "minRetryWaitTime", time.Second) + require.NoError(t, err, "failed to set minRetryWaitTime") + configJSON, err = sjson.Set(configJSON, "maxRetryWaitTime", time.Minute) + require.NoError(t, err, "failed to set maxRetryWaitTime") + + integrationtest.TestDatabaseScenarios(t, databricks.DatabaseType, []byte(configJSON), strings.ToLower) +} diff --git a/sqlconnect/internal/integration_test/db_integration_test_scenario.go b/sqlconnect/internal/integration_test/db_integration_test_scenario.go index 9a1469d..69a6c3e 100644 --- a/sqlconnect/internal/integration_test/db_integration_test_scenario.go +++ b/sqlconnect/internal/integration_test/db_integration_test_scenario.go @@ -14,8 +14,8 @@ import ( "github.com/rudderlabs/sqlconnect-go/sqlconnect" ) -func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMessage) { - schema := sqlconnect.SchemaRef{Name: GenerateTestSchema()} +func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMessage, formatfn func(string) string) { + schema := sqlconnect.SchemaRef{Name: GenerateTestSchema(formatfn)} configJSON, err := sjson.SetBytes(configJSON, "rudderSchema", schema.Name) require.NoError(t, err, "it should be able to set the rudder schema") db, err := sqlconnect.NewDB(warehouse, configJSON) @@ -97,7 +97,7 @@ func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMe }) t.Run("normal operation", func(t *testing.T) { - otherSchema := sqlconnect.SchemaRef{Name: GenerateTestSchema()} + otherSchema := sqlconnect.SchemaRef{Name: GenerateTestSchema(formatfn)} err := db.CreateSchema(ctx, otherSchema) require.NoError(t, err, "it should be able to create a schema") err = db.DropSchema(ctx, otherSchema) @@ -112,6 +112,6 @@ func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMe }) } -func GenerateTestSchema() string { - return fmt.Sprintf("tsqlcon_%s_%d", rand.String(12), time.Now().Unix()) +func GenerateTestSchema(formatfn func(string) string) string { + return formatfn(fmt.Sprintf("tsqlcon_%s_%d", rand.String(12), time.Now().Unix())) } diff --git a/sqlconnect/internal/mysql/integration_test.go b/sqlconnect/internal/mysql/integration_test.go index 770987e..066d12f 100644 --- a/sqlconnect/internal/mysql/integration_test.go +++ b/sqlconnect/internal/mysql/integration_test.go @@ -3,6 +3,7 @@ package mysql_test import ( "encoding/json" "strconv" + "strings" "testing" "github.com/ory/dockertest/v3" @@ -33,5 +34,5 @@ func TestMysqlDB(t *testing.T) { configJSON, err := json.Marshal(config) require.NoError(t, err, "it should be able to marshal config to json") - integrationtest.TestDatabaseScenarios(t, mysql.DatabaseType, configJSON) + integrationtest.TestDatabaseScenarios(t, mysql.DatabaseType, configJSON, strings.ToLower) } diff --git a/sqlconnect/internal/postgres/integration_test.go b/sqlconnect/internal/postgres/integration_test.go index e916510..75ce8ab 100644 --- a/sqlconnect/internal/postgres/integration_test.go +++ b/sqlconnect/internal/postgres/integration_test.go @@ -3,6 +3,7 @@ package postgres_test import ( "encoding/json" "strconv" + "strings" "testing" "github.com/ory/dockertest/v3" @@ -33,5 +34,5 @@ func TestPostgresDB(t *testing.T) { configJSON, err := json.Marshal(config) require.NoError(t, err, "it should be able to marshal config to json") - integrationtest.TestDatabaseScenarios(t, postgres.DatabaseType, configJSON) + integrationtest.TestDatabaseScenarios(t, postgres.DatabaseType, configJSON, strings.ToLower) } diff --git a/sqlconnect/internal/redshift/db.go b/sqlconnect/internal/redshift/db.go index 042284f..1bf3ab7 100644 --- a/sqlconnect/internal/redshift/db.go +++ b/sqlconnect/internal/redshift/db.go @@ -35,8 +35,11 @@ func NewDB(credentialsJSON json.RawMessage) (*DB, error) { lo.Ternary(config.RudderSchema != "", config.RudderSchema, defaultRudderSchema), base.WithColumnTypeMappings(columnTypeMappings), base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands { + cmds.ListSchemas = func() (string, string) { + return "SELECT schema_name FROM svv_redshift_schemas", "schema_name" + } cmds.SchemaExists = func(schema string) string { - return fmt.Sprintf("SELECT has_schema_privilege((SELECT current_user), '%[1]s', 'usage')", schema) + return fmt.Sprintf("SELECT schema_name FROM svv_redshift_schemas WHERE schema_name = '%[1]s'", schema) } return cmds }), diff --git a/sqlconnect/internal/redshift/integration_test.go b/sqlconnect/internal/redshift/integration_test.go new file mode 100644 index 0000000..3ad3ea6 --- /dev/null +++ b/sqlconnect/internal/redshift/integration_test.go @@ -0,0 +1,19 @@ +package redshift_test + +import ( + "os" + "strings" + "testing" + + integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test" + "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/redshift" +) + +func TestRedshiftDB(t *testing.T) { + configJSON, ok := os.LookupEnv("REDSHIFT_TEST_ENVIRONMENT_CREDENTIALS") + if !ok { + t.Skip("skipping redshift integration test due to lack of a test environment") + } + + integrationtest.TestDatabaseScenarios(t, redshift.DatabaseType, []byte(configJSON), strings.ToLower) +} diff --git a/sqlconnect/internal/snowflake/db.go b/sqlconnect/internal/snowflake/db.go index 6662e8b..8bc26a8 100644 --- a/sqlconnect/internal/snowflake/db.go +++ b/sqlconnect/internal/snowflake/db.go @@ -41,10 +41,10 @@ func NewDB(configJSON json.RawMessage) (*DB, error) { base.WithColumnTypeMappings(columnTypeMappings), base.WithJsonRowMapper(jsonRowMapper), base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands { + cmds.ListSchemas = func() (string, string) { return "SHOW TERSE SCHEMAS", "name" } cmds.SchemaExists = func(schema string) string { return fmt.Sprintf("SHOW TERSE SCHEMAS LIKE '%[1]s'", schema) } - cmds.ListSchemas = func() (string, string) { return "SHOW TERSE SCHEMAS", "schema_name" } cmds.ListTables = func(schema string) []lo.Tuple2[string, string] { return []lo.Tuple2[string, string]{ {A: fmt.Sprintf("SHOW TERSE TABLES IN SCHEMA %[1]s", schema), B: "name"}, diff --git a/sqlconnect/internal/snowflake/integration_test.go b/sqlconnect/internal/snowflake/integration_test.go new file mode 100644 index 0000000..8c45744 --- /dev/null +++ b/sqlconnect/internal/snowflake/integration_test.go @@ -0,0 +1,19 @@ +package snowflake_test + +import ( + "os" + "strings" + "testing" + + integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test" + "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/snowflake" +) + +func TestSnowflakeDB(t *testing.T) { + configJSON, ok := os.LookupEnv("SNOWFLAKE_TEST_ENVIRONMENT_CREDENTIALS") + if !ok { + t.Skip("skipping snowflake integration test due to lack of a test environment") + } + + integrationtest.TestDatabaseScenarios(t, snowflake.DatabaseType, []byte(configJSON), strings.ToUpper) +} diff --git a/sqlconnect/internal/trino/db.go b/sqlconnect/internal/trino/db.go index 1b72c31..9cdf78a 100644 --- a/sqlconnect/internal/trino/db.go +++ b/sqlconnect/internal/trino/db.go @@ -38,7 +38,7 @@ func NewDB(configJSON json.RawMessage) (*DB, error) { base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands { cmds.ListTables = func(schema string) []lo.Tuple2[string, string] { return []lo.Tuple2[string, string]{ - {A: fmt.Sprintf("SHOW TABLES FROM %s", schema), B: "tableName"}, + {A: fmt.Sprintf("SHOW TABLES FROM %[1]s", schema), B: "tableName"}, } } cmds.ListTablesWithPrefix = func(schema, prefix string) []lo.Tuple2[string, string] { diff --git a/sqlconnect/internal/trino/integration_test.go b/sqlconnect/internal/trino/integration_test.go new file mode 100644 index 0000000..69c65b8 --- /dev/null +++ b/sqlconnect/internal/trino/integration_test.go @@ -0,0 +1,19 @@ +package trino_test + +import ( + "os" + "strings" + "testing" + + integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test" + "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/trino" +) + +func TestTrinoDB(t *testing.T) { + configJSON, ok := os.LookupEnv("TRINO_TEST_ENVIRONMENT_CREDENTIALS") + if !ok { + t.Skip("skipping trino integration test due to lack of a test environment") + } + + integrationtest.TestDatabaseScenarios(t, trino.DatabaseType, []byte(configJSON), strings.ToLower) +}