diff --git a/cmd/tusd/cli/hooks.go b/cmd/tusd/cli/hooks.go index 698de8da4..9d45467e0 100644 --- a/cmd/tusd/cli/hooks.go +++ b/cmd/tusd/cli/hooks.go @@ -9,87 +9,19 @@ import ( "github.com/tus/tusd/v2/pkg/hooks/grpc" "github.com/tus/tusd/v2/pkg/hooks/http" "github.com/tus/tusd/v2/pkg/hooks/plugin" - "golang.org/x/exp/slices" ) -// TODO: Move some parts into hooks package - -var hookHandler hooks.HookHandler = nil - -func preCreateCallback(event handler.HookEvent) (handler.HTTPResponse, handler.FileInfoChanges, error) { - ok, hookRes, err := invokeHookSync(hooks.HookPreCreate, event) - if !ok || err != nil { - return handler.HTTPResponse{}, handler.FileInfoChanges{}, err - } - - httpRes := hookRes.HTTPResponse - - // If the hook response includes the instruction to reject the upload, reuse the error code - // and message from ErrUploadRejectedByServer, but also include custom HTTP response values. - if hookRes.RejectUpload { - err := handler.ErrUploadRejectedByServer - err.HTTPResponse = err.HTTPResponse.MergeWith(httpRes) - - return handler.HTTPResponse{}, handler.FileInfoChanges{}, err - } - - // Pass any changes regarding file info from the hook to the handler. - changes := hookRes.ChangeFileInfo - return httpRes, changes, nil -} - -func preFinishCallback(event handler.HookEvent) (handler.HTTPResponse, error) { - ok, hookRes, err := invokeHookSync(hooks.HookPreFinish, event) - if !ok || err != nil { - return handler.HTTPResponse{}, err - } - - httpRes := hookRes.HTTPResponse - return httpRes, nil -} - -func postReceiveCallback(event handler.HookEvent) { - ok, hookRes, _ := invokeHookSync(hooks.HookPostReceive, event) - // invokeHookSync already logs the error, if any occurs. So by checking `ok`, we can ensure - // that the hook finished successfully - if !ok { - return - } - - if hookRes.StopUpload { - logEv(stdout, "HookStopUpload", "id", event.Upload.ID) - - // TODO: Control response for PATCH request - event.Upload.StopUpload() - } -} - -func SetupHookMetrics() { - MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostFinish)).Add(0) - MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostTerminate)).Add(0) - MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostReceive)).Add(0) - MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostCreate)).Add(0) - MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0) - MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreFinish)).Add(0) - MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostFinish)).Add(0) - MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostTerminate)).Add(0) - MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostReceive)).Add(0) - MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPostCreate)).Add(0) - MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0) - MetricsHookInvocationsTotal.WithLabelValues(string(hooks.HookPreFinish)).Add(0) -} - -func SetupPreHooks(config *handler.Config) error { +func getHookHandler(config *handler.Config) hooks.HookHandler { if Flags.FileHooksDir != "" { stdout.Printf("Using '%s' for hooks", Flags.FileHooksDir) - hookHandler = &file.FileHook{ + return &file.FileHook{ Directory: Flags.FileHooksDir, } } else if Flags.HttpHooksEndpoint != "" { stdout.Printf("Using '%s' as the endpoint for hooks", Flags.HttpHooksEndpoint) - hookHandler = &http.HttpHook{ + return &http.HttpHook{ Endpoint: Flags.HttpHooksEndpoint, MaxRetries: Flags.HttpHooksRetry, Backoff: Flags.HttpHooksBackoff, @@ -98,7 +30,7 @@ func SetupPreHooks(config *handler.Config) error { } else if Flags.GrpcHooksEndpoint != "" { stdout.Printf("Using '%s' as the endpoint for gRPC hooks", Flags.GrpcHooksEndpoint) - hookHandler = &grpc.GrpcHook{ + return &grpc.GrpcHook{ Endpoint: Flags.GrpcHooksEndpoint, MaxRetries: Flags.GrpcHooksRetry, Backoff: Flags.GrpcHooksBackoff, @@ -106,89 +38,10 @@ func SetupPreHooks(config *handler.Config) error { } else if Flags.PluginHookPath != "" { stdout.Printf("Using '%s' to load plugin for hooks", Flags.PluginHookPath) - hookHandler = &plugin.PluginHook{ + return &plugin.PluginHook{ Path: Flags.PluginHookPath, } } else { return nil } - - var enabledHooksString []string - for _, h := range Flags.EnabledHooks { - enabledHooksString = append(enabledHooksString, string(h)) - } - - stdout.Printf("Enabled hook events: %s", strings.Join(enabledHooksString, ", ")) - - if err := hookHandler.Setup(); err != nil { - return err - } - - config.PreUploadCreateCallback = preCreateCallback - config.PreFinishResponseCallback = preFinishCallback - - return nil -} - -func SetupPostHooks(handler *handler.Handler) { - go func() { - for { - select { - case event := <-handler.CompleteUploads: - invokeHookAsync(hooks.HookPostFinish, event) - case event := <-handler.TerminatedUploads: - invokeHookAsync(hooks.HookPostTerminate, event) - case event := <-handler.CreatedUploads: - invokeHookAsync(hooks.HookPostCreate, event) - case event := <-handler.UploadProgress: - go postReceiveCallback(event) - } - } - }() -} - -func invokeHookAsync(typ hooks.HookType, event handler.HookEvent) { - go func() { - // Error handling is taken care by the function. - _, _, _ = invokeHookSync(typ, event) - }() -} - -// invokeHookSync executes a hook of the given type with the given event data. If -// the hook was not executed properly (e.g. an error occurred or not handler is installed), -// `ok` will be false and `res` is not filled. `err` can contain the underlying error. -// If `ok` is true, `res` contains the response as retrieved from the hook. -// Therefore, a caller should always check `ok` and `err` before assuming that the -// hook completed successfully. -func invokeHookSync(typ hooks.HookType, event handler.HookEvent) (ok bool, res hooks.HookResponse, err error) { - // Stop, if no hook handler is installed or this hook event is not enabled - if hookHandler == nil || !slices.Contains(Flags.EnabledHooks, typ) { - return false, hooks.HookResponse{}, nil - } - - MetricsHookInvocationsTotal.WithLabelValues(string(typ)).Add(1) - - id := event.Upload.ID - - if Flags.VerboseOutput { - logEv(stdout, "HookInvocationStart", "type", string(typ), "id", id) - } - - res, err = hookHandler.InvokeHook(hooks.HookRequest{ - Type: typ, - Event: event, - }) - if err != nil { - // If an error occurs during the hook execution, we log and track the error, but do not - // return a hook response. - logEv(stderr, "HookInvocationError", "type", string(typ), "id", id, "error", err.Error()) - MetricsHookErrorsTotal.WithLabelValues(string(typ)).Add(1) - return false, hooks.HookResponse{}, err - } - - if Flags.VerboseOutput { - logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", id) - } - - return true, res, nil } diff --git a/cmd/tusd/cli/metrics.go b/cmd/tusd/cli/metrics.go index 9fcb5656a..5a92fbc0d 100644 --- a/cmd/tusd/cli/metrics.go +++ b/cmd/tusd/cli/metrics.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/tus/tusd/v2/pkg/handler" + "github.com/tus/tusd/v2/pkg/hooks" "github.com/tus/tusd/v2/pkg/prometheuscollector" "github.com/prometheus/client_golang/prometheus" @@ -15,26 +16,10 @@ var MetricsOpenConnections = prometheus.NewGauge(prometheus.GaugeOpts{ Help: "Current number of open connections.", }) -var MetricsHookErrorsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "tusd_hook_errors_total", - Help: "Total number of execution errors per hook type.", - }, - []string{"hooktype"}, -) - -var MetricsHookInvocationsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "tusd_hook_invocations_total", - Help: "Total number of invocations per hook type.", - }, - []string{"hooktype"}, -) - func SetupMetrics(mux *http.ServeMux, handler *handler.Handler) { prometheus.MustRegister(MetricsOpenConnections) - prometheus.MustRegister(MetricsHookErrorsTotal) - prometheus.MustRegister(MetricsHookInvocationsTotal) + prometheus.MustRegister(hooks.MetricsHookErrorsTotal) + prometheus.MustRegister(hooks.MetricsHookInvocationsTotal) prometheus.MustRegister(prometheuscollector.New(handler.Metrics)) stdout.Printf("Using %s as the metrics path.\n", Flags.MetricsPath) diff --git a/cmd/tusd/cli/serve.go b/cmd/tusd/cli/serve.go index 4a2908bd6..984f03d3a 100644 --- a/cmd/tusd/cli/serve.go +++ b/cmd/tusd/cli/serve.go @@ -13,6 +13,8 @@ import ( "time" "github.com/tus/tusd/v2/pkg/handler" + handlerpkg "github.com/tus/tusd/v2/pkg/handler" + "github.com/tus/tusd/v2/pkg/hooks" ) const ( @@ -36,22 +38,31 @@ func Serve() { DisableTermination: Flags.DisableTermination, DisableCors: Flags.DisableCors, StoreComposer: Composer, - NotifyCompleteUploads: true, - NotifyTerminatedUploads: true, - NotifyUploadProgress: true, - NotifyCreatedUploads: true, UploadProgressInterval: time.Duration(Flags.ProgressHooksInterval) * time.Millisecond, } - if err := SetupPreHooks(&config); err != nil { - stderr.Fatalf("Unable to setup hooks for handler: %s", err) - } + var handler *handlerpkg.Handler + var err error + hookHandler := getHookHandler(&config) + if hookHandler != nil { + handler, err = hooks.NewHandlerWithHooks(&config, hookHandler, Flags.EnabledHooks) + + var enabledHooksString []string + for _, h := range Flags.EnabledHooks { + enabledHooksString = append(enabledHooksString, string(h)) + } - handler, err := handler.NewHandler(config) + stdout.Printf("Enabled hook events: %s", strings.Join(enabledHooksString, ", ")) + + } else { + handler, err = handlerpkg.NewHandler(config) + } if err != nil { stderr.Fatalf("Unable to create handler: %s", err) } + stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions()) + basepath := Flags.Basepath address := "" @@ -65,10 +76,6 @@ func Serve() { stdout.Printf("Using %s as the base path.\n", basepath) - SetupPostHooks(handler) - - stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions()) - mux := http.NewServeMux() if basepath == "/" { // If the basepath is set to the root path, only install the tusd handler @@ -91,7 +98,7 @@ func Serve() { if Flags.ExposeMetrics { SetupMetrics(mux, handler) - SetupHookMetrics() + hooks.SetupHookMetrics() } if Flags.ExposePprof { diff --git a/pkg/hooks/hooks.go b/pkg/hooks/hooks.go index 3162b7059..09d105350 100644 --- a/pkg/hooks/hooks.go +++ b/pkg/hooks/hooks.go @@ -2,7 +2,11 @@ package hooks import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" "github.com/tus/tusd/v2/pkg/handler" + "golang.org/x/exp/slices" ) // HookHandler is the main inferface to be implemented by all hook backends. @@ -72,4 +76,174 @@ const ( HookPreFinish HookType = "pre-finish" ) +// AvailableHooks is a slice of all hooks that are implemented by tusd. var AvailableHooks []HookType = []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish} + +func preCreateCallback(event handler.HookEvent, hookHandler HookHandler) (handler.HTTPResponse, handler.FileInfoChanges, error) { + ok, hookRes, err := invokeHookSync(HookPreCreate, event, hookHandler) + if !ok || err != nil { + return handler.HTTPResponse{}, handler.FileInfoChanges{}, err + } + + httpRes := hookRes.HTTPResponse + + // If the hook response includes the instruction to reject the upload, reuse the error code + // and message from ErrUploadRejectedByServer, but also include custom HTTP response values. + if hookRes.RejectUpload { + err := handler.ErrUploadRejectedByServer + err.HTTPResponse = err.HTTPResponse.MergeWith(httpRes) + + return handler.HTTPResponse{}, handler.FileInfoChanges{}, err + } + + // Pass any changes regarding file info from the hook to the handler. + changes := hookRes.ChangeFileInfo + return httpRes, changes, nil +} + +func preFinishCallback(event handler.HookEvent, hookHandler HookHandler) (handler.HTTPResponse, error) { + ok, hookRes, err := invokeHookSync(HookPreFinish, event, hookHandler) + if !ok || err != nil { + return handler.HTTPResponse{}, err + } + + httpRes := hookRes.HTTPResponse + return httpRes, nil +} + +func postReceiveCallback(event handler.HookEvent, hookHandler HookHandler) { + ok, hookRes, _ := invokeHookSync(HookPostReceive, event, hookHandler) + // invokeHookSync already logs the error, if any occurs. So by checking `ok`, we can ensure + // that the hook finished successfully + if !ok { + return + } + + if hookRes.StopUpload { + // logEv(stdout, "HookStopUpload", "id", event.Upload.ID) + + // TODO: Control response for PATCH request + event.Upload.StopUpload() + } +} + +var MetricsHookErrorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "tusd_hook_errors_total", + Help: "Total number of execution errors per hook type.", + }, + []string{"hooktype"}, +) + +var MetricsHookInvocationsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "tusd_hook_invocations_total", + Help: "Total number of invocations per hook type.", + }, + []string{"hooktype"}, +) + +func SetupHookMetrics() { + MetricsHookErrorsTotal.WithLabelValues(string(HookPostFinish)).Add(0) + MetricsHookErrorsTotal.WithLabelValues(string(HookPostTerminate)).Add(0) + MetricsHookErrorsTotal.WithLabelValues(string(HookPostReceive)).Add(0) + MetricsHookErrorsTotal.WithLabelValues(string(HookPostCreate)).Add(0) + MetricsHookErrorsTotal.WithLabelValues(string(HookPreCreate)).Add(0) + MetricsHookErrorsTotal.WithLabelValues(string(HookPreFinish)).Add(0) + MetricsHookInvocationsTotal.WithLabelValues(string(HookPostFinish)).Add(0) + MetricsHookInvocationsTotal.WithLabelValues(string(HookPostTerminate)).Add(0) + MetricsHookInvocationsTotal.WithLabelValues(string(HookPostReceive)).Add(0) + MetricsHookInvocationsTotal.WithLabelValues(string(HookPostCreate)).Add(0) + MetricsHookInvocationsTotal.WithLabelValues(string(HookPreCreate)).Add(0) + MetricsHookInvocationsTotal.WithLabelValues(string(HookPreFinish)).Add(0) +} + +func invokeHookAsync(typ HookType, event handler.HookEvent, hookHandler HookHandler) { + go func() { + // Error handling is taken care by the function. + _, _, _ = invokeHookSync(typ, event, hookHandler) + }() +} + +// invokeHookSync executes a hook of the given type with the given event data. If +// the hook was not executed properly (e.g. an error occurred or not handler is installed), +// `ok` will be false and `res` is not filled. `err` can contain the underlying error. +// If `ok` is true, `res` contains the response as retrieved from the hook. +// Therefore, a caller should always check `ok` and `err` before assuming that the +// hook completed successfully. +func invokeHookSync(typ HookType, event handler.HookEvent, hookHandler HookHandler) (ok bool, res HookResponse, err error) { + MetricsHookInvocationsTotal.WithLabelValues(string(typ)).Add(1) + + // id := event.Upload.ID + + // TODO: Re-enable logging with structured logging package slog. + // if Flags.VerboseOutput { + // logEv(stdout, "HookInvocationStart", "type", string(typ), "id", id) + // } + + res, err = hookHandler.InvokeHook(HookRequest{ + Type: typ, + Event: event, + }) + if err != nil { + // If an error occurs during the hook execution, we log and track the error, but do not + // return a hook response. + // logEv(stderr, "HookInvocationError", "type", string(typ), "id", id, "error", err.Error()) + MetricsHookErrorsTotal.WithLabelValues(string(typ)).Add(1) + return false, HookResponse{}, err + } + + // if Flags.VerboseOutput { + // logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", id) + // } + + return true, res, nil +} + +func NewHandlerWithHooks(config *handler.Config, hookHandler HookHandler, enabledHooks []HookType) (*handler.Handler, error) { + if err := hookHandler.Setup(); err != nil { + return nil, fmt.Errorf("unable to setup hooks for handler: %s", err) + } + + // Activate notifications for post-* hooks + config.NotifyCompleteUploads = slices.Contains(enabledHooks, HookPostFinish) + config.NotifyTerminatedUploads = slices.Contains(enabledHooks, HookPostTerminate) + config.NotifyUploadProgress = slices.Contains(enabledHooks, HookPostReceive) + config.NotifyCreatedUploads = slices.Contains(enabledHooks, HookPostCreate) + + // Install callbacks for pre-* hooks + if slices.Contains(enabledHooks, HookPreCreate) { + config.PreUploadCreateCallback = func(event handler.HookEvent) (handler.HTTPResponse, handler.FileInfoChanges, error) { + return preCreateCallback(event, hookHandler) + } + } + if slices.Contains(enabledHooks, HookPreFinish) { + config.PreFinishResponseCallback = func(event handler.HookEvent) (handler.HTTPResponse, error) { + return preFinishCallback(event, hookHandler) + } + } + + // Create handler + handler, err := handler.NewHandler(*config) + if err != nil { + return nil, err + } + + // Listen for notifications for post-* hooks + go func() { + for { + select { + case event := <-handler.CompleteUploads: + invokeHookAsync(HookPostFinish, event, hookHandler) + case event := <-handler.TerminatedUploads: + invokeHookAsync(HookPostTerminate, event, hookHandler) + case event := <-handler.CreatedUploads: + invokeHookAsync(HookPostCreate, event, hookHandler) + case event := <-handler.UploadProgress: + go postReceiveCallback(event, hookHandler) + } + } + }() + + return handler, nil +}