From 26503cfbb18a0f98a589ae283be32710327493fc Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 12 Jan 2023 15:20:17 -0800 Subject: [PATCH] Drop task on serialization error (#3803) --- common/metrics/metric_defs.go | 3 +- common/persistence/data_blob.go | 9 +- common/persistence/serialization/blob.go | 12 +-- .../persistence/serialization/serializer.go | 93 ++++++++++++++----- service/history/queues/executable.go | 27 +++++- service/history/queues/executable_test.go | 15 +++ 6 files changed, 116 insertions(+), 43 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index d99cc86da7a..e4cac2ce829 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1433,11 +1433,12 @@ var ( TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter") TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter") TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover") + TaskThrottledCounter = NewCounterDef("task_errors_throttled") + TaskCorruptionCounter = NewCounterDef("task_errors_corruption") TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency") TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter") TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter") TaskReschedulerPendingTasks = NewDimensionlessHistogramDef("task_rescheduler_pending_tasks") - TaskThrottledCounter = NewCounterDef("task_throttled_counter") PendingTasksCounter = NewDimensionlessHistogramDef("pending_tasks") QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count") diff --git a/common/persistence/data_blob.go b/common/persistence/data_blob.go index 3ed1aa50852..bf4204d30e5 100644 --- a/common/persistence/data_blob.go +++ b/common/persistence/data_blob.go @@ -25,22 +25,21 @@ package persistence import ( - "fmt" - commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" ) // NewDataBlob returns a new DataBlob +// TODO: return an UnknowEncodingType error with the actual type string when encodingTypeStr is invalid func NewDataBlob(data []byte, encodingTypeStr string) *commonpb.DataBlob { if len(data) == 0 { return nil } encodingType, ok := enumspb.EncodingType_value[encodingTypeStr] - if !ok || (enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_PROTO3 && - enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_JSON) { - panic(fmt.Sprintf("Invalid encoding: %v", encodingTypeStr)) + if !ok { + // encodingTypeStr not valid, an error will be returned on deserialization + encodingType = int32(enumspb.ENCODING_TYPE_UNSPECIFIED) } return &commonpb.DataBlob{ diff --git a/common/persistence/serialization/blob.go b/common/persistence/serialization/blob.go index 614fffe24f5..fa2895a6cbc 100644 --- a/common/persistence/serialization/blob.go +++ b/common/persistence/serialization/blob.go @@ -25,8 +25,6 @@ package serialization import ( - "fmt" - "github.com/gogo/protobuf/proto" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -137,7 +135,7 @@ func encode( case enumspb.ENCODING_TYPE_PROTO3: return proto3Encode(object) default: - return commonpb.DataBlob{}, fmt.Errorf("unknown encoding type: %v", encoding) + return commonpb.DataBlob{}, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3) } } @@ -156,7 +154,7 @@ func decode( case enumspb.ENCODING_TYPE_PROTO3: return proto3Decode(blob, encoding, result) default: - return fmt.Errorf("unknown encoding type: %v", encoding) + return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3) } } @@ -164,7 +162,7 @@ func proto3Encode(m proto.Message) (commonpb.DataBlob, error) { blob := commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3} data, err := proto.Marshal(m) if err != nil { - return blob, fmt.Errorf("error serializing struct to blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err) + return blob, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err) } blob.Data = data return blob, nil @@ -172,11 +170,11 @@ func proto3Encode(m proto.Message) (commonpb.DataBlob, error) { func proto3Decode(blob []byte, encoding string, result proto.Message) error { if e, ok := enumspb.EncodingType_value[encoding]; !ok || enumspb.EncodingType(e) != enumspb.ENCODING_TYPE_PROTO3 { - return fmt.Errorf("encoding %s doesn't match expected encoding %v", encoding, enumspb.ENCODING_TYPE_PROTO3) + return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_PROTO3) } if err := proto.Unmarshal(blob, result); err != nil { - return fmt.Errorf("error deserializing blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err) + return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err) } return nil } diff --git a/common/persistence/serialization/serializer.go b/common/persistence/serialization/serializer.go index 1b033a08735..2dc7f803534 100644 --- a/common/persistence/serialization/serializer.go +++ b/common/persistence/serialization/serializer.go @@ -25,8 +25,10 @@ package serialization import ( + "errors" "fmt" "reflect" + "strings" "github.com/gogo/protobuf/proto" commonpb "go.temporal.io/api/common/v1" @@ -107,17 +109,20 @@ type ( // SerializationError is an error type for serialization SerializationError struct { - msg string + encodingType enumspb.EncodingType + wrappedErr error } // DeserializationError is an error type for deserialization DeserializationError struct { - msg string + encodingType enumspb.EncodingType + wrappedErr error } // UnknownEncodingTypeError is an error type for unknown or unsupported encoding type UnknownEncodingTypeError struct { - encodingType enumspb.EncodingType + encodingTypeStr string + expectedEncodingStr []string } serializerImpl struct { @@ -149,7 +154,7 @@ func (t *serializerImpl) DeserializeEvents(data *commonpb.DataBlob) ([]*historyp // Client API currently specifies encodingType on requests which span multiple of these objects err = events.Unmarshal(data.Data) default: - return nil, NewDeserializationError("DeserializeEvents invalid encoding") + return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3) } if err != nil { return nil, err @@ -179,7 +184,7 @@ func (t *serializerImpl) DeserializeEvent(data *commonpb.DataBlob) (*historypb.H // Client API currently specifies encodingType on requests which span multiple of these objects err = event.Unmarshal(data.Data) default: - return nil, NewDeserializationError("DeserializeEvent invalid encoding") + return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3) } if err != nil { @@ -212,7 +217,7 @@ func (t *serializerImpl) DeserializeClusterMetadata(data *commonpb.DataBlob) (*p // Client API currently specifies encodingType on requests which span multiple of these objects err = cm.Unmarshal(data.Data) default: - return nil, NewDeserializationError("DeserializeClusterMetadata invalid encoding") + return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3) } if err != nil { @@ -235,11 +240,11 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod // Client API currently specifies encodingType on requests which span multiple of these objects data, err = p.Marshal() default: - return nil, NewUnknownEncodingTypeError(encodingType) + return nil, NewUnknownEncodingTypeError(encodingType.String(), enumspb.ENCODING_TYPE_PROTO3) } if err != nil { - return nil, NewSerializationError(err.Error()) + return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err) } // Shouldn't happen, but keeping @@ -254,30 +259,68 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod } // NewUnknownEncodingTypeError returns a new instance of encoding type error -func NewUnknownEncodingTypeError(encodingType enumspb.EncodingType) error { - return &UnknownEncodingTypeError{encodingType: encodingType} +func NewUnknownEncodingTypeError( + encodingTypeStr string, + expectedEncoding ...enumspb.EncodingType, +) error { + if len(expectedEncoding) == 0 { + for encodingType := range enumspb.EncodingType_name { + expectedEncoding = append(expectedEncoding, enumspb.EncodingType(encodingType)) + } + } + expectedEncodingStr := make([]string, 0, len(expectedEncoding)) + for _, encodingType := range expectedEncoding { + expectedEncodingStr = append(expectedEncodingStr, encodingType.String()) + } + return &UnknownEncodingTypeError{ + encodingTypeStr: encodingTypeStr, + expectedEncodingStr: expectedEncodingStr, + } } func (e *UnknownEncodingTypeError) Error() string { - return fmt.Sprintf("unknown or unsupported encoding type %v", e.encodingType) + return fmt.Sprintf("unknown or unsupported encoding type %v, supported types: %v", + e.encodingTypeStr, + strings.Join(e.expectedEncodingStr, ","), + ) } // NewSerializationError returns a SerializationError -func NewSerializationError(msg string) error { - return &SerializationError{msg: msg} +func NewSerializationError( + encodingType enumspb.EncodingType, + serializationErr error, +) error { + return &SerializationError{ + encodingType: encodingType, + wrappedErr: serializationErr, + } } func (e *SerializationError) Error() string { - return fmt.Sprintf("serialization error: %v", e.msg) + return fmt.Sprintf("error serializing using %v encoding: %v", e.encodingType, e.wrappedErr) +} + +func (e *SerializationError) Unwrap() error { + return e.wrappedErr } // NewDeserializationError returns a DeserializationError -func NewDeserializationError(msg string) error { - return &DeserializationError{msg: msg} +func NewDeserializationError( + encodingType enumspb.EncodingType, + deserializationErr error, +) error { + return &DeserializationError{ + encodingType: encodingType, + wrappedErr: deserializationErr, + } } func (e *DeserializationError) Error() string { - return fmt.Sprintf("deserialization error: %v", e.msg) + return fmt.Sprintf("error deserializing using %v encoding: %v", e.encodingType, e.wrappedErr) +} + +func (e *DeserializationError) Unwrap() error { + return e.wrappedErr } func (t *serializerImpl) ShardInfoToBlob(info *persistencespb.ShardInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) { @@ -469,15 +512,15 @@ func (t *serializerImpl) ReplicationTaskFromBlob(data *commonpb.DataBlob) (*repl func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error { if data == nil { // TODO: should we return nil or error? - return NewDeserializationError("cannot decode nil") + return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil")) } if data.EncodingType != enumspb.ENCODING_TYPE_PROTO3 { - return NewDeserializationError(fmt.Sprintf("encoding %v doesn't match expected encoding %v", data.EncodingType, enumspb.ENCODING_TYPE_PROTO3)) + return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3) } if err := proto.Unmarshal(data.Data, result); err != nil { - return NewDeserializationError(fmt.Sprintf("error deserializing blob using %v encoding: %s", enumspb.ENCODING_TYPE_PROTO3, err)) + return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err) } return nil } @@ -485,7 +528,7 @@ func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error { func decodeBlob(data *commonpb.DataBlob, result proto.Message) error { if data == nil { // TODO: should we return nil or error? - return NewDeserializationError("cannot decode nil") + return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil")) } if data.Data == nil { @@ -498,7 +541,7 @@ func decodeBlob(data *commonpb.DataBlob, result proto.Message) error { case enumspb.ENCODING_TYPE_PROTO3: return ProtoDecodeBlob(data, result) default: - return NewUnknownEncodingTypeError(data.EncodingType) + return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3) } } @@ -523,13 +566,13 @@ func encodeBlob(o proto.Message, encoding enumspb.EncodingType) (*commonpb.DataB case enumspb.ENCODING_TYPE_PROTO3: return ProtoEncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3) default: - return nil, NewUnknownEncodingTypeError(encoding) + return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3) } } func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) { if encoding != enumspb.ENCODING_TYPE_PROTO3 { - return nil, NewUnknownEncodingTypeError(encoding) + return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_PROTO3) } if m == nil || (reflect.ValueOf(m).Kind() == reflect.Ptr && reflect.ValueOf(m).IsNil()) { @@ -543,7 +586,7 @@ func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb. blob := &commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3} data, err := proto.Marshal(m) if err != nil { - return nil, NewSerializationError(err.Error()) + return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err) } blob.Data = data return blob, nil diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 1f79b3ac82b..d3b7e7199a0 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -28,6 +28,9 @@ package queues import ( "context" + "errors" + "fmt" + "runtime/debug" "sync" "time" @@ -42,6 +45,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/serialization" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/consts" @@ -178,10 +182,15 @@ func (e *executableImpl) Execute() (retErr error) { headers.NewBackgroundCallerInfo(ns.String()), ) - var panicErr error defer func() { - if panicErr != nil { - retErr = panicErr + if panicObj := recover(); panicObj != nil { + err, ok := panicObj.(error) + if !ok { + err = serviceerror.NewInternal(fmt.Sprintf("panic: %v", panicObj)) + } + + e.logger.Error("Panic is captured", tag.SysStackTrace(string(debug.Stack())), tag.Error(err)) + retErr = err // we need to guess the metrics tags here as we don't know which execution logic // is actually used which is upto the executor implementation @@ -189,8 +198,6 @@ func (e *executableImpl) Execute() (retErr error) { } }() - defer log.CapturePanic(e.logger, &panicErr) - startTime := e.timeSource.Now() metricsTags, isActive, err := e.executor.Execute(ctx, e) @@ -291,6 +298,16 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { return err } + var deserializationError *serialization.DeserializationError + var encodingTypeError *serialization.UnknownEncodingTypeError + if errors.As(err, &deserializationError) || errors.As(err, &encodingTypeError) { + // likely due to data corruption, emit logs, metrics & drop the task by return nil so that + // task will be marked as completed. + e.taggedMetricsHandler.Counter(metrics.TaskCorruptionCounter.GetMetricName()).Record(1) + e.logger.Error("Drop task due to serialization error", tag.Error(err)) + return nil + } + e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1) e.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed) diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index 65f78d9530a..591a7c77cd8 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/clock" @@ -42,6 +43,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/serialization" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/tasks" @@ -124,6 +126,19 @@ func (s *executableSuite) TestExecute_CapturePanic() { s.Error(executable.Execute()) } +func (s *executableSuite) TestExecuteHandleErr_Corrupted() { + executable := s.newTestExecutable() + + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn( + func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) { + panic(serialization.NewUnknownEncodingTypeError("unknownEncoding", enumspb.ENCODING_TYPE_PROTO3)) + }, + ) + err := executable.Execute() + s.Error(err) + s.NoError(executable.HandleErr(err)) +} + func (s *executableSuite) TestHandleErr_EntityNotExists() { executable := s.newTestExecutable()