From b7feb6f815dc94cec3b8c3cdaefcfe9e54ff70a0 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Fri, 16 Dec 2022 10:43:08 -0800 Subject: [PATCH] Improve archival_test --- host/archival_test.go | 75 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 11 deletions(-) diff --git a/host/archival_test.go b/host/archival_test.go index 83b2ae85b0b..4f86670427e 100644 --- a/host/archival_test.go +++ b/host/archival_test.go @@ -46,12 +46,15 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/searchattribute" ) const ( @@ -100,7 +103,7 @@ func (s *archivalSuite) TestArchival_TimerQueueProcessor() { WorkflowId: workflowID, RunId: runID, } - s.True(s.isHistoryArchived(s.archivalNamespace, execution)) + s.True(s.isArchived(s.archivalNamespace, execution)) s.True(s.isHistoryDeleted(execution)) s.True(s.isMutableStateDeleted(namespaceID, execution)) } @@ -121,7 +124,7 @@ func (s *archivalSuite) TestArchival_ContinueAsNew() { WorkflowId: workflowID, RunId: runID, } - s.True(s.isHistoryArchived(s.archivalNamespace, execution)) + s.True(s.isArchived(s.archivalNamespace, execution)) s.True(s.isHistoryDeleted(execution)) s.True(s.isMutableStateDeleted(namespaceID, execution)) } @@ -143,7 +146,7 @@ func (s *archivalSuite) TestArchival_ArchiverWorker() { WorkflowId: workflowID, RunId: runID, } - s.True(s.isHistoryArchived(s.archivalNamespace, execution)) + s.True(s.isArchived(s.archivalNamespace, execution)) s.True(s.isHistoryDeleted(execution)) s.True(s.isMutableStateDeleted(namespaceID, execution)) } @@ -201,18 +204,68 @@ func (s *IntegrationBase) getNamespaceID(namespace string) string { return namespaceResp.NamespaceInfo.GetId() } -func (s *archivalSuite) isHistoryArchived(namespace string, execution *commonpb.WorkflowExecution) bool { - request := &workflowservice.GetWorkflowExecutionHistoryRequest{ - Namespace: s.archivalNamespace, - Execution: execution, - } +// isArchived returns true if both the workflow history and workflow visibility are archived. +func (s *archivalSuite) isArchived(namespace string, execution *commonpb.WorkflowExecution) bool { + serviceName := string(primitives.HistoryService) + historyURI, err := archiver.NewURI(s.testCluster.archiverBase.historyURI) + s.NoError(err) + historyArchiver, err := s.testCluster.archiverBase.provider.GetHistoryArchiver( + historyURI.Scheme(), + serviceName, + ) + s.NoError(err) + + visibilityURI, err := archiver.NewURI(s.testCluster.archiverBase.visibilityURI) + s.NoError(err) + visibilityArchiver, err := s.testCluster.archiverBase.provider.GetVisibilityArchiver( + visibilityURI.Scheme(), + serviceName, + ) + s.NoError(err) for i := 0; i < retryLimit; i++ { - getHistoryResp, err := s.engine.GetWorkflowExecutionHistory(NewContext(), request) - if err == nil && getHistoryResp != nil && getHistoryResp.GetArchived() { + ctx := NewContext() + if i > 0 { + time.Sleep(retryBackoffTime) + } + namespaceID := s.getNamespaceID(namespace) + var historyResponse *archiver.GetHistoryResponse + historyResponse, err = historyArchiver.Get(ctx, historyURI, &archiver.GetHistoryRequest{ + NamespaceID: namespaceID, + WorkflowID: execution.GetWorkflowId(), + RunID: execution.GetRunId(), + PageSize: 1, + }) + if err != nil { + continue + } + if len(historyResponse.HistoryBatches) == 0 { + continue + } + var visibilityResponse *archiver.QueryVisibilityResponse + visibilityResponse, err = visibilityArchiver.Query( + ctx, + visibilityURI, + &archiver.QueryVisibilityRequest{ + NamespaceID: namespaceID, + PageSize: 1, + Query: fmt.Sprintf( + "WorkflowId = '%s' and RunId = '%s'", + execution.GetWorkflowId(), + execution.GetRunId(), + ), + }, + searchattribute.NameTypeMap{}, + ) + if err != nil { + continue + } + if len(visibilityResponse.Executions) > 0 { return true } - time.Sleep(retryBackoffTime) + } + if err != nil { + fmt.Println("isArchived failed with error: ", err) } return false }