Skip to content

Commit

Permalink
fixup! feat: sqlconnect library
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Feb 16, 2024
1 parent 55718a9 commit 9f9bd6a
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
run: |
go install github.com/wadey/gocovmerge@latest
gocovmerge */profile.out > profile.out
- uses: codecov/codecov-action@v3
- uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
files: ./profile.out
Expand Down
21 changes: 2 additions & 19 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: help default test test-run test-teardown generate lint fmt
.PHONY: help default test test-run generate lint fmt

GO=go
LDFLAGS?=-s -w
Expand All @@ -9,7 +9,7 @@ default: lint
generate: install-tools
$(GO) generate ./...

test: install-tools test-run test-teardown
test: install-tools test-run

test-run: ## Run all unit tests
ifeq ($(filter 1,$(debug) $(RUNNER_DEBUG)),)
Expand All @@ -33,23 +33,6 @@ else
$(TEST_CMD) -count=1 $(TEST_OPTIONS) ./... && touch $(TESTFILE) || true
endif

test-teardown:
@if [ -f "$(TESTFILE)" ]; then \
echo "Tests passed, tearing down..." ;\
rm -f $(TESTFILE) ;\
echo "mode: atomic" > coverage.txt ;\
find . -name "profile.out" | while read file; do grep -v 'mode: atomic' $${file} >> coverage.txt; rm -f $${file}; done ;\
else \
rm -f coverage.txt coverage.html ; find . -name "profile.out" | xargs rm -f ;\
echo "Tests failed :-(" ;\
exit 1 ;\
fi

coverage:
go tool cover -html=coverage.txt -o coverage.html

test-with-coverage: test coverage

help: ## Show the available commands
@grep -E '^[0-9a-zA-Z_-]+:.*?## .*$$' ./Makefile | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

