Skip to content

Commit

Permalink
feat(sql): add Snowflake support (#2707)
Browse files Browse the repository at this point in the history
* feat: add Snowflake support

* test: change comment to match db being tested

* refactor: move static mapping out of function
  • Loading branch information
alespour authored Apr 30, 2020
1 parent b42d417 commit aff8331
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 3 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ require (
github.com/c-bata/go-prompt v0.2.2
github.com/cespare/xxhash v1.1.0
github.com/dave/jennifer v1.2.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/flatbuffers v1.11.0
github.com/google/go-cmp v0.3.0
github.com/google/uuid v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
github.com/influxdata/pkg-config v0.2.0
Expand All @@ -30,15 +32,18 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/opentracing/opentracing-go v1.0.2
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/common v0.6.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/segmentio/kafka-go v0.1.0
github.com/sergi/go-diff v1.0.0 // indirect
github.com/snowflakedb/gosnowflake v1.3.4
github.com/spf13/cobra v0.0.3
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 // indirect
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down Expand Up @@ -71,6 +73,8 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down Expand Up @@ -119,6 +123,8 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down Expand Up @@ -147,6 +153,8 @@ github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/snowflakedb/gosnowflake v1.3.4 h1:Gyoi6g4lMHsilEwW9+KV+bgYkJTgf5pVfvL7Utus920=
github.com/snowflakedb/gosnowflake v1.3.4/go.mod h1:NsRq2QeiMUuoNUJhp5Q6xGC4uBrsS9g6LwZVEkTWgsE=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
Expand Down Expand Up @@ -180,6 +188,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de h1:xSjD6HQTqT0H/k60N5yYBtnN1OEkVy7WIo/DYyxKRO0=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewSqliteRowReader
case "postgres", "sqlmock":
newRowReader = NewPostgresRowReader
case "snowflake":
newRowReader = NewSnowflakeRowReader
default:
return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName)
}
Expand Down
8 changes: 8 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok snowflake",
Spec: &FromSQLProcedureSpec{
DriverName: "snowflake",
DataSourceName: "username:password@accountname.us-east-1/dbname",
Query: "",
},
ErrMsg: "",
}, {
Name: "invalid driver",
Spec: &FromSQLProcedureSpec{
Expand Down
202 changes: 202 additions & 0 deletions stdlib/sql/snowflake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package sql

import (
"database/sql"
"strconv"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/values"
)

// Snowflake DB support.
// Notes:
// * type mapping
// - see https://pkg.go.dev/github.com/snowflakedb/gosnowflake
// - current mappings are valid for v1.3.4

type SnowflakeRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
sqlTypes []*sql.ColumnType
NextFunc func() bool
CloseFunc func() error
}

const (
layoutDate = "2006-01-02"
layoutTime = "15:04:05"
layoutTimeStampNtz = "2006-01-02T15:04:05.0000000000"
)

// Next prepares SnowflakeRowReader to return rows
func (m *SnowflakeRowReader) Next() bool {
if m.NextFunc != nil {
return m.NextFunc()
}
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *SnowflakeRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, column := range m.columns {
switch value := column.(type) {
case bool, int64, float64: // never happens with scan into []*interface{}
row[i] = values.New(value)
case string:
switch m.columnTypes[i] {
case flux.TFloat:
f, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, err
}
row[i] = values.NewFloat(f)
case flux.TInt:
d, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, err
}
row[i] = values.NewInt(d)
case flux.TBool:
b, err := strconv.ParseBool(value)
if err != nil {
return nil, err
}
row[i] = values.NewBool(b)
default:
row[i] = values.New(value)
}
case time.Time:
// DATE, TIME and TIMESTAMP_NTZ types get scanned to time.Time by the driver,
// but they have no counterpart in Flux therefore will be represented as string
switch m.sqlTypes[i].DatabaseTypeName() {
case "DATE":
row[i] = values.NewString(value.Format(layoutDate))
case "TIME":
row[i] = values.NewString(value.Format(layoutTime))
case "TIMESTAMP_NTZ":
row[i] = values.NewString(value.Format(layoutTimeStampNtz))
default:
row[i] = values.NewTime(values.ConvertTime(value))
}
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *SnowflakeRowReader) InitColumnNames(names []string) {
m.columnNames = names
}

func (m *SnowflakeRowReader) InitColumnTypes(types []*sql.ColumnType) {
fluxTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "FIXED", "NUMBER": // FIXED is reported by Snowflake driver
_, scale, ok := types[i].DecimalSize()
if ok && scale > 0 {
fluxTypes[i] = flux.TFloat
} else {
fluxTypes[i] = flux.TInt
}
case "REAL", "FLOAT": // REAL is reported by Snowflake driver
fluxTypes[i] = flux.TFloat
case "BOOLEAN":
fluxTypes[i] = flux.TBool
case "TIMESTAMP_TZ", "TIMESTAMP_LTZ": // "TIMESTAMP_NTZ", "DATE" and "TIME" will be represented as string
fluxTypes[i] = flux.TTime
default:
fluxTypes[i] = flux.TString
}
}
m.columnTypes = fluxTypes
m.sqlTypes = types
}

