Skip to content

Commit

Permalink
Merge pull request #3 from metrico/clickhouse-driver
Browse files Browse the repository at this point in the history
feat: Add Native Clickhouse driver
  • Loading branch information
lmangani authored Sep 12, 2023
2 parents 952968f + 643e7ba commit 8e5b144
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ on:

jobs:
semantic:
with:
CHECK_PR_TITLE_OR_ONE_COMMIT: true
uses: influxdata/validate-semantic-github-messages/.github/workflows/semantic.yml@main

13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/gofrs/uuid v3.3.0+incompatible
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/google/flatbuffers v22.9.30-0.20221019131441-5792623df42e+incompatible
github.com/google/go-cmp v0.5.7
github.com/google/go-cmp v0.5.8
github.com/influxdata/gosnowflake v1.6.9
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839
Expand All @@ -51,7 +51,7 @@ require (
github.com/uber/jaeger-client-go v2.28.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/vertica/vertica-sql-go v1.1.1
go.uber.org/zap v1.16.0
go.uber.org/zap v1.22.0
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756
golang.org/x/net v0.7.0
golang.org/x/tools v0.1.12
Expand Down Expand Up @@ -88,6 +88,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0 // indirect
github.com/aws/smithy-go v1.9.0 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.6.0 // indirect
github.com/dimchansky/utfbom v1.1.0 // indirect
Expand All @@ -102,21 +103,21 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/miekg/dns v1.1.25 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.12 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/uber-go/tally v3.3.15+incompatible // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
Expand Down
26 changes: 16 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+Z
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM=
Expand Down Expand Up @@ -144,6 +143,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 h1:1jh8J+JjYRp+QWKOsaZt7rGUgoyr
github.com/aws/aws-sdk-go-v2/service/sts v1.10.0/go.mod h1:jLKCFqS+1T4i7HDqCP9GM4Uk75YW1cS0o82LdxpMyOE=
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.3.0 h1:TVRhuZx2wG9SZ0LRdqlbs9S5BZ6Y24hJEHTCgWHZEIw=
github.com/benbjohnson/immutable v0.3.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -328,8 +329,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -435,8 +436,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/asmfmt v1.3.1/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down Expand Up @@ -537,8 +538,9 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8=
github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -662,21 +664,25 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.22.0 h1:Zcye5DUgBloQ9BaT4qc9BnjOFog5TvBSAGkJ3Nf70c0=
go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down Expand Up @@ -1153,6 +1159,7 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand All @@ -1163,7 +1170,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
177 changes: 177 additions & 0 deletions stdlib/sql/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package sql

import (
"database/sql"
"fmt"
"time"

"github.com/InfluxCommunity/flux"
"github.com/InfluxCommunity/flux/codes"
"github.com/InfluxCommunity/flux/execute"
"github.com/InfluxCommunity/flux/internal/errors"
"github.com/InfluxCommunity/flux/values"
"github.com/lib/pq"
)

type ClickhouseRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
}

