Skip to content

Commit

Permalink
Stat assembled path instead of chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Oct 30, 2020
1 parent dbcbe85 commit c03f4cc
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 47 deletions.
12 changes: 0 additions & 12 deletions internal/http/services/owncloud/ocdav/ocdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"path"
"strings"
"time"
Expand Down Expand Up @@ -67,21 +66,14 @@ type Config struct {
// and received path is /docs the internal path will be:
// /users/<first char of username>/<username>/docs
WebdavNamespace string `mapstructure:"webdav_namespace"`
ChunkFolder string `mapstructure:"chunk_folder"`
GatewaySvc string `mapstructure:"gatewaysvc"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
}

func (c *Config) init() {
// note: default c.Prefix is an empty string

c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc)

if c.ChunkFolder == "" {
c.ChunkFolder = "/var/tmp/reva/tmp/davchunks"
}

}

type svc struct {
Expand All @@ -100,10 +92,6 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)

conf.init()

if err := os.MkdirAll(conf.ChunkFolder, 0755); err != nil {
return nil, err
}

s := &svc{
c: conf,
webDavHandler: new(WebDavHandler),
Expand Down
19 changes: 19 additions & 0 deletions internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/utils"
)

Expand Down Expand Up @@ -262,6 +263,24 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io
}
}

ok, err := chunking.IsChunked(fn)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if ok {
chunk, err := chunking.GetChunkBLOBInfo(fn)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
sReq = &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: chunk.Path},
}}
}

// stat again to check the new file's metadata
sRes, err = client.Stat(ctx, sReq)
if err != nil {
Expand Down
74 changes: 40 additions & 34 deletions pkg/storage/utils/chunking/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,28 @@ import (
"strings"
)

// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory
// until it gets the final chunk which is then returned.
type ChunkHandler struct {
ChunkFolder string `mapstructure:"chunk_folder"`
}

// NewChunkHandler creates a handler for chunked uploads.
func NewChunkHandler(chunkFolder string) *ChunkHandler {
return &ChunkHandler{chunkFolder}
// IsChunked checks if a given path refers to a chunk or not
func IsChunked(fn string) (bool, error) {
// FIXME: also need to check whether the OC-Chunked header is set
return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn)
}

type chunkBLOBInfo struct {
path string
transferID string
totalChunks int64
currentChunk int64
// ChunkBLOBInfo stores info about a particular chunk
type ChunkBLOBInfo struct {
Path string
TransferID string
TotalChunks int64
CurrentChunk int64
}

// Not using the resource path in the chunk folder name allows uploading to
// the same folder after a move without having to restart the chunk upload
func (c *chunkBLOBInfo) uploadID() string {
return fmt.Sprintf("chunking-%s-%d", c.transferID, c.totalChunks)
func (c *ChunkBLOBInfo) uploadID() string {
return fmt.Sprintf("chunking-%s-%d", c.TransferID, c.TotalChunks)
}

func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) {
// GetChunkBLOBInfo decodes a chunk name to retrieve info about it.
func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) {
parts := strings.Split(path, "-chunking-")
tail := strings.Split(parts[1], "-")

Expand All @@ -70,14 +67,25 @@ func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) {
return nil, fmt.Errorf("current chunk:%d exceeds total number of chunks:%d", currentChunk, totalChunks)
}

return &chunkBLOBInfo{
path: parts[0],
transferID: tail[0],
totalChunks: totalChunks,
currentChunk: currentChunk,
return &ChunkBLOBInfo{
Path: parts[0],
TransferID: tail[0],
TotalChunks: totalChunks,
CurrentChunk: currentChunk,
}, nil
}

// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory
// until it gets the final chunk which is then returned.
type ChunkHandler struct {
ChunkFolder string `mapstructure:"chunk_folder"`
}

// NewChunkHandler creates a handler for chunked uploads.
func NewChunkHandler(chunkFolder string) *ChunkHandler {
return &ChunkHandler{chunkFolder}
}

func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) {
file, err := ioutil.TempFile(fmt.Sprintf("/%s", c.ChunkFolder), "")
if err != nil {
Expand All @@ -87,7 +95,7 @@ func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) {
return file.Name(), file, nil
}

func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) {
func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (string, error) {
path := "/" + c.ChunkFolder + filepath.Clean("/"+i.uploadID())
if err := os.MkdirAll(path, 0755); err != nil {
return "", err
Expand All @@ -96,7 +104,7 @@ func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) {
}

func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, error) {
chunkInfo, err := getChunkBLOBInfo(path)
chunkInfo, err := GetChunkBLOBInfo(path)
if err != nil {
err := fmt.Errorf("error getting chunk info from path: %s", path)
return false, "", err
Expand Down Expand Up @@ -124,7 +132,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er
}
//c.logger.Info().Log("chunkfolder", chunksFolderName)

chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.currentChunk)
chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.CurrentChunk)
if err = os.Rename(chunkTempFilename, chunkTarget); err != nil {
return false, "", err
}
Expand All @@ -151,7 +159,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er
// not complete and requires more actions.
// This code is needed to notify the owncloud webservice that the upload has not yet been
// completed and needs to continue uploading chunks.
if len(chunks) < int(chunkInfo.totalChunks) {
if len(chunks) < int(chunkInfo.TotalChunks) {
return false, "", nil
}

Expand Down Expand Up @@ -190,11 +198,8 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er
return true, assembledFileName, nil
}

func (c *ChunkHandler) IsChunked(fn string) (bool, error) {
// FIXME: also need to check whether the OC-Chunked header is set
return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn)
}

// Write chunk saves an intermediate chunk temporarily and assembles all chunks
// once the final one is received.
func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) {
finish, chunk, err := c.saveChunk(fn, r)
if err != nil {
Expand All @@ -209,14 +214,15 @@ func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCl
if err != nil {
return "", nil, err
}
defer fd.Close()

chunkInfo, err := getChunkBLOBInfo(fn)
chunkInfo, err := GetChunkBLOBInfo(fn)
if err != nil {
return "", nil, err
}

return chunkInfo.path, fd, nil
// Since we're returning a ReadCloser, it is the responsibility of the
// caller function to close it to prevent file descriptor leaks.
return chunkInfo.Path, fd, nil

// TODO(labkode): implement old chunking

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/utils/eosfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/pkg/errors"
)

Expand All @@ -46,7 +47,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC
return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder")
}

ok, err := fs.chunkHandler.IsChunked(p)
ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "eos: error resolving reference")
}
Expand All @@ -58,6 +59,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC
if p == "" {
return errtypes.PartialContent(ref.String())
}
defer r.Close()
}

fn := fs.wrap(ctx, p)
Expand Down

0 comments on commit c03f4cc

Please sign in to comment.