Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): reuse buf to reduce memory allocation when checksum enabled #11522

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 105 additions & 101 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@
metricTotalRows prometheus.Gauge
metricIgnoredDMLEventCounter prometheus.Counter

integrity *integrity.Config
verifier *checksumVerifier

// decoder and preDecoder are used to decode the raw value, also used to extract checksum,
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

lastSkipOldValueTime time.Time
}

// NewMounter creates a mounter
Expand All @@ -119,8 +117,8 @@
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricIgnoredDMLEventCounter: ignoredDMLEventCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,
tz: tz,
verifier: newChecksumVerifier(changefeedID, integrity, tz),
}
}

Expand Down Expand Up @@ -450,74 +448,76 @@
return cols, rawCols, columnInfos, nil
}

func calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, tz *time.Location,
func (v *checksumVerifier) calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
v.buf = v.buf[:0]
v.columns = v.columns[:0]
for idx, col := range columnInfos {
column := rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
}
columns = append(columns, column)
v.columns = append(v.columns, column)
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
sort.Slice(v.columns, func(i, j int) bool {
return v.columns[i].ID < v.columns[j].ID
})

calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
Cols: v.columns,
Data: v.buf,
}

checksum, err := calculator.Checksum(tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
checksum, err := calculator.Checksum(v.tz)
return checksum, errors.Trace(err)
}

func (m *mounter) verifyColumnChecksum(
// return error if calculate checksum failed, this should not happen.
// return true if the checksum matched.
// return the matched checksum.
func (v *checksumVerifier) verifyColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
decoder *rowcodec.DatumMapDecoder, skipFail bool,
) (uint32, bool, error) {
) (checksum uint32, corrupted bool, err error) {

Check warning on line 482 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L482

Added line #L482 was not covered by tests
// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
return 0, false, nil

Check warning on line 487 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L487

Added line #L487 was not covered by tests
}

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
checksum, err = v.calculateColumnChecksum(columnInfos, rawColumns)

Check warning on line 490 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L490

Added line #L490 was not covered by tests
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, err
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Uint32("checksum", checksum), zap.Error(err))
return checksum, false, err

Check warning on line 494 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L492-L494

Added lines #L492 - L494 were not covered by tests
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched", zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, true, nil
return checksum, false, nil

Check warning on line 500 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L500

Added line #L500 was not covered by tests
}

extra, ok := decoder.GetExtraChecksum()
if ok && checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL execution phase",
zap.Uint32("checksum", checksum), zap.Uint32("extra", extra))
return checksum, true, nil
return checksum, false, nil

Check warning on line 507 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L507

Added line #L507 was not covered by tests
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
log.Error("verify the previous checksum v1 failed",

Check warning on line 511 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L511

Added line #L511 was not covered by tests
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
if time.Since(v.lastSkipOldValueTime) > time.Minute {

Check warning on line 516 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L516

Added line #L516 was not covered by tests
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
m.lastSkipOldValueTime = time.Now()
v.lastSkipOldValueTime = time.Now()

Check warning on line 520 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L520

Added line #L520 was not covered by tests
}
return checksum, true, nil
}
Expand Down Expand Up @@ -598,18 +598,41 @@
return types.Datum{}, nil
}

