From d47de15a46b2f0ef6e06072f6a04edca4a595793 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 17 Aug 2023 08:32:58 -0700 Subject: [PATCH] Add new fields to workflow info --- internal/internal_event_handlers.go | 4 +++- internal/internal_workflow_testsuite.go | 8 ++++++++ internal/workflow.go | 17 ++++++++++++++++- internal/workflow_testsuite.go | 18 ++++++++++++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 9cb8fa9e8..3bb3f0589 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1053,8 +1053,10 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED: // Set replay clock. weh.SetCurrentReplayTime(common.TimeValue(event.GetEventTime())) - // Set history length as this event's ID + // Update workflow info fields weh.workflowInfo.currentHistoryLength = int(event.EventId) + weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew() + weh.workflowInfo.currentHistorySize = int(event.GetWorkflowTaskStartedEventAttributes().GetHistorySizeBytes()) // Reset the counter on command helper used for generating ID for commands weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId()) weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index e5104581f..d433189d4 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -345,6 +345,14 @@ func (env *testWorkflowEnvironmentImpl) setCurrentHistoryLength(length int) { env.workflowInfo.currentHistoryLength = length } +func (env *testWorkflowEnvironmentImpl) setCurrentHistorySize(size int) { + env.workflowInfo.currentHistorySize = size +} + +func (env *testWorkflowEnvironmentImpl) setContineAsNewSuggested(suggest bool) { + env.workflowInfo.continueAsNewSuggested = suggest +} + func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) { // create a new test env childEnv := newTestWorkflowEnvironmentImpl(env.testSuite, env.registry) diff --git a/internal/workflow.go b/internal/workflow.go index 01d910314..7c869f694 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1002,7 +1002,9 @@ type WorkflowInfo struct { // workflow, it is this worker's current value. BinaryChecksum string - currentHistoryLength int + continueAsNewSuggested bool + currentHistorySize int + currentHistoryLength int } // UpdateInfo information about a currently running update @@ -1024,6 +1026,19 @@ func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int { return wInfo.currentHistoryLength } +// GetCurrentHistorySize returns the current byte size of history when called. +// This value may change throughout the life of the workflow. +func (wInfo *WorkflowInfo) GetCurrentHistorySize() int { + return wInfo.currentHistorySize +} + +// GetContinueAsNewSuggested returns true if the server is configured to suggest continue as new +// and it is suggested. +// This value may change throughout the life of the workflow. +func (wInfo *WorkflowInfo) GetContinueAsNewSuggested() bool { + return wInfo.continueAsNewSuggested +} + // GetWorkflowInfo extracts info of a current workflow from a context. func GetWorkflowInfo(ctx Context) *WorkflowInfo { i := getWorkflowOutboundInterceptor(ctx) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 2b633f3f2..b1f27d331 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -296,10 +296,28 @@ func (e *TestWorkflowEnvironment) SetStartTime(startTime time.Time) { // SetCurrentHistoryLength sets the value that is returned from // GetInfo(ctx).GetCurrentHistoryLength(). +// +// Note: this value may not be up to date if accessed inside a query. func (e *TestWorkflowEnvironment) SetCurrentHistoryLength(length int) { e.impl.setCurrentHistoryLength(length) } +// setCurrentHistoryLength sets the value that is returned from +// GetInfo(ctx).GetCurrentHistorySize(). +// +// Note: this value may not be up to date if accessed inside a query. +func (e *TestWorkflowEnvironment) SetCurrentHistorySize(length int) { + e.impl.setCurrentHistorySize(length) +} + +// SetContinueAsNewSuggested set sets the value that is returned from +// GetInfo(ctx).GetContinueAsNewSuggested(). +// +// Note: this value may not be up to date if accessed inside a query. +func (e *TestWorkflowEnvironment) SetContinueAsNewSuggested(suggest bool) { + e.impl.setContineAsNewSuggested(suggest) +} + // OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string). // You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to // the Return() call should either be a function that has exact same signature as the mocked activity, or it should be