diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 2e0f274650140..ca463c66af52d 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -947,9 +947,11 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt) } -// getFirstAndLastKey reads the first and last key in range [lowerBound, upperBound) +var _ ingestData = (*Engine)(nil) + +// GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound) // in the engine. Empty upperBound means unbounded. -func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) { +func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) { if len(upperBound) == 0 { // we use empty slice for unbounded upper bound, but it means max value in pebble // so reset to nil @@ -980,6 +982,22 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by return firstKey, lastKey, nil } +// NewIter implements ingestData interface. +func (e *Engine) NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter { + return e.newKVIter(ctx, &pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) +} + +// GetTS implements ingestData interface. +func (e *Engine) GetTS() uint64 { + return e.TS +} + +// Finish implements ingestData interface. +func (e *Engine) Finish(totalBytes, totalCount int64) { + e.importedKVSize.Add(totalBytes) + e.importedKVCount.Add(totalCount) +} + type sstMeta struct { path string minKey []byte diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index 93a007f867016..c2604caff2f9a 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -142,27 +142,27 @@ func TestGetFirstAndLastKey(t *testing.T) { err = db.Set([]byte("e"), []byte("e"), nil) require.NoError(t, err) - first, last, err := f.getFirstAndLastKey(nil, nil) + first, last, err := f.GetFirstAndLastKey(nil, nil) require.NoError(t, err) require.Equal(t, []byte("a"), first) require.Equal(t, []byte("e"), last) - first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("d")) + first, last, err = f.GetFirstAndLastKey([]byte("b"), []byte("d")) require.NoError(t, err) require.Equal(t, []byte("c"), first) require.Equal(t, []byte("c"), last) - first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("f")) + first, last, err = f.GetFirstAndLastKey([]byte("b"), []byte("f")) require.NoError(t, err) require.Equal(t, []byte("c"), first) require.Equal(t, []byte("e"), last) - first, last, err = f.getFirstAndLastKey([]byte("y"), []byte("z")) + first, last, err = f.GetFirstAndLastKey([]byte("y"), []byte("z")) require.NoError(t, err) require.Nil(t, first) require.Nil(t, last) - first, last, err = f.getFirstAndLastKey([]byte("e"), []byte("")) + first, last, err = f.GetFirstAndLastKey([]byte("e"), []byte("")) require.NoError(t, err) require.Equal(t, []byte("e"), first) require.Equal(t, []byte("e"), last) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 4543fede07b02..556d030bee9a8 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -28,25 +28,12 @@ import ( // Iter abstract iterator method for Ingester. type Iter interface { + ForwardIter // Seek seek to specify position. // if key not found, seeks next key position in iter. Seek(key []byte) bool - // Error return current error on this iter. - Error() error - // First moves this iter to the first key. - First() bool // Last moves this iter to the last key. Last() bool - // Valid check this iter reach the end. - Valid() bool - // Next moves this iter forward. - Next() bool - // Key represents current position pair's key. - Key() []byte - // Value represents current position pair's Value. - Value() []byte - // Close close this iter. - Close() error // OpType represents operations of pair. currently we have two types. // 1. Put // 2. Delete diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 7c6609e1720c4..6727246b69768 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1024,7 +1024,7 @@ func (local *Backend) readAndSplitIntoRange( sizeLimit int64, keysLimit int64, ) ([]Range, error) { - firstKey, lastKey, err := engine.getFirstAndLastKey(nil, nil) + firstKey, lastKey, err := engine.GetFirstAndLastKey(nil, nil) if err != nil { return nil, err } @@ -1191,7 +1191,7 @@ var fakeRegionJobs map[[2]string]struct { // It will retry internally when scan region meet error. func (local *Backend) generateJobForRange( ctx context.Context, - engine *Engine, + engine ingestData, keyRange Range, regionSplitSize, regionSplitKeys int64, ) ([]*regionJob, error) { @@ -1210,7 +1210,7 @@ func (local *Backend) generateJobForRange( }) start, end := keyRange.start, keyRange.end - pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end) + pairStart, pairEnd, err := engine.GetFirstAndLastKey(start, end) if err != nil { return nil, err } @@ -1247,7 +1247,7 @@ func (local *Backend) generateJobForRange( keyRange: intersectRange(region.Region, Range{start: start, end: end}), region: region, stage: regionScanned, - engine: engine, + ingestData: engine, regionSplitSize: regionSplitSize, regionSplitKeys: regionSplitKeys, metrics: local.metrics, @@ -1283,7 +1283,7 @@ func (local *Backend) startWorker( case needRescan: jobs, err2 := local.generateJobForRange( ctx, - job.engine, + job.ingestData, job.keyRange, job.regionSplitSize, job.regionSplitKeys, diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 6c2aa6fb435d1..6f775390fd9cc 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1129,6 +1129,80 @@ func TestLocalIsRetryableTiKVWriteError(t *testing.T) { require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } +// mockIngestData must be ordered on the first element of each [2][]byte. +type mockIngestData [][2][]byte + +func (m mockIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) { + i, j := m.getFirstAndLastKeyIdx(lowerBound, upperBound) + if i == -1 { + return nil, nil, nil + } + return m[i][0], m[j][0], nil +} + +func (m mockIngestData) getFirstAndLastKeyIdx(lowerBound, upperBound []byte) (int, int) { + var first int + if len(lowerBound) == 0 { + first = 0 + } else { + i, _ := sort.Find(len(m), func(i int) int { + return bytes.Compare(lowerBound, m[i][0]) + }) + if i == len(m) { + return -1, -1 + } + first = i + } + + var last int + if len(upperBound) == 0 { + last = len(m) - 1 + } else { + i, _ := sort.Find(len(m), func(i int) int { + return bytes.Compare(upperBound, m[i][1]) + }) + if i == 0 { + return -1, -1 + } + last = i - 1 + } + return first, last +} + +type mockIngestIter struct { + data mockIngestData + startIdx, endIdx, curIdx int +} + +func (m *mockIngestIter) First() bool { + m.curIdx = m.startIdx + return true +} + +func (m *mockIngestIter) Valid() bool { return m.curIdx < m.endIdx } + +func (m *mockIngestIter) Next() bool { + m.curIdx++ + return m.Valid() +} + +func (m *mockIngestIter) Key() []byte { return m.data[m.curIdx][0] } + +func (m *mockIngestIter) Value() []byte { return m.data[m.curIdx][1] } + +func (m *mockIngestIter) Close() error { return nil } + +func (m *mockIngestIter) Error() error { return nil } + +func (m mockIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter { + i, j := m.getFirstAndLastKeyIdx(lowerBound, upperBound) + return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i} +} + +func (m mockIngestData) GetTS() uint64 { return 0 } + +func (m mockIngestData) Finish(_, _ int64) {} + func TestCheckPeersBusy(t *testing.T) { backup := maxRetryBackoffSecond maxRetryBackoffSecond = 300 @@ -1177,23 +1251,7 @@ func TestCheckPeersBusy(t *testing.T) { tikvCodec: keyspace.CodecV1, } - db, tmpPath := makePebbleDB(t, nil) - _, engineUUID := backend.MakeUUID("ww", 0) - engineCtx, cancel2 := context.WithCancel(context.Background()) - f := &Engine{ - UUID: engineUUID, - sstDir: tmpPath, - ctx: engineCtx, - cancel: cancel2, - sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, - logger: log.L(), - } - f.db.Store(db) - err := db.Set([]byte("a"), []byte("a"), nil) - require.NoError(t, err) - err = db.Set([]byte("b"), []byte("b"), nil) - require.NoError(t, err) + data := mockIngestData{{[]byte("a"), []byte("a")}, {[]byte("b"), []byte("b")}} jobCh := make(chan *regionJob, 10) @@ -1211,7 +1269,7 @@ func TestCheckPeersBusy(t *testing.T) { Leader: &metapb.Peer{Id: 1, StoreId: 11}, }, stage: regionScanned, - engine: f, + ingestData: data, retryCount: 20, waitUntil: time.Now().Add(-time.Second), } @@ -1231,7 +1289,7 @@ func TestCheckPeersBusy(t *testing.T) { Leader: &metapb.Peer{Id: 4, StoreId: 21}, }, stage: regionScanned, - engine: f, + ingestData: data, retryCount: 20, waitUntil: time.Now().Add(-time.Second), } @@ -1313,21 +1371,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { tikvCodec: keyspace.CodecV1, } - db, tmpPath := makePebbleDB(t, nil) - _, engineUUID := backend.MakeUUID("ww", 0) - engineCtx, cancel2 := context.WithCancel(context.Background()) - f := &Engine{ - UUID: engineUUID, - sstDir: tmpPath, - ctx: engineCtx, - cancel: cancel2, - sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, - logger: log.L(), - } - f.db.Store(db) - err := db.Set([]byte("a"), []byte("a"), nil) - require.NoError(t, err) + data := mockIngestData{{[]byte("a"), []byte("a")}} jobCh := make(chan *regionJob, 10) @@ -1344,8 +1388,8 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { }, Leader: &metapb.Peer{Id: 1, StoreId: 1}, }, - stage: regionScanned, - engine: f, + stage: regionScanned, + ingestData: data, } var jobWg sync.WaitGroup jobWg.Add(1) @@ -1385,7 +1429,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"]) } -func TestPartialWriteIngestErrorWillPanic(t *testing.T) { +func TestPartialWriteIngestErrorWontPanic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1420,23 +1464,7 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) { tikvCodec: keyspace.CodecV1, } - db, tmpPath := makePebbleDB(t, nil) - _, engineUUID := backend.MakeUUID("ww", 0) - engineCtx, cancel2 := context.WithCancel(context.Background()) - f := &Engine{ - UUID: engineUUID, - sstDir: tmpPath, - ctx: engineCtx, - cancel: cancel2, - sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, - logger: log.L(), - } - f.db.Store(db) - err := db.Set([]byte("a"), []byte("a"), nil) - require.NoError(t, err) - err = db.Set([]byte("a2"), []byte("a2"), nil) - require.NoError(t, err) + data := mockIngestData{{[]byte("a"), []byte("a")}, {[]byte("a2"), []byte("a2")}} jobCh := make(chan *regionJob, 10) @@ -1453,8 +1481,8 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) { }, Leader: &metapb.Peer{Id: 1, StoreId: 1}, }, - stage: regionScanned, - engine: f, + stage: regionScanned, + ingestData: data, // use small regionSplitSize to trigger partial write regionSplitSize: 1, } @@ -1560,8 +1588,8 @@ func TestPartialWriteIngestBusy(t *testing.T) { }, Leader: &metapb.Peer{Id: 1, StoreId: 1}, }, - stage: regionScanned, - engine: f, + stage: regionScanned, + ingestData: f, // use small regionSplitSize to trigger partial write regionSplitSize: 1, } @@ -1775,17 +1803,17 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, {"b", "c"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, - engine: &Engine{}, + keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + ingestData: &Engine{}, injected: []injectedBehaviour{ { write: injectedWriteBehaviour{ @@ -1818,13 +1846,13 @@ func TestDoImport(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, - engine: &Engine{}, - injected: getNeedRescanWhenIngestBehaviour(), + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + ingestData: &Engine{}, + injected: getNeedRescanWhenIngestBehaviour(), }, { - keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, - engine: &Engine{}, + keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + ingestData: &Engine{}, injected: []injectedBehaviour{ { write: injectedWriteBehaviour{ @@ -1839,18 +1867,18 @@ func TestDoImport(t *testing.T) { {"c", "c2"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, {"c2", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, @@ -1880,9 +1908,9 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, @@ -1902,32 +1930,32 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'a', '2'}}, - engine: &Engine{}, - injected: getNeedRescanWhenIngestBehaviour(), + keyRange: Range{start: []byte{'a'}, end: []byte{'a', '2'}}, + ingestData: &Engine{}, + injected: getNeedRescanWhenIngestBehaviour(), }, { - keyRange: Range{start: []byte{'a', '2'}, end: []byte{'b'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'a', '2'}, end: []byte{'b'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, {"b", "c"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, @@ -1949,7 +1977,7 @@ func TestDoImport(t *testing.T) { jobs: []*regionJob{ { keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, - engine: &Engine{}, + ingestData: &Engine{}, retryCount: maxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), }, @@ -1959,7 +1987,7 @@ func TestDoImport(t *testing.T) { jobs: []*regionJob{ { keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, - engine: &Engine{}, + ingestData: &Engine{}, retryCount: maxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), }, @@ -1969,7 +1997,7 @@ func TestDoImport(t *testing.T) { jobs: []*regionJob{ { keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, - engine: &Engine{}, + ingestData: &Engine{}, retryCount: maxWriteAndIngestRetryTimes - 2, injected: []injectedBehaviour{ { @@ -2019,13 +2047,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) { jobs: []*regionJob{ { keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, - engine: &Engine{}, + ingestData: &Engine{}, injected: getNeedRescanWhenIngestBehaviour(), retryCount: maxWriteAndIngestRetryTimes, }, { keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, - engine: &Engine{}, + ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), retryCount: maxWriteAndIngestRetryTimes, }, @@ -2034,9 +2062,9 @@ func TestRegionJobResetRetryCounter(t *testing.T) { {"c", "c2"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, @@ -2087,18 +2115,18 @@ func TestCtxCancelIsIgnored(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, {"d", "e"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'d'}, end: []byte{'e'}}, - engine: &Engine{}, - injected: getSuccessInjectedBehaviour(), + keyRange: Range{start: []byte{'d'}, end: []byte{'e'}}, + ingestData: &Engine{}, + injected: getSuccessInjectedBehaviour(), }, }, }, diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 42d7facd52e4b..56df9e7d70afd 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/cockroachdb/pebble" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -103,7 +102,7 @@ type regionJob struct { // writeResult is available only in wrote and ingested stage writeResult *tikvWriteResult - engine *Engine + ingestData ingestData regionSplitSize int64 regionSplitKeys int64 metrics *metric.Metrics @@ -123,6 +122,40 @@ type tikvWriteResult struct { remainingStartKey []byte } +// ingestData describes a common interface that is needed by TiKV write + +// ingest RPC. +// TODO(lance6716): make it public to remote backend can use it. +type ingestData interface { + // GetFirstAndLastKey returns the first and last key of the data reader in the + // range [lowerBound, upperBound). Empty or nil bounds means unbounded. + // lowerBound must be less than upperBound. + // when there is no data in the range, it should return nil, nil, nil + GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) + NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter + // GetTS will be used as the start/commit TS of the data. + GetTS() uint64 + // Finish will be called when the data is ingested successfully. + Finish(totalBytes, totalCount int64) +} + +// ForwardIter describes a iterator that can only move forward. +type ForwardIter interface { + // First moves this iter to the first key. + First() bool + // Valid check this iter reach the end. + Valid() bool + // Next moves this iter forward. + Next() bool + // Key represents current position pair's key. + Key() []byte + // Value represents current position pair's Value. + Value() []byte + // Close close this iter. + Close() error + // Error return current error on this iter. + Error() error +} + type injectedBehaviour struct { write injectedWriteBehaviour ingest injectedIngestBehaviour @@ -149,8 +182,7 @@ func (j *regionJob) convertStageTo(stage jobStageTp) { return } - j.engine.importedKVSize.Add(j.writeResult.totalBytes) - j.engine.importedKVCount.Add(j.writeResult.count) + j.ingestData.Finish(j.writeResult.totalBytes, j.writeResult.count) if j.metrics != nil { j.metrics.BytesCounter.WithLabelValues(metric.StateImported). Add(float64(j.writeResult.totalBytes)) @@ -191,7 +223,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { begin := time.Now() region := j.region.Region - firstKey, lastKey, err := j.engine.getFirstAndLastKey(j.keyRange.start, j.keyRange.end) + firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.start, j.keyRange.end) if err != nil { return errors.Trace(err) } @@ -259,7 +291,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { } req.Chunk = &sst.WriteRequest_Batch{ Batch: &sst.WriteBatch{ - CommitTs: j.engine.TS, + CommitTs: j.ingestData.GetTS(), }, } @@ -300,8 +332,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { return nil } - opt := &pebble.IterOptions{LowerBound: j.keyRange.start, UpperBound: j.keyRange.end} - iter := j.engine.newKVIter(ctx, opt) + iter := j.ingestData.NewIter(ctx, j.keyRange.start, j.keyRange.end) //nolint: errcheck defer iter.Close()