Skip to content

Commit

Permalink
Move setup logic into hooks package
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Aug 21, 2023
1 parent 124ce13 commit 1562258
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 183 deletions.
157 changes: 5 additions & 152 deletions cmd/tusd/cli/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,87 +9,19 @@ import (
"github.com/tus/tusd/v2/pkg/hooks/grpc"
"github.com/tus/tusd/v2/pkg/hooks/http"
"github.com/tus/tusd/v2/pkg/hooks/plugin"
"golang.org/x/exp/slices"
)

// TODO: Move some parts into hooks package

var hookHandler hooks.HookHandler = nil

func preCreateCallback(event handler.HookEvent) (handler.HTTPResponse, handler.FileInfoChanges, error) {
ok, hookRes, err := invokeHookSync(hooks.HookPreCreate, event)
if !ok || err != nil {
return handler.HTTPResponse{}, handler.FileInfoChanges{}, err
}

httpRes := hookRes.HTTPResponse

// If the hook response includes the instruction to reject the upload, reuse the error code
// and message from ErrUploadRejectedByServer, but also include custom HTTP response values.
if hookRes.RejectUpload {
err := handler.ErrUploadRejectedByServer
err.HTTPResponse = err.HTTPResponse.MergeWith(httpRes)

return handler.HTTPResponse{}, handler.FileInfoChanges{}, err
}

// Pass any changes regarding file info from the hook to the handler.
changes := hookRes.ChangeFileInfo
return httpRes, changes, nil
}

func preFinishCallback(event handler.HookEvent) (handler.HTTPResponse, error) {
ok, hookRes, err := invokeHookSync(hooks.HookPreFinish, event)
if !ok || err != nil {
return handler.HTTPResponse{}, err
}

httpRes := hookRes.HTTPResponse
return httpRes, nil
}

func postReceiveCallback(event handler.HookEvent) {
ok, hookRes, _ := invokeHookSync(hooks.HookPostReceive, event)
// invokeHookSync already logs the error, if any occurs. So by checking `ok`, we can ensure
// that the hook finished successfully
if !ok {
return
}

if hookRes.StopUpload {
logEv(stdout, "HookStopUpload", "id", event.Upload.ID)

// TODO: Control response for PATCH request
event.Upload.StopUpload()
}
}

func SetupHookMetrics() {
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostFinish)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostTerminate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostReceive)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostCreate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreFinish)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostFinish)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostTerminate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostReceive)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostCreate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPreFinish)).Add(0)
}

