Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose history size to workflows #3055

Merged
merged 5 commits into from
Feb 9, 2023
Merged

Expose history size to workflows #3055

merged 5 commits into from
Feb 9, 2023

Conversation

dnr
Copy link
Member

@dnr dnr commented Jul 5, 2022

What changed?
This fills in HistorySizeBytes and SuggestContinueAsNew on WorkflowTaskStartedEventAttributes, added in temporalio/api#178.

Why?
So workflows can decide whether to continue-as-new with less guessing. Fixes #2726 and #1114

How did you test it?
New integration test

Potential risks
Bugs in this logic could lead to inconsistency between the values sent in transient workflow tasks and values actually recorded in history, which could lead to determinism errors on replay.

Is hotfix candidate?
no

@@ -221,9 +233,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduleToStartTimeoutEvent(
return nil, m.ms.createInternalServerError(opTag)
}

// clear stickiness whenever workflow task fails
m.ms.ClearStickyness()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't needed here because ReplicateWorkflowTaskTimedOutEvent(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START) below will always call ClearStickyness itself

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could affect incrementTimeout calculation in FailWorkflowTask but because timeout type is enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, it is false anyway, so I think it is safe to remove it from here.

Copy link
Member

@yiminc yiminc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need the values in WorkflowTaskStartedEventAttributes.
Could you explain what is purpose of the values used in ExecutionInfo, and what the purpose of it in WorkflowTaskInfo.

host/transient_task_test.go Outdated Show resolved Hide resolved
Comment on lines 101 to 106
bool workflow_task_suggest_continue_as_new = 67;
int64 workflow_task_history_size_bytes = 68;

bool cancel_requested = 29;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we keep the field number in order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I'd much rather see fields grouped by function

Comment on lines 398 to 436
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 2*1024*1024),
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 2*1024),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a bit aggressive. Maybe double it to 4MB and 4K, which still is arbitrary. But gRPC default size limit is also 4MB. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a great feel for it so I'll trust your judgement here. I'm curious what sdk team would say

if stats == nil {
return false, 0
}
// QUESTION: in some cases we might have history events in memory that we haven't written
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean buffered events? Those are not visible to workflow yet, so I think they should not be counted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly... look at where this is called. Consider the case where we had buffered events, and then we retry the wft. So on line 400, we do AddWorkflowTaskScheduledEvent and then reset Attempt to 1, so we end up here. That wftscheduledevent will be visible to to the workflow, but it won't be counted in HistorySize here, since that only gets updated when the transaction is closed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I get it right, stats is updated on write. We get here, when there are some events were added to history but not persisted yet. And stats doesn't reflect them, but m.ms.GetNextEventID() is current in-memory last event. I think they are not consistent here. But this is probably not a big deal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not consistent (in the sense I described) would be a big deal. But I think they are consistent but just not accurate, which isn't a big deal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, by "consistent" I mean consistency between "size" and "count". "Accurate" is a better word, yes.

service/history/workflowTaskHandlerCallbacks.go Outdated Show resolved Hide resolved
Comment on lines 139 to 155
// QUESTION: should we preserve these here? this is used by mutable state rebuilder. it
// seems like the same logic as case 1 above applies: if a failover happens right after
// this, then AddWorkflowTaskStartedEvent will rewrite these anyway. is that correct?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. cc @yycptt

Copy link
Member

@yycptt yycptt Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the value here doesn't matter as it will get overwritten anyway, either when starting the workflow task (if failover happens) or when replicating started event (no failover).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Let's not set them unit WT is started.

@dnr
Copy link
Member Author

dnr commented Jul 18, 2022

I think we only need the values in WorkflowTaskStartedEventAttributes. Could you explain what is purpose of the values used in ExecutionInfo, and what the purpose of it in WorkflowTaskInfo.

It's an obscure corner case, but I thought in the discussion we decided we needed to handle it:

What if you send the worker a wftstartedevent with suggestcontinueasnew == false, and it fails/times out. Then you send a second attempt, which is now a transient wft, with suggestcontinueasnew == false. Then you change dynamic config so that the same history size now makes suggestcontinueasnew == true. Now the worker responds to the wft successfully, and you have to write out the transient events to history. If you re-evaluate suggestcontinueasnew at that point and write a wftstartedevent with it as true, you'll get a determinism error on replay. (Assuming the workflow follows the suggestion.)

If we didn't use dynamic config I agree we wouldn't have to keep it in mutable state.

@@ -1209,32 +1209,6 @@ func (e *MutableStateImpl) DeleteUserTimer(
return nil
}

// nolint:unused
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already gone.

}

// workflow logic
stage := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I call it wtHandlerCalls.

