From b17d903fb66f89cbdc5c1220f10a03e4b5d77d3c Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Tue, 20 Feb 2024 21:53:47 +0200 Subject: [PATCH] fixup! feat: sqlconnect library --- sqlconnect/async.go | 20 ++++++++++++++++++- sqlconnect/db.go | 5 ++--- sqlconnect/internal/base/mapper.go | 11 +++------- .../testdata/column-mapping-test-rows.json | 6 +++--- .../testdata/column-mapping-test-seed.sql | 6 +++--- sqlconnect/internal/snowflake/mappings.go | 11 +++++++++- .../testdata/column-mapping-test-rows.json | 4 ++-- .../testdata/column-mapping-test-seed.sql | 2 +- 8 files changed, 43 insertions(+), 22 deletions(-) diff --git a/sqlconnect/async.go b/sqlconnect/async.go index 51187c9..a2bef64 100644 --- a/sqlconnect/async.go +++ b/sqlconnect/async.go @@ -9,10 +9,28 @@ import ( "github.com/rudderlabs/rudder-go-kit/async" ) +// QueryJSONMapAsync executes a query and returns a channel that will receive the results as a map or an error, along with a function that the caller can use to leave the channel early. +// The channel will be closed when the query is done or when the context is canceled. +func QueryJSONMapAsync(ctx context.Context, db DB, query string, params ...any) (ch <-chan ValueOrError[map[string]any], leave func()) { + return QueryAsync[map[string]any](ctx, db, db.JSONRowMapper(), query, params...) +} + // QueryJSONAsync executes a query and returns a channel that will receive the results as json or an error, along with a function that the caller can use to leave the channel early. // The channel will be closed when the query is done or when the context is canceled. func QueryJSONAsync(ctx context.Context, db DB, query string, params ...any) (ch <-chan ValueOrError[json.RawMessage], leave func()) { - return QueryAsync[json.RawMessage](ctx, db, db.JSONRowMapper(), query, params...) + jsonRowMapper := db.JSONRowMapper() + mapper := func(cols []*sql.ColumnType, row RowScan) (json.RawMessage, error) { + m, err := jsonRowMapper(cols, row) + if err != nil { + return nil, err + } + b, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("marshalling rows to json: %w", err) + } + return b, nil + } + return QueryAsync[json.RawMessage](ctx, db, mapper, query, params...) } // QueryAsync executes a query and returns a channel that will receive the results or an error, along with a function that the caller can use to leave the channel early. diff --git a/sqlconnect/db.go b/sqlconnect/db.go index 3aa7623..5ae06d4 100644 --- a/sqlconnect/db.go +++ b/sqlconnect/db.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "database/sql/driver" - "encoding/json" "time" ) @@ -78,8 +77,8 @@ type TableAdmin interface { } type JsonRowMapper interface { - // JSONRowMapper returns a row mapper that maps rows to json.RawMessage - JSONRowMapper() RowMapper[json.RawMessage] + // JSONRowMapper returns a row mapper that maps rows to map[string]any + JSONRowMapper() RowMapper[map[string]any] } type Dialect interface { diff --git a/sqlconnect/internal/base/mapper.go b/sqlconnect/internal/base/mapper.go index d88f139..7a3dafc 100644 --- a/sqlconnect/internal/base/mapper.go +++ b/sqlconnect/internal/base/mapper.go @@ -2,15 +2,14 @@ package base import ( "database/sql" - "encoding/json" "fmt" "github.com/rudderlabs/sqlconnect-go/sqlconnect" ) // JSONRowMapper returns a row mapper that maps scanned rows to [json.RawMessage] -func (db *DB) JSONRowMapper() sqlconnect.RowMapper[json.RawMessage] { - return func(cols []*sql.ColumnType, row sqlconnect.RowScan) (json.RawMessage, error) { +func (db *DB) JSONRowMapper() sqlconnect.RowMapper[map[string]any] { + return func(cols []*sql.ColumnType, row sqlconnect.RowScan) (map[string]any, error) { // scan all values in nullable strings values := make([]any, len(cols)) for i := range values { @@ -29,11 +28,7 @@ func (db *DB) JSONRowMapper() sqlconnect.RowMapper[json.RawMessage] { col := cols[i] o[col.Name()] = db.jsonRowMapper(col.DatabaseTypeName(), val) } - b, err := json.Marshal(o) - if err != nil { - return nil, fmt.Errorf("marshalling row to json: %w", err) - } - return json.RawMessage(b), nil + return o, nil } } diff --git a/sqlconnect/internal/databricks/testdata/column-mapping-test-rows.json b/sqlconnect/internal/databricks/testdata/column-mapping-test-rows.json index 308489e..6dc0166 100644 --- a/sqlconnect/internal/databricks/testdata/column-mapping-test-rows.json +++ b/sqlconnect/internal/databricks/testdata/column-mapping-test-rows.json @@ -21,8 +21,8 @@ "_date": "2020-12-31T00:00:00Z", "_timestamp": "2021-07-01T05:43:28Z", "_timestampntz": "2021-07-01T08:43:28.123456Z", - "_array": [1,2,3], - "_map": {"key":"value"}, + "_array": [1,2,3,null], + "_map": {"key":"value", "key1":null}, "_struct": {"col1": "val1", "col2": 1} }, { @@ -49,7 +49,7 @@ "_timestampntz": "2021-07-01T08:43:28.123456Z", "_array": [], "_map": {"": ""}, - "_struct": {"col1": "", "col2": 0} + "_struct": {"col1": "val1", "col2": null} }, { "_order": 3, diff --git a/sqlconnect/internal/databricks/testdata/column-mapping-test-seed.sql b/sqlconnect/internal/databricks/testdata/column-mapping-test-seed.sql index 9635a70..d8d1ebb 100644 --- a/sqlconnect/internal/databricks/testdata/column-mapping-test-seed.sql +++ b/sqlconnect/internal/databricks/testdata/column-mapping-test-seed.sql @@ -28,6 +28,6 @@ CREATE TABLE `{{.schema}}`.`column_mappings_test` ( INSERT INTO `{{.schema}}`.`column_mappings_test` (_order, _decimal, _numeric, _dec, _int, _integer, _bigint, _long, _smallint, _short, _tinyint, _byte, _float, _real, _double, _boolean, _string, _binary, _date, _timestamp, _timestampntz, _array, _map, _struct) VALUES - (1, 1.1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.1, 1.1, 1.1, true, 's', X'1', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(1,2,3), map('key', 'value'), struct('val1', 1)), - (2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, '', X'', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(), map('',''), struct('', 0) ), - (3, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ); \ No newline at end of file + (1, 1.1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.1, 1.1, 1.1, true, 's', X'1', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(1,2,3,NULL), map('key', 'value', 'key1', NULL), struct('val1', 1) ), + (2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, '', X'', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(), map('',''), struct('val1', NULL) ), + (3, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ); \ No newline at end of file diff --git a/sqlconnect/internal/snowflake/mappings.go b/sqlconnect/internal/snowflake/mappings.go index 7f6fd73..29954dd 100644 --- a/sqlconnect/internal/snowflake/mappings.go +++ b/sqlconnect/internal/snowflake/mappings.go @@ -106,7 +106,16 @@ func jsonRowMapper(databaseTypeName string, value interface{}) interface{} { case "OBJECT": return json.RawMessage(value.(string)) case "VARIANT": - return json.RawMessage(value.(string)) + rawValue := value.(string) + if strings.HasPrefix(rawValue, "[") { // An ARRAY can contain undefined values in place of nulls which would cause json.Unmarshal to fail + var jsonValue any + if err := json.Unmarshal([]byte(rawValue), &jsonValue); err != nil { + sanitizedJSON := strings.ReplaceAll(rawValue, "undefined", "null") + return json.RawMessage(sanitizedJSON) + } + } + + return json.RawMessage(rawValue) case "DATE", "TIME", "TIMESTAMP", "TIMESTAMP_LTZ", "TIMESTAMP_NTZ", "TIMESTAMP_TZ": return value.(time.Time) case "BINARY", "VARBINARY": diff --git a/sqlconnect/internal/snowflake/testdata/column-mapping-test-rows.json b/sqlconnect/internal/snowflake/testdata/column-mapping-test-rows.json index 5ad04f5..23a90cd 100644 --- a/sqlconnect/internal/snowflake/testdata/column-mapping-test-rows.json +++ b/sqlconnect/internal/snowflake/testdata/column-mapping-test-rows.json @@ -30,9 +30,9 @@ "_TIMESTAMPNTZ": "2014-01-01T16:00:00Z", "_TIMESTAMPLTZ": "2014-01-01T16:00:00-08:00", "_TIMESTAMPTZ": "2014-01-01T16:00:00-08:00", - "_VARIANT": {"key": "value"}, + "_VARIANT": {"key": "value", "key1": null}, "_OBJECT": { "key": "value"}, - "_ARRAY": [1,2,3] + "_ARRAY": [1,2,3,null] }, { "_ORDER": 2, diff --git a/sqlconnect/internal/snowflake/testdata/column-mapping-test-seed.sql b/sqlconnect/internal/snowflake/testdata/column-mapping-test-seed.sql index f2634ea..332087e 100644 --- a/sqlconnect/internal/snowflake/testdata/column-mapping-test-seed.sql +++ b/sqlconnect/internal/snowflake/testdata/column-mapping-test-seed.sql @@ -37,7 +37,7 @@ CREATE TABLE "{{.schema}}"."COLUMN_MAPPINGS_TEST" ( INSERT INTO "{{.schema}}"."COLUMN_MAPPINGS_TEST" (_order, _int, _number, _decimal, _numeric, _integer, _bigint, _smallint, _tinyint, _float, _float4, _float8, _double, _real, _double_precision, _boolean, _text, _varchar, _char, _character, _string, _binary, _varbinary, _date, _datetime, _time, _timestamp, _timestampntz, _timestampltz, _timestamptz, _variant, _object, _array) SELECT - 1, 1, 1.1, 1.1, 1.1, 1, 1, 1, 1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, true, 't', 'vc', 'c', 'c', 's', TO_BINARY('bin', 'UTF-8'), TO_BINARY('vbin', 'UTF-8'), '2021-7-1', '2017-01-01 12:00:00', '12:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', object_construct('key', 'value'), object_construct('key', 'value'), array_construct(1,2,3); + 1, 1, 1.1, 1.1, 1.1, 1, 1, 1, 1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, true, 't', 'vc', 'c', 'c', 's', TO_BINARY('bin', 'UTF-8'), TO_BINARY('vbin', 'UTF-8'), '2021-7-1', '2017-01-01 12:00:00', '12:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', TO_VARIANT(PARSE_JSON('{"key": "value", "key1": null}')), object_construct('key', 'value', 'key1', null), array_construct(1,2,3,null); INSERT INTO "{{.schema}}"."COLUMN_MAPPINGS_TEST" (_order, _int, _number, _decimal, _numeric, _integer, _bigint, _smallint, _tinyint, _float, _float4, _float8, _double, _real, _double_precision, _boolean, _text, _varchar, _char, _character, _string, _binary, _varbinary, _date, _datetime, _time, _timestamp, _timestampntz, _timestampltz, _timestamptz, _variant, _object, _array)