Skip to content

Commit

Permalink
add test and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jan 26, 2023
1 parent ff76e9b commit d19c0d8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
8 changes: 7 additions & 1 deletion service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4842,9 +4842,15 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo
// This mutates request (but idempotent so safe for retries)
func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) {
info := scheduler.GetListInfoFromStartArgs(args)
p, err := sdk.PreferProtoDataConverter.ToPayload(info)
infoBytes, err := info.Marshal()
if err != nil {
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
return
}
p, err := sdk.PreferProtoDataConverter.ToPayload(infoBytes)
if err != nil {
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
return
}
if request.Memo == nil {
request.Memo = &commonpb.Memo{}
Expand Down
87 changes: 86 additions & 1 deletion tests/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
Expand Down Expand Up @@ -259,7 +260,7 @@ func (s *scheduleIntegrationSuite) TestBasics() {
Namespace: s.namespace,
MaximumPageSize: 5,
})
if err != nil || len(listResp.Schedules) != 1 || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 {
if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 {
return false
}
s.NoError(err)
Expand Down Expand Up @@ -466,6 +467,14 @@ func (s *scheduleIntegrationSuite) TestInput() {
_, err = s.engine.CreateSchedule(NewContext(), req)
s.NoError(err)
s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 1 }, 5*time.Second, 200*time.Millisecond)

// cleanup
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)
}

func (s *scheduleIntegrationSuite) TestRefresh() {
Expand Down Expand Up @@ -546,4 +555,80 @@ func (s *scheduleIntegrationSuite) TestRefresh() {
// scheduler has done some stuff
events3 := s.getHistory(s.namespace, &commonpb.WorkflowExecution{WorkflowId: scheduler.WorkflowIDPrefix + sid})
s.Greater(len(events3), len(events2))

// cleanup
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)
}

func (s *scheduleIntegrationSuite) TestListBeforeRun() {
sid := "sched-test-list-before-run"
wid := "sched-test-list-before-run-wf"
wt := "sched-test-list-before-run-wt"

// disable per-ns worker so that the schedule workflow never runs
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()

schedule := &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{
Interval: []*schedulepb.IntervalSpec{
{Interval: timestamp.DurationPtr(3 * time.Second)},
},
},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue},
},
},
},
}
req := &workflowservice.CreateScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Schedule: schedule,
Identity: "test",
RequestId: uuid.New(),
}

_, err := s.engine.CreateSchedule(NewContext(), req)
s.NoError(err)

s.Eventually(func() bool { // wait for visibility
listResp, err := s.engine.ListSchedules(NewContext(), &workflowservice.ListSchedulesRequest{
Namespace: s.namespace,
MaximumPageSize: 5,
})
if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid {
return false
}
s.NoError(err)
entry := listResp.Schedules[0]
s.Equal(sid, entry.ScheduleId)
s.NotNil(entry.Info)
s.Equal(schedule.Spec, entry.Info.Spec)
s.Equal(wt, entry.Info.WorkflowType.Name)
s.False(entry.Info.Paused)
s.Greater(len(entry.Info.FutureActionTimes), 1)
return true
}, 10*time.Second, 1*time.Second)

// cleanup
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
Namespace: s.namespace,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)

dc.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount)
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
}

0 comments on commit d19c0d8

Please sign in to comment.