Skip to content

Commit

Permalink
lightning: extract region job's accessing data to an interface (#45717)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
lance6716 authored Aug 2, 2023
1 parent fc9738b commit 412fa13
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 137 deletions.
22 changes: 20 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,11 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
}

// getFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
var _ ingestData = (*Engine)(nil)

// GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
// in the engine. Empty upperBound means unbounded.
func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
if len(upperBound) == 0 {
// we use empty slice for unbounded upper bound, but it means max value in pebble
// so reset to nil
Expand Down Expand Up @@ -980,6 +982,22 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by
return firstKey, lastKey, nil
}

// NewIter implements ingestData interface.
func (e *Engine) NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter {
return e.newKVIter(ctx, &pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound})
}

// GetTS implements ingestData interface.
func (e *Engine) GetTS() uint64 {
return e.TS
}

// Finish implements ingestData interface.
func (e *Engine) Finish(totalBytes, totalCount int64) {
e.importedKVSize.Add(totalBytes)
e.importedKVCount.Add(totalCount)
}

type sstMeta struct {
path string
minKey []byte
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,27 +142,27 @@ func TestGetFirstAndLastKey(t *testing.T) {
err = db.Set([]byte("e"), []byte("e"), nil)
require.NoError(t, err)

first, last, err := f.getFirstAndLastKey(nil, nil)
first, last, err := f.GetFirstAndLastKey(nil, nil)
require.NoError(t, err)
require.Equal(t, []byte("a"), first)
require.Equal(t, []byte("e"), last)

first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("d"))
first, last, err = f.GetFirstAndLastKey([]byte("b"), []byte("d"))
require.NoError(t, err)
require.Equal(t, []byte("c"), first)
require.Equal(t, []byte("c"), last)

first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("f"))
first, last, err = f.GetFirstAndLastKey([]byte("b"), []byte("f"))
require.NoError(t, err)
require.Equal(t, []byte("c"), first)
require.Equal(t, []byte("e"), last)

first, last, err = f.getFirstAndLastKey([]byte("y"), []byte("z"))
first, last, err = f.GetFirstAndLastKey([]byte("y"), []byte("z"))
require.NoError(t, err)
require.Nil(t, first)
require.Nil(t, last)

first, last, err = f.getFirstAndLastKey([]byte("e"), []byte(""))
first, last, err = f.GetFirstAndLastKey([]byte("e"), []byte(""))
require.NoError(t, err)
require.Equal(t, []byte("e"), first)
require.Equal(t, []byte("e"), last)
Expand Down
15 changes: 1 addition & 14 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,12 @@ import (

// Iter abstract iterator method for Ingester.
type Iter interface {
ForwardIter
// Seek seek to specify position.
// if key not found, seeks next key position in iter.
Seek(key []byte) bool
// Error return current error on this iter.
Error() error
// First moves this iter to the first key.
First() bool
// Last moves this iter to the last key.
Last() bool
// Valid check this iter reach the end.
Valid() bool
// Next moves this iter forward.
Next() bool
// Key represents current position pair's key.
Key() []byte
// Value represents current position pair's Value.
Value() []byte
// Close close this iter.
Close() error
// OpType represents operations of pair. currently we have two types.
// 1. Put
// 2. Delete
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func (local *Backend) readAndSplitIntoRange(
sizeLimit int64,
keysLimit int64,
) ([]Range, error) {
firstKey, lastKey, err := engine.getFirstAndLastKey(nil, nil)
firstKey, lastKey, err := engine.GetFirstAndLastKey(nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1191,7 +1191,7 @@ var fakeRegionJobs map[[2]string]struct {
// It will retry internally when scan region meet error.
func (local *Backend) generateJobForRange(
ctx context.Context,
engine *Engine,
engine ingestData,
keyRange Range,
regionSplitSize, regionSplitKeys int64,
) ([]*regionJob, error) {
Expand All @@ -1210,7 +1210,7 @@ func (local *Backend) generateJobForRange(
})

start, end := keyRange.start, keyRange.end
pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end)
pairStart, pairEnd, err := engine.GetFirstAndLastKey(start, end)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func (local *Backend) generateJobForRange(
keyRange: intersectRange(region.Region, Range{start: start, end: end}),
region: region,
stage: regionScanned,
engine: engine,
ingestData: engine,
regionSplitSize: regionSplitSize,
regionSplitKeys: regionSplitKeys,
metrics: local.metrics,
Expand Down Expand Up @@ -1283,7 +1283,7 @@ func (local *Backend) startWorker(
case needRescan:
jobs, err2 := local.generateJobForRange(
ctx,
job.engine,
job.ingestData,
job.keyRange,
job.regionSplitSize,
job.regionSplitKeys,
Expand Down
Loading

0 comments on commit 412fa13

Please sign in to comment.