Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

mydumper: fix parquet data parser #435

Merged
merged 12 commits into from
Nov 9, 2020
93 changes: 73 additions & 20 deletions lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,35 @@ package mydump

import (
"context"
"fmt"
"io"
"reflect"

"go.uber.org/zap"
"time"

"github.com/pingcap/br/pkg/storage"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb/types"
"github.com/xitongsys/parquet-go/parquet"
preader "github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
"go.uber.org/zap"
)

const (
batchReadRowSize = 32
)

type ParquetParser struct {
Reader *preader.ParquetReader
columns []string
rows []interface{}
readRows int64
curStart int64
curIndex int
lastRow Row
logger log.Logger
Reader *preader.ParquetReader
columns []string
columnMetas []*parquet.SchemaElement
rows []interface{}
readRows int64
curStart int64
curIndex int
lastRow Row
logger log.Logger
}

// readerWrapper is a used for implement `source.ParquetFile`
Expand Down Expand Up @@ -82,18 +84,21 @@ func NewParquetParser(
return nil, errors.Trace(err)
}

columns := make([]string, 0, len(reader.Footer.Schema))
for i, c := range reader.Footer.Schema {
columns := make([]string, 0, len(reader.Footer.Schema)-1)
columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1)
for i, c := range reader.SchemaHandler.SchemaElements {
if c.GetNumChildren() == 0 {
// the SchemaElement.Name is capitalized, we should use the original name
columns = append(columns, reader.SchemaHandler.Infos[i].ExName)
columnMetas = append(columnMetas, c)
}
}

return &ParquetParser{
Reader: reader,
columns: columns,
logger: log.L(),
Reader: reader,
columns: columns,
columnMetas: columnMetas,
logger: log.L(),
}, nil
}

Expand Down Expand Up @@ -166,17 +171,22 @@ func (pp *ParquetParser) ReadRow() error {
pp.lastRow.Row = pp.lastRow.Row[:length]
}
for i := 0; i < length; i++ {
setDatumValue(&pp.lastRow.Row[i], v.Field(i))
setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i])
}
return nil
}

func setDatumValue(d *types.Datum, v reflect.Value) {
// convert a parquet value to Datum
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) {
switch v.Kind() {
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
d.SetUint64(v.Uint())
case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
case reflect.Int8, reflect.Int16:
d.SetInt64(v.Int())
case reflect.Int32, reflect.Int64:
setDatumByInt(d, v.Int(), meta)
case reflect.String:
d.SetString(v.String(), "")
case reflect.Float32, reflect.Float64:
Expand All @@ -185,14 +195,57 @@ func setDatumValue(d *types.Datum, v reflect.Value) {
if v.IsNil() {
d.SetNull()
} else {
setDatumValue(d, v.Elem())
setDatumValue(d, v.Elem(), meta)
}
default:
log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()),
zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface()))
}
}

// when the value type is int32/int64, convert to value to target logical type in tidb
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) {
if meta.ConvertedType == nil {
d.SetInt64(v)
return
}
switch *meta.ConvertedType {
// decimal
case parquet.ConvertedType_DECIMAL:
minLen := *meta.Scale + 1
if v < 0 {
minLen++
}
val := fmt.Sprintf("%0*d", minLen, v)
dotIndex := len(val) - int(*meta.Scale)
d.SetString(val[:dotIndex]+"."+val[dotIndex:], "")
case parquet.ConvertedType_DATE:
dateStr := time.Unix(v*86400, 0).Format("2006-01-02")
d.SetString(dateStr, "")
// convert all timestamp types (datetime/timestamp) to string
case parquet.ConvertedType_TIMESTAMP_MICROS:
dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
case parquet.ConvertedType_TIMESTAMP_MILLIS:
dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
// covert time types to string
case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS:
if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS {
v /= 1e3
}
millis := v % 1e3
v /= 1e3
sec := v % 60
v /= 60
min := v % 60
v /= 60
d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "")
default:
d.SetInt64(v)
}
}

func (pp *ParquetParser) LastRow() Row {
return pp.lastRow
}
Expand Down
129 changes: 126 additions & 3 deletions lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (
"strconv"

"github.com/pingcap/br/pkg/storage"

"github.com/pingcap/tidb/types"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/types"
"github.com/xitongsys/parquet-go-source/local"
writer2 "github.com/xitongsys/parquet-go/writer"
)
Expand Down Expand Up @@ -81,3 +79,128 @@ func (s testParquetParserSuite) TestParquetParser(c *C) {

c.Assert(reader.ReadRow(), Equals, io.EOF)
}