func verifyRawBytesChecksum(
type checksumVerifier struct {
changefeed model.ChangeFeedID
integrity *integrity.Config

tz *time.Location
buf []byte
columns []rowcodec.ColData

// used for the checksum v2.
datums []*types.Datum
columnIDs []int64

lastSkipOldValueTime time.Time
}

func newChecksumVerifier(changefeed model.ChangeFeedID, integrity *integrity.Config, tz *time.Location) *checksumVerifier {
return &checksumVerifier{
changefeed: changefeed,
integrity: integrity,
tz: tz,
buf: make([]byte, 0, 1024),
}
}

func (v *checksumVerifier) verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
) (uint32, bool, error) {
key kv.Key,
) (corrupted bool, err error) {
expected, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
return false, nil

Check warning on line 631 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L631

Added line #L631 was not covered by tests
}
var (
columnIDs []int64
datums []*types.Datum
)
v.buf = v.buf[:0]
v.datums = v.datums[:0]
v.columnIDs = v.columnIDs[:0]
for _, col := range columns {
// TiDB does not encode null value into the bytes, so just ignore it.
if col.Value == nil {
Expand All @@ -619,41 +642,33 @@
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
return 0, false, errors.Trace(err)
return false, errors.Trace(err)

Check warning on line 645 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L645

Added line #L645 was not covered by tests
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
v.datums = append(v.datums, &datum)
v.columnIDs = append(v.columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(v.tz, v.columnIDs, v.datums, key, v.buf)
if err != nil {
return 0, false, errors.Trace(err)
return false, errors.Trace(err)

Check warning on line 652 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L652

Added line #L652 was not covered by tests
}
if obtained == expected {
return expected, true, nil
return false, nil
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))

return expected, false, nil
return true, nil

Check warning on line 660 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L660

Added line #L660 was not covered by tests
}

// return error when calculate the checksum.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
func (v *checksumVerifier) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
key kv.Key, decoder *rowcodec.DatumMapDecoder, isPreRow bool,
) (checksum uint32, corrupted bool, err error) {
if !v.integrity.Enabled() {
return 0, false, nil
}

version := decoder.ChecksumVersion()
Expand All @@ -662,24 +677,37 @@
// skip old value checksum verification for the checksum v1, since it cannot handle
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
checksum, corrupted, err = v.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)

Check warning on line 680 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L680

Added line #L680 was not covered by tests
if err != nil {
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
case 1:
corrupted, err = v.verifyRawBytesChecksum(tableInfo, columns, decoder, key)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
if !corrupted {
checksum, err = v.calculateColumnChecksum(columnInfos, rawColumns)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}

Check warning on line 694 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L692-L694

Added lines #L692 - L694 were not covered by tests
}
default:
return 0, false, errors.Errorf("unknown checksum version %d", version)

Check warning on line 697 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L697

Added line #L697 was not covered by tests
}

if corrupted {
log.Error("columns checksum mismatch",
zap.Uint32("checksum", checksum),
zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns))
if v.integrity.ErrorHandle() {
return 0, true, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(v.changefeed.Namespace, v.changefeed.ID)
}

Check warning on line 708 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L701-L708

Added lines #L701 - L708 were not covered by tests
}
return 0, false, errors.Errorf("unknown checksum version %d", version)
return checksum, corrupted, nil
}

func (m *mounter) mountRowKVEntry(
Expand All @@ -688,10 +716,8 @@
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
matched bool
err error

checksum *integrity.Checksum
checksum *integrity.Checksum

checksumVersion int
corrupted bool
Expand All @@ -717,56 +743,34 @@
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, corrupted, err = m.verifier.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, m.preDecoder, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
log.Error("verify the previous columns checksum failed",

Check warning on line 748 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L748

Added line #L748 was not covered by tests
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
}
corrupted = true
}
}

var (
cols []*model.ColumnData
rawCols []types.Datum
currentChecksum uint32
cols []*model.ColumnData
rawCols []types.Datum
currentChecksum uint32
currentCorrupted bool
)
if row.RowExist {
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, currentCorrupted, err = m.verifier.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, m.decoder, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
log.Error("verify the current columns checksum failed",

Check warning on line 768 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L768

Added line #L768 was not covered by tests
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
}
corrupted = true
}
corrupted = corrupted || currentCorrupted
}

var intRowID int64
Expand Down
9 changes: 6 additions & 3 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
log.Info("begin emit ddl event",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Any("DDL", ddl))
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query))

doWrite := func() (err error) {
if err = s.makeSinkReady(ctx); err == nil {
Expand All @@ -251,14 +252,16 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
log.Error("Execute DDL failed",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Any("DDL", ddl),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query),
zap.Error(err))
} else {
ddl.Done.Store(true)
log.Info("Execute DDL succeeded",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Any("DDL", ddl))
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query))
}
return
}
Expand Down
6 changes: 4 additions & 2 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.String("table", job.TableName),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("query", job.Query),
zap.Uint64("pullerResolvedTs", p.getResolvedTs()))
zap.Uint64("pullerResolvedTs", p.getResolvedTs()),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
zap.Int64("schemaVersion", p.schemaVersion),
zap.String("query", job.Query))
return true, nil
}

Expand Down
Loading
Loading