Skip to content

Commit

Permalink
Drop task on serialization error (#3803)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 12, 2023
1 parent bd06826 commit 26503cf
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 43 deletions.
3 changes: 2 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,11 +1433,12 @@ var (
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter")
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
TaskThrottledCounter = NewCounterDef("task_errors_throttled")
TaskCorruptionCounter = NewCounterDef("task_errors_corruption")
TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency")
TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter")
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")
TaskReschedulerPendingTasks = NewDimensionlessHistogramDef("task_rescheduler_pending_tasks")
TaskThrottledCounter = NewCounterDef("task_throttled_counter")
PendingTasksCounter = NewDimensionlessHistogramDef("pending_tasks")
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
Expand Down
9 changes: 4 additions & 5 deletions common/persistence/data_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,21 @@
package persistence

import (
"fmt"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
)

// NewDataBlob returns a new DataBlob
// TODO: return an UnknowEncodingType error with the actual type string when encodingTypeStr is invalid
func NewDataBlob(data []byte, encodingTypeStr string) *commonpb.DataBlob {
if len(data) == 0 {
return nil
}

encodingType, ok := enumspb.EncodingType_value[encodingTypeStr]
if !ok || (enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_PROTO3 &&
enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_JSON) {
panic(fmt.Sprintf("Invalid encoding: %v", encodingTypeStr))
if !ok {
// encodingTypeStr not valid, an error will be returned on deserialization
encodingType = int32(enumspb.ENCODING_TYPE_UNSPECIFIED)
}

return &commonpb.DataBlob{
Expand Down
12 changes: 5 additions & 7 deletions common/persistence/serialization/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package serialization

import (
"fmt"

"github.com/gogo/protobuf/proto"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -137,7 +135,7 @@ func encode(
case enumspb.ENCODING_TYPE_PROTO3:
return proto3Encode(object)
default:
return commonpb.DataBlob{}, fmt.Errorf("unknown encoding type: %v", encoding)
return commonpb.DataBlob{}, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

Expand All @@ -156,27 +154,27 @@ func decode(
case enumspb.ENCODING_TYPE_PROTO3:
return proto3Decode(blob, encoding, result)
default:
return fmt.Errorf("unknown encoding type: %v", encoding)
return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

func proto3Encode(m proto.Message) (commonpb.DataBlob, error) {
blob := commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3}
data, err := proto.Marshal(m)
if err != nil {
return blob, fmt.Errorf("error serializing struct to blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err)
return blob, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
blob.Data = data
return blob, nil
}

func proto3Decode(blob []byte, encoding string, result proto.Message) error {
if e, ok := enumspb.EncodingType_value[encoding]; !ok || enumspb.EncodingType(e) != enumspb.ENCODING_TYPE_PROTO3 {
return fmt.Errorf("encoding %s doesn't match expected encoding %v", encoding, enumspb.ENCODING_TYPE_PROTO3)
return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_PROTO3)
}

if err := proto.Unmarshal(blob, result); err != nil {
return fmt.Errorf("error deserializing blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err)
return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
return nil
}
93 changes: 68 additions & 25 deletions common/persistence/serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
package serialization

import (
"errors"
"fmt"
"reflect"
"strings"

"github.com/gogo/protobuf/proto"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -107,17 +109,20 @@ type (

// SerializationError is an error type for serialization
SerializationError struct {
msg string
encodingType enumspb.EncodingType
wrappedErr error
}

// DeserializationError is an error type for deserialization
DeserializationError struct {
msg string
encodingType enumspb.EncodingType
wrappedErr error
}

// UnknownEncodingTypeError is an error type for unknown or unsupported encoding type
UnknownEncodingTypeError struct {
encodingType enumspb.EncodingType
encodingTypeStr string
expectedEncodingStr []string
}

serializerImpl struct {
Expand Down Expand Up @@ -149,7 +154,7 @@ func (t *serializerImpl) DeserializeEvents(data *commonpb.DataBlob) ([]*historyp
// Client API currently specifies encodingType on requests which span multiple of these objects
err = events.Unmarshal(data.Data)
default:
return nil, NewDeserializationError("DeserializeEvents invalid encoding")
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,7 +184,7 @@ func (t *serializerImpl) DeserializeEvent(data *commonpb.DataBlob) (*historypb.H
// Client API currently specifies encodingType on requests which span multiple of these objects
err = event.Unmarshal(data.Data)
default:
return nil, NewDeserializationError("DeserializeEvent invalid encoding")
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err != nil {
Expand Down Expand Up @@ -212,7 +217,7 @@ func (t *serializerImpl) DeserializeClusterMetadata(data *commonpb.DataBlob) (*p
// Client API currently specifies encodingType on requests which span multiple of these objects
err = cm.Unmarshal(data.Data)
default:
return nil, NewDeserializationError("DeserializeClusterMetadata invalid encoding")
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err != nil {
Expand All @@ -235,11 +240,11 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod
// Client API currently specifies encodingType on requests which span multiple of these objects
data, err = p.Marshal()
default:
return nil, NewUnknownEncodingTypeError(encodingType)
return nil, NewUnknownEncodingTypeError(encodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err != nil {
return nil, NewSerializationError(err.Error())
return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}

// Shouldn't happen, but keeping
Expand All @@ -254,30 +259,68 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod
}

// NewUnknownEncodingTypeError returns a new instance of encoding type error
func NewUnknownEncodingTypeError(encodingType enumspb.EncodingType) error {
return &UnknownEncodingTypeError{encodingType: encodingType}
func NewUnknownEncodingTypeError(
encodingTypeStr string,
expectedEncoding ...enumspb.EncodingType,
) error {
if len(expectedEncoding) == 0 {
for encodingType := range enumspb.EncodingType_name {
expectedEncoding = append(expectedEncoding, enumspb.EncodingType(encodingType))
}
}
expectedEncodingStr := make([]string, 0, len(expectedEncoding))
for _, encodingType := range expectedEncoding {
expectedEncodingStr = append(expectedEncodingStr, encodingType.String())
}
return &UnknownEncodingTypeError{
encodingTypeStr: encodingTypeStr,
expectedEncodingStr: expectedEncodingStr,
}
}

func (e *UnknownEncodingTypeError) Error() string {
return fmt.Sprintf("unknown or unsupported encoding type %v", e.encodingType)
return fmt.Sprintf("unknown or unsupported encoding type %v, supported types: %v",
e.encodingTypeStr,
strings.Join(e.expectedEncodingStr, ","),
)
}

// NewSerializationError returns a SerializationError
func NewSerializationError(msg string) error {
return &SerializationError{msg: msg}
func NewSerializationError(
encodingType enumspb.EncodingType,
serializationErr error,
) error {
return &SerializationError{
encodingType: encodingType,
wrappedErr: serializationErr,
}
}

func (e *SerializationError) Error() string {
return fmt.Sprintf("serialization error: %v", e.msg)
return fmt.Sprintf("error serializing using %v encoding: %v", e.encodingType, e.wrappedErr)
}

func (e *SerializationError) Unwrap() error {
return e.wrappedErr
}

// NewDeserializationError returns a DeserializationError
func NewDeserializationError(msg string) error {
return &DeserializationError{msg: msg}
func NewDeserializationError(
encodingType enumspb.EncodingType,
deserializationErr error,
) error {
return &DeserializationError{
encodingType: encodingType,
wrappedErr: deserializationErr,
}
}

func (e *DeserializationError) Error() string {
return fmt.Sprintf("deserialization error: %v", e.msg)
return fmt.Sprintf("error deserializing using %v encoding: %v", e.encodingType, e.wrappedErr)
}

func (e *DeserializationError) Unwrap() error {
return e.wrappedErr
}

func (t *serializerImpl) ShardInfoToBlob(info *persistencespb.ShardInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
Expand Down Expand Up @@ -469,23 +512,23 @@ func (t *serializerImpl) ReplicationTaskFromBlob(data *commonpb.DataBlob) (*repl
func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil"))
}

if data.EncodingType != enumspb.ENCODING_TYPE_PROTO3 {
return NewDeserializationError(fmt.Sprintf("encoding %v doesn't match expected encoding %v", data.EncodingType, enumspb.ENCODING_TYPE_PROTO3))
return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err := proto.Unmarshal(data.Data, result); err != nil {
return NewDeserializationError(fmt.Sprintf("error deserializing blob using %v encoding: %s", enumspb.ENCODING_TYPE_PROTO3, err))
return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
return nil
}

func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil"))
}

if data.Data == nil {
Expand All @@ -498,7 +541,7 @@ func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
case enumspb.ENCODING_TYPE_PROTO3:
return ProtoDecodeBlob(data, result)
default:
return NewUnknownEncodingTypeError(data.EncodingType)
return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

Expand All @@ -523,13 +566,13 @@ func encodeBlob(o proto.Message, encoding enumspb.EncodingType) (*commonpb.DataB
case enumspb.ENCODING_TYPE_PROTO3:
return ProtoEncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
default:
return nil, NewUnknownEncodingTypeError(encoding)
return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
if encoding != enumspb.ENCODING_TYPE_PROTO3 {
return nil, NewUnknownEncodingTypeError(encoding)
return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if m == nil || (reflect.ValueOf(m).Kind() == reflect.Ptr && reflect.ValueOf(m).IsNil()) {
Expand All @@ -543,7 +586,7 @@ func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.
blob := &commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3}
data, err := proto.Marshal(m)
if err != nil {
return nil, NewSerializationError(err.Error())
return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
blob.Data = data
return blob, nil
Expand Down
27 changes: 22 additions & 5 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ package queues

import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"

Expand All @@ -42,6 +45,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/serialization"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/consts"
Expand Down Expand Up @@ -178,19 +182,22 @@ func (e *executableImpl) Execute() (retErr error) {
headers.NewBackgroundCallerInfo(ns.String()),
)

var panicErr error
defer func() {
if panicErr != nil {
retErr = panicErr
if panicObj := recover(); panicObj != nil {
err, ok := panicObj.(error)
if !ok {
err = serviceerror.NewInternal(fmt.Sprintf("panic: %v", panicObj))
}

e.logger.Error("Panic is captured", tag.SysStackTrace(string(debug.Stack())), tag.Error(err))
retErr = err

// we need to guess the metrics tags here as we don't know which execution logic
// is actually used which is upto the executor implementation
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
}
}()

defer log.CapturePanic(e.logger, &panicErr)

startTime := e.timeSource.Now()

metricsTags, isActive, err := e.executor.Execute(ctx, e)
Expand Down Expand Up @@ -291,6 +298,16 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
return err
}

var deserializationError *serialization.DeserializationError
var encodingTypeError *serialization.UnknownEncodingTypeError
if errors.As(err, &deserializationError) || errors.As(err, &encodingTypeError) {
// likely due to data corruption, emit logs, metrics & drop the task by return nil so that
// task will be marked as completed.
e.taggedMetricsHandler.Counter(metrics.TaskCorruptionCounter.GetMetricName()).Record(1)
e.logger.Error("Drop task due to serialization error", tag.Error(err))
return nil
}

e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1)

e.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand Down
Loading

0 comments on commit 26503cf

Please sign in to comment.