From 6060157edda50dd48f59313706b9ed17944b4672 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 28 Oct 2020 12:17:57 +0100 Subject: [PATCH 01/16] Refactor the uploading files workflow from various clients --- changelog/unreleased/uploads-refactor.md | 8 + .../storageprovider/storageprovider.go | 74 ++--- .../services/dataprovider/dataprovider.go | 146 ++++----- internal/http/services/dataprovider/put.go | 90 ------ internal/http/services/owncloud/ocdav/copy.go | 83 +---- internal/http/services/owncloud/ocdav/move.go | 4 +- .../http/services/owncloud/ocdav/ocdav.go | 1 - .../http/services/owncloud/ocdav/options.go | 2 +- .../http/services/owncloud/ocdav/propfind.go | 8 +- internal/http/services/owncloud/ocdav/put.go | 70 ++-- .../services/owncloud/ocdav/putchunked.go | 25 -- internal/http/services/owncloud/ocdav/tus.go | 3 +- .../http/services/owncloud/ocdav/webdav.go | 6 +- pkg/storage/utils/eosfs/upload.go | 305 +----------------- pkg/storage/utils/localfs/localfs.go | 1 + pkg/storage/utils/localfs/upload.go | 4 + 16 files changed, 178 insertions(+), 652 deletions(-) create mode 100644 changelog/unreleased/uploads-refactor.md diff --git a/changelog/unreleased/uploads-refactor.md b/changelog/unreleased/uploads-refactor.md new file mode 100644 index 0000000000..f3ebb8e6e9 --- /dev/null +++ b/changelog/unreleased/uploads-refactor.md @@ -0,0 +1,8 @@ +Enhancement: Refactor the uploading files workflow from various clients + +Previously, we were implementing the tus client logic in the ocdav service, +leading to restricting the whole of tus logic to the internal services. This PR +refactors that workflow to accept incoming requests following the tus protocol +while using simpler transmission internally. + +https://github.com/cs3org/reva/pull/1285 diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 7457670e2f..08689f00ab 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -57,7 +57,6 @@ type config struct { TmpFolder string `mapstructure:"tmp_folder" docs:"/var/tmp;Path to temporary folder."` DataServerURL string `mapstructure:"data_server_url" docs:"http://localhost/data;The URL for the data server."` ExposeDataServer bool `mapstructure:"expose_data_server" docs:"false;Whether to expose data server."` // if true the client will be able to upload/download directly to it - DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."` MimeTypes map[string]string `mapstructure:"mimetypes" docs:"nil;List of supported mime types and corresponding file extensions."` } @@ -298,50 +297,53 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate Status: status.NewInternal(ctx, errors.New("can't upload to mount path"), "can't upload to mount path"), }, nil } - url := *s.dataServerURL - if s.conf.DisableTus { - url.Path = path.Join("/", url.Path, newRef.GetPath()) - } else { - metadata := map[string]string{} - var uploadLength int64 - if req.Opaque != nil && req.Opaque.Map != nil { - if req.Opaque.Map["Upload-Length"] != nil { - var err error - uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64) - if err != nil { - return &provider.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "error parsing upload length"), - }, nil - } - } - if req.Opaque.Map["X-OC-Mtime"] != nil { - metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value) + + metadata := map[string]string{} + var uploadLength int64 + if req.Opaque != nil && req.Opaque.Map != nil { + if req.Opaque.Map["Upload-Length"] != nil { + var err error + uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64) + if err != nil { + return &provider.InitiateFileUploadResponse{ + Status: status.NewInternal(ctx, err, "error parsing upload length"), + }, nil } } - uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata) - if err != nil { - var st *rpc.Status - switch err.(type) { - case errtypes.IsNotFound: - st = status.NewNotFound(ctx, "path not found when initiating upload") - case errtypes.PermissionDenied: - st = status.NewPermissionDenied(ctx, err, "permission denied") - default: - st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String()) - } - return &provider.InitiateFileUploadResponse{ - Status: st, - }, nil + if req.Opaque.Map["X-OC-Mtime"] != nil { + metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value) } - url.Path = path.Join("/", url.Path, uploadID) + } + uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata) + if err != nil { + var st *rpc.Status + switch err.(type) { + case errtypes.IsNotFound: + st = status.NewNotFound(ctx, "path not found when initiating upload") + case errtypes.PermissionDenied: + st = status.NewPermissionDenied(ctx, err, "permission denied") + default: + st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String()) + } + return &provider.InitiateFileUploadResponse{ + Status: st, + }, nil + } + + u := s.dataServerURL + u.Path = path.Join(u.Path, uploadID) + if err != nil { + return &provider.InitiateFileUploadResponse{ + Status: status.NewInternal(ctx, err, "error parsing data server URL"), + }, nil } - log.Info().Str("data-server", url.String()). + log.Info().Str("data-server", u.String()). Str("fn", req.Ref.GetPath()). Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)). Msg("file upload") res := &provider.InitiateFileUploadResponse{ - UploadEndpoint: url.String(), + UploadEndpoint: u.String(), Status: status.NewOK(ctx), AvailableChecksums: s.availableXS, Expose: s.conf.ExposeDataServer, diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 7b62ea64d9..d12cbfdc7e 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -41,7 +41,6 @@ type config struct { Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure"` - DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` TempDirectory string `mapstructure:"temp_directory"` } @@ -100,10 +99,7 @@ func (s *svc) Unprotected() []string { // Create a new DataStore instance which is responsible for // storing the uploaded file on disk in the specified directory. -// This path _must_ exist before tusd will store uploads in it. -// If you want to save them on a different medium, for example -// a remote FTP server, you can implement your own storage backend -// by implementing the tusd.DataStore interface. +// This path _must_ exist before we store uploads in it. func getFS(c *config) (storage.FS, error) { if f, ok := registry.NewFuncs[c.Driver]; ok { return f(c.Drivers[c.Driver]) @@ -119,14 +115,76 @@ func (s *svc) Handler() http.Handler { return s.handler } -// Composable is the interface that a struct needs to implement to be composable by this composer -type Composable interface { +func (s *svc) setHandler() error { + + tusHandler := s.getTusHandler() + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log := appctx.GetLogger(r.Context()) + log.Info().Msgf("dataprovider routing: path=%s", r.URL.Path) + + method := r.Method + // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override + if r.Header.Get("X-HTTP-Method-Override") != "" { + method = r.Header.Get("X-HTTP-Method-Override") + } + + switch method { + // old fashioned download. + // GET is not part of the tus.io protocol + // TODO allow range based get requests? that end before the current offset + case "GET": + s.doGet(w, r) + case "PUT": + s.doPut(w, r) + case "HEAD": + w.WriteHeader(http.StatusOK) + + // tus.io based uploads + // uploads are initiated using the CS3 APIs Initiate Upload call + case "POST": + if tusHandler != nil { + tusHandler.PostFile(w, r) + } else { + w.WriteHeader(http.StatusNotImplemented) + } + case "PATCH": + if tusHandler != nil { + tusHandler.PatchFile(w, r) + } else { + w.WriteHeader(http.StatusNotImplemented) + } + // TODO Only attach the DELETE handler if the Terminate() method is provided + case "DELETE": + if tusHandler != nil { + tusHandler.DelFile(w, r) + } else { + w.WriteHeader(http.StatusNotImplemented) + } + default: + w.WriteHeader(http.StatusNotImplemented) + return + } + }) + + if tusHandler != nil { + s.handler = tusHandler.Middleware(handler) + } else { + s.handler = handler + } + + return nil +} + +// Composable is the interface that a struct needs to implement +// to be composable, so that it can support the TUS methods +type composable interface { UseIn(composer *tusd.StoreComposer) } -func (s *svc) setHandler() (err error) { - composable, ok := s.storage.(Composable) - if ok && !s.conf.DisableTus { +func (s *svc) getTusHandler() *tusd.UnroutedHandler { + composable, ok := s.storage.(composable) + if ok { // A storage backend for tusd may consist of multiple different parts which // handle upload creation, locking, termination and so on. The composer is a // place where all those separated pieces are joined together. In this example @@ -144,71 +202,9 @@ func (s *svc) setHandler() (err error) { handler, err := tusd.NewUnroutedHandler(config) if err != nil { - return err + return nil } - - s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - log := appctx.GetLogger(r.Context()) - log.Info().Msgf("tusd routing: path=%s", r.URL.Path) - - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } - - switch method { - // old fashioned download. - - // GET is not part of the tus.io protocol - // currently there is no way to GET an upload that is in progress - // TODO allow range based get requests? that end before the current offset - case "GET": - s.doGet(w, r) - - // tus.io based upload - - // uploads are initiated using the CS3 APIs Initiate Download call - case "POST": - handler.PostFile(w, r) - case "HEAD": - handler.HeadFile(w, r) - case "PATCH": - handler.PatchFile(w, r) - // PUT provides a wrapper around the POST call, to save the caller from - // the trouble of configuring the tus client. - case "PUT": - s.doTusPut(w, r) - // TODO Only attach the DELETE handler if the Terminate() method is provided - case "DELETE": - handler.DelFile(w, r) - } - })) - } else { - s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } - - switch method { - case "HEAD": - w.WriteHeader(http.StatusOK) - return - case "GET": - s.doGet(w, r) - return - case "PUT": - s.doPut(w, r) - return - default: - w.WriteHeader(http.StatusNotImplemented) - return - } - }) + return handler } - - return err + return nil } diff --git a/internal/http/services/dataprovider/put.go b/internal/http/services/dataprovider/put.go index 13e957688e..cc70af5c5c 100644 --- a/internal/http/services/dataprovider/put.go +++ b/internal/http/services/dataprovider/put.go @@ -19,22 +19,11 @@ package dataprovider import ( - "fmt" - "io" - "io/ioutil" "net/http" - "os" - "path" - "strconv" "strings" - "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" - "github.com/cs3org/reva/pkg/rhttp" - "github.com/cs3org/reva/pkg/token" - "github.com/eventials/go-tus" - "github.com/eventials/go-tus/memorystore" ) func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { @@ -55,82 +44,3 @@ func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { r.Body.Close() w.WriteHeader(http.StatusOK) } - -func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - - fp := r.Header.Get("File-Path") - if fp == "" { - w.WriteHeader(http.StatusBadRequest) - return - } - - length, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64) - if err != nil { - // Fallback to Upload-Length - length, err = strconv.ParseInt(r.Header.Get("Upload-Length"), 10, 64) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - } - - fd, err := ioutil.TempFile(fmt.Sprintf("/%s", s.conf.TempDirectory), "") - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer os.RemoveAll(fd.Name()) - defer fd.Close() - if _, err := io.Copy(fd, r.Body); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI) - - // create the tus client. - c := tus.DefaultConfig() - c.Resume = true - c.HttpClient = rhttp.GetHTTPClient( - rhttp.Context(ctx), - rhttp.Timeout(time.Duration(s.conf.Timeout*int64(time.Second))), - rhttp.Insecure(s.conf.Insecure), - ) - c.Store, err = memorystore.NewMemoryStore() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - c.Header.Set(token.TokenHeader, token.ContextMustGetToken(ctx)) - - tusc, err := tus.NewClient(dataServerURL, c) - if err != nil { - log.Error().Err(err).Msg("error starting TUS client") - w.WriteHeader(http.StatusInternalServerError) - return - } - - metadata := map[string]string{ - "filename": path.Base(fp), - "dir": path.Dir(fp), - } - - upload := tus.NewUpload(fd, length, metadata, "") - defer r.Body.Close() - - // create the uploader. - c.Store.Set(upload.Fingerprint, dataServerURL) - uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) - - // start the uploading process. - err = uploader.Upload() - if err != nil { - log.Error().Err(err).Msg("Could not start TUS upload") - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) -} diff --git a/internal/http/services/owncloud/ocdav/copy.go b/internal/http/services/owncloud/ocdav/copy.go index 631d002120..33b7ec3f8d 100644 --- a/internal/http/services/owncloud/ocdav/copy.go +++ b/internal/http/services/owncloud/ocdav/copy.go @@ -21,9 +21,7 @@ package ocdav import ( "context" "fmt" - "io" "net/http" - "os" "path" "strings" @@ -34,9 +32,6 @@ import ( "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/rhttp" - tokenpkg "github.com/cs3org/reva/pkg/token" - "github.com/eventials/go-tus" - "github.com/eventials/go-tus/memorystore" ) func (s *svc) handleCopy(w http.ResponseWriter, r *http.Request, ns string) { @@ -135,7 +130,7 @@ func (s *svc) handleCopy(w http.ResponseWriter, r *http.Request, ns string) { return } - var successCode int + successCode := http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.8.5 if dstStatRes.Status.Code == rpc.Code_CODE_OK { successCode = http.StatusNoContent // 204 if target already existed, see https://tools.ietf.org/html/rfc4918#section-9.8.5 @@ -146,8 +141,6 @@ func (s *svc) handleCopy(w http.ResponseWriter, r *http.Request, ns string) { } } else { - successCode = http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.8.5 - // check if an intermediate path / the parent exists intermediateDir := path.Dir(dst) ref = &provider.Reference{ @@ -233,6 +226,7 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src // copy file // 1. get download url + dReq := &provider.InitiateFileDownloadRequest{ Ref: &provider.Reference{ Spec: &provider.Reference_Path{Path: src.Path}, @@ -282,84 +276,31 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src } httpDownloadReq.Header.Set(datagateway.TokenTransportHeader, dRes.Token) - httpDownloadClient := s.client - - httpDownloadRes, err := httpDownloadClient.Do(httpDownloadReq) + httpDownloadRes, err := s.client.Do(httpDownloadReq) if err != nil { return err } defer httpDownloadRes.Body.Close() - if httpDownloadRes.StatusCode != http.StatusOK { return fmt.Errorf("status code %d", httpDownloadRes.StatusCode) } - fileName, fd, err := s.createChunkTempFile() + // 4. do upload + + httpUploadReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, httpDownloadRes.Body) if err != nil { return err } - defer os.RemoveAll(fileName) - defer fd.Close() - if _, err := io.Copy(fd, httpDownloadRes.Body); err != nil { - return err - } + httpUploadReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) - // do upload - err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, fd, int64(src.GetSize())) + httpUploadRes, err := s.client.Do(httpUploadReq) if err != nil { return err } - } - return nil -} - -func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.ReadSeeker, length int64) error { - var err error - log := appctx.GetLogger(ctx) - - // create the tus client. - c := tus.DefaultConfig() - c.Resume = true - c.HttpClient = s.client - c.Store, err = memorystore.NewMemoryStore() - if err != nil { - return err - } - - log.Debug(). - Str("header", tokenpkg.TokenHeader). - Str("token", tokenpkg.ContextMustGetToken(ctx)). - Msg("adding token to header") - c.Header.Set(tokenpkg.TokenHeader, tokenpkg.ContextMustGetToken(ctx)) - c.Header.Set(datagateway.TokenTransportHeader, transferToken) - - tusc, err := tus.NewClient(dataServerURL, c) - if err != nil { - return nil - } - - // TODO: also copy properties: https://tools.ietf.org/html/rfc4918#section-9.8.2 - metadata := map[string]string{ - "filename": path.Base(fn), - "dir": path.Dir(fn), - //"checksum": fmt.Sprintf("%s %s", storageprovider.GRPC2PKGXS(xsType).String(), xs), - } - log.Debug(). - Str("length", fmt.Sprintf("%d", length)). - Str("filename", path.Base(fn)). - Str("dir", path.Dir(fn)). - Msg("tus.NewUpload") - - upload := tus.NewUpload(body, length, metadata, "") - - // create the uploader. - c.Store.Set(upload.Fingerprint, dataServerURL) - uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) - - // start the uploading process. - err = uploader.Upload() - if err != nil { - return err + defer httpUploadRes.Body.Close() + if httpUploadRes.StatusCode != http.StatusOK { + return err + } } return nil } diff --git a/internal/http/services/owncloud/ocdav/move.go b/internal/http/services/owncloud/ocdav/move.go index 4c98d8e625..cc94cdd285 100644 --- a/internal/http/services/owncloud/ocdav/move.go +++ b/internal/http/services/owncloud/ocdav/move.go @@ -114,7 +114,7 @@ func (s *svc) handleMove(w http.ResponseWriter, r *http.Request, ns string) { return } - var successCode int + successCode := http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.9.4 if dstStatRes.Status.Code == rpc.Code_CODE_OK { successCode = http.StatusNoContent // 204 if target already existed, see https://tools.ietf.org/html/rfc4918#section-9.9.4 @@ -145,8 +145,6 @@ func (s *svc) handleMove(w http.ResponseWriter, r *http.Request, ns string) { return } } else { - successCode = http.StatusCreated // 201 if new resource was created, see https://tools.ietf.org/html/rfc4918#section-9.9.4 - // check if an intermediate path / the parent exists intermediateDir := path.Dir(dst) ref2 := &provider.Reference{ diff --git a/internal/http/services/owncloud/ocdav/ocdav.go b/internal/http/services/owncloud/ocdav/ocdav.go index f34e37a340..76363ef154 100644 --- a/internal/http/services/owncloud/ocdav/ocdav.go +++ b/internal/http/services/owncloud/ocdav/ocdav.go @@ -71,7 +71,6 @@ type Config struct { GatewaySvc string `mapstructure:"gatewaysvc"` Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure"` - DisableTus bool `mapstructure:"disable_tus"` } func (c *Config) init() { diff --git a/internal/http/services/owncloud/ocdav/options.go b/internal/http/services/owncloud/ocdav/options.go index b3eb4e929a..3d95863aa0 100644 --- a/internal/http/services/owncloud/ocdav/options.go +++ b/internal/http/services/owncloud/ocdav/options.go @@ -34,7 +34,7 @@ func (s *svc) handleOptions(w http.ResponseWriter, r *http.Request, ns string) { w.Header().Set("Allow", allow) w.Header().Set("DAV", "1, 2") w.Header().Set("MS-Author-Via", "DAV") - if !s.c.DisableTus && !isPublic { + if !isPublic { w.Header().Add("Access-Control-Allow-Headers", "Tus-Resumable") w.Header().Add("Access-Control-Expose-Headers", "Tus-Resumable, Tus-Version, Tus-Extension") w.Header().Set("Tus-Resumable", "1.0.0") // TODO(jfd): only for dirs? diff --git a/internal/http/services/owncloud/ocdav/propfind.go b/internal/http/services/owncloud/ocdav/propfind.go index f570315087..0f3fb1cf5b 100644 --- a/internal/http/services/owncloud/ocdav/propfind.go +++ b/internal/http/services/owncloud/ocdav/propfind.go @@ -194,7 +194,7 @@ func (s *svc) handlePropfind(w http.ResponseWriter, r *http.Request, ns string) w.Header().Set("DAV", "1, 3, extended-mkcol") w.Header().Set("Content-Type", "application/xml; charset=utf-8") // let clients know this collection supports tus.io POST requests to start uploads - if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER && !s.c.DisableTus { + if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER { w.Header().Add("Access-Control-Expose-Headers", "Tus-Resumable, Tus-Version, Tus-Extension") w.Header().Set("Tus-Resumable", "1.0.0") w.Header().Set("Tus-Version", "1.0.0") @@ -344,8 +344,8 @@ func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provide ) } } - // Finder needs the the getLastModified property to work. - t := utils.TSToTime(md.Mtime).UTC() + // Finder needs the getLastModified property to work. + t := utils.TSToTime(md.Mtime) lastModifiedString := t.Format(time.RFC1123Z) response.Propstat[0].Prop = append(response.Propstat[0].Prop, s.newProp("d:getlastmodified", lastModifiedString)) @@ -500,7 +500,7 @@ func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provide } case "getlastmodified": // both // TODO we cannot find out if md.Mtime is set or not because ints in go default to 0 - t := utils.TSToTime(md.Mtime).UTC() + t := utils.TSToTime(md.Mtime) lastModifiedString := t.Format(time.RFC1123Z) propstatOK.Prop = append(propstatOK.Prop, s.newProp("d:getlastmodified", lastModifiedString)) default: diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 9ac9c40878..e40ee39fa8 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -21,7 +21,6 @@ package ocdav import ( "io" "net/http" - "os" "path" "regexp" "strconv" @@ -30,7 +29,9 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/rhttp" "github.com/cs3org/reva/pkg/utils" ) @@ -109,7 +110,6 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { log := appctx.GetLogger(ctx) ns = applyLayout(ctx, ns) - fn := path.Join(ns, r.URL.Path) if r.Body == nil { @@ -163,22 +163,11 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { return } } - fileName, fd, err := s.createChunkTempFile() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer os.RemoveAll(fileName) - defer fd.Close() - if _, err := io.Copy(fd, r.Body); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - s.handlePutHelper(w, r, fd, fn, length) + s.handlePutHelper(w, r, r.Body, fn, length) } -func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, fn string, length int64) { +func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io.Reader, fn string, length int64) { ctx := r.Context() log := appctx.GetLogger(ctx) @@ -199,7 +188,6 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io w.WriteHeader(http.StatusInternalServerError) return } - if sRes.Status.Code != rpc.Code_CODE_OK && sRes.Status.Code != rpc.Code_CODE_NOT_FOUND { switch sRes.Status.Code { case rpc.Code_CODE_PERMISSION_DENIED: @@ -213,13 +201,12 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } info := sRes.Info - if info != nil && info.Type != provider.ResourceType_RESOURCE_TYPE_FILE { - log.Warn().Msg("resource is not a file") - w.WriteHeader(http.StatusConflict) - return - } - if info != nil { + if info.Type != provider.ResourceType_RESOURCE_TYPE_FILE { + log.Warn().Msg("resource is not a file") + w.WriteHeader(http.StatusConflict) + return + } clientETag := r.Header.Get("If-Match") serverETag := info.Etag if clientETag != "" { @@ -238,8 +225,7 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io }, } - mtime := r.Header.Get("X-OC-Mtime") - if mtime != "" { + if mtime := r.Header.Get("X-OC-Mtime"); mtime != "" { opaqueMap["X-OC-Mtime"] = &typespb.OpaqueEntry{ Decoder: "plain", Value: []byte(mtime), @@ -250,10 +236,8 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } uReq := &provider.InitiateFileUploadRequest{ - Ref: ref, - Opaque: &typespb.Opaque{ - Map: opaqueMap, - }, + Ref: ref, + Opaque: &typespb.Opaque{Map: opaqueMap}, } // where to upload the file? @@ -279,8 +263,22 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - if err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, fn, content, length); err != nil { - log.Error().Err(err).Msg("TUS upload failed") + httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + httpReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) + + httpRes, err := s.client.Do(httpReq) + if err != nil { + log.Err(err).Msg("error doing PUT request to data service") + w.WriteHeader(http.StatusInternalServerError) + return + } + defer httpRes.Body.Close() + if httpRes.StatusCode != http.StatusOK { + log.Err(err).Msg("PUT request to data server failed") w.WriteHeader(http.StatusInternalServerError) return } @@ -308,13 +306,13 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - info2 := sRes.Info + newInfo := sRes.Info - w.Header().Add("Content-Type", info2.MimeType) - w.Header().Set("ETag", info2.Etag) - w.Header().Set("OC-FileId", wrapResourceID(info2.Id)) - w.Header().Set("OC-ETag", info2.Etag) - t := utils.TSToTime(info2.Mtime) + w.Header().Add("Content-Type", newInfo.MimeType) + w.Header().Set("ETag", newInfo.Etag) + w.Header().Set("OC-FileId", wrapResourceID(newInfo.Id)) + w.Header().Set("OC-ETag", newInfo.Etag) + t := utils.TSToTime(newInfo.Mtime) lastModifiedString := t.Format(time.RFC1123Z) w.Header().Set("Last-Modified", lastModifiedString) diff --git a/internal/http/services/owncloud/ocdav/putchunked.go b/internal/http/services/owncloud/ocdav/putchunked.go index 2708643d5f..f8c3812205 100644 --- a/internal/http/services/owncloud/ocdav/putchunked.go +++ b/internal/http/services/owncloud/ocdav/putchunked.go @@ -94,7 +94,6 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool chunkInfo, err := getChunkBLOBInfo(path) if err != nil { err := fmt.Errorf("error getting chunk info from path: %s", path) - //c.logger.Error().Log("error", err) return false, "", err } @@ -103,38 +102,31 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool chunkTempFilename, chunkTempFile, err := s.createChunkTempFile() if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer chunkTempFile.Close() if _, err := io.Copy(chunkTempFile, r); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } // force close of the file here because if it is the last chunk to // assemble the big file we must have all the chunks already closed. if err = chunkTempFile.Close(); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } chunksFolderName, err := s.getChunkFolderName(chunkInfo) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } //c.logger.Info().Log("chunkfolder", chunksFolderName) chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.currentChunk) if err = os.Rename(chunkTempFilename, chunkTarget); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Info().Log("chunktarget", chunkTarget) - // Check that all chunks are uploaded. // This is very inefficient, the server has to check that it has all the // chunks after each uploaded chunk. @@ -142,7 +134,6 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // assembly the chunks when the client asks for it. chunksFolder, err := os.Open(chunksFolderName) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer chunksFolder.Close() @@ -150,10 +141,8 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // read all the chunks inside the chunk folder; -1 == all chunks, err := chunksFolder.Readdir(-1) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Info().Log("msg", "chunkfolder readed", "nchunks", len(chunks)) // there is still some chunks to be uploaded. // we return CodeUploadIsPartial to notify upper layers that the upload is still @@ -166,34 +155,27 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool assembledFileName, assembledFile, err := s.createChunkTempFile() if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer assembledFile.Close() - //c.logger.Info().Log("assembledfile", assembledFileName) - // walk all chunks and append to assembled file for i := range chunks { target := chunksFolderName + "/" + fmt.Sprintf("%d", i) chunk, err := os.Open(target) if err != nil { - //c.logger.Error().Log("error", err) return false, "", err } defer chunk.Close() if _, err = io.Copy(assembledFile, chunk); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } - //c.logger.Debug().Log("msg", "chunk appended to assembledfile") // we close the chunk here because if the assembled file contains hundreds of chunks // we will end up with hundreds of open file descriptors if err = chunk.Close(); err != nil { - //c.logger.Error().Log("error", err) return false, "", err } @@ -207,13 +189,6 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool } }() - // when writing to the assembled file the write pointer points to the end of the file - // so we need to seek it to the beginning - if _, err = assembledFile.Seek(0, 0); err != nil { - //c.logger.Error().Log("error", err) - return false, "", err - } - return true, assembledFileName, nil } func (s *svc) handlePutChunked(w http.ResponseWriter, r *http.Request, ns string) { diff --git a/internal/http/services/owncloud/ocdav/tus.go b/internal/http/services/owncloud/ocdav/tus.go index d9319cc051..0dd2490f2f 100644 --- a/internal/http/services/owncloud/ocdav/tus.go +++ b/internal/http/services/owncloud/ocdav/tus.go @@ -193,7 +193,6 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { var httpRes *http.Response if length != 0 { - httpClient := s.client httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body) if err != nil { log.Err(err).Msg("wrong request") @@ -210,7 +209,7 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { } httpReq.Header.Set("Tus-Resumable", r.Header.Get("Tus-Resumable")) - httpRes, err = httpClient.Do(httpReq) + httpRes, err = s.client.Do(httpReq) if err != nil { log.Err(err).Msg("error doing GET request to data service") w.WriteHeader(http.StatusInternalServerError) diff --git a/internal/http/services/owncloud/ocdav/webdav.go b/internal/http/services/owncloud/ocdav/webdav.go index 22d05d784d..5c7e5be7c1 100644 --- a/internal/http/services/owncloud/ocdav/webdav.go +++ b/internal/http/services/owncloud/ocdav/webdav.go @@ -58,11 +58,7 @@ func (h *WebDavHandler) Handler(s *svc) http.Handler { case http.MethodPut: s.handlePut(w, r, h.namespace) case http.MethodPost: - if !s.c.DisableTus { - s.handleTusPost(w, r, h.namespace) - } else { - w.WriteHeader(http.StatusNotFound) - } + s.handleTusPost(w, r, h.namespace) case http.MethodOptions: s.handleOptions(w, r, h.namespace) case http.MethodHead: diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index 39a6da9501..8c97cac101 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -20,24 +20,13 @@ package eosfs import ( "context" - "encoding/json" "io" - "io/ioutil" - "os" - "path/filepath" - "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" - "github.com/google/uuid" "github.com/pkg/errors" - tusd "github.com/tus/tusd/pkg/handler" ) -var defaultFilePerm = os.FileMode(0664) - -// TODO deprecated ... use tus func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { u, err := getUser(ctx) if err != nil { @@ -62,296 +51,6 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return fs.c.Write(ctx, uid, gid, fn, r) } -// InitiateUpload returns an upload id that can be used for uploads with tus -// TODO read optional content for small files in this request -func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { - u, err := getUser(ctx) - if err != nil { - return "", errors.Wrap(err, "eos: no user in ctx") - } - - np, err := fs.resolve(ctx, u, ref) - if err != nil { - return "", errors.Wrap(err, "eos: error resolving reference") - } - - p := fs.wrap(ctx, np) - - info := tusd.FileInfo{ - MetaData: tusd.MetaData{ - "filename": filepath.Base(p), - "dir": filepath.Dir(p), - }, - Size: uploadLength, - } - - if metadata != nil && metadata["mtime"] != "" { - info.MetaData["mtime"] = metadata["mtime"] - } - - upload, err := fs.NewUpload(ctx, info) - if err != nil { - return "", err - } - - info, _ = upload.GetInfo(ctx) - - return info.ID, nil -} - -// UseIn tells the tus upload middleware which extensions it supports. -func (fs *eosfs) UseIn(composer *tusd.StoreComposer) { - composer.UseCore(fs) - composer.UseTerminater(fs) -} - -// NewUpload creates a new upload using the size as the file's length. To determine where to write the binary data -// the Fileinfo metadata must contain a dir and a filename. -// returns a unique id which is used to identify the upload. The properties Size and MetaData will be filled. -func (fs *eosfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("eos: NewUpload") - - fn := info.MetaData["filename"] - if fn == "" { - return nil, errors.New("eos: missing filename in metadata") - } - info.MetaData["filename"] = filepath.Clean(info.MetaData["filename"]) - - dir := info.MetaData["dir"] - if dir == "" { - return nil, errors.New("eos: missing dir in metadata") - } - info.MetaData["dir"] = filepath.Clean(info.MetaData["dir"]) - - log.Debug().Interface("info", info).Msg("eos: resolved filename") - - info.ID = uuid.New().String() - - binPath, err := fs.getUploadPath(ctx, info.ID) - if err != nil { - return nil, errors.Wrap(err, "eos: error resolving upload path") - } - user, err := getUser(ctx) - if err != nil { - return nil, errors.Wrap(err, "eos: no user in ctx") - } - uid, gid, err := fs.getUserUIDAndGID(ctx, user) - if err != nil { - return nil, err - } - - info.Storage = map[string]string{ - "Type": "EOSStore", - "Username": user.Username, - "UID": uid, - "GID": gid, - } - // Create binary file with no content - - file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) - if err != nil { - return nil, err - } - defer file.Close() - - u := &fileUpload{ - info: info, - binPath: binPath, - infoPath: binPath + ".info", - fs: fs, - } - - if !info.SizeIsDeferred && info.Size == 0 { - log.Debug().Interface("info", info).Msg("eos: finishing upload for empty file") - // no need to create info file and finish directly - err := u.FinishUpload(ctx) - if err != nil { - return nil, err - } - return u, nil - } - - // writeInfo creates the file by itself if necessary - err = u.writeInfo() - if err != nil { - return nil, err - } - - return u, nil -} - -// TODO use a subdirectory in the shadow tree -func (fs *eosfs) getUploadPath(ctx context.Context, uploadID string) (string, error) { - return filepath.Join(fs.conf.CacheDirectory, uploadID), nil -} - -// GetUpload returns the Upload for the given upload id -func (fs *eosfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - binPath, err := fs.getUploadPath(ctx, id) - if err != nil { - return nil, err - } - infoPath := binPath + ".info" - info := tusd.FileInfo{} - data, err := ioutil.ReadFile(infoPath) - if err != nil { - return nil, err - } - if err := json.Unmarshal(data, &info); err != nil { - return nil, err - } - - stat, err := os.Stat(binPath) - if err != nil { - return nil, err - } - - info.Offset = stat.Size() - - return &fileUpload{ - info: info, - binPath: binPath, - infoPath: infoPath, - fs: fs, - }, nil -} - -type fileUpload struct { - // info stores the current information about the upload - info tusd.FileInfo - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // only fs knows how to handle metadata and versions - fs *eosfs -} - -// GetInfo returns the FileInfo -func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { - return upload.info, nil -} - -// GetReader returns an io.Reader for the upload -func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { - f, err := os.Open(upload.binPath) - if err != nil { - return nil, err - } - defer f.Close() - return f, nil -} - -// WriteChunk writes the stream from the reader to the given offset of the upload -// TODO use the grpc api to directly stream to a temporary uploads location in the eos shadow tree -func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) - if err != nil { - return 0, err - } - defer file.Close() - - n, err := io.Copy(file, src) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for OwnCloudStore it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil { - if err != io.ErrUnexpectedEOF { - return n, err - } - } - - upload.info.Offset += n - err = upload.writeInfo() - - return n, err -} - -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *fileUpload) writeInfo() error { - data, err := json.Marshal(upload.info) - if err != nil { - return err - } - return ioutil.WriteFile(upload.infoPath, data, defaultFilePerm) -} - -// FinishUpload finishes an upload and moves the file to the internal destination -func (upload *fileUpload) FinishUpload(ctx context.Context) error { - - checksum := upload.info.MetaData["checksum"] - if checksum != "" { - // check checksum - s := strings.SplitN(checksum, " ", 2) - if len(s) == 2 { - alg, hash := s[0], s[1] - - log := appctx.GetLogger(ctx) - log.Debug(). - Interface("info", upload.info). - Str("alg", alg). - Str("hash", hash). - Msg("eos: TODO check checksum") // TODO this is done by eos if we write chunks to it directly - - } - } - np := filepath.Join(upload.info.MetaData["dir"], upload.info.MetaData["filename"]) - - // TODO check etag with If-Match header - // if destination exists - //if _, err := os.Stat(np); err == nil { - // copy attributes of existing file to tmp file before overwriting the target? - // eos creates revisions internally - //} - - err := upload.fs.c.WriteFile(ctx, upload.info.Storage["UID"], upload.info.Storage["GID"], np, upload.binPath) - - // only delete the upload if it was successfully written to eos - if err == nil { - if err := os.Remove(upload.infoPath); err != nil { - if !os.IsNotExist(err) { - log := appctx.GetLogger(ctx) - log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info") - } - } - if err := os.Remove(upload.binPath); err != nil { - if !os.IsNotExist(err) { - log := appctx.GetLogger(ctx) - log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary") - } - } - } - - // TODO: set mtime if specified in metadata - - // metadata propagation is left to the storage implementation - return err -} - -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// - the storage needs to implement AsTerminatableUpload -// - the upload needs to implement Terminate - -// AsTerminatableUpload returns a TerminatableUpload -func (fs *eosfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { - return upload.(*fileUpload) -} - -// Terminate terminates the upload -func (upload *fileUpload) Terminate(ctx context.Context) error { - if err := os.Remove(upload.infoPath); err != nil { - if !os.IsNotExist(err) { - return err - } - } - if err := os.Remove(upload.binPath); err != nil { - if !os.IsNotExist(err) { - return err - } - } - return nil +func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (string, error) { + return ref.GetPath(), nil } diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 5f643499d5..1a37788b7e 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -47,6 +47,7 @@ import ( type Config struct { Root string `mapstructure:"root"` DisableHome bool `mapstructure:"disable_home"` + DisableTus bool `mapstructure:"disable_tus"` UserLayout string `mapstructure:"user_layout"` ShareFolder string `mapstructure:"share_folder"` Uploads string `mapstructure:"uploads"` diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index 301fd51e57..826726bc3e 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -80,6 +80,10 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea // TODO to implement LengthDeferrerDataStore make size optional // TODO read optional content for small files in this request func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { + if fs.conf.DisableTus { + return ref.GetPath(), nil + } + np, err := fs.resolve(ctx, ref) if err != nil { return "", errors.Wrap(err, "localfs: error resolving reference") From 41aab4efcf528e8d2810770a20056383e86f2046 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 28 Oct 2020 15:47:53 +0100 Subject: [PATCH 02/16] Handle non-tus uploads from upload ID in owncloudfs and ocisfs --- pkg/storage/fs/ocis/upload.go | 20 +++++++++++++++++++- pkg/storage/fs/owncloud/upload.go | 14 +++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index b6cf86373c..178d7afb2c 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -46,7 +46,7 @@ var defaultFilePerm = os.FileMode(0664) func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) { var n *Node - if n, err = fs.lu.NodeFromResource(ctx, ref); err != nil { + if n, err = fs.resolveUploadIDToNode(ctx, ref); err != nil { return } @@ -148,6 +148,24 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read return fs.tp.Propagate(ctx, n) } +func (fs *ocisfs) resolveUploadIDToNode(ctx context.Context, ref *provider.Reference) (*Node, error) { + upload, err := fs.GetUpload(ctx, ref.GetPath()) + if err != nil { + return nil, err + } + uploadInfo := upload.(*fileUpload) + if uploadInfo.info.MetaData == nil { + return nil, errors.New("storage for the upload ID is nil") + } + + p := filepath.Join(uploadInfo.info.MetaData["dir"], uploadInfo.info.MetaData["filename"]) + var n *Node + if n, err = fs.lu.NodeFromResource(ctx, &provider.Reference{Spec: &provider.Reference_Path{Path: p}}); err != nil { + return nil, err + } + return n, nil +} + // InitiateUpload returns an upload id that can be used for uploads with tus // TODO read optional content for small files in this request func (fs *ocisfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 99c8c2958d..878d74f2e9 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -42,7 +42,7 @@ var defaultFilePerm = os.FileMode(0664) // TODO deprecated ... use tus func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { - ip, err := fs.resolve(ctx, ref) + ip, err := fs.resolveUploadIDToPath(ctx, ref) if err != nil { return errors.Wrap(err, "ocfs: error resolving reference") } @@ -101,6 +101,18 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl return nil } +func (fs *ocfs) resolveUploadIDToPath(ctx context.Context, ref *provider.Reference) (string, error) { + upload, err := fs.GetUpload(ctx, ref.GetPath()) + if err != nil { + return "", err + } + uploadInfo := upload.(*fileUpload) + if uploadInfo.info.Storage == nil { + return "", errors.New("storage for the upload ID is nil") + } + return uploadInfo.info.Storage["InternalDestination"], nil +} + // InitiateUpload returns an upload id that can be used for uploads with tus // TODO read optional content for small files in this request func (fs *ocfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { From 8457f81a6507b547d392ef484e83ddd6e4addce6 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 28 Oct 2020 18:11:15 +0100 Subject: [PATCH 03/16] Don't add tus handler as middleware --- internal/http/services/dataprovider/dataprovider.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index d12cbfdc7e..889816ad65 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -119,7 +119,7 @@ func (s *svc) setHandler() error { tusHandler := s.getTusHandler() - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { log := appctx.GetLogger(r.Context()) log.Info().Msgf("dataprovider routing: path=%s", r.URL.Path) @@ -167,12 +167,6 @@ func (s *svc) setHandler() error { } }) - if tusHandler != nil { - s.handler = tusHandler.Middleware(handler) - } else { - s.handler = handler - } - return nil } From ca5ccc4ac5b1c866bff77965991ded5546e0898a Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 28 Oct 2020 18:57:20 +0100 Subject: [PATCH 04/16] Copy value of data server URL --- .../grpc/services/storageprovider/storageprovider.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 08689f00ab..957491920d 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -266,17 +266,17 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia // For example, https://data-server.example.org/home/docs/myfile.txt // or ownclouds://data-server.example.org/home/docs/myfile.txt log := appctx.GetLogger(ctx) - url := *s.dataServerURL + u := *s.dataServerURL newRef, err := s.unwrap(ctx, req.Ref) if err != nil { return &provider.InitiateFileDownloadResponse{ Status: status.NewInternal(ctx, err, "error unwrapping path"), }, nil } - url.Path = path.Join("/", url.Path, newRef.GetPath()) - log.Info().Str("data-server", url.String()).Str("fn", req.Ref.GetPath()).Msg("file download") + u.Path = path.Join(u.Path, newRef.GetPath()) + log.Info().Str("data-server", u.String()).Str("fn", req.Ref.GetPath()).Msg("file download") res := &provider.InitiateFileDownloadResponse{ - DownloadEndpoint: url.String(), + DownloadEndpoint: u.String(), Status: status.NewOK(ctx), Expose: s.conf.ExposeDataServer, } @@ -330,7 +330,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate }, nil } - u := s.dataServerURL + u := *s.dataServerURL u.Path = path.Join(u.Path, uploadID) if err != nil { return &provider.InitiateFileUploadResponse{ From 5adbef1750a965f0165181a7a145d8768378b082 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 28 Oct 2020 19:14:50 +0100 Subject: [PATCH 05/16] Handle non-tus uploads in localfs --- pkg/storage/utils/localfs/localfs.go | 1 - pkg/storage/utils/localfs/upload.go | 18 +++++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 1a37788b7e..5f643499d5 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -47,7 +47,6 @@ import ( type Config struct { Root string `mapstructure:"root"` DisableHome bool `mapstructure:"disable_home"` - DisableTus bool `mapstructure:"disable_tus"` UserLayout string `mapstructure:"user_layout"` ShareFolder string `mapstructure:"share_folder"` Uploads string `mapstructure:"uploads"` diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index 826726bc3e..b979ec920f 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -39,11 +39,10 @@ var defaultFilePerm = os.FileMode(0664) // TODO deprecated ... use tus func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { - fn, err := fs.resolve(ctx, ref) + fn, err := fs.resolveUploadIDToPath(ctx, ref) if err != nil { return errors.Wrap(err, "error resolving ref") } - fn = fs.wrap(ctx, fn) // we cannot rely on /tmp as it can live in another partition and we can // hit invalid cross-device link errors, so we create the tmp file in the same directory @@ -74,15 +73,24 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea return nil } +func (fs *localfs) resolveUploadIDToPath(ctx context.Context, ref *provider.Reference) (string, error) { + upload, err := fs.GetUpload(ctx, ref.GetPath()) + if err != nil { + return "", err + } + uploadInfo := upload.(*fileUpload) + if uploadInfo.info.Storage == nil { + return "", errors.New("storage for the upload ID is nil") + } + return uploadInfo.info.Storage["InternalDestination"], nil +} + // InitiateUpload returns an upload id that can be used for uploads with tus // It resolves the resource and then reuses the NewUpload function // Currently requires the uploadLength to be set // TODO to implement LengthDeferrerDataStore make size optional // TODO read optional content for small files in this request func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { - if fs.conf.DisableTus { - return ref.GetPath(), nil - } np, err := fs.resolve(ctx, ref) if err != nil { From 261f21db1f4cb06878f4d9ec435a8932d5ed7473 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 28 Oct 2020 19:27:46 +0100 Subject: [PATCH 06/16] Fix bug in handlePutHelper --- internal/http/services/owncloud/ocdav/put.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index e40ee39fa8..3691ac2040 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -263,7 +263,7 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, r.Body) + httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, content) if err != nil { w.WriteHeader(http.StatusInternalServerError) return From ac8ec902b7cb8f18b02074911963c5a3c862ad48 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Thu, 29 Oct 2020 11:24:14 +0100 Subject: [PATCH 07/16] Revert not explicitly converting timestamp to UTC --- internal/http/services/owncloud/ocdav/propfind.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/http/services/owncloud/ocdav/propfind.go b/internal/http/services/owncloud/ocdav/propfind.go index 0f3fb1cf5b..e6fbdd591f 100644 --- a/internal/http/services/owncloud/ocdav/propfind.go +++ b/internal/http/services/owncloud/ocdav/propfind.go @@ -345,7 +345,7 @@ func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provide } } // Finder needs the getLastModified property to work. - t := utils.TSToTime(md.Mtime) + t := utils.TSToTime(md.Mtime).UTC() lastModifiedString := t.Format(time.RFC1123Z) response.Propstat[0].Prop = append(response.Propstat[0].Prop, s.newProp("d:getlastmodified", lastModifiedString)) @@ -500,7 +500,7 @@ func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provide } case "getlastmodified": // both // TODO we cannot find out if md.Mtime is set or not because ints in go default to 0 - t := utils.TSToTime(md.Mtime) + t := utils.TSToTime(md.Mtime).UTC() lastModifiedString := t.Format(time.RFC1123Z) propstatOK.Prop = append(propstatOK.Prop, s.newProp("d:getlastmodified", lastModifiedString)) default: From e403c88606e5cb0fe76c416d51256eaca4dfe3f3 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Thu, 29 Oct 2020 11:50:01 +0100 Subject: [PATCH 08/16] Set mtime in fs if present in metadata --- pkg/storage/fs/ocis/upload.go | 32 ++++++++++++------------------- pkg/storage/fs/owncloud/upload.go | 28 +++++++++++++-------------- 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index 178d7afb2c..7a45835054 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -45,9 +45,19 @@ var defaultFilePerm = os.FileMode(0664) func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) { + upload, err := fs.GetUpload(ctx, ref.GetPath()) + if err != nil { + return errors.Wrap(err, "ocisfs: error retrieving upload") + } + uploadInfo := upload.(*fileUpload) + if uploadInfo.info.MetaData == nil { + return errors.New("ocisfs: storage for the upload ID is nil") + } + p := filepath.Join(uploadInfo.info.MetaData["dir"], uploadInfo.info.MetaData["filename"]) + var n *Node - if n, err = fs.resolveUploadIDToNode(ctx, ref); err != nil { - return + if n, err = fs.lu.NodeFromResource(ctx, &provider.Reference{Spec: &provider.Reference_Path{Path: p}}); err != nil { + return err } // check permissions @@ -148,24 +158,6 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read return fs.tp.Propagate(ctx, n) } -func (fs *ocisfs) resolveUploadIDToNode(ctx context.Context, ref *provider.Reference) (*Node, error) { - upload, err := fs.GetUpload(ctx, ref.GetPath()) - if err != nil { - return nil, err - } - uploadInfo := upload.(*fileUpload) - if uploadInfo.info.MetaData == nil { - return nil, errors.New("storage for the upload ID is nil") - } - - p := filepath.Join(uploadInfo.info.MetaData["dir"], uploadInfo.info.MetaData["filename"]) - var n *Node - if n, err = fs.lu.NodeFromResource(ctx, &provider.Reference{Spec: &provider.Reference_Path{Path: p}}); err != nil { - return nil, err - } - return n, nil -} - // InitiateUpload returns an upload id that can be used for uploads with tus // TODO read optional content for small files in this request func (fs *ocisfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 878d74f2e9..07e95af577 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -42,10 +42,15 @@ var defaultFilePerm = os.FileMode(0664) // TODO deprecated ... use tus func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { - ip, err := fs.resolveUploadIDToPath(ctx, ref) + upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "ocfs: error resolving reference") + return errors.Wrap(err, "ocfs: error retrieving upload") } + uploadInfo := upload.(*fileUpload) + if uploadInfo.info.Storage == nil { + return errors.New("ocfs: storage for the upload ID is nil") + } + ip := uploadInfo.info.Storage["InternalDestination"] var perm *provider.ResourcePermissions var perr error @@ -98,19 +103,14 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl return errors.Wrap(err, "ocfs: error renaming from "+tmp.Name()+" to "+ip) } - return nil -} - -func (fs *ocfs) resolveUploadIDToPath(ctx context.Context, ref *provider.Reference) (string, error) { - upload, err := fs.GetUpload(ctx, ref.GetPath()) - if err != nil { - return "", err - } - uploadInfo := upload.(*fileUpload) - if uploadInfo.info.Storage == nil { - return "", errors.New("storage for the upload ID is nil") + if uploadInfo.info.MetaData["mtime"] != "" { + err := uploadInfo.fs.setMtime(ctx, ip, uploadInfo.info.MetaData["mtime"]) + if err != nil { + return errors.Wrap(err, "ocfs: could not set mtime metadata") + } } - return uploadInfo.info.Storage["InternalDestination"], nil + + return nil } // InitiateUpload returns an upload id that can be used for uploads with tus From a40d3c74dff8de2313113d7f3f2cfca641b674cd Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 12:33:16 +0100 Subject: [PATCH 09/16] Handle zero-size uploads and refactor fs uploads --- internal/http/services/owncloud/ocdav/copy.go | 26 +++-- internal/http/services/owncloud/ocdav/put.go | 36 +++--- pkg/storage/fs/ocis/upload.go | 109 +----------------- pkg/storage/fs/owncloud/upload.go | 66 +---------- 4 files changed, 41 insertions(+), 196 deletions(-) diff --git a/internal/http/services/owncloud/ocdav/copy.go b/internal/http/services/owncloud/ocdav/copy.go index 33b7ec3f8d..54606421d3 100644 --- a/internal/http/services/owncloud/ocdav/copy.go +++ b/internal/http/services/owncloud/ocdav/copy.go @@ -287,19 +287,21 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src // 4. do upload - httpUploadReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, httpDownloadRes.Body) - if err != nil { - return err - } - httpUploadReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) + if src.GetSize() > 0 { + httpUploadReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, httpDownloadRes.Body) + if err != nil { + return err + } + httpUploadReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) - httpUploadRes, err := s.client.Do(httpUploadReq) - if err != nil { - return err - } - defer httpUploadRes.Body.Close() - if httpUploadRes.StatusCode != http.StatusOK { - return err + httpUploadRes, err := s.client.Do(httpUploadReq) + if err != nil { + return err + } + defer httpUploadRes.Body.Close() + if httpUploadRes.StatusCode != http.StatusOK { + return err + } } } return nil diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 3691ac2040..5d2afb788a 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -263,24 +263,26 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, content) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - httpReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) + if length > 0 { + httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, content) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + httpReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) - httpRes, err := s.client.Do(httpReq) - if err != nil { - log.Err(err).Msg("error doing PUT request to data service") - w.WriteHeader(http.StatusInternalServerError) - return - } - defer httpRes.Body.Close() - if httpRes.StatusCode != http.StatusOK { - log.Err(err).Msg("PUT request to data server failed") - w.WriteHeader(http.StatusInternalServerError) - return + httpRes, err := s.client.Do(httpReq) + if err != nil { + log.Err(err).Msg("error doing PUT request to data service") + w.WriteHeader(http.StatusInternalServerError) + return + } + defer httpRes.Body.Close() + if httpRes.StatusCode != http.StatusOK { + log.Err(err).Msg("PUT request to data server failed") + w.WriteHeader(http.StatusInternalServerError) + return + } } // stat again to check the new file's metadata diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index 7a45835054..b58e221b92 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -44,118 +44,17 @@ var defaultFilePerm = os.FileMode(0664) // TODO deprecated ... use tus func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) { - upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { return errors.Wrap(err, "ocisfs: error retrieving upload") } - uploadInfo := upload.(*fileUpload) - if uploadInfo.info.MetaData == nil { - return errors.New("ocisfs: storage for the upload ID is nil") - } - p := filepath.Join(uploadInfo.info.MetaData["dir"], uploadInfo.info.MetaData["filename"]) - - var n *Node - if n, err = fs.lu.NodeFromResource(ctx, &provider.Reference{Spec: &provider.Reference_Path{Path: p}}); err != nil { - return err - } - - // check permissions - var ok bool - if n.Exists { - // check permissions of file to be overwritten - ok, err = fs.p.HasPermission(ctx, n, func(rp *provider.ResourcePermissions) bool { - return rp.InitiateFileUpload - }) - } else { - // check permissions of parent - p, perr := n.Parent() - if perr != nil { - return errors.Wrap(perr, "ocisfs: error getting parent "+n.ParentID) - } - - ok, err = fs.p.HasPermission(ctx, p, func(rp *provider.ResourcePermissions) bool { - return rp.InitiateFileUpload - }) - } - switch { - case err != nil: - return errtypes.InternalError(err.Error()) - case !ok: - return errtypes.PermissionDenied(filepath.Join(n.ParentID, n.Name)) - } - - if n.ID == "" { - n.ID = uuid.New().String() - } - - nodePath := fs.lu.toInternalPath(n.ID) - - var tmp *os.File - tmp, err = ioutil.TempFile(filepath.Dir(nodePath), "._reva_atomic_upload") - if err != nil { - return errors.Wrap(err, "ocisfs: error creating tmp fn at "+nodePath) - } - - _, err = io.Copy(tmp, r) - r.Close() - tmp.Close() - if err != nil { - return errors.Wrap(err, "ocisfs: error writing to tmp file "+tmp.Name()) - } - // TODO move old content to version - //_ = os.RemoveAll(path.Join(nodePath, "content")) - appctx.GetLogger(ctx).Warn().Msg("TODO move old content to version") - - appctx.GetLogger(ctx).Debug().Str("tmp", tmp.Name()).Str("ipath", nodePath).Msg("moving tmp to content") - if err = os.Rename(tmp.Name(), nodePath); err != nil { - return - } - - // who will become the owner? - u, ok := user.ContextGetUser(ctx) - switch { - case ok: - err = n.writeMetadata(u.Id) - case fs.o.EnableHome: - log := appctx.GetLogger(ctx) - log.Error().Msg("home support enabled but no user in context") - err = errors.Wrap(errtypes.UserRequired("userrequired"), "error getting user from ctx") - case fs.o.Owner != "": - err = n.writeMetadata(&userpb.UserId{ - OpaqueId: fs.o.Owner, - }) - default: - // fallback to parent owner? - err = n.writeMetadata(nil) - } - if err != nil { - return - } - - // link child name to parent if it is new - childNameLink := filepath.Join(fs.lu.toInternalPath(n.ParentID), n.Name) - var link string - link, err = os.Readlink(childNameLink) - if err == nil && link != "../"+n.ID { - log.Err(err). - Interface("node", n). - Str("childNameLink", childNameLink). - Str("link", link). - Msg("ocisfs: child name link has wrong target id, repairing") - - if err = os.Remove(childNameLink); err != nil { - return errors.Wrap(err, "ocisfs: could not remove symlink child entry") - } - } - if os.IsNotExist(err) || link != "../"+n.ID { - if err = os.Symlink("../"+n.ID, childNameLink); err != nil { - return errors.Wrap(err, "ocisfs: could not symlink child entry") - } + uploadInfo := upload.(*fileUpload) + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "ocisfs: error writing to binary file") } - return fs.tp.Propagate(ctx, n) + return uploadInfo.FinishUpload(ctx) } // InitiateUpload returns an upload id that can be used for uploads with tus diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 07e95af577..6c7b059e03 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -46,71 +46,13 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl if err != nil { return errors.Wrap(err, "ocfs: error retrieving upload") } - uploadInfo := upload.(*fileUpload) - if uploadInfo.info.Storage == nil { - return errors.New("ocfs: storage for the upload ID is nil") - } - ip := uploadInfo.info.Storage["InternalDestination"] - - var perm *provider.ResourcePermissions - var perr error - // if destination exists - if _, err := os.Stat(ip); err == nil { - // check permissions of file to be overwritten - perm, perr = fs.readPermissions(ctx, ip) - } else { - // check permissions - perm, perr = fs.readPermissions(ctx, filepath.Dir(ip)) - } - if perr == nil { - if !perm.InitiateFileUpload { - return errtypes.PermissionDenied("") - } - } else { - if isNotFound(perr) { - return errtypes.NotFound(fs.toStoragePath(ctx, filepath.Dir(ip))) - } - return errors.Wrap(perr, "ocfs: error reading permissions") - } - // we cannot rely on /tmp as it can live in another partition and we can - // hit invalid cross-device link errors, so we create the tmp file in the same directory - // the file is supposed to be written. - tmp, err := ioutil.TempFile(filepath.Dir(ip), "._reva_atomic_upload") - if err != nil { - return errors.Wrap(err, "ocfs: error creating tmp file at "+filepath.Dir(ip)) - } - - _, err = io.Copy(tmp, r) - if err != nil { - return errors.Wrap(err, "ocfs: error writing to tmp file "+tmp.Name()) - } - - // if destination exists - if _, err := os.Stat(ip); err == nil { - // copy attributes of existing file to tmp file - if err := fs.copyMD(ip, tmp.Name()); err != nil { - return errors.Wrap(err, "ocfs: error copying metadata from "+ip+" to "+tmp.Name()) - } - // create revision - if err := fs.archiveRevision(ctx, fs.getVersionsPath(ctx, ip), ip); err != nil { - return err - } - } - - // TODO(jfd): make sure rename is atomic, missing fsync ... - if err := os.Rename(tmp.Name(), ip); err != nil { - return errors.Wrap(err, "ocfs: error renaming from "+tmp.Name()+" to "+ip) - } - - if uploadInfo.info.MetaData["mtime"] != "" { - err := uploadInfo.fs.setMtime(ctx, ip, uploadInfo.info.MetaData["mtime"]) - if err != nil { - return errors.Wrap(err, "ocfs: could not set mtime metadata") - } + uploadInfo := upload.(*fileUpload) + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "ocfs: error writing to binary file") } - return nil + return uploadInfo.FinishUpload(ctx) } // InitiateUpload returns an upload id that can be used for uploads with tus From 01664510a10399d8888a251d687e35ffabd2cfbc Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 12:46:02 +0100 Subject: [PATCH 10/16] Refactor localfs uploads --- pkg/storage/fs/ocis/upload.go | 2 -- pkg/storage/fs/owncloud/upload.go | 1 - pkg/storage/utils/localfs/upload.go | 46 ++++------------------------- 3 files changed, 6 insertions(+), 43 deletions(-) diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index b58e221b92..a42954387e 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -41,8 +41,6 @@ import ( var defaultFilePerm = os.FileMode(0664) -// TODO deprecated ... use tus - func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 6c7b059e03..508f5e8b61 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -40,7 +40,6 @@ import ( var defaultFilePerm = os.FileMode(0664) -// TODO deprecated ... use tus func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index b979ec920f..bffa861dd8 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -37,52 +37,18 @@ import ( var defaultFilePerm = os.FileMode(0664) -// TODO deprecated ... use tus func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { - fn, err := fs.resolveUploadIDToPath(ctx, ref) - if err != nil { - return errors.Wrap(err, "error resolving ref") - } - - // we cannot rely on /tmp as it can live in another partition and we can - // hit invalid cross-device link errors, so we create the tmp file in the same directory - // the file is supposed to be written. - tmp, err := ioutil.TempFile(filepath.Dir(fn), "._reva_atomic_upload") - if err != nil { - return errors.Wrap(err, "localfs: error creating tmp fn at "+filepath.Dir(fn)) - } - - _, err = io.Copy(tmp, r) - if err != nil { - return errors.Wrap(err, "localfs: error writing to tmp file "+tmp.Name()) - } - - // if destination exists - if _, err := os.Stat(fn); err == nil { - // create revision - if err := fs.archiveRevision(ctx, fn); err != nil { - return err - } - } - - // TODO(labkode): make sure rename is atomic, missing fsync ... - if err := os.Rename(tmp.Name(), fn); err != nil { - return errors.Wrap(err, "localfs: error renaming from "+tmp.Name()+" to "+fn) - } - - return nil -} - -func (fs *localfs) resolveUploadIDToPath(ctx context.Context, ref *provider.Reference) (string, error) { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return "", err + return errors.Wrap(err, "ocisfs: error retrieving upload") } + uploadInfo := upload.(*fileUpload) - if uploadInfo.info.Storage == nil { - return "", errors.New("storage for the upload ID is nil") + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "ocisfs: error writing to binary file") } - return uploadInfo.info.Storage["InternalDestination"], nil + + return uploadInfo.FinishUpload(ctx) } // InitiateUpload returns an upload id that can be used for uploads with tus From a8568c533d04b8390efaf890a11c18dddcb3d8ee Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 15:28:10 +0100 Subject: [PATCH 11/16] Move chunking assembly logic to fs --- internal/http/services/dataprovider/put.go | 7 +- internal/http/services/owncloud/ocdav/put.go | 31 +------ pkg/errtypes/errtypes.go | 19 +++- .../storage/utils/chunking/chunking.go | 91 ++++++++----------- pkg/storage/utils/eosfs/eosfs.go | 7 +- pkg/storage/utils/eosfs/upload.go | 15 ++- 6 files changed, 84 insertions(+), 86 deletions(-) rename internal/http/services/owncloud/ocdav/putchunked.go => pkg/storage/utils/chunking/chunking.go (77%) diff --git a/internal/http/services/dataprovider/put.go b/internal/http/services/dataprovider/put.go index cc70af5c5c..b9b1fe9d4d 100644 --- a/internal/http/services/dataprovider/put.go +++ b/internal/http/services/dataprovider/put.go @@ -24,23 +24,28 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" ) func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := appctx.GetLogger(ctx) fn := r.URL.Path + defer r.Body.Close() fsfn := strings.TrimPrefix(fn, s.conf.Prefix) ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} err := s.storage.Upload(ctx, ref, r.Body) if err != nil { + if _, ok := err.(errtypes.IsPartialContent); ok { + w.WriteHeader(http.StatusPartialContent) + return + } log.Error().Err(err).Msg("error uploading file") w.WriteHeader(http.StatusInternalServerError) return } - r.Body.Close() w.WriteHeader(http.StatusOK) } diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 5d2afb788a..5822a67b75 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "path" - "regexp" "strconv" "time" @@ -35,11 +34,6 @@ import ( "github.com/cs3org/reva/pkg/utils" ) -func isChunked(fn string) (bool, error) { - // FIXME: also need to check whether the OC-Chunked header is set - return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) -} - func sufferMacOSFinder(r *http.Request) bool { return r.Header.Get("X-Expected-Entity-Length") != "" } @@ -118,27 +112,6 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { return } - ok, err := isChunked(fn) - if err != nil { - log.Error().Err(err).Msg("error checking if request is chunked or not") - w.WriteHeader(http.StatusInternalServerError) - return - } - - if ok { - // TODO: disable if chunking capability is turned off in config - /** - if s.c.Capabilities.Dav.Chunking == "1.0" { - s.handlePutChunked(w, r) - } else { - log.Error().Err(err).Msg("chunking 1.0 is not enabled") - w.WriteHeader(http.StatusBadRequest) - } - */ - s.handlePutChunked(w, r, ns) - return - } - if isContentRange(r) { log.Warn().Msg("Content-Range not supported for PUT") w.WriteHeader(http.StatusNotImplemented) @@ -279,6 +252,10 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } defer httpRes.Body.Close() if httpRes.StatusCode != http.StatusOK { + if httpRes.StatusCode == http.StatusPartialContent { + w.WriteHeader(http.StatusPartialContent) + return + } log.Err(err).Msg("PUT request to data server failed") w.WriteHeader(http.StatusInternalServerError) return diff --git a/pkg/errtypes/errtypes.go b/pkg/errtypes/errtypes.go index a01a4679fb..dbd6c351a1 100644 --- a/pkg/errtypes/errtypes.go +++ b/pkg/errtypes/errtypes.go @@ -35,6 +35,9 @@ type InternalError string func (e InternalError) Error() string { return "internal error: " + string(e) } +// IsInternalError is the method to check for w +func (e InternalError) IsInternalError() {} + // PermissionDenied is the error to use when a resource cannot be access because of missing permissions. type PermissionDenied string @@ -75,6 +78,14 @@ func (e NotSupported) Error() string { return "error: not supported: " + string( // IsNotSupported implements the IsNotSupported interface. func (e NotSupported) IsNotSupported() {} +// PartialContent is the error to use when the client request has partial data. +type PartialContent string + +func (e PartialContent) Error() string { return "error: partial content: " + string(e) } + +// IsPartialContent implements the IsPartialContent interface. +func (e PartialContent) IsPartialContent() {} + // IsNotFound is the interface to implement // to specify that an a resource is not found. type IsNotFound interface { @@ -112,7 +123,13 @@ type IsNotSupported interface { } // IsPermissionDenied is the interface to implement -// to specify that an action is not supported. +// to specify that an action is denied. type IsPermissionDenied interface { IsPermissionDenied() } + +// IsPartialContent is the interface to implement +// to specify that the client request has partial data. +type IsPartialContent interface { + IsPartialContent() +} diff --git a/internal/http/services/owncloud/ocdav/putchunked.go b/pkg/storage/utils/chunking/chunking.go similarity index 77% rename from internal/http/services/owncloud/ocdav/putchunked.go rename to pkg/storage/utils/chunking/chunking.go index f8c3812205..fbda9484e5 100644 --- a/internal/http/services/owncloud/ocdav/putchunked.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -16,23 +16,30 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -package ocdav +package chunking import ( - "context" "fmt" "io" "io/ioutil" - "net/http" "os" - "path" "path/filepath" + "regexp" "strconv" "strings" - - "github.com/cs3org/reva/pkg/appctx" ) +// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory +// until it gets the final chunk which is then returned. +type ChunkHandler struct { + ChunkFolder string `mapstructure:"chunk_folder"` +} + +// NewChunkHandler creates a handler for chunked uploads. +func NewChunkHandler(chunkFolder string) *ChunkHandler { + return &ChunkHandler{chunkFolder} +} + type chunkBLOBInfo struct { path string transferID string @@ -40,9 +47,8 @@ type chunkBLOBInfo struct { currentChunk int64 } -// not using the resource path in the chunk folder name allows uploading -// to the same folder after a move without having to restart the chunk -// upload +// Not using the resource path in the chunk folder name allows uploading to +// the same folder after a move without having to restart the chunk upload func (c *chunkBLOBInfo) uploadID() string { return fmt.Sprintf("chunking-%s-%d", c.transferID, c.totalChunks) } @@ -72,8 +78,8 @@ func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) { }, nil } -func (s *svc) createChunkTempFile() (string, *os.File, error) { - file, err := ioutil.TempFile(fmt.Sprintf("/%s", s.c.ChunkFolder), "") +func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) { + file, err := ioutil.TempFile(fmt.Sprintf("/%s", c.ChunkFolder), "") if err != nil { return "", nil, err } @@ -81,26 +87,22 @@ func (s *svc) createChunkTempFile() (string, *os.File, error) { return file.Name(), file, nil } -func (s *svc) getChunkFolderName(i *chunkBLOBInfo) (string, error) { - path := "/" + s.c.ChunkFolder + filepath.Clean("/"+i.uploadID()) +func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) { + path := "/" + c.ChunkFolder + filepath.Clean("/"+i.uploadID()) if err := os.MkdirAll(path, 0755); err != nil { return "", err } return path, nil } -func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool, string, error) { - log := appctx.GetLogger(ctx) +func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, error) { chunkInfo, err := getChunkBLOBInfo(path) if err != nil { err := fmt.Errorf("error getting chunk info from path: %s", path) return false, "", err } - //c.logger.Info().Log("chunknum", chunkInfo.currentChunk, "chunks", chunkInfo.totalChunks, - //"transferid", chunkInfo.transferID, "uploadid", chunkInfo.uploadID()) - - chunkTempFilename, chunkTempFile, err := s.createChunkTempFile() + chunkTempFilename, chunkTempFile, err := c.createChunkTempFile() if err != nil { return false, "", err } @@ -116,7 +118,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool return false, "", err } - chunksFolderName, err := s.getChunkFolderName(chunkInfo) + chunksFolderName, err := c.getChunkFolderName(chunkInfo) if err != nil { return false, "", err } @@ -144,7 +146,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool return false, "", err } - // there is still some chunks to be uploaded. + // there are still some chunks to be uploaded. // we return CodeUploadIsPartial to notify upper layers that the upload is still // not complete and requires more actions. // This code is needed to notify the owncloud webservice that the upload has not yet been @@ -153,7 +155,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool return false, "", nil } - assembledFileName, assembledFile, err := s.createChunkTempFile() + assembledFileName, assembledFile, err := c.createChunkTempFile() if err != nil { return false, "", err } @@ -183,57 +185,38 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // at this point the assembled file is complete // so we free space removing the chunks folder - defer func() { - if err = os.RemoveAll(chunksFolderName); err != nil { - log.Warn().Err(err).Msg("error deleting chunk folder, remove folder manually/cron to not leak storage space") - } - }() + defer os.RemoveAll(chunksFolderName) return true, assembledFileName, nil } -func (s *svc) handlePutChunked(w http.ResponseWriter, r *http.Request, ns string) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - - fn := r.URL.Path - if r.Body == nil { - log.Warn().Msg("body is nil") - w.WriteHeader(http.StatusBadRequest) - return - } +func (c *ChunkHandler) IsChunked(fn string) (bool, error) { + // FIXME: also need to check whether the OC-Chunked header is set + return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) +} - finish, chunk, err := s.saveChunk(ctx, fn, r.Body) +func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) { + finish, chunk, err := c.saveChunk(fn, r) if err != nil { - log.Error().Err(err).Msg("error saving chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", nil, err } if !finish { - w.WriteHeader(http.StatusPartialContent) - return + return "", nil, nil } fd, err := os.Open(chunk) if err != nil { - log.Error().Err(err).Msg("error opening chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", nil, err } defer fd.Close() - md, err := fd.Stat() + chunkInfo, err := getChunkBLOBInfo(fn) if err != nil { - log.Error().Err(err).Msg("error statting chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", nil, err } - chunkInfo, _ := getChunkBLOBInfo(fn) - fn = path.Join(applyLayout(ctx, ns), chunkInfo.path) - - s.handlePutHelper(w, r, fd, fn, md.Size()) + return chunkInfo.path, fd, nil // TODO(labkode): implement old chunking diff --git a/pkg/storage/utils/eosfs/eosfs.go b/pkg/storage/utils/eosfs/eosfs.go index b2865c2411..087f1e696f 100644 --- a/pkg/storage/utils/eosfs/eosfs.go +++ b/pkg/storage/utils/eosfs/eosfs.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/pkg/sharedconf" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/utils/acl" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/grants" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" @@ -175,6 +176,7 @@ func (c *Config) init() { type eosfs struct { c *eosclient.Client conf *Config + chunkHandler *chunking.ChunkHandler singleUserUID string singleUserGID string } @@ -208,8 +210,9 @@ func NewEOSFS(c *Config) (storage.FS, error) { eosClient := eosclient.New(eosClientOpts) eosfs := &eosfs{ - c: eosClient, - conf: c, + c: eosClient, + conf: c, + chunkHandler: chunking.NewChunkHandler(c.CacheDirectory), } return eosfs, nil diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index 8c97cac101..8c07d1285b 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -46,8 +46,21 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder") } - fn := fs.wrap(ctx, p) + ok, err := fs.chunkHandler.IsChunked(p) + if err != nil { + return errors.Wrap(err, "eos: error resolving reference") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + return errtypes.PartialContent(ref.String()) + } + } + fn := fs.wrap(ctx, p) return fs.c.Write(ctx, uid, gid, fn, r) } From 86dfe4ee396afc7046cd95f02d4a7092489294b2 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 16:19:24 +0100 Subject: [PATCH 12/16] Stat assembled path instead of chunk --- .../http/services/owncloud/ocdav/ocdav.go | 12 --- internal/http/services/owncloud/ocdav/put.go | 19 +++++ pkg/storage/utils/chunking/chunking.go | 74 ++++++++++--------- pkg/storage/utils/eosfs/upload.go | 4 +- 4 files changed, 62 insertions(+), 47 deletions(-) diff --git a/internal/http/services/owncloud/ocdav/ocdav.go b/internal/http/services/owncloud/ocdav/ocdav.go index 76363ef154..291d4811fa 100644 --- a/internal/http/services/owncloud/ocdav/ocdav.go +++ b/internal/http/services/owncloud/ocdav/ocdav.go @@ -24,7 +24,6 @@ import ( "fmt" "net/http" "net/url" - "os" "path" "strings" "time" @@ -67,7 +66,6 @@ type Config struct { // and received path is /docs the internal path will be: // /users///docs WebdavNamespace string `mapstructure:"webdav_namespace"` - ChunkFolder string `mapstructure:"chunk_folder"` GatewaySvc string `mapstructure:"gatewaysvc"` Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure"` @@ -75,13 +73,7 @@ type Config struct { func (c *Config) init() { // note: default c.Prefix is an empty string - c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc) - - if c.ChunkFolder == "" { - c.ChunkFolder = "/var/tmp/reva/tmp/davchunks" - } - } type svc struct { @@ -100,10 +92,6 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) conf.init() - if err := os.MkdirAll(conf.ChunkFolder, 0755); err != nil { - return nil, err - } - s := &svc{ c: conf, webDavHandler: new(WebDavHandler), diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 5822a67b75..eec782664c 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -31,6 +31,7 @@ import ( "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/rhttp" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/utils" ) @@ -262,6 +263,24 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } } + ok, err := chunking.IsChunked(fn) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if ok { + chunk, err := chunking.GetChunkBLOBInfo(fn) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + sReq = &provider.StatRequest{ + Ref: &provider.Reference{ + Spec: &provider.Reference_Path{ + Path: chunk.Path}, + }} + } + // stat again to check the new file's metadata sRes, err = client.Stat(ctx, sReq) if err != nil { diff --git a/pkg/storage/utils/chunking/chunking.go b/pkg/storage/utils/chunking/chunking.go index fbda9484e5..13df7103eb 100644 --- a/pkg/storage/utils/chunking/chunking.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -29,31 +29,28 @@ import ( "strings" ) -// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory -// until it gets the final chunk which is then returned. -type ChunkHandler struct { - ChunkFolder string `mapstructure:"chunk_folder"` -} - -// NewChunkHandler creates a handler for chunked uploads. -func NewChunkHandler(chunkFolder string) *ChunkHandler { - return &ChunkHandler{chunkFolder} +// IsChunked checks if a given path refers to a chunk or not +func IsChunked(fn string) (bool, error) { + // FIXME: also need to check whether the OC-Chunked header is set + return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) } -type chunkBLOBInfo struct { - path string - transferID string - totalChunks int64 - currentChunk int64 +// ChunkBLOBInfo stores info about a particular chunk +type ChunkBLOBInfo struct { + Path string + TransferID string + TotalChunks int64 + CurrentChunk int64 } // Not using the resource path in the chunk folder name allows uploading to // the same folder after a move without having to restart the chunk upload -func (c *chunkBLOBInfo) uploadID() string { - return fmt.Sprintf("chunking-%s-%d", c.transferID, c.totalChunks) +func (c *ChunkBLOBInfo) uploadID() string { + return fmt.Sprintf("chunking-%s-%d", c.TransferID, c.TotalChunks) } -func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) { +// GetChunkBLOBInfo decodes a chunk name to retrieve info about it. +func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) { parts := strings.Split(path, "-chunking-") tail := strings.Split(parts[1], "-") @@ -70,14 +67,25 @@ func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) { return nil, fmt.Errorf("current chunk:%d exceeds total number of chunks:%d", currentChunk, totalChunks) } - return &chunkBLOBInfo{ - path: parts[0], - transferID: tail[0], - totalChunks: totalChunks, - currentChunk: currentChunk, + return &ChunkBLOBInfo{ + Path: parts[0], + TransferID: tail[0], + TotalChunks: totalChunks, + CurrentChunk: currentChunk, }, nil } +// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory +// until it gets the final chunk which is then returned. +type ChunkHandler struct { + ChunkFolder string `mapstructure:"chunk_folder"` +} + +// NewChunkHandler creates a handler for chunked uploads. +func NewChunkHandler(chunkFolder string) *ChunkHandler { + return &ChunkHandler{chunkFolder} +} + func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) { file, err := ioutil.TempFile(fmt.Sprintf("/%s", c.ChunkFolder), "") if err != nil { @@ -87,7 +95,7 @@ func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) { return file.Name(), file, nil } -func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) { +func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (string, error) { path := "/" + c.ChunkFolder + filepath.Clean("/"+i.uploadID()) if err := os.MkdirAll(path, 0755); err != nil { return "", err @@ -96,7 +104,7 @@ func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) { } func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, error) { - chunkInfo, err := getChunkBLOBInfo(path) + chunkInfo, err := GetChunkBLOBInfo(path) if err != nil { err := fmt.Errorf("error getting chunk info from path: %s", path) return false, "", err @@ -124,7 +132,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er } //c.logger.Info().Log("chunkfolder", chunksFolderName) - chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.currentChunk) + chunkTarget := chunksFolderName + "/" + fmt.Sprintf("%d", chunkInfo.CurrentChunk) if err = os.Rename(chunkTempFilename, chunkTarget); err != nil { return false, "", err } @@ -151,7 +159,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er // not complete and requires more actions. // This code is needed to notify the owncloud webservice that the upload has not yet been // completed and needs to continue uploading chunks. - if len(chunks) < int(chunkInfo.totalChunks) { + if len(chunks) < int(chunkInfo.TotalChunks) { return false, "", nil } @@ -190,11 +198,8 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er return true, assembledFileName, nil } -func (c *ChunkHandler) IsChunked(fn string) (bool, error) { - // FIXME: also need to check whether the OC-Chunked header is set - return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) -} - +// Write chunk saves an intermediate chunk temporarily and assembles all chunks +// once the final one is received. func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) { finish, chunk, err := c.saveChunk(fn, r) if err != nil { @@ -209,14 +214,15 @@ func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCl if err != nil { return "", nil, err } - defer fd.Close() - chunkInfo, err := getChunkBLOBInfo(fn) + chunkInfo, err := GetChunkBLOBInfo(fn) if err != nil { return "", nil, err } - return chunkInfo.path, fd, nil + // Since we're returning a ReadCloser, it is the responsibility of the + // caller function to close it to prevent file descriptor leaks. + return chunkInfo.Path, fd, nil // TODO(labkode): implement old chunking diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index 8c07d1285b..f86dcc2217 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -24,6 +24,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/pkg/errors" ) @@ -46,7 +47,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder") } - ok, err := fs.chunkHandler.IsChunked(p) + ok, err := chunking.IsChunked(p) if err != nil { return errors.Wrap(err, "eos: error resolving reference") } @@ -58,6 +59,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC if p == "" { return errtypes.PartialContent(ref.String()) } + defer r.Close() } fn := fs.wrap(ctx, p) From 744cebb184e0c03a5618ae77be20985940c79d37 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 17:09:22 +0100 Subject: [PATCH 13/16] Add chunking support for local, owncloud and ocis fs --- pkg/storage/fs/ocis/ocis.go | 19 +++++++++++-------- pkg/storage/fs/ocis/upload.go | 22 ++++++++++++++++++++++ pkg/storage/fs/owncloud/owncloud.go | 12 +++++++++--- pkg/storage/fs/owncloud/upload.go | 22 ++++++++++++++++++++++ pkg/storage/utils/chunking/chunking.go | 12 ++++++------ pkg/storage/utils/eosfs/upload.go | 2 +- pkg/storage/utils/localfs/localfs.go | 12 +++++++++--- pkg/storage/utils/localfs/upload.go | 23 +++++++++++++++++++++++ 8 files changed, 103 insertions(+), 21 deletions(-) diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index 8fc1da1d75..59f092997b 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -33,6 +33,7 @@ import ( "github.com/cs3org/reva/pkg/logger" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/mitchellh/mapstructure" @@ -160,18 +161,20 @@ func New(m map[string]interface{}) (storage.FS, error) { } return &ocisfs{ - tp: tp, - lu: lu, - o: o, - p: &Permissions{lu: lu}, + tp: tp, + lu: lu, + o: o, + p: &Permissions{lu: lu}, + chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")), }, nil } type ocisfs struct { - tp TreePersistence - lu *Lookup - o *Options - p *Permissions + tp TreePersistence + lu *Lookup + o *Options + p *Permissions + chunkHandler *chunking.ChunkHandler } func (fs *ocisfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index a42954387e..d45354d040 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -32,6 +32,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" @@ -48,6 +49,27 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read } uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "ocfs: error checking path") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + defer r.Close() + } + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { return errors.Wrap(err, "ocisfs: error writing to binary file") } diff --git a/pkg/storage/fs/owncloud/owncloud.go b/pkg/storage/fs/owncloud/owncloud.go index ab32fe1f42..6e08cf4047 100644 --- a/pkg/storage/fs/owncloud/owncloud.go +++ b/pkg/storage/fs/owncloud/owncloud.go @@ -44,6 +44,7 @@ import ( "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" "github.com/cs3org/reva/pkg/storage/utils/ace" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/gofrs/uuid" @@ -198,12 +199,17 @@ func New(m map[string]interface{}) (storage.FS, error) { }, } - return &ocfs{c: c, pool: pool}, nil + return &ocfs{ + c: c, + pool: pool, + chunkHandler: chunking.NewChunkHandler(c.UploadInfoDir), + }, nil } type ocfs struct { - c *config - pool *redis.Pool + c *config + pool *redis.Pool + chunkHandler *chunking.ChunkHandler } func (fs *ocfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 508f5e8b61..9e64740358 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -31,6 +31,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" @@ -47,6 +48,27 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl } uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "ocfs: error checking path") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + defer r.Close() + } + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { return errors.Wrap(err, "ocfs: error writing to binary file") } diff --git a/pkg/storage/utils/chunking/chunking.go b/pkg/storage/utils/chunking/chunking.go index 13df7103eb..9877f9d653 100644 --- a/pkg/storage/utils/chunking/chunking.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -39,8 +39,8 @@ func IsChunked(fn string) (bool, error) { type ChunkBLOBInfo struct { Path string TransferID string - TotalChunks int64 - CurrentChunk int64 + TotalChunks int + CurrentChunk int } // Not using the resource path in the chunk folder name allows uploading to @@ -54,12 +54,12 @@ func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) { parts := strings.Split(path, "-chunking-") tail := strings.Split(parts[1], "-") - totalChunks, err := strconv.ParseInt(tail[1], 10, 64) + totalChunks, err := strconv.Atoi(tail[1]) if err != nil { return nil, err } - currentChunk, err := strconv.ParseInt(tail[2], 10, 64) + currentChunk, err := strconv.Atoi(tail[2]) if err != nil { return nil, err } @@ -159,7 +159,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er // not complete and requires more actions. // This code is needed to notify the owncloud webservice that the upload has not yet been // completed and needs to continue uploading chunks. - if len(chunks) < int(chunkInfo.TotalChunks) { + if len(chunks) < chunkInfo.TotalChunks { return false, "", nil } @@ -198,7 +198,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er return true, assembledFileName, nil } -// Write chunk saves an intermediate chunk temporarily and assembles all chunks +// WriteChunk saves an intermediate chunk temporarily and assembles all chunks // once the final one is received. func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) { finish, chunk, err := c.saveChunk(fn, r) diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index f86dcc2217..a73197c723 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -49,7 +49,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC ok, err := chunking.IsChunked(p) if err != nil { - return errors.Wrap(err, "eos: error resolving reference") + return errors.Wrap(err, "eos: error checking path") } if ok { p, r, err = fs.chunkHandler.WriteChunk(p, r) diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 5f643499d5..909b1acd9d 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/mime" "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/grants" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" @@ -84,8 +85,9 @@ func (c *Config) init() { } type localfs struct { - conf *Config - db *sql.DB + conf *Config + db *sql.DB + chunkHandler *chunking.ChunkHandler } // NewLocalFS returns a storage.FS interface implementation that controls then @@ -111,7 +113,11 @@ func NewLocalFS(c *Config) (storage.FS, error) { return nil, errors.Wrap(err, "localfs: error initializing db") } - return &localfs{conf: c, db: db}, nil + return &localfs{ + conf: c, + db: db, + chunkHandler: chunking.NewChunkHandler(c.Uploads), + }, nil } func (fs *localfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index bffa861dd8..fe4b68fb20 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -29,6 +29,8 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" @@ -44,6 +46,27 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea } uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "ocfs: error checking path") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + defer r.Close() + } + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { return errors.Wrap(err, "ocisfs: error writing to binary file") } From 698683fc0c35ee30881667c0dc53750c9e050078 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 18:44:57 +0100 Subject: [PATCH 14/16] Delete assembled chunks after writing them to fs --- pkg/storage/fs/ocis/upload.go | 16 ++++++++++++---- pkg/storage/fs/owncloud/upload.go | 11 +++++++++-- pkg/storage/utils/chunking/chunking.go | 17 +++++------------ pkg/storage/utils/eosfs/upload.go | 12 ++++++++++-- pkg/storage/utils/localfs/upload.go | 11 +++++++++-- 5 files changed, 45 insertions(+), 22 deletions(-) diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index d45354d040..41e95b2cfe 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -36,7 +36,6 @@ import ( "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rs/zerolog/log" tusd "github.com/tus/tusd/pkg/handler" ) @@ -56,7 +55,8 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read return errors.Wrap(err, "ocfs: error checking path") } if ok { - p, r, err = fs.chunkHandler.WriteChunk(p, r) + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { return err } @@ -66,8 +66,14 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read } return errtypes.PartialContent(ref.String()) } - uploadInfo.info.Storage["InternalDestination"] = p - defer r.Close() + uploadInfo.info.Storage["NodeName"] = p + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { @@ -364,6 +370,8 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { n.ID = uuid.New().String() } targetPath := upload.fs.lu.toInternalPath(n.ID) + log := appctx.GetLogger(upload.ctx) + log.Info().Msgf("targetPath: %+v, ID: %+v", targetPath, n.ID) // if target exists create new version var fi os.FileInfo diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 9e64740358..d8e166ac42 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -55,7 +55,8 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl return errors.Wrap(err, "ocfs: error checking path") } if ok { - p, r, err = fs.chunkHandler.WriteChunk(p, r) + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { return err } @@ -66,7 +67,13 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl return errtypes.PartialContent(ref.String()) } uploadInfo.info.Storage["InternalDestination"] = p - defer r.Close() + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { diff --git a/pkg/storage/utils/chunking/chunking.go b/pkg/storage/utils/chunking/chunking.go index 9877f9d653..57508a376e 100644 --- a/pkg/storage/utils/chunking/chunking.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -200,29 +200,22 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er // WriteChunk saves an intermediate chunk temporarily and assembles all chunks // once the final one is received. -func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) { +func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, error) { finish, chunk, err := c.saveChunk(fn, r) if err != nil { - return "", nil, err + return "", "", err } if !finish { - return "", nil, nil - } - - fd, err := os.Open(chunk) - if err != nil { - return "", nil, err + return "", "", nil } chunkInfo, err := GetChunkBLOBInfo(fn) if err != nil { - return "", nil, err + return "", "", err } - // Since we're returning a ReadCloser, it is the responsibility of the - // caller function to close it to prevent file descriptor leaks. - return chunkInfo.Path, fd, nil + return chunkInfo.Path, chunk, nil // TODO(labkode): implement old chunking diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index a73197c723..e60c539680 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -21,6 +21,7 @@ package eosfs import ( "context" "io" + "os" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/errtypes" @@ -52,14 +53,21 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return errors.Wrap(err, "eos: error checking path") } if ok { - p, r, err = fs.chunkHandler.WriteChunk(p, r) + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { return err } if p == "" { return errtypes.PartialContent(ref.String()) } - defer r.Close() + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } fn := fs.wrap(ctx, p) diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index fe4b68fb20..20fc1fa805 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -53,7 +53,8 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea return errors.Wrap(err, "ocfs: error checking path") } if ok { - p, r, err = fs.chunkHandler.WriteChunk(p, r) + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { return err } @@ -64,7 +65,13 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea return errtypes.PartialContent(ref.String()) } uploadInfo.info.Storage["InternalDestination"] = p - defer r.Close() + fd, err := os.Open(assembledFile) + if err != nil { + return errors.Wrap(err, "eos: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + r = fd } if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { From 4ba631afa34df545685b8e0901e796aab5997710 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 19:49:03 +0100 Subject: [PATCH 15/16] Use correct key for looking up file name --- pkg/storage/fs/ocis/upload.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index 41e95b2cfe..3b8f0e9563 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -49,7 +49,7 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read uploadInfo := upload.(*fileUpload) - p := uploadInfo.info.Storage["InternalDestination"] + p := uploadInfo.info.Storage["NodeName"] ok, err := chunking.IsChunked(p) if err != nil { return errors.Wrap(err, "ocfs: error checking path") @@ -358,6 +358,7 @@ func (upload *fileUpload) writeInfo() error { // FinishUpload finishes an upload and moves the file to the internal destination func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { + log := appctx.GetLogger(upload.ctx) n := &Node{ lu: upload.fs.lu, @@ -370,8 +371,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { n.ID = uuid.New().String() } targetPath := upload.fs.lu.toInternalPath(n.ID) - log := appctx.GetLogger(upload.ctx) - log.Info().Msgf("targetPath: %+v, ID: %+v", targetPath, n.ID) // if target exists create new version var fi os.FileInfo @@ -380,7 +379,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { versionsPath := upload.fs.lu.toInternalPath(n.ID + ".REV." + fi.ModTime().UTC().Format(time.RFC3339Nano)) if err = os.Rename(targetPath, versionsPath); err != nil { - log := appctx.GetLogger(upload.ctx) log.Err(err).Interface("info", upload.info). Str("binPath", upload.binPath). Str("targetPath", targetPath). From cf62669472c536b7fb058d3ba860595a6f132282 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Mon, 2 Nov 2020 09:32:10 +0100 Subject: [PATCH 16/16] Remove unused parameters --- .../http/services/dataprovider/dataprovider.go | 17 +++++------------ internal/http/services/owncloud/ocdav/put.go | 6 ++++-- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 889816ad65..fea3b918df 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -36,27 +36,20 @@ func init() { } type config struct { - Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` - Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` - Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` - Timeout int64 `mapstructure:"timeout"` - Insecure bool `mapstructure:"insecure"` - TempDirectory string `mapstructure:"temp_directory"` + Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` + Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` + Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` + Timeout int64 `mapstructure:"timeout"` + Insecure bool `mapstructure:"insecure"` } func (c *config) init() { if c.Prefix == "" { c.Prefix = "data" } - if c.Driver == "" { c.Driver = "localhome" } - - if c.TempDirectory == "" { - c.TempDirectory = "/var/tmp/reva/tmp" - } - } type svc struct { diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index eec782664c..61c4875b28 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -277,8 +277,10 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io sReq = &provider.StatRequest{ Ref: &provider.Reference{ Spec: &provider.Reference_Path{ - Path: chunk.Path}, - }} + Path: chunk.Path, + }, + }, + } } // stat again to check the new file's metadata