Skip to content

Commit

Permalink
Merge branch 'master' into copr-cache-paging
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jul 13, 2022
2 parents 7616634 + 2ff12e8 commit 8e45d8f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 18 deletions.
10 changes: 3 additions & 7 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (p *printByTable) AddTask(task TaskStatus) {
info := fmt.Sprintf("%s; gap=%s", pTime, gapColor.Sprint(gap))
return info
}
cp := task.GetMinStoreCheckpoint()
table.Add("checkpoint[global]", formatTS(cp.TS))
p.addCheckpoints(&task, table, formatTS)
for store, e := range task.LastErrors {
table.Add(fmt.Sprintf("error[store=%d]", store), e.ErrorCode)
Expand All @@ -147,21 +149,15 @@ func (p *printByTable) AddTask(task TaskStatus) {

func (p *printByTable) addCheckpoints(task *TaskStatus, table *glue.Table, formatTS func(uint64) string) {
cp := task.GetMinStoreCheckpoint()
items := make([][2]string, 0, len(task.Checkpoints))
if cp.Type() != CheckpointTypeGlobal {
for _, cp := range task.Checkpoints {
switch cp.Type() {
case CheckpointTypeStore:
items = append(items, [2]string{fmt.Sprintf("checkpoint[store=%d]", cp.ID), formatTS(cp.TS)})
table.Add(fmt.Sprintf("checkpoint[store=%d]", cp.ID), formatTS(cp.TS))
}
}
} else {
items = append(items, [2]string{"checkpoint[central-global]", formatTS(cp.TS)})
}

for _, item := range items {
table.Add(item[0], item[1])
}
}

func (p *printByTable) PrintTasks() {
Expand Down
3 changes: 2 additions & 1 deletion store/mockstore/unistore/cophandler/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingS
if pagingSize > 0 {
lastRange = &coprocessor.KeyRange{}
builder.paging = lastRange
builder.pagingSize = pagingSize
}
exec, err := builder.buildMPPExecutor(rootExec)
if err != nil {
Expand Down Expand Up @@ -221,7 +222,7 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest, pagin
if pagingSize > 0 {
totalRows += uint64(chk.NumRows())
if totalRows > pagingSize {
break
return
}
}
default:
Expand Down
25 changes: 16 additions & 9 deletions store/mockstore/unistore/cophandler/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ const (
)

type mppExecBuilder struct {
sc *stmtctx.StatementContext
dbReader *dbreader.DBReader
mppCtx *MPPCtx
dagReq *tipb.DAGRequest
dagCtx *dagContext
counts []int64
ndvs []int64
paging *coprocessor.KeyRange
sc *stmtctx.StatementContext
dbReader *dbreader.DBReader
mppCtx *MPPCtx
dagReq *tipb.DAGRequest
dagCtx *dagContext
counts []int64
ndvs []int64
paging *coprocessor.KeyRange
pagingSize uint64
}

func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, error) {
Expand Down Expand Up @@ -199,7 +200,7 @@ func (b *mppExecBuilder) buildLimit(pb *tipb.Limit) (*limitExec, error) {
return exec, nil
}

func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (*topNExec, error) {
func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (mppExec, error) {
child, err := b.buildMPPExecutor(pb.Child)
if err != nil {
return nil, err
Expand Down Expand Up @@ -227,6 +228,12 @@ func (b *mppExecBuilder) buildTopN(pb *tipb.TopN) (*topNExec, error) {
row: newTopNSortRow(len(conds)),
topn: pb.Limit,
}

// When using paging protocol, if paging size < topN limit, the topN exec degenerate to do nothing.
if b.paging != nil && b.pagingSize < pb.Limit {
exec.dummy = true
}

return exec, nil
}

Expand Down
15 changes: 14 additions & 1 deletion store/mockstore/unistore/cophandler/mpp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ func (e *indexScanExec) Process(key, value []byte) error {
if e.chk.IsFull() {
e.chunks = append(e.chunks, e.chk)
if e.paging != nil {
e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, key)
lastProcessed := kv.Key(append([]byte{}, key...)) // need a deep copy to store the key
e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, lastProcessed)
}
e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize)
}
Expand Down Expand Up @@ -423,6 +424,9 @@ type topNExec struct {
conds []expression.Expression
row *sortRow
recv []*chunk.Chunk

// When dummy is true, topNExec just copy what it read from children to its parent.
dummy bool
}

func (e *topNExec) open() error {
Expand All @@ -432,6 +436,11 @@ func (e *topNExec) open() error {
if err != nil {
return err
}

if e.dummy {
return nil
}

for {
chk, err = e.children[0].next()
if err != nil {
Expand Down Expand Up @@ -466,6 +475,10 @@ func (e *topNExec) open() error {
}

func (e *topNExec) next() (*chunk.Chunk, error) {
if e.dummy {
return e.children[0].next()
}

chk := chunk.NewChunkWithCapacity(e.getFieldTypes(), DefaultBatchSize)
for ; !chk.IsFull() && e.idx < e.topn && e.idx < uint64(e.heap.heapSize); e.idx++ {
row := e.heap.rows[e.idx]
Expand Down

0 comments on commit 8e45d8f

Please sign in to comment.