Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics + max buffer size to schedule workflow #3872

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,12 @@ const (
TaskTypeTimerStandbyTaskDeleteHistoryEvent = "TimerStandbyTaskDeleteHistoryEvent"
)

// Schedule action types
const (
ScheduleActionTypeTag = "schedule_action"
ScheduleActionStartWorkflow = "start_workflow"
)

var (
ServiceRequests = NewCounterDef("service_requests")
ServicePendingRequests = NewGaugeDef("service_pending_requests")
Expand Down Expand Up @@ -1703,6 +1709,13 @@ var (
NamespaceReplicationEnqueueDLQCount = NewCounterDef("namespace_replication_dlq_enqueue_requests")
ParentClosePolicyProcessorSuccess = NewCounterDef("parent_close_policy_processor_requests")
ParentClosePolicyProcessorFailures = NewCounterDef("parent_close_policy_processor_errors")
ScheduleMissedCatchupWindow = NewCounterDef("schedule_missed_catchup_window")
ScheduleRateLimited = NewCounterDef("schedule_rate_limited")
ScheduleBufferOverruns = NewCounterDef("schedule_buffer_overruns")
ScheduleActionSuccess = NewCounterDef("schedule_action_success")
ScheduleActionErrors = NewCounterDef("schedule_action_errors")
ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors")
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
65 changes: 45 additions & 20 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ package scheduler
import (
"errors"
"fmt"
"strings"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"golang.org/x/exp/slices"
Expand All @@ -38,11 +40,13 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -81,9 +85,10 @@ type (
scheduler struct {
schedspb.StartScheduleArgs

ctx workflow.Context
a *activities
logger sdklog.Logger
ctx workflow.Context
a *activities
logger sdklog.Logger
metrics sdkclient.MetricsHandler

cspec *CompiledSpec

Expand Down Expand Up @@ -111,6 +116,9 @@ type (
RecentActionCountForList int // The number of recent actual action results to include in List (search attr).
IterationsBeforeContinueAsNew int
SleepWhilePaused bool // If true, don't set timers while paused/out of actions
// MaxBufferSize limits the number of buffered starts. This also limits the number of
// workflows that can be backfilled at once (since they all have to fit in the buffer).
MaxBufferSize int
}
)

Expand Down Expand Up @@ -141,6 +149,7 @@ var (
RecentActionCountForList: 5,
IterationsBeforeContinueAsNew: 500,
SleepWhilePaused: true,
MaxBufferSize: 1000,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand All @@ -151,14 +160,13 @@ func SchedulerWorkflow(ctx workflow.Context, args *schedspb.StartScheduleArgs) e
StartScheduleArgs: *args,
ctx: ctx,
a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "schedule-id", args.State.ScheduleId),
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", args.State.Namespace, "schedule-id", args.State.ScheduleId),
metrics: workflow.GetMetricsHandler(ctx).WithTags(map[string]string{"namespace": args.State.Namespace}),
dnr marked this conversation as resolved.
Show resolved Hide resolved
}
return scheduler.run()
}

func (s *scheduler) run() error {
s.logger.Info("Schedule starting", "schedule", s.Schedule)

s.updateTweakables()
s.ensureFields()
s.compileSpec()
Expand All @@ -171,7 +179,13 @@ func (s *scheduler) run() error {
}

if s.State.LastProcessedTime == nil {
s.logger.Debug("Initializing internal state")
// log these as json since it's more readable than the Go representation
var m jsonpb.Marshaler
var specJson, policiesJson strings.Builder
_ = m.Marshal(&specJson, s.Schedule.Spec)
_ = m.Marshal(&policiesJson, s.Schedule.Policies)
s.logger.Info("Starting schedule", "spec", specJson.String(), "policies", policiesJson.String())

s.State.LastProcessedTime = timestamp.TimePtr(s.now())
s.State.ConflictToken = InitialConflictToken
s.Info.CreateTime = s.State.LastProcessedTime
Expand Down Expand Up @@ -218,7 +232,7 @@ func (s *scheduler) run() error {

// Any watcher activities will get cancelled automatically if running.

s.logger.Info("Schedule doing continue-as-new")
s.logger.Debug("Schedule doing continue-as-new")
return workflow.NewContinueAsNewError(s.ctx, WorkflowType, &s.StartScheduleArgs)
}

Expand Down Expand Up @@ -287,7 +301,7 @@ func (s *scheduler) now() time.Time {
}

func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
s.logger.Info("Schedule patch", "patch", patch.String())
s.logger.Debug("Schedule patch", "patch", patch.String())

if trigger := patch.TriggerImmediately; trigger != nil {
now := s.now()
Expand Down Expand Up @@ -320,7 +334,7 @@ func (s *scheduler) processTimeRange(
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
) time.Duration {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlap-policy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return invalidDuration
Expand All @@ -343,6 +357,7 @@ func (s *scheduler) processTimeRange(
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.GetMetricName()).Inc(1)
s.Info.MissedCatchupWindow++
continue
}
Expand Down Expand Up @@ -410,7 +425,7 @@ func (s *scheduler) sleep(nextSleep time.Duration) {
sel.AddFuture(s.watchingFuture, s.wfWatcherReturned)
}

s.logger.Debug("sleeping", "nextSleep", nextSleep, "watching", s.watchingFuture != nil)
s.logger.Debug("sleeping", "next-sleep", nextSleep, "watching", s.watchingFuture != nil)
sel.Select(s.ctx)
for sel.HasPending() {
sel.Select(s.ctx)
Expand Down Expand Up @@ -456,10 +471,10 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.Schedule.State.Paused = true
if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED {
s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow failure: %s: %s", id, res.GetFailure().GetMessage())
s.logger.Info("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage())
s.logger.Debug("paused due to workflow failure", "workflow", id, "message", res.GetFailure().GetMessage())
} else if res.Status == enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT {
s.Schedule.State.Notes = fmt.Sprintf("paused due to workflow timeout: %s", id)
s.logger.Info("paused due to workflow timeout", "workflow", id)
s.logger.Debug("paused due to workflow timeout", "workflow", id)
}
s.incSeqNo()
}
Expand All @@ -473,7 +488,7 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) {
s.State.ContinuedFailure = res.GetFailure()
}

s.logger.Info("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
s.logger.Debug("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure)
}

func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
Expand All @@ -482,7 +497,7 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
return
}

s.logger.Info("Schedule update", "new-schedule", req.Schedule.String())
s.logger.Debug("Schedule update", "new-schedule", req.Schedule.String())

s.Schedule.Spec = req.Schedule.GetSpec()
s.Schedule.Action = req.Schedule.GetAction()
Expand Down Expand Up @@ -672,7 +687,12 @@ func (s *scheduler) resolveOverlapPolicy(overlapPolicy enumspb.ScheduleOverlapPo
}

func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool) {
s.logger.Debug("addStart", "nominal", nominalTime, "actual", actualTime, "overlapPolicy", overlapPolicy, "manual", manual)
s.logger.Debug("addStart", "start-time", nominalTime, "actual-start-time", actualTime, "overlap-policy", overlapPolicy, "manual", manual)
if s.tweakables.MaxBufferSize > 0 && len(s.State.BufferedStarts) >= s.tweakables.MaxBufferSize {
s.logger.Warn("Buffer too large", "start-time", nominalTime, "overlap-policy", overlapPolicy, "manual", manual)
s.metrics.Counter(metrics.ScheduleBufferOverruns.GetMetricName()).Inc(1)
return
}
s.State.BufferedStarts = append(s.State.BufferedStarts, &schedspb.BufferedStart{
NominalTime: timestamp.TimePtr(nominalTime),
ActualTime: timestamp.TimePtr(actualTime),
Expand All @@ -688,7 +708,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
//
//nolint:revive
func (s *scheduler) processBuffer() bool {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "need-refresh", s.State.NeedRefresh)

// TODO: consider doing this always and removing needRefresh? we only end up here without
// needRefresh in the case of update, or patch without an immediate run, so it's not much
Expand Down Expand Up @@ -727,14 +747,18 @@ func (s *scheduler) processBuffer() bool {
continue
}
result, err := s.startWorkflow(start, req)
metricsWithTag := s.metrics.WithTags(map[string]string{
metrics.ScheduleActionTypeTag: metrics.ScheduleActionStartWorkflow})
if err != nil {
s.logger.Error("Failed to start workflow", "error", err)
metricsWithTag.Counter(metrics.ScheduleActionErrors.GetMetricName()).Inc(1)
// TODO: we could put this back in the buffer and retry (after a delay) up until
// the catchup window. of course, it's unlikely that this workflow would be making
// progress while we're unable to start a new one, so maybe it's not that valuable.
tryAgain = true
continue
}
metricsWithTag.Counter(metrics.ScheduleActionSuccess.GetMetricName()).Inc(1)
s.recordAction(result)
}

Expand Down Expand Up @@ -826,9 +850,8 @@ func (s *scheduler) startWorkflow(

var appErr *temporal.ApplicationError
var details rateLimitedDetails
if errors.As(err, &appErr) &&
appErr.Type() == rateLimitedErrorType &&
appErr.Details(&details) == nil {
if errors.As(err, &appErr) && appErr.Type() == rateLimitedErrorType && appErr.Details(&details) == nil {
s.metrics.Counter(metrics.ScheduleRateLimited.GetMetricName()).Inc(1)
workflow.Sleep(s.ctx, details.Delay)
req.CompletedRateLimitSleep = true // only use rate limiter once
continue
Expand Down Expand Up @@ -920,6 +943,7 @@ func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
s.metrics.Counter(metrics.ScheduleCancelWorkflowErrors.GetMetricName()).Inc(1)
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
Expand All @@ -937,6 +961,7 @@ func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
s.metrics.Counter(metrics.ScheduleTerminateWorkflowErrors.GetMetricName()).Inc(1)
}
// Note: the local activity has completed (or failed) here but we'll still wait until we
// observe the workflow close (with a watcher) to start the next one.
Expand Down