Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove scheduled and started events from transient workflow task info #3736

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 44 additions & 185 deletions api/history/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions proto/internal/temporal/server/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,10 @@ import "dependencies/gogoproto/gogo.proto";
import "temporal/api/history/v1/message.proto";

message TransientWorkflowTaskInfo {
//TODO (mmcshane): remove these deprecated fields after v1.18 is released
reserved 1;
reserved 2;

// Rather than use this field, instead add the event to the history_suffix list.
temporal.api.history.v1.HistoryEvent scheduled_event = 1 [deprecated = true];
// Rather than use this field, instead add the event to the history_suffix list.
temporal.api.history.v1.HistoryEvent started_event = 2 [deprecated = true];

// A list of history events that are to be appended to the "real" workflow
// history.
// A list of history events that are to be appended to the "real" workflow history.
repeated temporal.api.history.v1.HistoryEvent history_suffix = 3;
}

Expand Down
27 changes: 3 additions & 24 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4078,9 +4078,7 @@ func (wh *WorkflowHandler) getRawHistory(
return nil, nil, err
}

suffix := extractHistorySuffix(transientWorkflowTaskInfo)

for _, event := range suffix {
for _, event := range transientWorkflowTaskInfo.HistorySuffix {
blob, err := wh.payloadSerializer.SerializeEvent(event, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -4161,7 +4159,7 @@ func (wh *WorkflowHandler) getHistory(
tag.Error(err))
}
// Append the transient workflow task events once we are done enumerating everything from the events table
historyEvents = append(historyEvents, extractHistorySuffix(transientWorkflowTaskInfo)...)
historyEvents = append(historyEvents, transientWorkflowTaskInfo.HistorySuffix...)
}

if err := wh.processOutgoingSearchAttributes(historyEvents, namespace); err != nil {
Expand Down Expand Up @@ -4277,8 +4275,7 @@ func (wh *WorkflowHandler) validateTransientWorkflowTaskEvents(
eventIDOffset int64,
transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo,
) error {
suffix := extractHistorySuffix(transientWorkflowTaskInfo)
for i, event := range suffix {
for i, event := range transientWorkflowTaskInfo.HistorySuffix {
expectedEventID := eventIDOffset + int64(i)
if event.GetEventId() != expectedEventID {
return serviceerror.NewInternal(
Expand All @@ -4293,24 +4290,6 @@ func (wh *WorkflowHandler) validateTransientWorkflowTaskEvents(
return nil
}

func extractHistorySuffix(transientWorkflowTask *historyspb.TransientWorkflowTaskInfo) []*historypb.HistoryEvent {
// TODO (mmcshane): remove this function after v1.18 is release as we will
// be able to just use transientWorkflowTask.HistorySuffix directly and the other
// fields will be removed.

suffix := transientWorkflowTask.HistorySuffix
if len(suffix) == 0 {
// HistorySuffix is a new field - we may still need to handle
// instances that carry the separate ScheduledEvent and StartedEvent
// fields

// One might be tempted to check for nil here but the old code did not
// make that check and we aim to preserve compatiblity
suffix = append(suffix, transientWorkflowTask.ScheduledEvent, transientWorkflowTask.StartedEvent)
}
return suffix
}

func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue) error {
if t == nil || t.GetName() == "" {
return errTaskQueueNotSet
Expand Down
Loading