Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
add uploader compress writer (#566)
Browse files Browse the repository at this point in the history
* add uploader compress writer

* fix hounder

* fix hounder again

* ut

* update Makefile

* address comments

* delete unused code

* fix go fmt

* address comments, add zlib compresser

* fix hound

* fix dot

* address comment

* Revert "address comment"

This reverts commit f0317d7.

* address comment

* fix fmt

Co-authored-by: 3pointer <luancheng@pingcap.com>
  • Loading branch information
lichunzhu and 3pointer authored Nov 16, 2020
1 parent a3517d6 commit 0b4e2a9
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 8 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ check:
@make tools errdoc static lint

static: export GO111MODULE=on
static: tools
static: prepare tools
@ # Not running vet and fmt through metalinter becauase it ends up looking at vendor
tools/bin/goimports -w -d -format-only -local $(BR_PKG) $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(CHECKER)
tools/bin/govet --shadow $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(CHECKER)
Expand Down Expand Up @@ -125,10 +125,12 @@ static: tools
@# TODO: allow more APIs when we need to support "workaound".
grep -Rn --exclude="*_test.go" -E "(\t| )errors\.[A-Z]" cmd pkg | \
grep -vE "Normalize|Annotate|Trace|Cause" 2>&1 | $(CHECKER)
$(FINISH_MOD)

lint: tools
lint: prepare tools
@echo "linting"
CGO_ENABLED=0 tools/bin/revive -formatter friendly -config revive.toml $$($(PACKAGES))
$(FINISH_MOD)

tidy:
@echo "go mod tidy"
Expand Down
109 changes: 104 additions & 5 deletions pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,108 @@ package storage

import (
"bytes"
"compress/gzip"
"context"
"io"
)

// CompressType represents the type of compression.
type CompressType uint8

const (
// NoCompression won't compress given bytes.
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
)

type interceptBuffer interface {
io.WriteCloser
Len() int
Cap() int
Bytes() []byte
Flush() error
Reset()
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
switch compressType {
case NoCompression:
return newNoCompressionBuffer(chunkSize)
case Gzip:
return newGzipBuffer(chunkSize)
default:
return nil
}
}

type noCompressionBuffer struct {
*bytes.Buffer
}

func (b *noCompressionBuffer) Flush() error {
return nil
}

func (b *noCompressionBuffer) Close() error {
return nil
}

func newNoCompressionBuffer(chunkSize int) *noCompressionBuffer {
return &noCompressionBuffer{bytes.NewBuffer(make([]byte, 0, chunkSize))}
}

type simpleCompressWriter interface {
io.WriteCloser
Flush() error
}

type simpleCompressBuffer struct {
*bytes.Buffer
compressWriter simpleCompressWriter
len int
cap int
}

func (b *simpleCompressBuffer) Write(p []byte) (int, error) {
written, err := b.compressWriter.Write(p)
b.len += written
return written, err
}

func (b *simpleCompressBuffer) Len() int {
return b.len
}

func (b *simpleCompressBuffer) Cap() int {
return b.cap
}

func (b *simpleCompressBuffer) Reset() {
b.len = 0
b.Buffer.Reset()
}

func (b *simpleCompressBuffer) Flush() error {
return b.compressWriter.Flush()
}

func (b *simpleCompressBuffer) Close() error {
return b.compressWriter.Close()
}

func newGzipBuffer(chunkSize int) *simpleCompressBuffer {
bf := bytes.NewBuffer(make([]byte, 0, chunkSize))
return &simpleCompressBuffer{
Buffer: bf,
len: 0,
cap: chunkSize,
compressWriter: gzip.NewWriter(bf),
}
}

type uploaderWriter struct {
buf *bytes.Buffer
buf interceptBuffer
uploader Uploader
}

Expand All @@ -27,6 +124,7 @@ func (u *uploaderWriter) Write(ctx context.Context, p []byte) (int, error) {
}
p = p[w:]
}
u.buf.Flush()
err := u.uploadChunk(ctx)
if err != nil {
return 0, err
Expand All @@ -47,6 +145,7 @@ func (u *uploaderWriter) uploadChunk(ctx context.Context) error {
}

func (u *uploaderWriter) Close(ctx context.Context) error {
u.buf.Close()
err := u.uploadChunk(ctx)
if err != nil {
return err
Expand All @@ -55,15 +154,15 @@ func (u *uploaderWriter) Close(ctx context.Context) error {
}

// NewUploaderWriter wraps the Writer interface over an uploader.
func NewUploaderWriter(uploader Uploader, chunkSize int) Writer {
return newUploaderWriter(uploader, chunkSize)
func NewUploaderWriter(uploader Uploader, chunkSize int, compressType CompressType) Writer {
return newUploaderWriter(uploader, chunkSize, compressType)
}

// newUploaderWriter is used for testing only.
func newUploaderWriter(uploader Uploader, chunkSize int) *uploaderWriter {
func newUploaderWriter(uploader Uploader, chunkSize int, compressType CompressType) *uploaderWriter {
return &uploaderWriter{
uploader: uploader,
buf: bytes.NewBuffer(make([]byte, 0, chunkSize))}
buf: newInterceptBuffer(chunkSize, compressType)}
}

// BufferWriter is a Writer implementation on top of bytes.Buffer that is useful for testing.
Expand Down
87 changes: 86 additions & 1 deletion pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
package storage

import (
"bytes"
"compress/gzip"
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"

Expand All @@ -29,7 +33,7 @@ func (r *testStorageSuite) TestUploaderWriter(c *C) {
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
uploader, err := storage.CreateUploader(ctx, fileName)
c.Assert(err, IsNil)
writer := newUploaderWriter(uploader, test.chunkSize)
writer := newUploaderWriter(uploader, test.chunkSize, NoCompression)
for _, str := range test.content {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
Expand Down Expand Up @@ -91,3 +95,84 @@ func (r *testStorageSuite) TestUploaderWriter(c *C) {
testFn(&tests[i], c)
}
}

func (r *testStorageSuite) TestUploaderCompressWriter(c *C) {
dir := c.MkDir()

type testcase struct {
name string
content []string
chunkSize int
compressType CompressType
}
testFn := func(test *testcase, c *C) {
c.Log(test.name)
backend, err := ParseBackend("local:///"+dir, nil)
c.Assert(err, IsNil)
ctx := context.Background()
storage, err := Create(ctx, backend, true)
c.Assert(err, IsNil)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz"
uploader, err := storage.CreateUploader(ctx, fileName)
c.Assert(err, IsNil)
writer := newUploaderWriter(uploader, test.chunkSize, test.compressType)
for _, str := range test.content {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
c.Assert(err2, IsNil)
c.Assert(written, Equals, len(p))
}
err = writer.Close(ctx)
c.Assert(err, IsNil)
file, err := os.Open(filepath.Join(dir, fileName))
c.Assert(err, IsNil)
var r io.Reader
switch test.compressType {
case Gzip:
r, err = gzip.NewReader(file)
default:
c.Fatal("unknown compressType")
}
c.Assert(err, IsNil)
var bf bytes.Buffer
_, err = bf.ReadFrom(r)
c.Assert(err, IsNil)
c.Assert(bf.String(), Equals, strings.Join(test.content, ""))
// Sanity check we didn't write past the chunk size
c.Assert(writer.buf.Cap(), Equals, test.chunkSize)
c.Assert(file.Close(), IsNil)
}
compressTypeArr := []CompressType{Gzip}
tests := []testcase{
{
name: "long text medium chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
chunkSize: 30,
},
{
name: "long text large chunks",
content: []string{
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
"hello world",
},
chunkSize: 500,
},
}
for i := range tests {
for _, compressType := range compressTypeArr {
tests[i].compressType = compressType
testFn(&tests[i], c)
}
}
}

0 comments on commit 0b4e2a9

Please sign in to comment.