Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use tus for uploads (and support range requests for downloads?) #286

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"

rpcpb "github.com/cs3org/go-cs3apis/cs3/rpc"
Expand Down Expand Up @@ -185,8 +186,24 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *storageprovider
func (s *service) InitiateFileUpload(ctx context.Context, req *storageproviderv0alphapb.InitiateFileUploadRequest) (*storageproviderv0alphapb.InitiateFileUploadResponse, error) {
// TODO(labkode): same considerations as download
log := appctx.GetLogger(ctx)
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil && req.Opaque.Map["Upload-Length"] != nil {
var err error
uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64)
if err != nil {
return &storageproviderv0alphapb.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing upload length"),
}, nil
}
}
uploadID, err := s.storage.NewUpload(ctx, req.Ref, uploadLength)
if err != nil {
return &storageproviderv0alphapb.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error getting upload id"),
}, nil
}
url := *s.dataServerURL
url.Path = path.Join("/", url.Path, path.Clean(req.Ref.GetPath()))
url.Path = path.Join("/", url.Path, uploadID)
log.Info().Str("data-server", url.String()).
Str("fn", req.Ref.GetPath()).
Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)).
Expand Down
114 changes: 85 additions & 29 deletions cmd/revad/svcs/httpsvcs/datasvc/datasvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ package datasvc
import (
"fmt"
"net/http"
"os"

"github.com/cs3org/reva/cmd/revad/httpserver"
"github.com/cs3org/reva/cmd/revad/svcs/httpsvcs"
tusd "github.com/cs3org/reva/cmd/revad/svcs/httpsvcs/datasvc/handler"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
Expand All @@ -37,7 +38,6 @@ func init() {
type config struct {
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
TmpFolder string `mapstructure:"tmp_folder"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
ProviderPath string `mapstructure:"provider_path"`
}
Expand All @@ -55,14 +55,6 @@ func New(m map[string]interface{}) (httpsvcs.Service, error) {
return nil, err
}

if conf.TmpFolder == "" {
conf.TmpFolder = os.TempDir()
}

if err := os.MkdirAll(conf.TmpFolder, 0755); err != nil {
return nil, err
}

fs, err := getFS(conf)
if err != nil {
return nil, err
Expand All @@ -72,15 +64,21 @@ func New(m map[string]interface{}) (httpsvcs.Service, error) {
storage: fs,
conf: conf,
}
s.setHandler()
return s, nil
err = s.setHandler()
return s, err
}

// Close performs cleanup.
func (s *svc) Close() error {
return nil
}

// Create a new DataStore instance which is responsible for
// storing the uploaded file on disk in the specified directory.
// This path _must_ exist before tusd will store uploads in it.
// If you want to save them on a different medium, for example
// a remote FTP server, you can implement your own storage backend
// by implementing the tusd.DataStore interface.
func getFS(c *config) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
Expand All @@ -96,24 +94,82 @@ func (s *svc) Handler() http.Handler {
return s.handler
}

func (s *svc) setHandler() {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
func (s *svc) setHandler() (err error) {
composable, ok := s.storage.(tusd.Composable)
if ok {
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()
// TODO use Terminater
// TODO use Locker
// TODO use Concater
// TODO use LenghtDeferrer
composable.UseIn(composer)

//logger := log.New(os.Stdout, "tusd ", log.Ldate|log.Ltime|log.Lshortfile)

config := tusd.Config{
BasePath: s.conf.Prefix,
StoreComposer: composer,
//Logger: logger,
}
})

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return err
}

s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

switch r.Method {
// old fashioned download.

// GET is not part of the tus.io protocol
// currently there is no way to GET an upload that is in progress
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)

// tus.id based upload

// uploads are initiated using the CS3 APIs Initiate Download call
//case "POST":
// handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
handler.PatchFile(w, r)
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
handler.DelFile(w, r)
}
}))
} else {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})
}

return err
}

func addCorsHeader(res http.ResponseWriter) {
Expand Down
124 changes: 124 additions & 0 deletions cmd/revad/svcs/httpsvcs/datasvc/handler/composer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2018-2019 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// this file is a fork of https://github.com/tus/tusd/tree/master/pkg/handler/composer.go
// TODO remove when PRs have been merged upstream

package handler

// StoreComposer represents a composable data store. It consists of the core
// data store and optional extensions. Please consult the package's overview
// for a more detailed introduction in how to use this structure.
type StoreComposer struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct of size 136 bytes could be of size 104 bytes (from maligned)

Core DataStore

UsesCreator bool
Creator CreatingDataStore
UsesTerminater bool
Terminater TerminaterDataStore
butonic marked this conversation as resolved.
Show resolved Hide resolved
UsesLocker bool
Locker Locker
UsesConcater bool
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
}

// NewStoreComposer creates a new and empty store composer.
func NewStoreComposer() *StoreComposer {
return &StoreComposer{}
}

// Capabilities returns a string representing the provided extensions in a
// human-readable format meant for debugging.
func (store *StoreComposer) Capabilities() string {
str := "Core: "

if store.Core != nil {
str += "✓"
} else {
str += "✗"
}

str += ` Creator: `
if store.UsesCreator {
str += "✓"
} else {
str += "✗"
}
str += ` Terminater: `
butonic marked this conversation as resolved.
Show resolved Hide resolved
if store.UsesTerminater {
str += "✓"
} else {
str += "✗"
}
str += ` Locker: `
if store.UsesLocker {
str += "✓"
} else {
str += "✗"
}
str += ` Concater: `
if store.UsesConcater {
str += "✓"
} else {
str += "✗"
}
str += ` LengthDeferrer: `
if store.UsesLengthDeferrer {
str += "✓"
} else {
str += "✗"
}

return str
}

// UseCore will set the used core data store. If the argument is nil, the
// property will be unset.
func (store *StoreComposer) UseCore(core DataStore) {
store.Core = core
}

func (store *StoreComposer) UseCreator(ext CreatingDataStore) {
butonic marked this conversation as resolved.
Show resolved Hide resolved
store.UsesCreator = ext != nil
store.Creator = ext
}
func (store *StoreComposer) UseTerminater(ext TerminaterDataStore) {
butonic marked this conversation as resolved.
Show resolved Hide resolved
store.UsesTerminater = ext != nil
store.Terminater = ext
}

func (store *StoreComposer) UseLocker(ext Locker) {
butonic marked this conversation as resolved.
Show resolved Hide resolved
store.UsesLocker = ext != nil
store.Locker = ext
}

func (store *StoreComposer) UseConcater(ext ConcaterDataStore) {
butonic marked this conversation as resolved.
Show resolved Hide resolved
store.UsesConcater = ext != nil
store.Concater = ext
}

func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
butonic marked this conversation as resolved.
Show resolved Hide resolved
store.UsesLengthDeferrer = ext != nil
store.LengthDeferrer = ext
}

type Composable interface {
butonic marked this conversation as resolved.
Show resolved Hide resolved
UseIn(composer *StoreComposer)
}
Loading