Skip to content

Commit

Permalink
Add chunking support for local, owncloud and ocis fs
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Oct 30, 2020
1 parent c03f4cc commit e1d32b8
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 21 deletions.
19 changes: 11 additions & 8 deletions pkg/storage/fs/ocis/ocis.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cs3org/reva/pkg/logger"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/storage/utils/templates"
"github.com/cs3org/reva/pkg/user"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -160,18 +161,20 @@ func New(m map[string]interface{}) (storage.FS, error) {
}

return &ocisfs{
tp: tp,
lu: lu,
o: o,
p: &Permissions{lu: lu},
tp: tp,
lu: lu,
o: o,
p: &Permissions{lu: lu},
chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")),
}, nil
}

type ocisfs struct {
tp TreePersistence
lu *Lookup
o *Options
p *Permissions
tp TreePersistence
lu *Lookup
o *Options
p *Permissions
chunkHandler *chunking.ChunkHandler
}

func (fs *ocisfs) Shutdown(ctx context.Context) error {
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/fs/ocis/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/logger"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/user"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand All @@ -48,6 +49,27 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read
}

uploadInfo := upload.(*fileUpload)

p := uploadInfo.info.Storage["InternalDestination"]
ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "ocfs: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
if p == "" {
if err = uploadInfo.Terminate(ctx); err != nil {
return errors.Wrap(err, "ocfs: error removing auxiliary files")
}
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
defer r.Close()
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
return errors.Wrap(err, "ocisfs: error writing to binary file")
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/storage/utils/ace"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/storage/utils/templates"
"github.com/cs3org/reva/pkg/user"
"github.com/gofrs/uuid"
Expand Down Expand Up @@ -198,12 +199,17 @@ func New(m map[string]interface{}) (storage.FS, error) {
},
}

return &ocfs{c: c, pool: pool}, nil
return &ocfs{
c: c,
pool: pool,
chunkHandler: chunking.NewChunkHandler(c.UploadInfoDir),
}, nil
}

type ocfs struct {
c *config
pool *redis.Pool
c *config
pool *redis.Pool
chunkHandler *chunking.ChunkHandler
}

func (fs *ocfs) Shutdown(ctx context.Context) error {
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/fs/owncloud/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/logger"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/storage/utils/templates"
"github.com/cs3org/reva/pkg/user"
"github.com/google/uuid"
Expand All @@ -47,6 +48,27 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl
}

uploadInfo := upload.(*fileUpload)

p := uploadInfo.info.Storage["InternalDestination"]
ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "ocfs: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
if p == "" {
if err = uploadInfo.Terminate(ctx); err != nil {
return errors.Wrap(err, "ocfs: error removing auxiliary files")
}
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
defer r.Close()
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
return errors.Wrap(err, "ocfs: error writing to binary file")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/utils/chunking/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func IsChunked(fn string) (bool, error) {
type ChunkBLOBInfo struct {
Path string
TransferID string
TotalChunks int64
CurrentChunk int64
TotalChunks int
CurrentChunk int
}

// Not using the resource path in the chunk folder name allows uploading to
Expand All @@ -54,12 +54,12 @@ func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) {
parts := strings.Split(path, "-chunking-")
tail := strings.Split(parts[1], "-")

totalChunks, err := strconv.ParseInt(tail[1], 10, 64)
totalChunks, err := strconv.Atoi(tail[1])
if err != nil {
return nil, err
}

currentChunk, err := strconv.ParseInt(tail[2], 10, 64)
currentChunk, err := strconv.Atoi(tail[2])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er
// not complete and requires more actions.
// This code is needed to notify the owncloud webservice that the upload has not yet been
// completed and needs to continue uploading chunks.
if len(chunks) < int(chunkInfo.TotalChunks) {
if len(chunks) < chunkInfo.TotalChunks {
return false, "", nil
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er
return true, assembledFileName, nil
}

// Write chunk saves an intermediate chunk temporarily and assembles all chunks
// WriteChunk saves an intermediate chunk temporarily and assembles all chunks
// once the final one is received.
func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) {
finish, chunk, err := c.saveChunk(fn, r)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/eosfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC

ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "eos: error resolving reference")
return errors.Wrap(err, "eos: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/utils/localfs/localfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/mime"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/storage/utils/grants"
"github.com/cs3org/reva/pkg/storage/utils/templates"
"github.com/cs3org/reva/pkg/user"
Expand Down Expand Up @@ -84,8 +85,9 @@ func (c *Config) init() {
}

type localfs struct {
conf *Config
db *sql.DB
conf *Config
db *sql.DB
chunkHandler *chunking.ChunkHandler
}

// NewLocalFS returns a storage.FS interface implementation that controls then
Expand All @@ -111,7 +113,11 @@ func NewLocalFS(c *Config) (storage.FS, error) {
return nil, errors.Wrap(err, "localfs: error initializing db")
}

return &localfs{conf: c, db: db}, nil
return &localfs{
conf: c,
db: db,
chunkHandler: chunking.NewChunkHandler(c.Uploads),
}, nil
}

func (fs *localfs) Shutdown(ctx context.Context) error {
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/utils/localfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/user"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand All @@ -44,6 +46,27 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea
}

uploadInfo := upload.(*fileUpload)

p := uploadInfo.info.Storage["InternalDestination"]
ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "ocfs: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
if p == "" {
if err = uploadInfo.Terminate(ctx); err != nil {
return errors.Wrap(err, "ocfs: error removing auxiliary files")
}
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
defer r.Close()
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
return errors.Wrap(err, "ocisfs: error writing to binary file")
}
Expand Down

0 comments on commit e1d32b8

Please sign in to comment.