Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): reuse buf to reduce memory allocation when checksum enabled #11522

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 95 additions & 83 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@
metricTotalRows prometheus.Gauge
metricIgnoredDMLEventCounter prometheus.Counter

integrity *integrity.Config
verifier *checksumVerifier

// decoder and preDecoder are used to decode the raw value, also used to extract checksum,
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

lastSkipOldValueTime time.Time
}

// NewMounter creates a mounter
Expand All @@ -119,8 +117,8 @@
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricIgnoredDMLEventCounter: ignoredDMLEventCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,
tz: tz,
verifier: newChecksumVerifier(changefeedID, integrity, tz),
}
}

Expand Down Expand Up @@ -450,34 +448,38 @@
return cols, rawCols, columnInfos, nil
}

func calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, tz *time.Location,
func (v *checksumVerifier) calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
v.buf = v.buf[:0]
v.columns = v.columns[:0]
for idx, col := range columnInfos {
column := rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
}
columns = append(columns, column)
v.columns = append(v.columns, column)
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
sort.Slice(v.columns, func(i, j int) bool {
return v.columns[i].ID < v.columns[j].ID
})

calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
Cols: v.columns,
Data: v.buf,
}

checksum, err := calculator.Checksum(tz)
checksum, err := calculator.Checksum(v.tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

func (m *mounter) verifyColumnChecksum(
// return error if calculate checksum failed, this should not happen.
// return true if the checksum matched.
// return the matched checksum.
func (v *checksumVerifier) verifyColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
decoder *rowcodec.DatumMapDecoder, skipFail bool,
) (uint32, bool, error) {
Expand All @@ -488,7 +490,7 @@
return 0, true, nil
}

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
checksum, err := v.calculateColumnChecksum(columnInfos, rawColumns)

Check warning on line 493 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L493

Added line #L493 was not covered by tests
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, err
Expand All @@ -508,16 +510,16 @@
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
log.Error("verify the previous checksum v1 failed",

Check warning on line 513 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L513

Added line #L513 was not covered by tests
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
if time.Since(v.lastSkipOldValueTime) > time.Minute {

Check warning on line 518 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L518

Added line #L518 was not covered by tests
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
m.lastSkipOldValueTime = time.Now()
v.lastSkipOldValueTime = time.Now()

Check warning on line 522 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L522

Added line #L522 was not covered by tests
}
return checksum, true, nil
}
Expand Down Expand Up @@ -598,18 +600,41 @@
return types.Datum{}, nil
}

func verifyRawBytesChecksum(
type checksumVerifier struct {
changefeed model.ChangeFeedID
integrity *integrity.Config

tz *time.Location
buf []byte
columns []rowcodec.ColData

// used for the checksum v2.
datums []*types.Datum
columnIDs []int64

lastSkipOldValueTime time.Time
}

func newChecksumVerifier(changefeed model.ChangeFeedID, integrity *integrity.Config, tz *time.Location) *checksumVerifier {
return &checksumVerifier{
changefeed: changefeed,
integrity: integrity,
tz: tz,
buf: make([]byte, 0, 1024),
}
}

func (v *checksumVerifier) verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
) (uint32, bool, error) {
key kv.Key,
) (bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
return true, nil

Check warning on line 633 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L633

Added line #L633 was not covered by tests
}
var (
columnIDs []int64
datums []*types.Datum
)
v.buf = v.buf[:0]
v.datums = v.datums[:0]
v.columnIDs = v.columnIDs[:0]
for _, col := range columns {
// TiDB does not encode null value into the bytes, so just ignore it.
if col.Value == nil {
Expand All @@ -619,67 +644,74 @@
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
return 0, false, errors.Trace(err)
return false, errors.Trace(err)

Check warning on line 647 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L647

Added line #L647 was not covered by tests
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
v.datums = append(v.datums, &datum)
v.columnIDs = append(v.columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(v.tz, v.columnIDs, v.datums, key, v.buf)
if err != nil {
return 0, false, errors.Trace(err)
return false, errors.Trace(err)

Check warning on line 654 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L654

Added line #L654 was not covered by tests
}
if obtained == expected {
return expected, true, nil
return true, nil
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))

return expected, false, nil
return false, nil

Check warning on line 662 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L662

Added line #L662 was not covered by tests
}

// return error when calculate the checksum.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
func (v *checksumVerifier) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
key kv.Key, decoder *rowcodec.DatumMapDecoder, isPreRow bool,
) (checksum uint32, matched bool, err error) {
if !v.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

version := decoder.ChecksumVersion()
switch version {
case 0:
// skip old value checksum verification for the checksum v1, since it cannot handle
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
checksum, matched, err = v.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)

Check warning on line 682 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L682

Added line #L682 was not covered by tests
if err != nil {
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
case 1:
matched, err = v.verifyRawBytesChecksum(tableInfo, columns, decoder, key)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
if matched {
checksum, err = v.calculateColumnChecksum(columnInfos, rawColumns)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}

Check warning on line 696 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L694-L696

Added lines #L694 - L696 were not covered by tests
}
default:
return 0, false, errors.Errorf("unknown checksum version %d", version)

Check warning on line 699 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L699

Added line #L699 was not covered by tests
}

if matched {
return checksum, true, nil
}

log.Error("columns checksum mismatch",
zap.Uint32("checksum", checksum),
zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns))
if v.integrity.ErrorHandle() {
return 0, false, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(v.changefeed.Namespace, v.changefeed.ID)

Check warning on line 712 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L706-L712

Added lines #L706 - L712 were not covered by tests
}
return 0, false, errors.Errorf("unknown checksum version %d", version)
return checksum, false, nil

Check warning on line 714 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L714

Added line #L714 was not covered by tests
}

func (m *mounter) mountRowKVEntry(
Expand All @@ -688,10 +720,9 @@
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
matched bool
err error

checksum *integrity.Checksum
matched bool
checksum *integrity.Checksum

checksumVersion int
corrupted bool
Expand All @@ -717,23 +748,14 @@
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, matched, err = m.verifier.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, m.preDecoder, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
log.Error("verify the previous columns checksum failed",

Check warning on line 753 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L753

Added line #L753 was not covered by tests
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
}
corrupted = true
}
}
Expand All @@ -749,24 +771,14 @@
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, matched, err = m.verifier.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, m.decoder, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
log.Error("verify the current columns checksum failed",

Check warning on line 776 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L776

Added line #L776 was not covered by tests
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
}
corrupted = true
}
corrupted = corrupted || !matched
}

var intRowID int64
Expand Down
9 changes: 6 additions & 3 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
log.Info("begin emit ddl event",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Any("DDL", ddl))
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query))

doWrite := func() (err error) {
if err = s.makeSinkReady(ctx); err == nil {
Expand All @@ -251,14 +252,16 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
log.Error("Execute DDL failed",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Any("DDL", ddl),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query),
zap.Error(err))
} else {
ddl.Done.Store(true)
log.Info("Execute DDL succeeded",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Any("DDL", ddl))
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query))
}
return
}
Expand Down
6 changes: 4 additions & 2 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.String("table", job.TableName),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("query", job.Query),
zap.Uint64("pullerResolvedTs", p.getResolvedTs()))
zap.Uint64("pullerResolvedTs", p.getResolvedTs()),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
zap.Int64("schemaVersion", p.schemaVersion),
zap.String("query", job.Query))
return true, nil
}

Expand Down
Loading
Loading