diff --git a/lightning/mydump/parquet_parser.go b/lightning/mydump/parquet_parser.go index 64470e54c..c2de55c08 100644 --- a/lightning/mydump/parquet_parser.go +++ b/lightning/mydump/parquet_parser.go @@ -2,8 +2,14 @@ package mydump import ( "context" + "fmt" "io" "reflect" + "time" + + "github.com/pingcap/parser/mysql" + + "github.com/xitongsys/parquet-go/parquet" "go.uber.org/zap" @@ -21,14 +27,15 @@ const ( ) type ParquetParser struct { - Reader *preader.ParquetReader - columns []string - rows []interface{} - readRows int64 - curStart int64 - curIndex int - lastRow Row - logger log.Logger + Reader *preader.ParquetReader + columns []string + columnMetas []*parquet.SchemaElement + rows []interface{} + readRows int64 + curStart int64 + curIndex int + lastRow Row + logger log.Logger } // readerWrapper is a used for implement `source.ParquetFile` @@ -82,18 +89,21 @@ func NewParquetParser( return nil, errors.Trace(err) } - columns := make([]string, 0, len(reader.Footer.Schema)) - for i, c := range reader.Footer.Schema { + columns := make([]string, 0, len(reader.Footer.Schema)-1) + columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1) + for i, c := range reader.SchemaHandler.SchemaElements { if c.GetNumChildren() == 0 { // the SchemaElement.Name is capitalized, we should use the original name columns = append(columns, reader.SchemaHandler.Infos[i].ExName) + columnMetas = append(columnMetas, c) } } return &ParquetParser{ - Reader: reader, - columns: columns, - logger: log.L(), + Reader: reader, + columns: columns, + columnMetas: columnMetas, + logger: log.L(), }, nil } @@ -166,26 +176,31 @@ func (pp *ParquetParser) ReadRow() error { pp.lastRow.Row = pp.lastRow.Row[:length] } for i := 0; i < length; i++ { - setDatumValue(&pp.lastRow.Row[i], v.Field(i)) + setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]) } return nil } -func setDatumValue(d *types.Datum, v reflect.Value) { +// convert a parquet value to Datum +// +// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md +func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) { switch v.Kind() { case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: d.SetUint64(v.Uint()) - case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + case reflect.Int8, reflect.Int16: d.SetInt64(v.Int()) + case reflect.Int32, reflect.Int64: + setDatumByInt(d, v.Int(), meta) case reflect.String: - d.SetString(v.String(), "") + d.SetString(v.String(), mysql.DefaultCollationName) case reflect.Float32, reflect.Float64: d.SetFloat64(v.Float()) case reflect.Ptr: if v.IsNil() { d.SetNull() } else { - setDatumValue(d, v.Elem()) + setDatumValue(d, v.Elem(), meta) } default: log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()), @@ -193,6 +208,56 @@ func setDatumValue(d *types.Datum, v reflect.Value) { } } +func abs(v int64) int64 { + if v >= 0 { + return v + } + return -v +} + +// when the value type is int32/int64, convert to value to target logical type in tidb +func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) { + if meta.ConvertedType == nil { + d.SetInt64(v) + return + } + switch *meta.ConvertedType { + // decimal + case parquet.ConvertedType_DECIMAL: + scale := int64(1) + for i := 0; i < int(*meta.Scale); i++ { + scale *= 10 + } + fmtStr := fmt.Sprintf("%%d.%%0%dd", *meta.Scale) + val := fmt.Sprintf(fmtStr, v/scale, abs(v%scale)) + d.SetString(val, mysql.DefaultCollationName) + case parquet.ConvertedType_DATE: + dateStr := time.Unix(v*86400, 0).Format("2006-01-02") + d.SetString(dateStr, mysql.DefaultCollationName) + // convert all timestamp types (datetime/timestamp) to string + case parquet.ConvertedType_TIMESTAMP_MICROS: + dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999") + d.SetString(dateStr, mysql.DefaultCollationName) + case parquet.ConvertedType_TIMESTAMP_MILLIS: + dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999") + d.SetString(dateStr, mysql.DefaultCollationName) + // covert time types to string + case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS: + if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS { + v /= 1e3 + } + millis := v % 1e3 + v /= 1e3 + sec := v % 60 + v /= 60 + min := v % 60 + v /= 60 + d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), mysql.DefaultCollationName) + default: + d.SetInt64(v) + } +} + func (pp *ParquetParser) LastRow() Row { return pp.lastRow } diff --git a/lightning/mydump/parquet_parser_test.go b/lightning/mydump/parquet_parser_test.go index 58bca7367..b05b83c23 100644 --- a/lightning/mydump/parquet_parser_test.go +++ b/lightning/mydump/parquet_parser_test.go @@ -81,3 +81,69 @@ func (s testParquetParserSuite) TestParquetParser(c *C) { c.Assert(reader.ReadRow(), Equals, io.EOF) } + +func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { + type Test struct { + Date int32 `parquet:"name=date, type=DATE"` + TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"` + TimeMicros int64 `parquet:"name=timemicros, type=TIME_MICROS"` + TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"` + TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"` + + Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` + Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` + Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"` + Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"` + } + + dir := c.MkDir() + // prepare data + name := "test123.parquet" + testPath := filepath.Join(dir, name) + pf, err := local.NewLocalFileWriter(testPath) + c.Assert(err, IsNil) + test := &Test{} + writer, err := writer2.NewParquetWriter(pf, test, 2) + c.Assert(err, IsNil) + + v := &Test{ + Date: 18564, //2020-10-29 + TimeMillis: 62775123, // 17:26:15.123 + TimeMicros: 62775123000, // 17:26:15.123 + TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356 + TimestampMicros: 1603963672356956, //2020-10-29T17:27:52.356956 + Decimal1: -12345678, // -123456.78 + Decimal2: 456, // 0.0456 + Decimal3: 123456789012345678, //1234567890123456.78 + Decimal4: "-12345678.09", + Decimal5: "-1234567890123456.78", + } + c.Assert(writer.Write(v), IsNil) + c.Assert(writer.WriteStop(), IsNil) + c.Assert(pf.Close(), IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + r, err := store.Open(context.TODO(), name) + c.Assert(err, IsNil) + reader, err := NewParquetParser(context.TODO(), store, r, name) + c.Assert(err, IsNil) + defer reader.Close() + + c.Assert(len(reader.columns), Equals, 10) + + c.Assert(reader.ReadRow(), IsNil) + c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{ + types.NewStringDatum("2020-10-29"), + types.NewStringDatum("17:26:15.123"), + types.NewStringDatum("17:26:15.123"), + types.NewStringDatum("2020-10-29 17:27:52.356"), + types.NewStringDatum("2020-10-29 17:27:52.356"), + types.NewStringDatum("-123456.78"), + types.NewStringDatum("0.0456"), + types.NewStringDatum("1234567890123456.78"), + types.NewStringDatum("-12345678.09"), + types.NewStringDatum("-1234567890123456.78"), + }) +} diff --git a/tests/parquet/run.sh b/tests/parquet/run.sh index 6809612e9..542ddd5e2 100755 --- a/tests/parquet/run.sh +++ b/tests/parquet/run.sh @@ -45,4 +45,8 @@ for BACKEND in local importer tidb; do run_sql 'select w_name from test.warehouse;' check_contains "w_name: eLNEDIW" + + run_sql 'select c_since, c_discount from test.customer where c_id = 20;' + check_contains "c_since: 2020-09-10 20:17:16" + check_container "c_discount: 0.0585" done