From ff76e9bf38912fe0f1c62f307573d085a526aeca Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 23 Jan 2023 19:13:04 -0800 Subject: [PATCH 1/3] add initial memo on schedule workflows --- service/frontend/workflow_handler.go | 18 ++++++++++++++++++ service/worker/scheduler/workflow.go | 21 ++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 80c7eb19a09..cced6043db7 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -2982,6 +2982,8 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow if err != nil { return nil, err } + // Add initial memo for list schedules + wh.addInitialScheduleMemo(request, input) // Add namespace division searchattribute.AddSearchAttribute(&request.SearchAttributes, searchattribute.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision)) // Create StartWorkflowExecutionRequest @@ -4837,6 +4839,22 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo return memo } +// This mutates request (but idempotent so safe for retries) +func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) { + info := scheduler.GetListInfoFromStartArgs(args) + p, err := sdk.PreferProtoDataConverter.ToPayload(info) + if err != nil { + wh.logger.Error("encoding initial schedule memo failed", tag.Error(err)) + } + if request.Memo == nil { + request.Memo = &commonpb.Memo{} + } + if request.Memo.Fields == nil { + request.Memo.Fields = make(map[string]*commonpb.Payload) + } + request.Memo.Fields[scheduler.MemoFieldInfo] = p +} + func getBatchOperationState(workflowState enumspb.WorkflowExecutionStatus) enumspb.BatchOperationState { var operationState enumspb.BatchOperationState switch workflowState { diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index d1d46ee9464..53d4d18d5d8 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -252,7 +252,9 @@ func (s *scheduler) ensureFields() { func (s *scheduler) compileSpec() { cspec, err := NewCompiledSpec(s.Schedule.Spec) if err != nil { - s.logger.Error("Invalid schedule", "error", err) + if s.logger != nil { + s.logger.Error("Invalid schedule", "error", err) + } s.Info.InvalidScheduleError = err.Error() s.cspec = nil } else { @@ -516,6 +518,8 @@ func (s *scheduler) processSignals() bool { } func (s *scheduler) getFutureActionTimes(n int) []*time.Time { + // Note that `s` may be a fake scheduler used to compute list info at creation time. + if s.cspec == nil { return nil } @@ -571,6 +575,9 @@ func (s *scheduler) incSeqNo() { } func (s *scheduler) getListInfo() *schedpb.ScheduleListInfo { + // Note that `s` may be a fake scheduler used to compute list info at creation time, before + // the first workflow task. This function and anything it calls should not use s.ctx. + // make shallow copy spec := *s.Schedule.Spec // clear fields that are too large/not useful for the list view @@ -676,6 +683,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en } // processBuffer should return true if there might be more work to do right now. +// //nolint:revive func (s *scheduler) processBuffer() bool { s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh) @@ -941,3 +949,14 @@ func panicIfErr(err error) { panic(err) } } + +func GetListInfoFromStartArgs(args *schedspb.StartScheduleArgs) *schedpb.ScheduleListInfo { + // note that this does not take into account InitialPatch + fakeScheduler := &scheduler{ + StartScheduleArgs: *args, + tweakables: currentTweakablePolicies, + } + fakeScheduler.ensureFields() + fakeScheduler.compileSpec() + return fakeScheduler.getListInfo() +} From d19c0d812a6932d4835328b76ce08ebe95692973 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 23 Jan 2023 21:12:10 -0800 Subject: [PATCH 2/3] add test and fix --- service/frontend/workflow_handler.go | 8 ++- tests/schedule_test.go | 87 +++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index cced6043db7..bccfad4f8a2 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4842,9 +4842,15 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo // This mutates request (but idempotent so safe for retries) func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) { info := scheduler.GetListInfoFromStartArgs(args) - p, err := sdk.PreferProtoDataConverter.ToPayload(info) + infoBytes, err := info.Marshal() if err != nil { wh.logger.Error("encoding initial schedule memo failed", tag.Error(err)) + return + } + p, err := sdk.PreferProtoDataConverter.ToPayload(infoBytes) + if err != nil { + wh.logger.Error("encoding initial schedule memo failed", tag.Error(err)) + return } if request.Memo == nil { request.Memo = &commonpb.Memo{} diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 5eed6467935..163ba359fae 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" @@ -259,7 +260,7 @@ func (s *scheduleIntegrationSuite) TestBasics() { Namespace: s.namespace, MaximumPageSize: 5, }) - if err != nil || len(listResp.Schedules) != 1 || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 { + if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 { return false } s.NoError(err) @@ -466,6 +467,14 @@ func (s *scheduleIntegrationSuite) TestInput() { _, err = s.engine.CreateSchedule(NewContext(), req) s.NoError(err) s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 1 }, 5*time.Second, 200*time.Millisecond) + + // cleanup + _, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Identity: "test", + }) + s.NoError(err) } func (s *scheduleIntegrationSuite) TestRefresh() { @@ -546,4 +555,80 @@ func (s *scheduleIntegrationSuite) TestRefresh() { // scheduler has done some stuff events3 := s.getHistory(s.namespace, &commonpb.WorkflowExecution{WorkflowId: scheduler.WorkflowIDPrefix + sid}) s.Greater(len(events3), len(events2)) + + // cleanup + _, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Identity: "test", + }) + s.NoError(err) +} + +func (s *scheduleIntegrationSuite) TestListBeforeRun() { + sid := "sched-test-list-before-run" + wid := "sched-test-list-before-run-wf" + wt := "sched-test-list-before-run-wt" + + // disable per-ns worker so that the schedule workflow never runs + dc := s.testCluster.host.dcClient + dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0) + s.testCluster.host.workerService.RefreshPerNSWorkerManager() + + schedule := &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{ + Interval: []*schedulepb.IntervalSpec{ + {Interval: timestamp.DurationPtr(3 * time.Second)}, + }, + }, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: wid, + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue}, + }, + }, + }, + } + req := &workflowservice.CreateScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Schedule: schedule, + Identity: "test", + RequestId: uuid.New(), + } + + _, err := s.engine.CreateSchedule(NewContext(), req) + s.NoError(err) + + s.Eventually(func() bool { // wait for visibility + listResp, err := s.engine.ListSchedules(NewContext(), &workflowservice.ListSchedulesRequest{ + Namespace: s.namespace, + MaximumPageSize: 5, + }) + if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid { + return false + } + s.NoError(err) + entry := listResp.Schedules[0] + s.Equal(sid, entry.ScheduleId) + s.NotNil(entry.Info) + s.Equal(schedule.Spec, entry.Info.Spec) + s.Equal(wt, entry.Info.WorkflowType.Name) + s.False(entry.Info.Paused) + s.Greater(len(entry.Info.FutureActionTimes), 1) + return true + }, 10*time.Second, 1*time.Second) + + // cleanup + _, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{ + Namespace: s.namespace, + ScheduleId: sid, + Identity: "test", + }) + s.NoError(err) + + dc.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount) + s.testCluster.host.workerService.RefreshPerNSWorkerManager() } From 2e6279db5911d05d2cb97f2d7c94d558b87f8f77 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 25 Jan 2023 22:39:27 -0800 Subject: [PATCH 3/3] add sleeps after refresh just to make test more reliable --- tests/schedule_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 163ba359fae..6fa1388d744 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -574,6 +574,7 @@ func (s *scheduleIntegrationSuite) TestListBeforeRun() { dc := s.testCluster.host.dcClient dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0) s.testCluster.host.workerService.RefreshPerNSWorkerManager() + time.Sleep(2 * time.Second) schedule := &schedulepb.Schedule{ Spec: &schedulepb.ScheduleSpec{ @@ -631,4 +632,5 @@ func (s *scheduleIntegrationSuite) TestListBeforeRun() { dc.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount) s.testCluster.host.workerService.RefreshPerNSWorkerManager() + time.Sleep(2 * time.Second) }