diff --git a/lightning/mydump/parquet_parser.go b/lightning/mydump/parquet_parser.go index 64470e54c..b1808fc9c 100644 --- a/lightning/mydump/parquet_parser.go +++ b/lightning/mydump/parquet_parser.go @@ -2,18 +2,19 @@ package mydump import ( "context" + "fmt" "io" "reflect" - - "go.uber.org/zap" + "time" "github.com/pingcap/br/pkg/storage" - "github.com/pingcap/errors" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb/types" + "github.com/xitongsys/parquet-go/parquet" preader "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/source" + "go.uber.org/zap" ) const ( @@ -21,14 +22,15 @@ const ( ) type ParquetParser struct { - Reader *preader.ParquetReader - columns []string - rows []interface{} - readRows int64 - curStart int64 - curIndex int - lastRow Row - logger log.Logger + Reader *preader.ParquetReader + columns []string + columnMetas []*parquet.SchemaElement + rows []interface{} + readRows int64 + curStart int64 + curIndex int + lastRow Row + logger log.Logger } // readerWrapper is a used for implement `source.ParquetFile` @@ -82,18 +84,21 @@ func NewParquetParser( return nil, errors.Trace(err) } - columns := make([]string, 0, len(reader.Footer.Schema)) - for i, c := range reader.Footer.Schema { + columns := make([]string, 0, len(reader.Footer.Schema)-1) + columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1) + for i, c := range reader.SchemaHandler.SchemaElements { if c.GetNumChildren() == 0 { // the SchemaElement.Name is capitalized, we should use the original name columns = append(columns, reader.SchemaHandler.Infos[i].ExName) + columnMetas = append(columnMetas, c) } } return &ParquetParser{ - Reader: reader, - columns: columns, - logger: log.L(), + Reader: reader, + columns: columns, + columnMetas: columnMetas, + logger: log.L(), }, nil } @@ -166,17 +171,22 @@ func (pp *ParquetParser) ReadRow() error { pp.lastRow.Row = pp.lastRow.Row[:length] } for i := 0; i < length; i++ { - setDatumValue(&pp.lastRow.Row[i], v.Field(i)) + setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]) } return nil } -func setDatumValue(d *types.Datum, v reflect.Value) { +// convert a parquet value to Datum +// +// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md +func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) { switch v.Kind() { case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: d.SetUint64(v.Uint()) - case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + case reflect.Int8, reflect.Int16: d.SetInt64(v.Int()) + case reflect.Int32, reflect.Int64: + setDatumByInt(d, v.Int(), meta) case reflect.String: d.SetString(v.String(), "") case reflect.Float32, reflect.Float64: @@ -185,7 +195,7 @@ func setDatumValue(d *types.Datum, v reflect.Value) { if v.IsNil() { d.SetNull() } else { - setDatumValue(d, v.Elem()) + setDatumValue(d, v.Elem(), meta) } default: log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()), @@ -193,6 +203,49 @@ func setDatumValue(d *types.Datum, v reflect.Value) { } } +// when the value type is int32/int64, convert to value to target logical type in tidb +func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) { + if meta.ConvertedType == nil { + d.SetInt64(v) + return + } + switch *meta.ConvertedType { + // decimal + case parquet.ConvertedType_DECIMAL: + minLen := *meta.Scale + 1 + if v < 0 { + minLen++ + } + val := fmt.Sprintf("%0*d", minLen, v) + dotIndex := len(val) - int(*meta.Scale) + d.SetString(val[:dotIndex]+"."+val[dotIndex:], "") + case parquet.ConvertedType_DATE: + dateStr := time.Unix(v*86400, 0).Format("2006-01-02") + d.SetString(dateStr, "") + // convert all timestamp types (datetime/timestamp) to string + case parquet.ConvertedType_TIMESTAMP_MICROS: + dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999") + d.SetString(dateStr, "") + case parquet.ConvertedType_TIMESTAMP_MILLIS: + dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999") + d.SetString(dateStr, "") + // covert time types to string + case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS: + if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS { + v /= 1e3 + } + millis := v % 1e3 + v /= 1e3 + sec := v % 60 + v /= 60 + min := v % 60 + v /= 60 + d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "") + default: + d.SetInt64(v) + } +} + func (pp *ParquetParser) LastRow() Row { return pp.lastRow } diff --git a/lightning/mydump/parquet_parser_test.go b/lightning/mydump/parquet_parser_test.go index 58bca7367..de117d044 100644 --- a/lightning/mydump/parquet_parser_test.go +++ b/lightning/mydump/parquet_parser_test.go @@ -7,10 +7,8 @@ import ( "strconv" "github.com/pingcap/br/pkg/storage" - - "github.com/pingcap/tidb/types" - . "github.com/pingcap/check" + "github.com/pingcap/tidb/types" "github.com/xitongsys/parquet-go-source/local" writer2 "github.com/xitongsys/parquet-go/writer" ) @@ -81,3 +79,128 @@ func (s testParquetParserSuite) TestParquetParser(c *C) { c.Assert(reader.ReadRow(), Equals, io.EOF) } + +func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { + type Test struct { + Date int32 `parquet:"name=date, type=DATE"` + TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"` + TimeMicros int64 `parquet:"name=timemicros, type=TIME_MICROS"` + TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"` + TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"` + + Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` + Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` + Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"` + Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"` + Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + } + + dir := c.MkDir() + // prepare data + name := "test123.parquet" + testPath := filepath.Join(dir, name) + pf, err := local.NewLocalFileWriter(testPath) + c.Assert(err, IsNil) + test := &Test{} + writer, err := writer2.NewParquetWriter(pf, test, 2) + c.Assert(err, IsNil) + + v := &Test{ + Date: 18564, //2020-10-29 + TimeMillis: 62775123, // 17:26:15.123 + TimeMicros: 62775123000, // 17:26:15.123 + TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356 + TimestampMicros: 1603963672356956, //2020-10-29T17:27:52.356956 + Decimal1: -12345678, // -123456.78 + Decimal2: 456, // 0.0456 + Decimal3: 123456789012345678, //1234567890123456.78 + Decimal4: "-12345678.09", + Decimal5: "-1234567890123456.78", + Decimal6: -1, // -0.0001 + } + c.Assert(writer.Write(v), IsNil) + c.Assert(writer.WriteStop(), IsNil) + c.Assert(pf.Close(), IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + r, err := store.Open(context.TODO(), name) + c.Assert(err, IsNil) + reader, err := NewParquetParser(context.TODO(), store, r, name) + c.Assert(err, IsNil) + defer reader.Close() + + c.Assert(len(reader.columns), Equals, 11) + + c.Assert(reader.ReadRow(), IsNil) + c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{ + types.NewCollationStringDatum("2020-10-29", "", 0), + types.NewCollationStringDatum("17:26:15.123", "", 0), + types.NewCollationStringDatum("17:26:15.123", "", 0), + types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), + types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), + types.NewCollationStringDatum("-123456.78", "", 0), + types.NewCollationStringDatum("0.0456", "", 0), + types.NewCollationStringDatum("1234567890123456.78", "", 0), + types.NewCollationStringDatum("-12345678.09", "", 0), + types.NewCollationStringDatum("-1234567890123456.78", "", 0), + types.NewCollationStringDatum("-0.0001", "", 0), + }) + + type TestDecimal struct { + Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=3, precision=5, basetype=INT32"` + DecimalRef *int32 `parquet:"name=decimal2, type=DECIMAL, scale=3, precision=5, basetype=INT32"` + } + + cases := [][]interface{}{ + {int32(0), "0.000"}, + {int32(1000), "1.000"}, + {int32(-1000), "-1.000"}, + {int32(999), "0.999"}, + {int32(-999), "-0.999"}, + {int32(1), "0.001"}, + {int32(-1), "-0.001"}, + } + + fileName := "test.02.parquet" + testPath = filepath.Join(dir, fileName) + pf, err = local.NewLocalFileWriter(testPath) + td := &TestDecimal{} + c.Assert(err, IsNil) + writer, err = writer2.NewParquetWriter(pf, td, 2) + c.Assert(err, IsNil) + for i, testCase := range cases { + val := testCase[0].(int32) + td.Decimal1 = val + if i%2 == 0 { + td.DecimalRef = &val + } else { + td.DecimalRef = nil + } + c.Assert(writer.Write(td), IsNil) + } + c.Assert(writer.WriteStop(), IsNil) + c.Assert(pf.Close(), IsNil) + + r, err = store.Open(context.TODO(), fileName) + c.Assert(err, IsNil) + reader, err = NewParquetParser(context.TODO(), store, r, fileName) + c.Assert(err, IsNil) + defer reader.Close() + + for i, testCase := range cases { + c.Assert(reader.ReadRow(), IsNil) + vals := []types.Datum{types.NewCollationStringDatum(testCase[1].(string), "", 0)} + if i%2 == 0 { + vals = append(vals, vals[0]) + } else { + vals = append(vals, types.Datum{}) + } + // because we always reuse the datums in reader.lastRow.Row, so we can't directly + // compare will `DeepEqual` here + eq, err := types.EqualDatums(nil, reader.lastRow.Row, vals) + c.Assert(err, IsNil) + c.Assert(eq, IsTrue) + } +} diff --git a/tests/parquet/run.sh b/tests/parquet/run.sh index 6809612e9..3d2fd56cc 100755 --- a/tests/parquet/run.sh +++ b/tests/parquet/run.sh @@ -45,4 +45,8 @@ for BACKEND in local importer tidb; do run_sql 'select w_name from test.warehouse;' check_contains "w_name: eLNEDIW" + + run_sql 'select c_since, c_discount from test.customer where c_id = 20;' + check_contains "c_since: 2020-09-10 20:17:16" + check_contains "c_discount: 0.0585" done