func (m *SnowflakeRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *SnowflakeRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *SnowflakeRowReader) SetColumnTypes(types []flux.ColType) {
m.columnTypes = types
}

func (m *SnowflakeRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *SnowflakeRowReader) Close() error {
if m.CloseFunc != nil {
return m.CloseFunc()
}
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewSnowflakeRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &SnowflakeRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)

return reader, nil
}

var fluxToSnowflake = map[flux.ColType]string{
flux.TFloat: "FLOAT",
flux.TInt: "NUMBER",
flux.TString: "TEXT",
flux.TBool: "BOOLEAN",
flux.TTime: "TIMESTAMP_LTZ",
}

// SnowflakeTranslateColumn translates flux colTypes into their corresponding Snowflake column type
func SnowflakeColumnTranslateFunc() translationFunc {
return func(f flux.ColType, colName string) (string, error) {
s, found := fluxToSnowflake[f]
if !found {
return "", errors.Newf(codes.Internal, "Snowflake does not support column type %s", f.String())
}
return colName + " " + s, nil
}
}
14 changes: 13 additions & 1 deletion stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependencies/url"
"github.com/influxdata/flux/internal/errors"
"github.com/snowflakedb/gosnowflake"
)

// helper function to validate the data source url (postgres, sqlmock) / dsn (mysql) using the URLValidator.
// helper function to validate the data source url (postgres, sqlmock) / dsn (mysql, snowflake) using the URLValidator.
func validateDataSource(validator url.Validator, driverName string, dataSourceName string) error {

/*
Expand Down Expand Up @@ -60,6 +61,17 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "snowflake":
// an example is: username:password@accountname/dbname/testschema?warehouse=mywh
cfg, err := gosnowflake.ParseDSN(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source dsn: %v", err)
}
u = &neturl.URL{
Scheme: cfg.Protocol,
User: neturl.UserPassword(cfg.User, cfg.Password),
Host: cfg.Host,
}
default:
return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName)
}
Expand Down
3 changes: 2 additions & 1 deletion stdlib/sql/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,11 @@ func getTranslationFunc(driverName string) (func() translationFunc, error) {
return PostgresColumnTranslateFunc, nil
case "mysql":
return MysqlColumnTranslateFunc, nil
case "snowflake":
return SnowflakeColumnTranslateFunc, nil
default:
return nil, errors.Newf(codes.Internal, "invalid driverName: %s", driverName)
}

}

func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames []string, valStringArray [][]string, valArgsArray [][]interface{}, err error) {
Expand Down
Loading

0 comments on commit aff8331

Please sign in to comment.