Expand Down
2 changes: 1 addition & 1 deletion sqlconnect/internal/base/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewDB(db *sql.DB, rudderSchema string, opts ...Option) *DB {
return "SELECT schema_name FROM information_schema.schemata", "schema_name"
},
SchemaExists: func(schema string) string {
return fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata where schema_name = '%[1]s')", schema)
return fmt.Sprintf("SELECT schema_name FROM information_schema.schemata where schema_name = '%[1]s'", schema)
},
DropSchema: func(schema string) string { return fmt.Sprintf("DROP SCHEMA %[1]s CASCADE", schema) },
CreateTestTable: func(table string) string {
Expand Down
9 changes: 7 additions & 2 deletions sqlconnect/internal/base/schemaadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ func (db *DB) ListSchemas(ctx context.Context) ([]sqlconnect.SchemaRef, error) {

// SchemaExists returns true if the schema exists
func (db *DB) SchemaExists(ctx context.Context, schemaRef sqlconnect.SchemaRef) (bool, error) {
var exists bool
if err := db.QueryRowContext(ctx, db.sqlCommands.SchemaExists(schemaRef.Name)).Scan(&exists); err != nil {
rows, err := db.QueryContext(ctx, db.sqlCommands.SchemaExists(schemaRef.Name))
if err != nil {
return false, fmt.Errorf("querying schema exists: %w", err)
}
defer func() { _ = rows.Close() }()
exists := rows.Next()
if err := rows.Err(); err != nil {
return false, fmt.Errorf("iterating schema exists: %w", err)

Check warning on line 78 in sqlconnect/internal/base/schemaadmin.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/base/schemaadmin.go#L78

Added line #L78 was not covered by tests
}
return exists, nil
}

Expand Down
22 changes: 21 additions & 1 deletion sqlconnect/internal/bigquery/db.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package bigquery

import (
"context"
"database/sql"
"encoding/json"
"fmt"

"cloud.google.com/go/bigquery"
"github.com/samber/lo"
"google.golang.org/api/option"

Expand Down Expand Up @@ -49,7 +51,7 @@ func NewDB(configJSON json.RawMessage) (*DB, error) {
}
}
cmds.TableExists = func(schema, table string) string {
return fmt.Sprintf("SELECT EXISTS (SELECT table_name FROM `%[1]s`.INFORMATION_SCHEMA.TABLES WHERE table_name = '%[1]s'", schema, table)
return fmt.Sprintf("SELECT EXISTS (SELECT table_name FROM `%[1]s`.INFORMATION_SCHEMA.TABLES WHERE table_name = '%[2]s'", schema, table)

Check warning on line 54 in sqlconnect/internal/bigquery/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/db.go#L54

Added line #L54 was not covered by tests
}

return cmds
Expand All @@ -67,3 +69,21 @@ func init() {
type DB struct {
*base.DB
}

func (db *DB) WithBigqueryClient(ctx context.Context, f func(*bigquery.Client) error) error {
sqlconn, err := db.Conn(ctx)
if err != nil {
return err
}
defer func() { _ = sqlconn.Close() }()
return sqlconn.Raw(func(driverConn any) error {
if c, ok := driverConn.(bqclient); ok {
return f(c.BigqueryClient())
}
return fmt.Errorf("invalid driver connection")

Check warning on line 83 in sqlconnect/internal/bigquery/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/db.go#L83

Added line #L83 was not covered by tests
})
}

type bqclient interface {
BigqueryClient() *bigquery.Client
}
18 changes: 18 additions & 0 deletions sqlconnect/internal/bigquery/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package bigquery_test

import (
"os"
"strings"
"testing"

"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/bigquery"
integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test"
)

func TestBigqueryDB(t *testing.T) {
configJSON, ok := os.LookupEnv("BIGQUERY_TEST_ENVIRONMENT_CREDENTIALS")
if !ok {
t.Skip("skipping bigquery integration test due to lack of a test environment")
}
integrationtest.TestDatabaseScenarios(t, bigquery.DatabaseType, []byte(configJSON), strings.ToLower)
}
57 changes: 57 additions & 0 deletions sqlconnect/internal/bigquery/schemaadmin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package bigquery

import (
"context"
"errors"

"cloud.google.com/go/bigquery"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

// SchemaExists uses the bigquery client instead of [INFORMATION_SCHEMA.SCHEMATA] due to absence of a region qualifier
// https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata#scope_and_syntax
func (db *DB) SchemaExists(ctx context.Context, schemaRef sqlconnect.SchemaRef) (bool, error) {
var exists bool
if err := db.WithBigqueryClient(ctx, func(c *bigquery.Client) error {
if _, err := c.Dataset(schemaRef.Name).Metadata(ctx); err != nil {
var e *googleapi.Error
if ok := errors.As(err, &e); ok {
if e.Code == 404 { // not found
return nil
}
}
return err

Check warning on line 26 in sqlconnect/internal/bigquery/schemaadmin.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/schemaadmin.go#L26

Added line #L26 was not covered by tests
}
exists = true
return nil
}); err != nil {
return false, err
}
return exists, nil
}

// ListSchemas uses the bigquery client instead of [INFORMATION_SCHEMA.SCHEMATA] due to absence of a region qualifier
// https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata#scope_and_syntax
func (db *DB) ListSchemas(ctx context.Context) ([]sqlconnect.SchemaRef, error) {
var schemas []sqlconnect.SchemaRef
if err := db.WithBigqueryClient(ctx, func(c *bigquery.Client) error {
datasets := c.Datasets(ctx)
for {
var dataset *bigquery.Dataset
dataset, err := datasets.Next()
if err != nil {
if err == iterator.Done {
return nil
}
return err

Check warning on line 49 in sqlconnect/internal/bigquery/schemaadmin.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/bigquery/schemaadmin.go#L49

Added line #L49 was not covered by tests
}
schemas = append(schemas, sqlconnect.SchemaRef{Name: dataset.DatasetID})
}
}); err != nil {
return nil, err
}
return schemas, nil
}
2 changes: 2 additions & 0 deletions sqlconnect/internal/databricks/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package databricks
import (
"database/sql"
"encoding/json"
"fmt"

databricks "github.com/databricks/databricks-sql-go"
"github.com/samber/lo"
Expand Down Expand Up @@ -55,6 +56,7 @@ func NewDB(configJson json.RawMessage) (*DB, error) {
base.WithJsonRowMapper(jsonRowMapper),
base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands {
cmds.ListSchemas = func() (string, string) { return "SHOW SCHEMAS", "schema_name" }
cmds.SchemaExists = func(schema string) string { return fmt.Sprintf(`SHOW SCHEMAS LIKE '%s'`, schema) }
cmds.ListTables = func(schema string) []lo.Tuple2[string, string] {
return []lo.Tuple2[string, string]{
{A: "SHOW TABLES IN " + schema, B: "tableName"},

Check warning on line 62 in sqlconnect/internal/databricks/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/databricks/db.go#L61-L62

Added lines #L61 - L62 were not covered by tests
Expand Down
30 changes: 30 additions & 0 deletions sqlconnect/internal/databricks/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package databricks_test

import (
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tidwall/sjson"

"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/databricks"
integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test"
)

func TestDatabricksDB(t *testing.T) {
configJSON, ok := os.LookupEnv("DATABRICKS_TEST_ENVIRONMENT_CREDENTIALS")
if !ok {
t.Skip("skipping databricks integration test due to lack of a test environment")
}

configJSON, err := sjson.Set(configJSON, "retryAttempts", 1)
require.NoError(t, err, "failed to set retryAttempts")
configJSON, err = sjson.Set(configJSON, "minRetryWaitTime", time.Second)
require.NoError(t, err, "failed to set minRetryWaitTime")
configJSON, err = sjson.Set(configJSON, "maxRetryWaitTime", time.Minute)
require.NoError(t, err, "failed to set maxRetryWaitTime")

integrationtest.TestDatabaseScenarios(t, databricks.DatabaseType, []byte(configJSON), strings.ToLower)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMessage) {
schema := sqlconnect.SchemaRef{Name: GenerateTestSchema()}
func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMessage, formatfn func(string) string) {
schema := sqlconnect.SchemaRef{Name: GenerateTestSchema(formatfn)}
configJSON, err := sjson.SetBytes(configJSON, "rudderSchema", schema.Name)
require.NoError(t, err, "it should be able to set the rudder schema")
db, err := sqlconnect.NewDB(warehouse, configJSON)
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMe
})

t.Run("normal operation", func(t *testing.T) {
otherSchema := sqlconnect.SchemaRef{Name: GenerateTestSchema()}
otherSchema := sqlconnect.SchemaRef{Name: GenerateTestSchema(formatfn)}
err := db.CreateSchema(ctx, otherSchema)
require.NoError(t, err, "it should be able to create a schema")
err = db.DropSchema(ctx, otherSchema)
Expand All @@ -112,6 +112,6 @@ func TestDatabaseScenarios(t *testing.T, warehouse string, configJSON json.RawMe
})
}

func GenerateTestSchema() string {
return fmt.Sprintf("tsqlcon_%s_%d", rand.String(12), time.Now().Unix())
func GenerateTestSchema(formatfn func(string) string) string {
return formatfn(fmt.Sprintf("tsqlcon_%s_%d", rand.String(12), time.Now().Unix()))
}
3 changes: 2 additions & 1 deletion sqlconnect/internal/mysql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mysql_test
import (
"encoding/json"
"strconv"
"strings"
"testing"

"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -33,5 +34,5 @@ func TestMysqlDB(t *testing.T) {
configJSON, err := json.Marshal(config)
require.NoError(t, err, "it should be able to marshal config to json")

integrationtest.TestDatabaseScenarios(t, mysql.DatabaseType, configJSON)
integrationtest.TestDatabaseScenarios(t, mysql.DatabaseType, configJSON, strings.ToLower)
}
3 changes: 2 additions & 1 deletion sqlconnect/internal/postgres/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgres_test
import (
"encoding/json"
"strconv"
"strings"
"testing"

"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -33,5 +34,5 @@ func TestPostgresDB(t *testing.T) {
configJSON, err := json.Marshal(config)
require.NoError(t, err, "it should be able to marshal config to json")

integrationtest.TestDatabaseScenarios(t, postgres.DatabaseType, configJSON)
integrationtest.TestDatabaseScenarios(t, postgres.DatabaseType, configJSON, strings.ToLower)
}
5 changes: 4 additions & 1 deletion sqlconnect/internal/redshift/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ func NewDB(credentialsJSON json.RawMessage) (*DB, error) {
lo.Ternary(config.RudderSchema != "", config.RudderSchema, defaultRudderSchema),
base.WithColumnTypeMappings(columnTypeMappings),
base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands {
cmds.ListSchemas = func() (string, string) {
return "SELECT schema_name FROM svv_redshift_schemas", "schema_name"
}
cmds.SchemaExists = func(schema string) string {
return fmt.Sprintf("SELECT has_schema_privilege((SELECT current_user), '%[1]s', 'usage')", schema)
return fmt.Sprintf("SELECT schema_name FROM svv_redshift_schemas WHERE schema_name = '%[1]s'", schema)
}
return cmds
}),
Expand Down
19 changes: 19 additions & 0 deletions sqlconnect/internal/redshift/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package redshift_test

import (
"os"
"strings"
"testing"

integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/redshift"
)

func TestRedshiftDB(t *testing.T) {
configJSON, ok := os.LookupEnv("REDSHIFT_TEST_ENVIRONMENT_CREDENTIALS")
if !ok {
t.Skip("skipping redshift integration test due to lack of a test environment")
}

integrationtest.TestDatabaseScenarios(t, redshift.DatabaseType, []byte(configJSON), strings.ToLower)
}
2 changes: 1 addition & 1 deletion sqlconnect/internal/snowflake/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func NewDB(configJSON json.RawMessage) (*DB, error) {
base.WithColumnTypeMappings(columnTypeMappings),
base.WithJsonRowMapper(jsonRowMapper),
base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands {
cmds.ListSchemas = func() (string, string) { return "SHOW TERSE SCHEMAS", "name" }
cmds.SchemaExists = func(schema string) string {
return fmt.Sprintf("SHOW TERSE SCHEMAS LIKE '%[1]s'", schema)
}
cmds.ListSchemas = func() (string, string) { return "SHOW TERSE SCHEMAS", "schema_name" }
cmds.ListTables = func(schema string) []lo.Tuple2[string, string] {
return []lo.Tuple2[string, string]{
{A: fmt.Sprintf("SHOW TERSE TABLES IN SCHEMA %[1]s", schema), B: "name"},

Check warning on line 50 in sqlconnect/internal/snowflake/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/snowflake/db.go#L49-L50

Added lines #L49 - L50 were not covered by tests
Expand Down
19 changes: 19 additions & 0 deletions sqlconnect/internal/snowflake/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package snowflake_test

import (
"os"
"strings"
"testing"

integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/snowflake"
)

func TestSnowflakeDB(t *testing.T) {
configJSON, ok := os.LookupEnv("SNOWFLAKE_TEST_ENVIRONMENT_CREDENTIALS")
if !ok {
t.Skip("skipping snowflake integration test due to lack of a test environment")
}

integrationtest.TestDatabaseScenarios(t, snowflake.DatabaseType, []byte(configJSON), strings.ToUpper)
}
2 changes: 1 addition & 1 deletion sqlconnect/internal/trino/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewDB(configJSON json.RawMessage) (*DB, error) {
base.WithSQLCommandsOverride(func(cmds base.SQLCommands) base.SQLCommands {
cmds.ListTables = func(schema string) []lo.Tuple2[string, string] {
return []lo.Tuple2[string, string]{
{A: fmt.Sprintf("SHOW TABLES FROM %s", schema), B: "tableName"},
{A: fmt.Sprintf("SHOW TABLES FROM %[1]s", schema), B: "tableName"},

Check warning on line 41 in sqlconnect/internal/trino/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/trino/db.go#L32-L41

Added lines #L32 - L41 were not covered by tests
}
}
cmds.ListTablesWithPrefix = func(schema, prefix string) []lo.Tuple2[string, string] {
Expand Down
19 changes: 19 additions & 0 deletions sqlconnect/internal/trino/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package trino_test

import (
"os"
"strings"
"testing"

integrationtest "github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/integration_test"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/trino"
)

func TestTrinoDB(t *testing.T) {
configJSON, ok := os.LookupEnv("TRINO_TEST_ENVIRONMENT_CREDENTIALS")
if !ok {
t.Skip("skipping trino integration test due to lack of a test environment")
}

integrationtest.TestDatabaseScenarios(t, trino.DatabaseType, []byte(configJSON), strings.ToLower)
}

0 comments on commit 9f9bd6a

Please sign in to comment.