Skip to content

Commit

Permalink
make sure that the request is sent first by partition then by region
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Jul 20, 2022
1 parent 423f599 commit f940c71
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 9 deletions.
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetPartitionKeyRanges sets the "KeyRangesWithPartition" for "kv.Request".
func (builder *RequestBuilder) SetPartitionKeyRanges(keyRanges [][]kv.KeyRange) *RequestBuilder {
builder.Request.KeyRangesWithPartition = keyRanges
return builder
}

// SetStartTS sets "StartTS" for "kv.Request".
func (builder *RequestBuilder) SetStartTS(startTS uint64) *RequestBuilder {
builder.Request.StartTs = startTS
Expand Down
9 changes: 4 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4045,17 +4045,16 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*r
return pids, ret, nil
}

func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error) {
//nolint: prealloc
var ret []kv.KeyRange
for _, p := range h.partitions {
func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error) {
ret := make([][]kv.KeyRange, len(h.partitions))
for i, p := range h.partitions {
pid := p.GetPhysicalID()
meta := p.Meta()
kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil)
if err != nil {
return nil, err
}
ret = append(ret, kvRange...)
ret[i] = append(ret[i], kvRange...)
}
return ret, nil
}
Expand Down
4 changes: 2 additions & 2 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Con
}

type kvRangeBuilder interface {
buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error)
buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error)
buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error)
}

Expand Down Expand Up @@ -394,7 +394,7 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
if err != nil {
return nil, err
}
reqBuilder = builder.SetKeyRanges(kvRange)
reqBuilder = builder.SetPartitionKeyRanges(kvRange)
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
Expand Down
4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ type Request struct {
Data []byte
KeyRanges []KeyRange

// KeyRangesWithPartition makes sure that the request is sent first by partition then by region.
// When the table is small, it's possible that multiple partitions are in the same region.
KeyRangesWithPartition [][]KeyRange

// For PartitionTableScan used by tiflash.
PartitionIDAndRanges []PartitionIDAndRanges

Expand Down
23 changes: 21 additions & 2 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,27 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
ctx = context.WithValue(ctx, util.RequestSourceKey, req.RequestSource)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb)
var (
tasks []*copTask
err error
)
if len(req.KeyRanges) > 0 {
ranges := NewKeyRanges(req.KeyRanges)
tasks, err = buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb)
} else {
// Here we build the task by partition, not directly by region.
// This is because it's possible that TiDB merge multiple small partition into one region which break some assumption.
// Keep it split by partition would be more safe.
for _, kvRanges := range req.KeyRangesWithPartition {
ranges := NewKeyRanges(kvRanges)
var tasksInPartition []*copTask
tasksInPartition, err = buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb)
if err != nil {
break
}
tasks = append(tasks, tasksInPartition...)
}
}
if err != nil {
return copErrorResponse{err}
}
Expand Down

0 comments on commit f940c71

Please sign in to comment.