Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new fields to workflow info #1202

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,10 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// Set replay clock.
weh.SetCurrentReplayTime(common.TimeValue(event.GetEventTime()))
// Set history length as this event's ID
// Update workflow info fields
weh.workflowInfo.currentHistoryLength = int(event.EventId)
weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew()
weh.workflowInfo.currentHistorySize = int(event.GetWorkflowTaskStartedEventAttributes().GetHistorySizeBytes())
// Reset the counter on command helper used for generating ID for commands
weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId())
weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout)
Expand Down
8 changes: 8 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ func (env *testWorkflowEnvironmentImpl) setCurrentHistoryLength(length int) {
env.workflowInfo.currentHistoryLength = length
}

func (env *testWorkflowEnvironmentImpl) setCurrentHistorySize(size int) {
env.workflowInfo.currentHistorySize = size
}

func (env *testWorkflowEnvironmentImpl) setContineAsNewSuggested(suggest bool) {
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved Hide resolved
env.workflowInfo.continueAsNewSuggested = suggest
}

func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) {
// create a new test env
childEnv := newTestWorkflowEnvironmentImpl(env.testSuite, env.registry)
Expand Down
17 changes: 16 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,9 @@ type WorkflowInfo struct {
// workflow, it is this worker's current value.
BinaryChecksum string

currentHistoryLength int
continueAsNewSuggested bool
currentHistorySize int
currentHistoryLength int
}

// UpdateInfo information about a currently running update
Expand All @@ -1024,6 +1026,19 @@ func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int {
return wInfo.currentHistoryLength
}

// GetCurrentHistorySize returns the current byte size of history when called.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetCurrentHistorySize() int {
return wInfo.currentHistorySize
}

// GetContinueAsNewSuggested returns true if the server is configured to suggest continue as new
// and it is suggested.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetContinueAsNewSuggested() bool {
return wInfo.continueAsNewSuggested
}

// GetWorkflowInfo extracts info of a current workflow from a context.
func GetWorkflowInfo(ctx Context) *WorkflowInfo {
i := getWorkflowOutboundInterceptor(ctx)
Expand Down
18 changes: 18 additions & 0 deletions internal/workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,28 @@ func (e *TestWorkflowEnvironment) SetStartTime(startTime time.Time) {

// SetCurrentHistoryLength sets the value that is returned from
// GetInfo(ctx).GetCurrentHistoryLength().
//
// Note: this value may not be up to date if accessed inside a query.
func (e *TestWorkflowEnvironment) SetCurrentHistoryLength(length int) {
e.impl.setCurrentHistoryLength(length)
}

// setCurrentHistoryLength sets the value that is returned from
// GetInfo(ctx).GetCurrentHistorySize().
//
// Note: this value may not be up to date if accessed inside a query.
func (e *TestWorkflowEnvironment) SetCurrentHistorySize(length int) {
e.impl.setCurrentHistorySize(length)
}

// SetContinueAsNewSuggested set sets the value that is returned from
// GetInfo(ctx).GetContinueAsNewSuggested().
//
// Note: this value may not be up to date if accessed inside a query.
func (e *TestWorkflowEnvironment) SetContinueAsNewSuggested(suggest bool) {
e.impl.setContineAsNewSuggested(suggest)
}

// OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string).
// You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to
// the Return() call should either be a function that has exact same signature as the mocked activity, or it should be
Expand Down
Loading