func (m *ClickhouseRowReader) Next() bool {
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *ClickhouseRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, col := range m.columns {
switch col := col.(type) {
case bool, int64, uint64, float64, string:
row[i] = values.New(col)
case *bool:
row[i] = values.NewBool(*col)
case *string:
row[i] = values.NewString(*col)
case *int64:
row[i] = values.NewInt(*col)
case *uint64:
row[i] = values.NewUInt(*col)
case *float64:
row[i] = values.NewFloat(*col)
case time.Time:
row[i] = values.NewTime(values.ConvertTime(col))
case []uint8:
switch m.columnTypes[i] {
case flux.TInt:
newInt, err := UInt8ToInt64(col)
if err != nil {
return nil, err
}
row[i] = values.NewInt(newInt)
case flux.TFloat:
newFloat, err := UInt8ToFloat(col)
if err != nil {
return nil, err
}
row[i] = values.NewFloat(newFloat)
case flux.TTime:
t, err := time.Parse(layout, string(col))
if err != nil {
fmt.Print(err)
}
row[i] = values.NewTime(values.ConvertTime(t))
default:
row[i] = values.NewString(string(col))
}
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *ClickhouseRowReader) InitColumnNames(n []string) {
m.columnNames = n
}

func (m *ClickhouseRowReader) InitColumnTypes(types []*sql.ColumnType) {
stringTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "INT", "BIGINT", "SMALLINT", "TINYINT", "INT2", "INT4", "INT8", "SERIAL2", "SERIAL4", "SERIAL8":
stringTypes[i] = flux.TInt
case "FLOAT4", "FLOAT8":
stringTypes[i] = flux.TFloat
case "DATE", "TIME", "TIMESTAMP":
stringTypes[i] = flux.TTime
case "BOOL":
stringTypes[i] = flux.TBool
case "TEXT":
stringTypes[i] = flux.TString
case "Nullable(UInt64)":
stringTypes[i] = flux.TUInt
default:
stringTypes[i] = flux.TString
}
}
m.columnTypes = stringTypes
}

func (m *ClickhouseRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *ClickhouseRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *ClickhouseRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *ClickhouseRowReader) Close() error {
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewClickhouseRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &ClickhouseRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)
return reader, nil
}

// ClickhouseTranslateColumn translates flux colTypes into their corresponding Clickhouse column type
func ClickhouseColumnTranslateFunc() translationFunc {
c := map[string]string{
flux.TFloat.String(): "FLOAT",
flux.TInt.String(): "BIGINT",
flux.TUInt.String(): "BIGINT",
flux.TString.String(): "TEXT",
flux.TTime.String(): "TIMESTAMP",
flux.TBool.String(): "BOOL",
}
return func(f flux.ColType, colName string) (string, error) {
s, found := c[f.String()]
if !found {
return "", errors.Newf(codes.Internal, "ClickHouseQL does not support column type %s", f.String())
}
return clickhouseQuoteIdent(colName) + " " + s, nil
}
}

func clickhouseQuoteIdent(name string) string {
return pq.QuoteIdentifier(name)
}
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewSnowflakeRowReader
case "mssql", "sqlserver":
newRowReader = NewMssqlRowReader
case "clickhouse":
newRowReader = NewClickhouseRowReader
case "awsathena":
newRowReader = NewAwsAthenaRowReader
case "bigquery":
Expand Down
8 changes: 8 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok clickhouse",
Spec: &FromSQLProcedureSpec{
DriverName: "clickhouse",
DataSourceName: "clickhouse://username:password@localhost:12345/default?secure=true",
Query: "",
},
ErrMsg: "",
}, {
Name: "ok vertica",
Spec: &FromSQLProcedureSpec{
Expand Down
7 changes: 7 additions & 0 deletions stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "clickhouse":
// an example for clickhouse data source is: clickhouse://username:password@host1:9000/database?dial_timeout=200ms&max_execution_time=60
// this follows the URI semantics
u, err = neturl.Parse(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "vertica", "vertigo":
// an example for vertica data source is: vertica://dbadmin:password@localhost:5433/VMart
// this follows the URI semantics
Expand Down
1 change: 1 addition & 0 deletions stdlib/sql/sql_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mysqlDsn = "flux:flux@tcp(127.0.0.1:3306)/flux"
mariaDbDsn = "flux:flux@tcp(127.0.0.1:3307)/flux"
pgDsn = "postgresql://postgres@127.0.0.1:5432/postgres?sslmode=disable"
verticaDsn = "vertica://dbadmin@localhost:5433/flux"
clickhouseDsn = "clickhouse://clickhouse@127.0.0.1:9000/clickhouse?secure=false"
sqliteDsn = "file:///tmp/flux-integ-tests-sqlite.db"

// Some db engines will UPPERCASE table/column names when the identifiers are unquoted.
Expand Down
Loading

0 comments on commit 8e5b144

Please sign in to comment.