Skip to content

Commit

Permalink
fixup! feat: sqlconnect library
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Feb 20, 2024
1 parent 863c52a commit b17d903
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 22 deletions.
20 changes: 19 additions & 1 deletion sqlconnect/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,28 @@ import (
"github.com/rudderlabs/rudder-go-kit/async"
)

// QueryJSONMapAsync executes a query and returns a channel that will receive the results as a map or an error, along with a function that the caller can use to leave the channel early.
// The channel will be closed when the query is done or when the context is canceled.
func QueryJSONMapAsync(ctx context.Context, db DB, query string, params ...any) (ch <-chan ValueOrError[map[string]any], leave func()) {
return QueryAsync[map[string]any](ctx, db, db.JSONRowMapper(), query, params...)
}

// QueryJSONAsync executes a query and returns a channel that will receive the results as json or an error, along with a function that the caller can use to leave the channel early.
// The channel will be closed when the query is done or when the context is canceled.
func QueryJSONAsync(ctx context.Context, db DB, query string, params ...any) (ch <-chan ValueOrError[json.RawMessage], leave func()) {
return QueryAsync[json.RawMessage](ctx, db, db.JSONRowMapper(), query, params...)
jsonRowMapper := db.JSONRowMapper()
mapper := func(cols []*sql.ColumnType, row RowScan) (json.RawMessage, error) {
m, err := jsonRowMapper(cols, row)
if err != nil {
return nil, err
}
b, err := json.Marshal(m)
if err != nil {
return nil, fmt.Errorf("marshalling rows to json: %w", err)
}
return b, nil
}
return QueryAsync[json.RawMessage](ctx, db, mapper, query, params...)
}

// QueryAsync executes a query and returns a channel that will receive the results or an error, along with a function that the caller can use to leave the channel early.
Expand Down
5 changes: 2 additions & 3 deletions sqlconnect/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"time"
)

Expand Down Expand Up @@ -78,8 +77,8 @@ type TableAdmin interface {
}

type JsonRowMapper interface {
// JSONRowMapper returns a row mapper that maps rows to json.RawMessage
JSONRowMapper() RowMapper[json.RawMessage]
// JSONRowMapper returns a row mapper that maps rows to map[string]any
JSONRowMapper() RowMapper[map[string]any]
}

