Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: support restore apache parquet format source files (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Sep 16, 2020
1 parent 4ea4e76 commit 6e4c898
Show file tree
Hide file tree
Showing 37 changed files with 822 additions and 15 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ lightning_for_integration_test: ensure_failpoint_ctl
-coverpkg=github.com/pingcap/tidb-lightning/... \
-o $(LIGHTNING_CTL_BIN).test \
github.com/pingcap/tidb-lightning/cmd/tidb-lightning-ctl || ( $(FAILPOINT_DISABLE) && exit 1 )
$(GOBUILD) $(RACE_FLAG) -o bin/parquet_gen tests/checkpoint_parquet/*.go
$(FAILPOINT_DISABLE)

integration_test: lightning_for_integration_test
Expand Down
10 changes: 8 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pingcap/tidb-lightning
go 1.13

require (
cloud.google.com/go/bigquery v1.4.0 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/carlmjohnson/flagext v0.0.11
Expand All @@ -12,6 +13,7 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.3
github.com/google/go-cmp v0.5.0 // indirect
github.com/joho/sqltocsv v0.0.0-20190824231449-5650f27fd5b6
github.com/juju/loggo v0.0.0-20180524022052-584905176618 // indirect
github.com/onsi/ginkgo v1.13.0 // indirect
Expand All @@ -30,11 +32,15 @@ require (
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tikv/pd v1.1.0-beta.0.20200818122340-ef1a4e920b2f
github.com/xitongsys/parquet-go v1.5.2
github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5
go.opencensus.io v0.22.3 // indirect
go.uber.org/zap v1.15.0
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 // indirect
golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed // indirect
golang.org/x/text v0.3.3
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
google.golang.org/grpc v1.26.0
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
modernc.org/mathutil v1.0.0
Expand Down
36 changes: 32 additions & 4 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *mdLoaderSetup) listFiles(ctx context.Context, store storage.ExternalSto
s.dbSchemas = append(s.dbSchemas, info)
case SourceTypeTableSchema:
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeSQL, SourceTypeCSV:
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
s.tableDatas = append(s.tableDatas, info)
}

Expand Down
216 changes: 216 additions & 0 deletions lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package mydump

import (
"context"
"io"
"reflect"

"go.uber.org/zap"

"github.com/pingcap/br/pkg/storage"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb/types"
preader "github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
)

const (
batchReadRowSize = 32
)

type ParquetParser struct {
Reader *preader.ParquetReader
columns []string
rows []interface{}
readRows int64
curStart int64
curIndex int
lastRow Row
logger log.Logger
}

// readerWrapper is a used for implement `source.ParquetFile`
type readerWrapper struct {
ReadSeekCloser
store storage.ExternalStorage
ctx context.Context
// current file path
path string
}

func (r *readerWrapper) Write(p []byte) (n int, err error) {
return 0, errors.New("unsupported operation")
}

func (r *readerWrapper) Open(name string) (source.ParquetFile, error) {
if len(name) == 0 {
name = r.path
}
reader, err := r.store.Open(r.ctx, name)
if err != nil {
return nil, errors.Trace(err)
}
return &readerWrapper{
ReadSeekCloser: reader,
store: r.store,
ctx: r.ctx,
path: name,
}, nil
}
func (r *readerWrapper) Create(name string) (source.ParquetFile, error) {
return nil, errors.New("unsupported operation")
}

func NewParquetParser(
ctx context.Context,
store storage.ExternalStorage,
r storage.ReadSeekCloser,
path string,
) (*ParquetParser, error) {
wrapper := &readerWrapper{
ReadSeekCloser: r,
store: store,
ctx: ctx,
path: path,
}

// FIXME: need to bench what the best value for the concurrent reader number
reader, err := preader.NewParquetReader(wrapper, nil, 2)
if err != nil {
return nil, errors.Trace(err)
}

columns := make([]string, 0, len(reader.Footer.Schema))
for i, c := range reader.Footer.Schema {
if c.GetNumChildren() == 0 {
// the SchemaElement.Name is capitalized, we should use the original name
columns = append(columns, reader.SchemaHandler.Infos[i].ExName)
}
}

return &ParquetParser{
Reader: reader,
columns: columns,
logger: log.L(),
}, nil
}

// Pos returns the currently row number of the parquet file
func (pp *ParquetParser) Pos() (pos int64, rowID int64) {
return pp.curStart + int64(pp.curIndex), pp.lastRow.RowID
}

func (pp *ParquetParser) SetPos(pos int64, rowID int64) error {
if pos < pp.curStart {
panic("don't support seek back yet")
}
pp.lastRow.RowID = rowID

if pos < pp.curStart+int64(len(pp.rows)) {
pp.curIndex = int(pos - pp.curStart)
pp.readRows = pos
return nil
}

if pos > pp.curStart+int64(len(pp.rows)) {
if err := pp.Reader.SkipRows(pos - pp.curStart - int64(len(pp.rows))); err != nil {
return errors.Trace(err)
}
}
pp.curStart = pos
pp.readRows = pos
pp.curIndex = 0
if len(pp.rows) > 0 {
pp.rows = pp.rows[:0]
}

return nil
}

func (pp *ParquetParser) Close() error {
pp.Reader.ReadStop()
return pp.Reader.PFile.Close()
}

func (pp *ParquetParser) ReadRow() error {
pp.lastRow.RowID++
if pp.curIndex >= len(pp.rows) {
if pp.readRows >= pp.Reader.GetNumRows() {
return io.EOF
}
count := batchReadRowSize
if pp.Reader.GetNumRows()-pp.readRows < int64(count) {
count = int(pp.Reader.GetNumRows() - pp.readRows)
}

var err error
pp.rows, err = pp.Reader.ReadByNumber(count)
if err != nil {
return errors.Trace(err)
}
pp.curStart = pp.readRows
pp.readRows += int64(len(pp.rows))
pp.curIndex = 0
}

row := pp.rows[pp.curIndex]
pp.curIndex++

v := reflect.ValueOf(row)
length := v.NumField()
if cap(pp.lastRow.Row) < length {
pp.lastRow.Row = make([]types.Datum, length)
} else {
pp.lastRow.Row = pp.lastRow.Row[:length]
}
for i := 0; i < length; i++ {
setDatumValue(&pp.lastRow.Row[i], v.Field(i))
}
return nil
}

func setDatumValue(d *types.Datum, v reflect.Value) {
switch v.Kind() {
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
d.SetUint64(v.Uint())
case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
d.SetInt64(v.Int())
case reflect.String:
d.SetString(v.String(), "")
case reflect.Float32, reflect.Float64:
d.SetFloat64(v.Float())
case reflect.Ptr:
if v.IsNil() {
d.SetNull()
} else {
setDatumValue(d, v.Elem())
}
default:
log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()),
zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface()))
}
}

func (pp *ParquetParser) LastRow() Row {
return pp.lastRow
}

func (pp *ParquetParser) RecycleRow(row Row) {
}

// Columns returns the _lower-case_ column names corresponding to values in
// the LastRow.
func (pp *ParquetParser) Columns() []string {
return pp.columns
}

// SetColumns set restored column names to parser
func (pp *ParquetParser) SetColumns(cols []string) {
// just do nothing
}

func (pp *ParquetParser) SetLogger(l log.Logger) {
pp.logger = l
}
83 changes: 83 additions & 0 deletions lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package mydump

import (
"context"
"io"
"path/filepath"
"strconv"

"github.com/pingcap/br/pkg/storage"

"github.com/pingcap/tidb/types"

. "github.com/pingcap/check"
"github.com/xitongsys/parquet-go-source/local"
writer2 "github.com/xitongsys/parquet-go/writer"
)

type testParquetParserSuite struct{}

var _ = Suite(testParquetParserSuite{})

func (s testParquetParserSuite) TestParquetParser(c *C) {
type Test struct {
S string `parquet:"name=s, type=UTF8, encoding=PLAIN_DICTIONARY"`
A int32 `parquet:"name=a, type=INT32"`
}

dir := c.MkDir()
// prepare data
name := "test123.parquet"
testPath := filepath.Join(dir, name)
pf, err := local.NewLocalFileWriter(testPath)
c.Assert(err, IsNil)
test := &Test{}
writer, err := writer2.NewParquetWriter(pf, test, 2)
c.Assert(err, IsNil)

for i := 0; i < 100; i++ {
test.A = int32(i)
test.S = strconv.Itoa(i)
c.Assert(writer.Write(test), IsNil)
}

c.Assert(writer.WriteStop(), IsNil)
c.Assert(pf.Close(), IsNil)

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)
r, err := store.Open(context.TODO(), name)
c.Assert(err, IsNil)
reader, err := NewParquetParser(context.TODO(), store, r, name)
c.Assert(err, IsNil)
defer reader.Close()

c.Assert(reader.Columns(), DeepEquals, []string{"s", "a"})

verifyRow := func(i int) {
c.Assert(reader.lastRow.RowID, Equals, int64(i+1))
c.Assert(len(reader.lastRow.Row), Equals, 2)
c.Assert(reader.lastRow.Row[0], DeepEquals, types.NewCollationStringDatum(strconv.Itoa(i), "", 0))
c.Assert(reader.lastRow.Row[1], DeepEquals, types.NewIntDatum(int64(i)))
}

// test read some rows
for i := 0; i < 10; i++ {
c.Assert(reader.ReadRow(), IsNil)
verifyRow(i)
}

// test set pos to pos < curpos + batchReadRowSize
c.Assert(reader.SetPos(15, 15), IsNil)
c.Assert(reader.ReadRow(), IsNil)
verifyRow(15)

// test set pos to pos > curpos + batchReadRowSize
c.Assert(reader.SetPos(80, 80), IsNil)
for i := 80; i < 100; i++ {
c.Assert(reader.ReadRow(), IsNil)
verifyRow(i)
}

c.Assert(reader.ReadRow(), Equals, io.EOF)
}
Loading

0 comments on commit 6e4c898

Please sign in to comment.