Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

layout: fix read byte not full from s3 #330

Merged
merged 1 commit into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions encoding/binaryread.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func BinaryReadINT32(r io.Reader, nums []interface{}) error {
buf := make([]byte, len(nums)*4)
n, err := r.Read(buf)
n, err := io.ReadFull(r, buf)
if err != nil {
return err
}
Expand All @@ -28,7 +28,7 @@ func BinaryReadINT32(r io.Reader, nums []interface{}) error {

func BinaryReadINT64(r io.Reader, nums []interface{}) error {
buf := make([]byte, len(nums)*8)
n, err := r.Read(buf)
n, err := io.ReadFull(r, ,buf)
Copy link

@kennytm kennytm Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, shouldn't the compiler reject this?

edit: ok fixed in b09c49d.

if err != nil {
return err
}
Expand All @@ -51,7 +51,7 @@ func BinaryReadINT64(r io.Reader, nums []interface{}) error {

func BinaryReadFLOAT32(r io.Reader, nums []interface{}) error {
buf := make([]byte, len(nums)*4)
n, err := r.Read(buf)
n, err := io.ReadFull(r, buf)
if err != nil {
return err
}
Expand All @@ -70,7 +70,7 @@ func BinaryReadFLOAT32(r io.Reader, nums []interface{}) error {

func BinaryReadFLOAT64(r io.Reader, nums []interface{}) error {
buf := make([]byte, len(nums)*8)
n, err := r.Read(buf)
n, err := io.ReadFull(r, buf)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions layout/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math/bits"
"strings"

Expand Down Expand Up @@ -407,7 +408,7 @@ func ReadPageRawData(thriftReader *thrift.TBufferedTransport, schemaHandler *sch

compressedPageSize := pageHeader.GetCompressedPageSize()
buf := make([]byte, compressedPageSize)
if _, err := thriftReader.Read(buf); err != nil {
if _, err := io.ReadFull(thriftReader, buf); err != nil {
return nil, err
}

Expand Down Expand Up @@ -728,13 +729,13 @@ func ReadPage(thriftReader *thrift.TBufferedTransport, schemaHandler *schema.Sch
definitionLevelsBuf := make([]byte, dll)
dataBuf := make([]byte, compressedPageSize-rll-dll)

if _, err = thriftReader.Read(repetitionLevelsBuf); err != nil {
if _, err = io.ReadFull(thriftReader, repetitionLevelsBuf); err != nil {
return nil, 0, 0, err
}
if _, err = thriftReader.Read(definitionLevelsBuf); err != nil {
if _, err = io.ReadFull(thriftReader, definitionLevelsBuf); err != nil {
return nil, 0, 0, err
}
if _, err = thriftReader.Read(dataBuf); err != nil {
if _, err = io.ReadFull(thriftReader, dataBuf); err != nil {
return nil, 0, 0, err
}

Expand Down Expand Up @@ -762,7 +763,7 @@ func ReadPage(thriftReader *thrift.TBufferedTransport, schemaHandler *schema.Sch

} else {
buf = make([]byte, compressedPageSize)
if _, err = thriftReader.Read(buf); err != nil {
if _, err = io.ReadFull(thriftReader, buf); err != nil {
return nil, 0, 0, err
}
codec := colMetaData.GetCodec()
Expand Down
23 changes: 11 additions & 12 deletions reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"encoding/binary"
"io"
"reflect"
"sync"
"strings"
"sync"

"github.com/apache/thrift/lib/go/thrift"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/layout"
"github.com/xitongsys/parquet-go/marshal"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/source"
)

type ParquetReader struct {
Expand All @@ -26,8 +26,8 @@ type ParquetReader struct {
ColumnBuffers map[string]*ColumnBufferType

//One reader can only read one type objects
ObjType reflect.Type
ObjPartialType reflect.Type
ObjType reflect.Type
ObjPartialType reflect.Type
}

//Create a parquet reader: obj is a object with schema tags or a JSON schema string
Expand Down Expand Up @@ -55,7 +55,7 @@ func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64) (*Par
}
}

}else{
} else {
res.SchemaHandler = schema.NewSchemaHandlerFromSchemaList(res.Footer.Schema)
}

Expand All @@ -80,7 +80,6 @@ func (self *ParquetReader) SetSchemaHandlerFromJSON(jsonSchema string) error {
return err
}


self.RenameSchema()
for i := 0; i < len(self.SchemaHandler.SchemaElements); i++ {
schemaElement := self.SchemaHandler.SchemaElements[i]
Expand Down Expand Up @@ -199,7 +198,7 @@ func (self *ParquetReader) Read(dstInterface interface{}) error {

// Read maxReadNumber objects
func (self *ParquetReader) ReadByNumber(maxReadNumber int) ([]interface{}, error) {
var err error
var err error
if self.ObjType == nil {
if self.ObjType, err = self.SchemaHandler.GetType(self.SchemaHandler.GetRootInName()); err != nil {
return nil, err
Expand Down Expand Up @@ -229,13 +228,13 @@ func (self *ParquetReader) ReadPartial(dstInterface interface{}, prefixPath stri
if err != nil {
return err
}

return self.read(dstInterface, prefixPath)
}

// Read maxReadNumber partial objects
// Read maxReadNumber partial objects
func (self *ParquetReader) ReadPartialByNumber(maxReadNumber int, prefixPath string) ([]interface{}, error) {
var err error
var err error
if self.ObjPartialType == nil {
if self.ObjPartialType, err = self.SchemaHandler.GetType(prefixPath); err != nil {
return nil, err
Expand Down Expand Up @@ -327,7 +326,7 @@ func (self *ParquetReader) read(dstInterface interface{}, prefixPath string) err
}
wg.Add(1)
go func(b, e, index int) {
defer func(){
defer func() {
wg.Done()
}()

Expand Down