type Dialect interface {
Expand Down
11 changes: 3 additions & 8 deletions sqlconnect/internal/base/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package base

import (
"database/sql"
"encoding/json"
"fmt"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

// JSONRowMapper returns a row mapper that maps scanned rows to [json.RawMessage]
func (db *DB) JSONRowMapper() sqlconnect.RowMapper[json.RawMessage] {
return func(cols []*sql.ColumnType, row sqlconnect.RowScan) (json.RawMessage, error) {
func (db *DB) JSONRowMapper() sqlconnect.RowMapper[map[string]any] {
return func(cols []*sql.ColumnType, row sqlconnect.RowScan) (map[string]any, error) {
// scan all values in nullable strings
values := make([]any, len(cols))
for i := range values {
Expand All @@ -29,11 +28,7 @@ func (db *DB) JSONRowMapper() sqlconnect.RowMapper[json.RawMessage] {
col := cols[i]
o[col.Name()] = db.jsonRowMapper(col.DatabaseTypeName(), val)
}
b, err := json.Marshal(o)
if err != nil {
return nil, fmt.Errorf("marshalling row to json: %w", err)
}
return json.RawMessage(b), nil
return o, nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"_date": "2020-12-31T00:00:00Z",
"_timestamp": "2021-07-01T05:43:28Z",
"_timestampntz": "2021-07-01T08:43:28.123456Z",
"_array": [1,2,3],
"_map": {"key":"value"},
"_array": [1,2,3,null],
"_map": {"key":"value", "key1":null},
"_struct": {"col1": "val1", "col2": 1}
},
{
Expand All @@ -49,7 +49,7 @@
"_timestampntz": "2021-07-01T08:43:28.123456Z",
"_array": [],
"_map": {"": ""},
"_struct": {"col1": "", "col2": 0}
"_struct": {"col1": "val1", "col2": null}
},
{
"_order": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ CREATE TABLE `{{.schema}}`.`column_mappings_test` (
INSERT INTO `{{.schema}}`.`column_mappings_test`
(_order, _decimal, _numeric, _dec, _int, _integer, _bigint, _long, _smallint, _short, _tinyint, _byte, _float, _real, _double, _boolean, _string, _binary, _date, _timestamp, _timestampntz, _array, _map, _struct)
VALUES
(1, 1.1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.1, 1.1, 1.1, true, 's', X'1', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(1,2,3), map('key', 'value'), struct('val1', 1)),
(2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, '', X'', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(), map('',''), struct('', 0) ),
(3, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL );
(1, 1.1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.1, 1.1, 1.1, true, 's', X'1', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(1,2,3,NULL), map('key', 'value', 'key1', NULL), struct('val1', 1) ),
(2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, '', X'', CAST('2020-12-31' AS DATE), '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28.123456', ARRAY(), map('',''), struct('val1', NULL) ),
(3, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL );
11 changes: 10 additions & 1 deletion sqlconnect/internal/snowflake/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,16 @@ func jsonRowMapper(databaseTypeName string, value interface{}) interface{} {
case "OBJECT":
return json.RawMessage(value.(string))
case "VARIANT":
return json.RawMessage(value.(string))
rawValue := value.(string)
if strings.HasPrefix(rawValue, "[") { // An ARRAY can contain undefined values in place of nulls which would cause json.Unmarshal to fail
var jsonValue any
if err := json.Unmarshal([]byte(rawValue), &jsonValue); err != nil {
sanitizedJSON := strings.ReplaceAll(rawValue, "undefined", "null")
return json.RawMessage(sanitizedJSON)
}
}

return json.RawMessage(rawValue)
case "DATE", "TIME", "TIMESTAMP", "TIMESTAMP_LTZ", "TIMESTAMP_NTZ", "TIMESTAMP_TZ":
return value.(time.Time)
case "BINARY", "VARBINARY":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
"_TIMESTAMPNTZ": "2014-01-01T16:00:00Z",
"_TIMESTAMPLTZ": "2014-01-01T16:00:00-08:00",
"_TIMESTAMPTZ": "2014-01-01T16:00:00-08:00",
"_VARIANT": {"key": "value"},
"_VARIANT": {"key": "value", "key1": null},
"_OBJECT": { "key": "value"},
"_ARRAY": [1,2,3]
"_ARRAY": [1,2,3,null]
},
{
"_ORDER": 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CREATE TABLE "{{.schema}}"."COLUMN_MAPPINGS_TEST" (
INSERT INTO "{{.schema}}"."COLUMN_MAPPINGS_TEST"
(_order, _int, _number, _decimal, _numeric, _integer, _bigint, _smallint, _tinyint, _float, _float4, _float8, _double, _real, _double_precision, _boolean, _text, _varchar, _char, _character, _string, _binary, _varbinary, _date, _datetime, _time, _timestamp, _timestampntz, _timestampltz, _timestamptz, _variant, _object, _array)
SELECT
1, 1, 1.1, 1.1, 1.1, 1, 1, 1, 1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, true, 't', 'vc', 'c', 'c', 's', TO_BINARY('bin', 'UTF-8'), TO_BINARY('vbin', 'UTF-8'), '2021-7-1', '2017-01-01 12:00:00', '12:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', object_construct('key', 'value'), object_construct('key', 'value'), array_construct(1,2,3);
1, 1, 1.1, 1.1, 1.1, 1, 1, 1, 1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, true, 't', 'vc', 'c', 'c', 's', TO_BINARY('bin', 'UTF-8'), TO_BINARY('vbin', 'UTF-8'), '2021-7-1', '2017-01-01 12:00:00', '12:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', '2014-01-01 16:00:00', TO_VARIANT(PARSE_JSON('{"key": "value", "key1": null}')), object_construct('key', 'value', 'key1', null), array_construct(1,2,3,null);

INSERT INTO "{{.schema}}"."COLUMN_MAPPINGS_TEST"
(_order, _int, _number, _decimal, _numeric, _integer, _bigint, _smallint, _tinyint, _float, _float4, _float8, _double, _real, _double_precision, _boolean, _text, _varchar, _char, _character, _string, _binary, _varbinary, _date, _datetime, _time, _timestamp, _timestampntz, _timestampltz, _timestamptz, _variant, _object, _array)
Expand Down

0 comments on commit b17d903

Please sign in to comment.