Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial memo to scheduler workflows #3839

Merged
merged 3 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2982,6 +2982,8 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
if err != nil {
return nil, err
}
// Add initial memo for list schedules
wh.addInitialScheduleMemo(request, input)
// Add namespace division
searchattribute.AddSearchAttribute(&request.SearchAttributes, searchattribute.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision))
// Create StartWorkflowExecutionRequest
Expand Down Expand Up @@ -4837,6 +4839,28 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo
return memo
}

// This mutates request (but idempotent so safe for retries)
func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) {
info := scheduler.GetListInfoFromStartArgs(args)
infoBytes, err := info.Marshal()
if err != nil {
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
return
}
p, err := sdk.PreferProtoDataConverter.ToPayload(infoBytes)
if err != nil {
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
return
}
if request.Memo == nil {
request.Memo = &commonpb.Memo{}
}
if request.Memo.Fields == nil {
request.Memo.Fields = make(map[string]*commonpb.Payload)
}
request.Memo.Fields[scheduler.MemoFieldInfo] = p
}

func getBatchOperationState(workflowState enumspb.WorkflowExecutionStatus) enumspb.BatchOperationState {
var operationState enumspb.BatchOperationState
switch workflowState {
Expand Down
21 changes: 20 additions & 1 deletion service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ func (s *scheduler) ensureFields() {
func (s *scheduler) compileSpec() {
cspec, err := NewCompiledSpec(s.Schedule.Spec)
if err != nil {
s.logger.Error("Invalid schedule", "error", err)
if s.logger != nil {
s.logger.Error("Invalid schedule", "error", err)
}
s.Info.InvalidScheduleError = err.Error()
s.cspec = nil
} else {
Expand Down Expand Up @@ -516,6 +518,8 @@ func (s *scheduler) processSignals() bool {
}

func (s *scheduler) getFutureActionTimes(n int) []*time.Time {
// Note that `s` may be a fake scheduler used to compute list info at creation time.

if s.cspec == nil {
return nil
}
Expand Down Expand Up @@ -571,6 +575,9 @@ func (s *scheduler) incSeqNo() {
}

func (s *scheduler) getListInfo() *schedpb.ScheduleListInfo {
// Note that `s` may be a fake scheduler used to compute list info at creation time, before
// the first workflow task. This function and anything it calls should not use s.ctx.

// make shallow copy
spec := *s.Schedule.Spec
// clear fields that are too large/not useful for the list view
Expand Down Expand Up @@ -676,6 +683,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
}

// processBuffer should return true if there might be more work to do right now.
//
//nolint:revive
func (s *scheduler) processBuffer() bool {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)
Expand Down Expand Up @@ -941,3 +949,14 @@ func panicIfErr(err error) {
panic(err)
}
}

func GetListInfoFromStartArgs(args *schedspb.StartScheduleArgs) *schedpb.ScheduleListInfo {
// note that this does not take into account InitialPatch
fakeScheduler := &scheduler{
StartScheduleArgs: *args,
tweakables: currentTweakablePolicies,
}
fakeScheduler.ensureFields()
fakeScheduler.compileSpec()
return fakeScheduler.getListInfo()
}
89 changes: 88 additions & 1 deletion tests/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
Expand Down Expand Up @@ -259,7 +260,7 @@ func (s *scheduleIntegrationSuite) TestBasics() {
Namespace: s.namespace,
MaximumPageSize: 5,
})
if err != nil || len(listResp.Schedules) != 1 || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 {
if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 {
return false
}
s.NoError(err)
Expand Down Expand Up @@ -466,6 +467,14 @@ func (s *scheduleIntegrationSuite) TestInput() {
_, err = s.engine.CreateSchedule(NewContext(), req)
s.NoError(err)
s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 1 }, 5*time.Second, 200*time.Millisecond)

// cleanup
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)
}

func (s *scheduleIntegrationSuite) TestRefresh() {
Expand Down Expand Up @@ -546,4 +555,82 @@ func (s *scheduleIntegrationSuite) TestRefresh() {
// scheduler has done some stuff
events3 := s.getHistory(s.namespace, &commonpb.WorkflowExecution{WorkflowId: scheduler.WorkflowIDPrefix + sid})
s.Greater(len(events3), len(events2))

// cleanup
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)
}

func (s *scheduleIntegrationSuite) TestListBeforeRun() {
sid := "sched-test-list-before-run"
wid := "sched-test-list-before-run-wf"
wt := "sched-test-list-before-run-wt"

// disable per-ns worker so that the schedule workflow never runs
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
time.Sleep(2 * time.Second)

schedule := &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{
Interval: []*schedulepb.IntervalSpec{
{Interval: timestamp.DurationPtr(3 * time.Second)},
},
},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue},
},
},
},
}
req := &workflowservice.CreateScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: uuid.New(),
}

_, err := s.engine.CreateSchedule(NewContext(), req)
s.NoError(err)

s.Eventually(func() bool { // wait for visibility
listResp, err := s.engine.ListSchedules(NewContext(), &workflowservice.ListSchedulesRequest{
Namespace: s.namespace,
MaximumPageSize: 5,
})
if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid {
return false
}
s.NoError(err)
entry := listResp.Schedules[0]
s.Equal(sid, entry.ScheduleId)
s.NotNil(entry.Info)
s.Equal(schedule.Spec, entry.Info.Spec)
s.Equal(wt, entry.Info.WorkflowType.Name)
s.False(entry.Info.Paused)
s.Greater(len(entry.Info.FutureActionTimes), 1)
return true
}, 10*time.Second, 1*time.Second)

// cleanup
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)

dc.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
time.Sleep(2 * time.Second)
}