func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {
type Test struct {
Date int32 `parquet:"name=date, type=DATE"`
TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"`
TimeMicros int64 `parquet:"name=timemicros, type=TIME_MICROS"`
TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"`
TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"`

Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"`
Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"`
Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"`
Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"`
Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"`
Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"`
}

dir := c.MkDir()
// prepare data
name := "test123.parquet"
testPath := filepath.Join(dir, name)
pf, err := local.NewLocalFileWriter(testPath)
c.Assert(err, IsNil)
test := &Test{}
writer, err := writer2.NewParquetWriter(pf, test, 2)
c.Assert(err, IsNil)

v := &Test{
Date: 18564, //2020-10-29
TimeMillis: 62775123, // 17:26:15.123
TimeMicros: 62775123000, // 17:26:15.123
TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356
TimestampMicros: 1603963672356956, //2020-10-29T17:27:52.356956
Decimal1: -12345678, // -123456.78
Decimal2: 456, // 0.0456
Decimal3: 123456789012345678, //1234567890123456.78
kennytm marked this conversation as resolved.
Show resolved Hide resolved
Decimal4: "-12345678.09",
Decimal5: "-1234567890123456.78",
Decimal6: -1, // -0.0001
}
c.Assert(writer.Write(v), IsNil)
c.Assert(writer.WriteStop(), IsNil)
c.Assert(pf.Close(), IsNil)

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)
r, err := store.Open(context.TODO(), name)
c.Assert(err, IsNil)
reader, err := NewParquetParser(context.TODO(), store, r, name)
c.Assert(err, IsNil)
defer reader.Close()

c.Assert(len(reader.columns), Equals, 11)

c.Assert(reader.ReadRow(), IsNil)
c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{
types.NewCollationStringDatum("2020-10-29", "", 0),
types.NewCollationStringDatum("17:26:15.123", "", 0),
types.NewCollationStringDatum("17:26:15.123", "", 0),
types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0),
types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0),
types.NewCollationStringDatum("-123456.78", "", 0),
types.NewCollationStringDatum("0.0456", "", 0),
types.NewCollationStringDatum("1234567890123456.78", "", 0),
types.NewCollationStringDatum("-12345678.09", "", 0),
types.NewCollationStringDatum("-1234567890123456.78", "", 0),
types.NewCollationStringDatum("-0.0001", "", 0),
})

type TestDecimal struct {
Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=3, precision=5, basetype=INT32"`
DecimalRef *int32 `parquet:"name=decimal2, type=DECIMAL, scale=3, precision=5, basetype=INT32"`
}

cases := [][]interface{}{
{int32(0), "0.000"},
{int32(1000), "1.000"},
{int32(-1000), "-1.000"},
{int32(999), "0.999"},
{int32(-999), "-0.999"},
{int32(1), "0.001"},
{int32(-1), "-0.001"},
}

fileName := "test.02.parquet"
testPath = filepath.Join(dir, fileName)
pf, err = local.NewLocalFileWriter(testPath)
td := &TestDecimal{}
c.Assert(err, IsNil)
writer, err = writer2.NewParquetWriter(pf, td, 2)
c.Assert(err, IsNil)
for i, testCase := range cases {
val := testCase[0].(int32)
td.Decimal1 = val
if i%2 == 0 {
td.DecimalRef = &val
} else {
td.DecimalRef = nil
}
c.Assert(writer.Write(td), IsNil)
}
c.Assert(writer.WriteStop(), IsNil)
c.Assert(pf.Close(), IsNil)

r, err = store.Open(context.TODO(), fileName)
c.Assert(err, IsNil)
reader, err = NewParquetParser(context.TODO(), store, r, fileName)
c.Assert(err, IsNil)
defer reader.Close()

for i, testCase := range cases {
c.Assert(reader.ReadRow(), IsNil)
vals := []types.Datum{types.NewCollationStringDatum(testCase[1].(string), "", 0)}
if i%2 == 0 {
vals = append(vals, vals[0])
} else {
vals = append(vals, types.Datum{})
}
// because we always reuse the datums in reader.lastRow.Row, so we can't directly
// compare will `DeepEqual` here
eq, err := types.EqualDatums(nil, reader.lastRow.Row, vals)
c.Assert(err, IsNil)
c.Assert(eq, IsTrue)
}
}
4 changes: 4 additions & 0 deletions tests/parquet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ for BACKEND in local importer tidb; do

run_sql 'select w_name from test.warehouse;'
check_contains "w_name: eLNEDIW"

run_sql 'select c_since, c_discount from test.customer where c_id = 20;'
check_contains "c_since: 2020-09-10 20:17:16"
check_contains "c_discount: 0.0585"
done