From 3002b02174266d5e64671afce65a73c39057d6fc Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 31 Jan 2023 00:44:40 -0800 Subject: [PATCH 1/2] Add metrics + max buffer size to schedule workflow --- common/metrics/metric_defs.go | 13 ++++++++ service/worker/scheduler/workflow.go | 47 +++++++++++++++++++--------- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 2836a689559..89fc99a04da 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1311,6 +1311,12 @@ const ( TaskTypeTimerStandbyTaskDeleteHistoryEvent = "TimerStandbyTaskDeleteHistoryEvent" ) +// Schedule action types +const ( + ScheduleActionTypeTag = "schedule_action" + ScheduleActionStartWorkflow = "start_workflow" +) + var ( ServiceRequests = NewCounterDef("service_requests") ServicePendingRequests = NewGaugeDef("service_pending_requests") @@ -1703,6 +1709,13 @@ var ( NamespaceReplicationEnqueueDLQCount = NewCounterDef("namespace_replication_dlq_enqueue_requests") ParentClosePolicyProcessorSuccess = NewCounterDef("parent_close_policy_processor_requests") ParentClosePolicyProcessorFailures = NewCounterDef("parent_close_policy_processor_errors") + ScheduleMissedCatchupWindow = NewCounterDef("schedule_missed_catchup_window") + ScheduleRateLimited = NewCounterDef("schedule_rate_limited") + ScheduleBufferOverruns = NewCounterDef("schedule_buffer_overruns") + ScheduleActionSuccess = NewCounterDef("schedule_action_success") + ScheduleActionErrors = NewCounterDef("schedule_action_errors") + ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors") + ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors") // Replication NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level") diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 8154f4f4981..eeec7eda65c 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -38,11 +38,13 @@ import ( schedpb "go.temporal.io/api/schedule/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + sdkclient "go.temporal.io/sdk/client" sdklog "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" schedspb "go.temporal.io/server/api/schedule/v1" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" @@ -81,9 +83,10 @@ type ( scheduler struct { schedspb.StartScheduleArgs - ctx workflow.Context - a *activities - logger sdklog.Logger + ctx workflow.Context + a *activities + logger sdklog.Logger + metrics sdkclient.MetricsHandler cspec *CompiledSpec @@ -111,6 +114,9 @@ type ( RecentActionCountForList int // The number of recent actual action results to include in List (search attr). IterationsBeforeContinueAsNew int SleepWhilePaused bool // If true, don't set timers while paused/out of actions + // MaxBufferSize limits the number of buffered starts. This also limits the number of + // workflows that can be backfilled at once (since they all have to fit in the buffer). + MaxBufferSize int } ) @@ -141,6 +147,7 @@ var ( RecentActionCountForList: 5, IterationsBeforeContinueAsNew: 500, SleepWhilePaused: true, + MaxBufferSize: 1000, } errUpdateConflict = errors.New("conflicting concurrent update") @@ -151,13 +158,14 @@ func SchedulerWorkflow(ctx workflow.Context, args *schedspb.StartScheduleArgs) e StartScheduleArgs: *args, ctx: ctx, a: nil, - logger: sdklog.With(workflow.GetLogger(ctx), "schedule-id", args.State.ScheduleId), + logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", args.State.Namespace, "schedule-id", args.State.ScheduleId), + metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": args.State.Namespace}), } return scheduler.run() } func (s *scheduler) run() error { - s.logger.Info("Schedule starting", "schedule", s.Schedule) + s.logger.Debug("Schedule starting", "schedule", s.Schedule) s.updateTweakables() s.ensureFields() @@ -218,7 +226,7 @@ func (s *scheduler) run() error { // Any watcher activities will get cancelled automatically if running. - s.logger.Info("Schedule doing continue-as-new") + s.logger.Debug("Schedule doing continue-as-new") return workflow.NewContinueAsNewError(s.ctx, WorkflowType, &s.StartScheduleArgs) } @@ -287,7 +295,7 @@ func (s *scheduler) now() time.Time { } func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) { - s.logger.Info("Schedule patch", "patch", patch.String()) + s.logger.Debug("Schedule patch", "patch", patch.String()) if trigger := patch.TriggerImmediately; trigger != nil { now := s.now() @@ -343,6 +351,7 @@ func (s *scheduler) processTimeRange( } if !manual && t2.Sub(t1) > catchupWindow { s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1) + s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.GetMetricName()).Inc(1) s.Info.MissedCatchupWindow++ continue } @@ -456,10 +465,10 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) { s.Schedule.State.Paused = true if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED { s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow failure: %s: %s", id, res.GetFailure().GetMessage()) - s.logger.Info("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage()) + s.logger.Debug("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage()) } else if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT { s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow timeout: %s", id) - s.logger.Info("paused due to workflow timeout", "workflow", id) + s.logger.Debug("paused due to workflow timeout", "workflow", id) } s.incSeqNo() } @@ -473,7 +482,7 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) { s.State.ContinuedFailure = res.GetFailure() } - s.logger.Info("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure) + s.logger.Debug("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure) } func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) { @@ -482,7 +491,7 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) { return } - s.logger.Info("Schedule update", "new-schedule", req.Schedule.String()) + s.logger.Debug("Schedule update", "new-schedule", req.Schedule.String()) s.Schedule.Spec = req.Schedule.GetSpec() s.Schedule.Action = req.Schedule.GetAction() @@ -673,6 +682,11 @@ func (s *scheduler) resolveOverlapPolicy(overlapPolicy enumspb.ScheduleOverlapPo func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool) { s.logger.Debug("addStart", "nominal", nominalTime, "actual", actualTime, "overlapPolicy", overlapPolicy, "manual", manual) + if s.tweakables.MaxBufferSize > 0 && len(s.State.BufferedStarts) >= s.tweakables.MaxBufferSize { + s.logger.Error("Buffer too large") + s.metrics.Counter(metrics.ScheduleBufferOverruns.GetMetricName()).Inc(1) + return + } s.State.BufferedStarts = append(s.State.BufferedStarts, &schedspb.BufferedStart{ NominalTime: timestamp.TimePtr(nominalTime), ActualTime: timestamp.TimePtr(actualTime), @@ -727,14 +741,18 @@ func (s *scheduler) processBuffer() bool { continue } result, err := s.startWorkflow(start, req) + metricsWithTag := s.metrics.WithTags(map[string]string{ + metrics.ScheduleActionTypeTag: metrics.ScheduleActionStartWorkflow}) if err != nil { s.logger.Error("Failed to start workflow", "error", err) + metricsWithTag.Counter(metrics.ScheduleActionErrors.GetMetricName()).Inc(1) // TODO: we could put this back in the buffer and retry (after a delay) up until // the catchup window. of course, it's unlikely that this workflow would be making // progress while we're unable to start a new one, so maybe it's not that valuable. tryAgain = true continue } + metricsWithTag.Counter(metrics.ScheduleActionSuccess.GetMetricName()).Inc(1) s.recordAction(result) } @@ -826,9 +844,8 @@ func (s *scheduler) startWorkflow( var appErr *temporal.ApplicationError var details rateLimitedDetails - if errors.As(err, &appErr) && - appErr.Type() == rateLimitedErrorType && - appErr.Details(&details) == nil { + if errors.As(err, &appErr) && appErr.Type() == rateLimitedErrorType && appErr.Details(&details) == nil { + s.metrics.Counter(metrics.ScheduleRateLimited.GetMetricName()).Inc(1) workflow.Sleep(s.ctx, details.Delay) req.CompletedRateLimitSleep = true // only use rate limiter once continue @@ -920,6 +937,7 @@ func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) { err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil) if err != nil { s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err) + s.metrics.Counter(metrics.ScheduleCancelWorkflowErrors.GetMetricName()).Inc(1) } // Note: the local activity has completed (or failed) here but the workflow might take time // to close since a cancel is only a request. @@ -937,6 +955,7 @@ func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) { err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil) if err != nil { s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err) + s.metrics.Counter(metrics.ScheduleTerminateWorkflowErrors.GetMetricName()).Inc(1) } // Note: the local activity has completed (or failed) here but we'll still wait until we // observe the workflow close (with a watcher) to start the next one. From feb36d7ebb953cb8520b5ee22b18dcfa219f1d44 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 2 Feb 2023 13:13:26 -0800 Subject: [PATCH 2/2] tweak logs --- service/worker/scheduler/workflow.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index eeec7eda65c..bd67dc7ad89 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -27,8 +27,10 @@ package scheduler import ( "errors" "fmt" + "strings" "time" + "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/google/uuid" "golang.org/x/exp/slices" @@ -165,8 +167,6 @@ func SchedulerWorkflow(ctx workflow.Context, args *schedspb.StartScheduleArgs) e } func (s *scheduler) run() error { - s.logger.Debug("Schedule starting", "schedule", s.Schedule) - s.updateTweakables() s.ensureFields() s.compileSpec() @@ -179,7 +179,13 @@ func (s *scheduler) run() error { } if s.State.LastProcessedTime == nil { - s.logger.Debug("Initializing internal state") + // log these as json since it's more readable than the Go representation + var m jsonpb.Marshaler + var specJson, policiesJson strings.Builder + _ = m.Marshal(&specJson, s.Schedule.Spec) + _ = m.Marshal(&policiesJson, s.Schedule.Policies) + s.logger.Info("Starting schedule", "spec", specJson.String(), "policies", policiesJson.String()) + s.State.LastProcessedTime = timestamp.TimePtr(s.now()) s.State.ConflictToken = InitialConflictToken s.Info.CreateTime = s.State.LastProcessedTime @@ -328,7 +334,7 @@ func (s *scheduler) processTimeRange( overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool, ) time.Duration { - s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual) + s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlap-policy", overlapPolicy, "manual", manual) if s.cspec == nil { return invalidDuration @@ -419,7 +425,7 @@ func (s *scheduler) sleep(nextSleep time.Duration) { sel.AddFuture(s.watchingFuture, s.wfWatcherReturned) } - s.logger.Debug("sleeping", "nextSleep", nextSleep, "watching", s.watchingFuture != nil) + s.logger.Debug("sleeping", "next-sleep", nextSleep, "watching", s.watchingFuture != nil) sel.Select(s.ctx) for sel.HasPending() { sel.Select(s.ctx) @@ -681,9 +687,9 @@ func (s *scheduler) resolveOverlapPolicy(overlapPolicy enumspb.ScheduleOverlapPo } func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool) { - s.logger.Debug("addStart", "nominal", nominalTime, "actual", actualTime, "overlapPolicy", overlapPolicy, "manual", manual) + s.logger.Debug("addStart", "start-time", nominalTime, "actual-start-time", actualTime, "overlap-policy", overlapPolicy, "manual", manual) if s.tweakables.MaxBufferSize > 0 && len(s.State.BufferedStarts) >= s.tweakables.MaxBufferSize { - s.logger.Error("Buffer too large") + s.logger.Warn("Buffer too large", "start-time", nominalTime, "overlap-policy", overlapPolicy, "manual", manual) s.metrics.Counter(metrics.ScheduleBufferOverruns.GetMetricName()).Inc(1) return } @@ -702,7 +708,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en // //nolint:revive func (s *scheduler) processBuffer() bool { - s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh) + s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "need-refresh", s.State.NeedRefresh) // TODO: consider doing this always and removing needRefresh? we only end up here without // needRefresh in the case of update, or patch without an immediate run, so it's not much