Skip to content

Commit

Permalink
Expose replicateWorkflowState API in history (#3783)
Browse files Browse the repository at this point in the history
* Expose replicateWorkflowState API in history
  • Loading branch information
yux0 committed Jan 13, 2023
1 parent d321415 commit 0e4d618
Show file tree
Hide file tree
Showing 11 changed files with 822 additions and 432 deletions.
915 changes: 566 additions & 349 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

203 changes: 122 additions & 81 deletions api/historyservice/v1/service.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions api/historyservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions client/history/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/history/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions client/history/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ const (
HistoryClientVerifyChildExecutionCompletionRecordedScope = "HistoryClientVerifyChildExecutionCompletionRecorded"
// HistoryClientReplicateEventsV2Scope tracks RPC calls to history service
HistoryClientReplicateEventsV2Scope = "HistoryClientReplicateEventsV2"
// HistoryClientReplicateWorkflowStateScope tracks RPC calls to history service
HistoryClientReplicateWorkflowStateScope = "HistoryClientReplicateWorkflowState"
// HistoryClientSyncShardStatusScope tracks RPC calls to history service
HistoryClientSyncShardStatusScope = "HistoryClientSyncShardStatus"
// HistoryClientSyncActivityScope tracks RPC calls to history service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,16 @@ message ReplicateEventsV2Request {
temporal.api.common.v1.DataBlob new_run_events = 5;
}

message ReplicateEventsV2Response {
}

message ReplicateWorkflowStateRequest {
temporal.server.api.persistence.v1.WorkflowMutableState workflow_state = 1;
string remote_cluster = 2;
string namespace_id= 3;
}

message ReplicateEventsV2Response {
message ReplicateWorkflowStateResponse {
}

message SyncShardStatusRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,14 @@ service HistoryService {
rpc DescribeWorkflowExecution (DescribeWorkflowExecutionRequest) returns (DescribeWorkflowExecutionResponse) {
}

// ReplicateEventsV2 replicates workflow history events
rpc ReplicateEventsV2 (ReplicateEventsV2Request) returns (ReplicateEventsV2Response) {
}

// ReplicateWorkflowState replicates workflow state
rpc ReplicateWorkflowState(ReplicateWorkflowStateRequest) returns (ReplicateWorkflowStateResponse) {
}

// SyncShardStatus sync the status between shards.
rpc SyncShardStatus (SyncShardStatusRequest) returns (SyncShardStatusResponse) {
}
Expand Down Expand Up @@ -275,7 +280,7 @@ service HistoryService {
}

// (-- api-linter: core::0134=disabled
// aip.dev/not-precedent: This service does not follow the update method AIP --)
// aip.dev/not-precedent: This service does not follow the update method API --)
rpc UpdateWorkflow(UpdateWorkflowRequest) returns (UpdateWorkflowResponse) {
}
}
1 change: 1 addition & 0 deletions service/history/configs/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
"RemoveSignalMutableState": 0,
"RemoveTask": 0,
"ReplicateEventsV2": 0,
"ReplicateWorkflowState": 0,
"RequestCancelWorkflowExecution": 0,
"ResetStickyTaskQueue": 0,
"ResetWorkflowExecution": 0,
Expand Down
32 changes: 32 additions & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,38 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice
return &historyservice.ReplicateEventsV2Response{}, nil
}

// ReplicateWorkflowState is called by processor to replicate workflow state for passive namespaces
func (h *Handler) ReplicateWorkflowState(
ctx context.Context,
request *historyservice.ReplicateWorkflowStateRequest,
) (_ *historyservice.ReplicateWorkflowStateResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
h.startWG.Wait()

if h.isStopped() {
return nil, errShuttingDown
}

shardContext, err := h.controller.GetShardByNamespaceWorkflow(
namespace.ID(request.GetWorkflowState().GetExecutionInfo().GetNamespaceId()),
request.GetWorkflowState().GetExecutionInfo().GetWorkflowId(),
)
if err != nil {
return nil, h.convertError(err)
}

engine, err := shardContext.GetEngine(ctx)
if err != nil {
return nil, h.convertError(err)
}

err = engine.ReplicateWorkflowState(ctx, request)
if err != nil {
return nil, err
}
return &historyservice.ReplicateWorkflowStateResponse{}, nil
}

// SyncShardStatus is called by processor to sync history shard information from another cluster
func (h *Handler) SyncShardStatus(ctx context.Context, request *historyservice.SyncShardStatusRequest) (_ *historyservice.SyncShardStatusResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
Expand Down

0 comments on commit 0e4d618

Please sign in to comment.