Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use namespace from schedule activity worker #3680

Merged
merged 5 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/adminservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions api/historyservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions api/matchingservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions api/namespace/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

547 changes: 77 additions & 470 deletions api/schedule/v1/message.pb.go

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions proto/internal/temporal/server/api/schedule/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ message DescribeResponse {
}

message WatchWorkflowRequest {
string namespace = 1;
string namespace_id = 2;
// Note: this will be sent to the activity with empty execution.run_id, and
// the run id that we started in first_execution_run_id.
temporal.api.common.v1.WorkflowExecution execution = 3;
Expand All @@ -102,7 +100,6 @@ message WatchWorkflowResponse {
}

message StartWorkflowRequest {
string namespace_id = 1;
temporal.api.workflowservice.v1.StartWorkflowExecutionRequest request = 2;
reserved 3;
temporal.api.common.v1.Payloads last_completion_result = 4;
Expand All @@ -115,8 +112,6 @@ message StartWorkflowResponse {
}

message CancelWorkflowRequest {
string namespace = 1;
string namespace_id = 2;
string request_id = 3;
string identity = 4;
// Note: run id in execution is first execution run id
Expand All @@ -125,8 +120,6 @@ message CancelWorkflowRequest {
}

message TerminateWorkflowRequest {
string namespace = 1;
string namespace_id = 2;
string request_id = 3;
string identity = 4;
// Note: run id in execution is first execution run id
Expand Down
37 changes: 8 additions & 29 deletions service/worker/scheduler/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,11 @@ var (

func (e errFollow) Error() string { return string(e) }

func (a *activities) checkNamespace(namespace, namespaceID string) error {
if namespace != a.namespace.String() || namespaceID != a.namespaceID.String() {
return errNamespaceMismatch
}
return nil
}

func (a *activities) StartWorkflow(ctx context.Context, req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
if err := a.checkNamespace(req.Request.Namespace, req.NamespaceId); err != nil {
return nil, err
}
req.Request.Namespace = a.namespace.String()

request := common.CreateHistoryStartWorkflowRequest(
req.NamespaceId,
a.namespaceID.String(),
req.Request,
nil,
time.Now().UTC(),
Expand Down Expand Up @@ -118,7 +109,7 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo
// the desired chain or not). if we have to follow (unlikely), we'll end up
// back here with non-empty RunId.
pollReq := &historyservice.PollMutableStateRequest{
NamespaceId: req.NamespaceId,
NamespaceId: a.namespaceID.String(),
Execution: req.Execution,
}
if req.LongPoll {
Expand Down Expand Up @@ -166,7 +157,7 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo

// get last event from history
histReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: req.Namespace,
Namespace: a.namespace.String(),
Execution: req.Execution,
MaximumPageSize: 1,
HistoryEventFilterType: enumspb.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
Expand Down Expand Up @@ -224,10 +215,6 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo
}

func (a *activities) WatchWorkflow(ctx context.Context, req *schedspb.WatchWorkflowRequest) (*schedspb.WatchWorkflowResponse, error) {
if err := a.checkNamespace(req.Namespace, req.NamespaceId); err != nil {
return nil, err
}

for ctx.Err() == nil {
activity.RecordHeartbeat(ctx)
res, err := a.tryWatchWorkflow(ctx, req)
Expand All @@ -246,14 +233,10 @@ func (a *activities) WatchWorkflow(ctx context.Context, req *schedspb.WatchWorkf
}

func (a *activities) CancelWorkflow(ctx context.Context, req *schedspb.CancelWorkflowRequest) error {
if err := a.checkNamespace(req.Namespace, req.NamespaceId); err != nil {
return err
}

rreq := &historyservice.RequestCancelWorkflowExecutionRequest{
NamespaceId: req.NamespaceId,
NamespaceId: a.namespaceID.String(),
CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: req.Namespace,
Namespace: a.namespace.String(),
// only set WorkflowId so we cancel the latest, but restricted by FirstExecutionRunId
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: req.Execution.WorkflowId},
Identity: req.Identity,
Expand All @@ -268,14 +251,10 @@ func (a *activities) CancelWorkflow(ctx context.Context, req *schedspb.CancelWor
}

func (a *activities) TerminateWorkflow(ctx context.Context, req *schedspb.TerminateWorkflowRequest) error {
if err := a.checkNamespace(req.Namespace, req.NamespaceId); err != nil {
return err
}

rreq := &historyservice.TerminateWorkflowExecutionRequest{
NamespaceId: req.NamespaceId,
NamespaceId: a.namespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: req.Namespace,
Namespace: a.namespace.String(),
// only set WorkflowId so we cancel the latest, but restricted by FirstExecutionRunId
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: req.Execution.WorkflowId},
Reason: req.Reason,
Expand Down
26 changes: 8 additions & 18 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,7 @@ func (s *scheduler) startWorkflow(
ctx := workflow.WithLocalActivityOptions(s.ctx, options)

req := &schedspb.StartWorkflowRequest{
NamespaceId: s.State.NamespaceId,
Request: &workflowservice.StartWorkflowExecutionRequest{
Namespace: s.State.Namespace,
WorkflowId: workflowID,
WorkflowType: newWorkflow.WorkflowType,
TaskQueue: newWorkflow.TaskQueue,
Expand Down Expand Up @@ -851,8 +849,6 @@ func (s *scheduler) refreshWorkflows(executions []*commonpb.WorkflowExecution) {
futures := make([]workflow.Future, len(executions))
for i, ex := range executions {
req := &schedspb.WatchWorkflowRequest{
Namespace: s.State.Namespace,
NamespaceId: s.State.NamespaceId,
// Note: do not send runid here so that we always get the latest one
Execution: &commonpb.WorkflowExecution{WorkflowId: ex.WorkflowId},
FirstExecutionRunId: ex.RunId,
Expand Down Expand Up @@ -880,8 +876,6 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
HeartbeatTimeout: 65 * time.Second,
})
req := &schedspb.WatchWorkflowRequest{
Namespace: s.State.Namespace,
NamespaceId: s.State.NamespaceId,
// Note: do not send runid here so that we always get the latest one
Execution: &commonpb.WorkflowExecution{WorkflowId: ex.WorkflowId},
FirstExecutionRunId: ex.RunId,
Expand All @@ -894,12 +888,10 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
areq := &schedspb.CancelWorkflowRequest{
NamespaceId: s.State.NamespaceId,
Namespace: s.State.Namespace,
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "cancelled by schedule overlap policy",
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "cancelled by schedule overlap policy",
}
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
Expand All @@ -912,12 +904,10 @@ func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
areq := &schedspb.TerminateWorkflowRequest{
NamespaceId: s.State.NamespaceId,
Namespace: s.State.Namespace,
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "terminated by schedule overlap policy",
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "terminated by schedule overlap policy",
}
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
Expand Down