Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose replicateWorkflowState API in history #3783

Merged
merged 2 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
915 changes: 566 additions & 349 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

203 changes: 122 additions & 81 deletions api/historyservice/v1/service.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions api/historyservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions client/history/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/history/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions client/history/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ const (
HistoryClientVerifyChildExecutionCompletionRecordedScope = "HistoryClientVerifyChildExecutionCompletionRecorded"
// HistoryClientReplicateEventsV2Scope tracks RPC calls to history service
HistoryClientReplicateEventsV2Scope = "HistoryClientReplicateEventsV2"
// HistoryClientReplicateWorkflowStateScope tracks RPC calls to history service
HistoryClientReplicateWorkflowStateScope = "HistoryClientReplicateWorkflowState"
// HistoryClientSyncShardStatusScope tracks RPC calls to history service
HistoryClientSyncShardStatusScope = "HistoryClientSyncShardStatus"
// HistoryClientSyncActivityScope tracks RPC calls to history service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,16 @@ message ReplicateEventsV2Request {
temporal.api.common.v1.DataBlob new_run_events = 5;
}

message ReplicateEventsV2Response {
}

message ReplicateWorkflowStateRequest {
temporal.server.api.persistence.v1.WorkflowMutableState workflow_state = 1;
string remote_cluster = 2;
string namespace_id= 3;
}

message ReplicateEventsV2Response {
message ReplicateWorkflowStateResponse {
}

message SyncShardStatusRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,14 @@ service HistoryService {
rpc DescribeWorkflowExecution (DescribeWorkflowExecutionRequest) returns (DescribeWorkflowExecutionResponse) {
}

// ReplicateEventsV2 replicates workflow history events
rpc ReplicateEventsV2 (ReplicateEventsV2Request) returns (ReplicateEventsV2Response) {
}

// ReplicateWorkflowState replicates workflow state
rpc ReplicateWorkflowState(ReplicateWorkflowStateRequest) returns (ReplicateWorkflowStateResponse) {
}

// SyncShardStatus sync the status between shards.
rpc SyncShardStatus (SyncShardStatusRequest) returns (SyncShardStatusResponse) {
}
Expand Down Expand Up @@ -275,7 +280,7 @@ service HistoryService {
}

// (-- api-linter: core::0134=disabled
// aip.dev/not-precedent: This service does not follow the update method AIP --)
// aip.dev/not-precedent: This service does not follow the update method API --)
rpc UpdateWorkflow(UpdateWorkflowRequest) returns (UpdateWorkflowResponse) {
}
}
1 change: 1 addition & 0 deletions service/history/configs/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
"RemoveSignalMutableState": 0,
"RemoveTask": 0,
"ReplicateEventsV2": 0,
"ReplicateWorkflowState": 0,
"RequestCancelWorkflowExecution": 0,
"ResetStickyTaskQueue": 0,
"ResetWorkflowExecution": 0,
Expand Down
32 changes: 32 additions & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,38 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice
return &historyservice.ReplicateEventsV2Response{}, nil
}

// ReplicateWorkflowState is called by processor to replicate workflow state for passive namespaces
func (h *Handler) ReplicateWorkflowState(
ctx context.Context,
request *historyservice.ReplicateWorkflowStateRequest,
) (_ *historyservice.ReplicateWorkflowStateResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
h.startWG.Wait()

if h.isStopped() {
return nil, errShuttingDown
}

shardContext, err := h.controller.GetShardByNamespaceWorkflow(
namespace.ID(request.GetWorkflowState().GetExecutionInfo().GetNamespaceId()),
request.GetWorkflowState().GetExecutionInfo().GetWorkflowId(),
)
if err != nil {
return nil, h.convertError(err)
}

engine, err := shardContext.GetEngine(ctx)
if err != nil {
return nil, h.convertError(err)
}

err = engine.ReplicateWorkflowState(ctx, request)
if err != nil {
return nil, err
}
return &historyservice.ReplicateWorkflowStateResponse{}, nil
}

// SyncShardStatus is called by processor to sync history shard information from another cluster
func (h *Handler) SyncShardStatus(ctx context.Context, request *historyservice.SyncShardStatusRequest) (_ *historyservice.SyncShardStatusResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
Expand Down