Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
fix parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Oct 29, 2020
1 parent 3ecec63 commit fb49286
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 18 deletions.
101 changes: 83 additions & 18 deletions lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package mydump

import (
"context"
"fmt"
"io"
"reflect"
"time"

"github.com/pingcap/parser/mysql"

"github.com/xitongsys/parquet-go/parquet"

"go.uber.org/zap"

Expand All @@ -21,14 +27,15 @@ const (
)

type ParquetParser struct {
Reader *preader.ParquetReader
columns []string
rows []interface{}
readRows int64
curStart int64
curIndex int
lastRow Row
logger log.Logger
Reader *preader.ParquetReader
columns []string
columnMetas []*parquet.SchemaElement
rows []interface{}
readRows int64
curStart int64
curIndex int
lastRow Row
logger log.Logger
}

// readerWrapper is a used for implement `source.ParquetFile`
Expand Down Expand Up @@ -82,18 +89,21 @@ func NewParquetParser(
return nil, errors.Trace(err)
}

columns := make([]string, 0, len(reader.Footer.Schema))
for i, c := range reader.Footer.Schema {
columns := make([]string, 0, len(reader.Footer.Schema)-1)
columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1)
for i, c := range reader.SchemaHandler.SchemaElements {
if c.GetNumChildren() == 0 {
// the SchemaElement.Name is capitalized, we should use the original name
columns = append(columns, reader.SchemaHandler.Infos[i].ExName)
columnMetas = append(columnMetas, c)
}
}

return &ParquetParser{
Reader: reader,
columns: columns,
logger: log.L(),
Reader: reader,
columns: columns,
columnMetas: columnMetas,
logger: log.L(),
}, nil
}

Expand Down Expand Up @@ -166,33 +176,88 @@ func (pp *ParquetParser) ReadRow() error {
pp.lastRow.Row = pp.lastRow.Row[:length]
}
for i := 0; i < length; i++ {
setDatumValue(&pp.lastRow.Row[i], v.Field(i))
setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i])
}
return nil
}

func setDatumValue(d *types.Datum, v reflect.Value) {
// convert a parquet value to Datum
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) {
switch v.Kind() {
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
d.SetUint64(v.Uint())
case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
case reflect.Int8, reflect.Int16:
d.SetInt64(v.Int())
case reflect.Int32, reflect.Int64:
setDatumByInt(d, v.Int(), meta)
case reflect.String:
d.SetString(v.String(), "")
d.SetString(v.String(), mysql.DefaultCollationName)
case reflect.Float32, reflect.Float64:
d.SetFloat64(v.Float())
case reflect.Ptr:
if v.IsNil() {
d.SetNull()
} else {
setDatumValue(d, v.Elem())
setDatumValue(d, v.Elem(), meta)
}
default:
log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()),
zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface()))
}
}

func abs(v int64) int64 {
if v >= 0 {
return v
}
return -v
}

// when the value type is int32/int64, convert to value to target logical type in tidb
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) {
if meta.ConvertedType == nil {
d.SetInt64(v)
return
}
switch *meta.ConvertedType {
// decimal
case parquet.ConvertedType_DECIMAL:
scale := int64(1)
for i := 0; i < int(*meta.Scale); i++ {
scale *= 10
}
fmtStr := fmt.Sprintf("%%d.%%0%dd", *meta.Scale)
val := fmt.Sprintf(fmtStr, v/scale, abs(v%scale))
d.SetString(val, mysql.DefaultCollationName)
case parquet.ConvertedType_DATE:
dateStr := time.Unix(v*86400, 0).Format("2006-01-02")
d.SetString(dateStr, mysql.DefaultCollationName)
// convert all timestamp types (datetime/timestamp) to string
case parquet.ConvertedType_TIMESTAMP_MICROS:
dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, mysql.DefaultCollationName)
case parquet.ConvertedType_TIMESTAMP_MILLIS:
dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, mysql.DefaultCollationName)
// covert time types to string
case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS:
if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS {
v /= 1e3
}
millis := v % 1e3
v /= 1e3
sec := v % 60
v /= 60
min := v % 60
v /= 60
d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), mysql.DefaultCollationName)
default:
d.SetInt64(v)
}
}

func (pp *ParquetParser) LastRow() Row {
return pp.lastRow
}
Expand Down
66 changes: 66 additions & 0 deletions lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,69 @@ func (s testParquetParserSuite) TestParquetParser(c *C) {

c.Assert(reader.ReadRow(), Equals, io.EOF)
}

func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {
type Test struct {
Date int32 `parquet:"name=date, type=DATE"`
TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"`
TimeMicros int64 `parquet:"name=timemicros, type=TIME_MICROS"`
TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"`
TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"`

Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"`
Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"`
Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"`
Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"`
Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"`
}

dir := c.MkDir()
// prepare data
name := "test123.parquet"
testPath := filepath.Join(dir, name)
pf, err := local.NewLocalFileWriter(testPath)
c.Assert(err, IsNil)
test := &Test{}
writer, err := writer2.NewParquetWriter(pf, test, 2)
c.Assert(err, IsNil)

v := &Test{
Date: 18564, //2020-10-29
TimeMillis: 62775123, // 17:26:15.123
TimeMicros: 62775123000, // 17:26:15.123
TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356
TimestampMicros: 1603963672356956, //2020-10-29T17:27:52.356956
Decimal1: -12345678, // -123456.78
Decimal2: 456, // 0.0456
Decimal3: 123456789012345678, //1234567890123456.78
Decimal4: "-12345678.09",
Decimal5: "-1234567890123456.78",
}
c.Assert(writer.Write(v), IsNil)
c.Assert(writer.WriteStop(), IsNil)
c.Assert(pf.Close(), IsNil)

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)
r, err := store.Open(context.TODO(), name)
c.Assert(err, IsNil)
reader, err := NewParquetParser(context.TODO(), store, r, name)
c.Assert(err, IsNil)
defer reader.Close()

c.Assert(len(reader.columns), Equals, 10)

c.Assert(reader.ReadRow(), IsNil)
c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{
types.NewStringDatum("2020-10-29"),
types.NewStringDatum("17:26:15.123"),
types.NewStringDatum("17:26:15.123"),
types.NewStringDatum("2020-10-29 17:27:52.356"),
types.NewStringDatum("2020-10-29 17:27:52.356"),
types.NewStringDatum("-123456.78"),
types.NewStringDatum("0.0456"),
types.NewStringDatum("1234567890123456.78"),
types.NewStringDatum("-12345678.09"),
types.NewStringDatum("-1234567890123456.78"),
})
}
4 changes: 4 additions & 0 deletions tests/parquet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ for BACKEND in local importer tidb; do

run_sql 'select w_name from test.warehouse;'
check_contains "w_name: eLNEDIW"

run_sql 'select c_since, c_discount from test.customer where c_id = 20;'
check_contains "c_since: 2020-09-10 20:17:16"
check_container "c_discount: 0.0585"
done

0 comments on commit fb49286

Please sign in to comment.