func SetupPreHooks(config *handler.Config) error {
func getHookHandler(config *handler.Config) hooks.HookHandler {
if Flags.FileHooksDir != "" {
stdout.Printf("Using '%s' for hooks", Flags.FileHooksDir)

hookHandler = &file.FileHook{
return &file.FileHook{
Directory: Flags.FileHooksDir,
}
} else if Flags.HttpHooksEndpoint != "" {
stdout.Printf("Using '%s' as the endpoint for hooks", Flags.HttpHooksEndpoint)

hookHandler = &http.HttpHook{
return &http.HttpHook{
Endpoint: Flags.HttpHooksEndpoint,
MaxRetries: Flags.HttpHooksRetry,
Backoff: Flags.HttpHooksBackoff,
Expand All @@ -98,97 +30,18 @@ func SetupPreHooks(config *handler.Config) error {
} else if Flags.GrpcHooksEndpoint != "" {
stdout.Printf("Using '%s' as the endpoint for gRPC hooks", Flags.GrpcHooksEndpoint)

hookHandler = &grpc.GrpcHook{
return &grpc.GrpcHook{
Endpoint: Flags.GrpcHooksEndpoint,
MaxRetries: Flags.GrpcHooksRetry,
Backoff: Flags.GrpcHooksBackoff,
}
} else if Flags.PluginHookPath != "" {
stdout.Printf("Using '%s' to load plugin for hooks", Flags.PluginHookPath)

hookHandler = &plugin.PluginHook{
return &plugin.PluginHook{
Path: Flags.PluginHookPath,
}
} else {
return nil
}

var enabledHooksString []string
for _, h := range Flags.EnabledHooks {
enabledHooksString = append(enabledHooksString, string(h))
}

stdout.Printf("Enabled hook events: %s", strings.Join(enabledHooksString, ", "))

if err := hookHandler.Setup(); err != nil {
return err
}

config.PreUploadCreateCallback = preCreateCallback
config.PreFinishResponseCallback = preFinishCallback

return nil
}

func SetupPostHooks(handler *handler.Handler) {
go func() {
for {
select {
case event := <-handler.CompleteUploads:
invokeHookAsync(hooks.HookPostFinish, event)
case event := <-handler.TerminatedUploads:
invokeHookAsync(hooks.HookPostTerminate, event)
case event := <-handler.CreatedUploads:
invokeHookAsync(hooks.HookPostCreate, event)
case event := <-handler.UploadProgress:
go postReceiveCallback(event)
}
}
}()
}

func invokeHookAsync(typ hooks.HookType, event handler.HookEvent) {
go func() {
// Error handling is taken care by the function.
_, _, _ = invokeHookSync(typ, event)
}()
}

// invokeHookSync executes a hook of the given type with the given event data. If
// the hook was not executed properly (e.g. an error occurred or not handler is installed),
// `ok` will be false and `res` is not filled. `err` can contain the underlying error.
// If `ok` is true, `res` contains the response as retrieved from the hook.
// Therefore, a caller should always check `ok` and `err` before assuming that the
// hook completed successfully.
func invokeHookSync(typ hooks.HookType, event handler.HookEvent) (ok bool, res hooks.HookResponse, err error) {
// Stop, if no hook handler is installed or this hook event is not enabled
if hookHandler == nil || !slices.Contains(Flags.EnabledHooks, typ) {
return false, hooks.HookResponse{}, nil
}

MetricsHookInvocationsTotal.WithLabelValues(string(typ)).Add(1)

id := event.Upload.ID

if Flags.VerboseOutput {
logEv(stdout, "HookInvocationStart", "type", string(typ), "id", id)
}

res, err = hookHandler.InvokeHook(hooks.HookRequest{
Type: typ,
Event: event,
})
if err != nil {
// If an error occurs during the hook execution, we log and track the error, but do not
// return a hook response.
logEv(stderr, "HookInvocationError", "type", string(typ), "id", id, "error", err.Error())
MetricsHookErrorsTotal.WithLabelValues(string(typ)).Add(1)
return false, hooks.HookResponse{}, err
}

if Flags.VerboseOutput {
logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", id)
}

return true, res, nil
}
21 changes: 3 additions & 18 deletions cmd/tusd/cli/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/tus/tusd/v2/pkg/handler"
"github.com/tus/tusd/v2/pkg/hooks"
"github.com/tus/tusd/v2/pkg/prometheuscollector"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -15,26 +16,10 @@ var MetricsOpenConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "Current number of open connections.",
})

var MetricsHookErrorsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tusd_hook_errors_total",
Help: "Total number of execution errors per hook type.",
},
[]string{"hooktype"},
)

var MetricsHookInvocationsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tusd_hook_invocations_total",
Help: "Total number of invocations per hook type.",
},
[]string{"hooktype"},
)

func SetupMetrics(mux *http.ServeMux, handler *handler.Handler) {
prometheus.MustRegister(MetricsOpenConnections)
prometheus.MustRegister(MetricsHookErrorsTotal)
prometheus.MustRegister(MetricsHookInvocationsTotal)
prometheus.MustRegister(hooks.MetricsHookErrorsTotal)
prometheus.MustRegister(hooks.MetricsHookInvocationsTotal)
prometheus.MustRegister(prometheuscollector.New(handler.Metrics))

stdout.Printf("Using %s as the metrics path.\n", Flags.MetricsPath)
Expand Down
33 changes: 20 additions & 13 deletions cmd/tusd/cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

"github.com/tus/tusd/v2/pkg/handler"
handlerpkg "github.com/tus/tusd/v2/pkg/handler"
"github.com/tus/tusd/v2/pkg/hooks"
)

const (
Expand All @@ -36,22 +38,31 @@ func Serve() {
DisableTermination: Flags.DisableTermination,
DisableCors: Flags.DisableCors,
StoreComposer: Composer,
NotifyCompleteUploads: true,
NotifyTerminatedUploads: true,
NotifyUploadProgress: true,
NotifyCreatedUploads: true,
UploadProgressInterval: time.Duration(Flags.ProgressHooksInterval) * time.Millisecond,
}

if err := SetupPreHooks(&config); err != nil {
stderr.Fatalf("Unable to setup hooks for handler: %s", err)
}
var handler *handlerpkg.Handler
var err error
hookHandler := getHookHandler(&config)
if hookHandler != nil {
handler, err = hooks.NewHandlerWithHooks(&config, hookHandler, Flags.EnabledHooks)

var enabledHooksString []string
for _, h := range Flags.EnabledHooks {
enabledHooksString = append(enabledHooksString, string(h))
}

handler, err := handler.NewHandler(config)
stdout.Printf("Enabled hook events: %s", strings.Join(enabledHooksString, ", "))

} else {
handler, err = handlerpkg.NewHandler(config)
}
if err != nil {
stderr.Fatalf("Unable to create handler: %s", err)
}

stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions())

basepath := Flags.Basepath
address := ""

Expand All @@ -65,10 +76,6 @@ func Serve() {

stdout.Printf("Using %s as the base path.\n", basepath)

SetupPostHooks(handler)

stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions())

mux := http.NewServeMux()
if basepath == "/" {
// If the basepath is set to the root path, only install the tusd handler
Expand All @@ -91,7 +98,7 @@ func Serve() {

if Flags.ExposeMetrics {
SetupMetrics(mux, handler)
SetupHookMetrics()
hooks.SetupHookMetrics()
}

if Flags.ExposePprof {
Expand Down
Loading

0 comments on commit 1562258

Please sign in to comment.