Skip to content

Commit

Permalink
Add new fields to workflow info
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Aug 17, 2023
1 parent 7dbde86 commit eb3809e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
4 changes: 3 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,10 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// Set replay clock.
weh.SetCurrentReplayTime(common.TimeValue(event.GetEventTime()))
// Set history length as this event's ID
// Update workflow info fields
weh.workflowInfo.currentHistoryLength = int(event.EventId)
weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew()
weh.workflowInfo.currentHistorySize = int(event.GetWorkflowTaskStartedEventAttributes().GetHistorySizeBytes())
// Reset the counter on command helper used for generating ID for commands
weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId())
weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout)
Expand Down
8 changes: 8 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ func (env *testWorkflowEnvironmentImpl) setCurrentHistoryLength(length int) {
env.workflowInfo.currentHistoryLength = length
}

func (env *testWorkflowEnvironmentImpl) setCurrentHistorySize(size int) {

Check failure on line 348 in internal/internal_workflow_testsuite.go

View workflow job for this annotation

GitHub Actions / build-and-test (1.20)

func (*testWorkflowEnvironmentImpl).setCurrentHistorySize is unused (U1000)
env.workflowInfo.currentHistorySize = size
}

func (env *testWorkflowEnvironmentImpl) setContineAsNewSuggested(suggest bool) {
env.workflowInfo.continueAsNewSuggested = suggest
}

func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) {
// create a new test env
childEnv := newTestWorkflowEnvironmentImpl(env.testSuite, env.registry)
Expand Down
17 changes: 16 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,9 @@ type WorkflowInfo struct {
// workflow, it is this worker's current value.
BinaryChecksum string

currentHistoryLength int
continueAsNewSuggested bool
currentHistorySize int
currentHistoryLength int
}

// UpdateInfo information about a currently running update
Expand All @@ -1024,6 +1026,19 @@ func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int {
return wInfo.currentHistoryLength
}

// GetCurrentHistorySize returns the current byte size of history when called.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetCurrentHistorySize() int {
return wInfo.currentHistorySize
}

// GetContinueAsNewSuggested returns true if the server is configured to suggest continue as new
// and it is suggested.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetContinueAsNewSuggested() bool {
return wInfo.continueAsNewSuggested
}

// GetWorkflowInfo extracts info of a current workflow from a context.
func GetWorkflowInfo(ctx Context) *WorkflowInfo {
i := getWorkflowOutboundInterceptor(ctx)
Expand Down
18 changes: 18 additions & 0 deletions internal/workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,28 @@ func (e *TestWorkflowEnvironment) SetStartTime(startTime time.Time) {

// SetCurrentHistoryLength sets the value that is returned from
// GetInfo(ctx).GetCurrentHistoryLength().
//
// Note: this value may not be up to date if accessed inside a query.
func (e *TestWorkflowEnvironment) SetCurrentHistoryLength(length int) {
e.impl.setCurrentHistoryLength(length)
}

// setCurrentHistoryLength sets the value that is returned from
// GetInfo(ctx).GetCurrentHistorySize().
//
// Note: this value may not be up to date if accessed inside a query.
func (e *TestWorkflowEnvironment) SetCurrentHistorySize(length int) {
e.impl.setCurrentHistoryLength(length)
}

// SetContinueAsNewSuggested set sets the value that is returned from
// GetInfo(ctx).GetContinueAsNewSuggested().
//
// Note: this value may not be up to date if accessed inside a query.
func (e *TestWorkflowEnvironment) SetContinueAsNewSuggested(suggest bool) {
e.impl.setContineAsNewSuggested(suggest)
}

// OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string).
// You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to
// the Return() call should either be a function that has exact same signature as the mocked activity, or it should be
Expand Down

0 comments on commit eb3809e

Please sign in to comment.