@@ -87,16 +87,21 @@ message WorkflowExecutionInfo {
int64 last_workflow_task_started_event_id = 19;
google.protobuf.Timestamp start_time = 20 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp last_update_time = 21 [(gogoproto.stdtime) = true];

// This group of fields contains info about the current in-flight workflow task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This group of fields contains info about the current in-flight workflow task
// This group of fields contains info about the current workflow task

"in-flight" means running in other places. I reordered these already too.

Comment on lines +230 to +231
SuggestContinueAsNew: suggestContinueAsNew,
HistorySizeBytes: historySizeBytes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. Attributes are already there for half a year.

Comment on lines 87 to 98
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a
// copy here to ensure that we send the same values with every transient WorkflowTaskStarted
// event, otherwise a dynamic config change of the suggestion threshold could cause the
// event that the worker used to not match the event we saved in history.
SuggestContinueAsNew bool
HistorySizeBytes int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to make history deterministic? I don't think it is necessary. First of all SDKs can ignore these fields in non-determinism detector same way as they do for activity arguments. But even if they don't, SDK will just replay history from the beginning which is ok for workflows with continuously failing WT. I think SDK already does it (may be not).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you still need these fields just to pass this data around. All WT related fields from executions.proto must be here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed: they can change across attempts, but we do need to keep them and can't just recompute them because of determinism. updated comment

Comment on lines 139 to 155
// QUESTION: should we preserve these here? this is used by mutable state rebuilder. it
// seems like the same logic as case 1 above applies: if a failover happens right after
// this, then AddWorkflowTaskStartedEvent will rewrite these anyway. is that correct?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Let's not set them unit WT is started.

@@ -221,9 +233,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduleToStartTimeoutEvent(
return nil, m.ms.createInternalServerError(opTag)
}

// clear stickiness whenever workflow task fails
m.ms.ClearStickyness()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could affect incrementTimeout calculation in FailWorkflowTask but because timeout type is enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, it is false anyway, so I think it is safe to remove it from here.

@@ -614,7 +633,10 @@ func (m *workflowTaskStateMachine) DeleteWorkflowTask() {

TaskQueue: nil,
// Keep the last original scheduled Timestamp, so that AddWorkflowTaskScheduledEventAsHeartbeat can continue with it.
OriginalScheduledTime: m.getWorkflowTaskInfo().OriginalScheduledTime,
OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave getWorkflowTaskInfo(). It will help me to refactor WT state machine in future.

if stats == nil {
return false, 0
}
// QUESTION: in some cases we might have history events in memory that we haven't written
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I get it right, stats is updated on write. We get here, when there are some events were added to history but not persisted yet. And stats doesn't reflect them, but m.ms.GetNextEventID() is current in-memory last event. I think they are not consistent here. But this is probably not a big deal.

Comment on lines 412 to 454
workflowTask.SuggestContinueAsNew, workflowTask.HistorySizeBytes = m.getHistorySizeInfo()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to compute it for every attempt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we compute it for every attempt, but only actually write the event to history on the first attempt, then the value used by the workflow may be different from the value written to history, so replay would cause a determinism error. Like 90% of the complexity of this PR is just for that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, the answer here is that we write a new started event one when the retried wft completes, so as long as that new started event has the right values, we're good

Copy link
Member Author

@dnr dnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebased!

Comment on lines 412 to 454
workflowTask.SuggestContinueAsNew, workflowTask.HistorySizeBytes = m.getHistorySizeInfo()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we compute it for every attempt, but only actually write the event to history on the first attempt, then the value used by the workflow may be different from the value written to history, so replay would cause a determinism error. Like 90% of the complexity of this PR is just for that

Comment on lines +575 to +576
workflowTask.SuggestContinueAsNew,
workflowTask.HistorySizeBytes,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little unsure about these. I think probably it should recompute them here? (It seems logical that it should recompute every time hBuilder.AddWorkflowTaskStartedEvent is called, and not any other time.) But I'm not sure how speculative works...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a failure, so it doesn't matter

if stats == nil {
return false, 0
}
// QUESTION: in some cases we might have history events in memory that we haven't written
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not consistent (in the sense I described) would be a big deal. But I think they are consistent but just not accurate, which isn't a big deal

@dnr dnr marked this pull request as ready for review February 9, 2023 06:32
@dnr dnr requested a review from a team as a code owner February 9, 2023 06:32
Copy link
Member Author

@dnr dnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplified a little based on discussion

Comment on lines 87 to 98
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a
// copy here to ensure that we send the same values with every transient WorkflowTaskStarted
// event, otherwise a dynamic config change of the suggestion threshold could cause the
// event that the worker used to not match the event we saved in history.
SuggestContinueAsNew bool
HistorySizeBytes int64
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed: they can change across attempts, but we do need to keep them and can't just recompute them because of determinism. updated comment

// events. That's okay, it doesn't have to be 100% accurate. It just has to be kept
// consistent between the started event in history and the event that was sent to the SDK
// that resulted in the successful completion.
suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, just compute every time we get here

Comment on lines +575 to +576
workflowTask.SuggestContinueAsNew,
workflowTask.HistorySizeBytes,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a failure, so it doesn't matter

Comment on lines +93 to +97
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a
// copy in mutable state to know the last values we sent (which might have been in a
// transient event), otherwise a dynamic config change of the suggestion threshold could
// cause the WorkflowTaskStarted event that the worker used to not match the event we saved
// in history.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment also needs to be updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what part is wrong? I just updated this one

@dnr dnr merged commit d4498d1 into temporalio:master Feb 9, 2023
@dnr dnr deleted the wfsize branch February 9, 2023 20:24
@alexshtin
Copy link
Member

Feb,9-Jul,5 = 219

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Communicate shouldContinueAsNew to Workers
4 participants