Skip to content

Commit

Permalink
executor: fix data race in topn (#53593)
Browse files Browse the repository at this point in the history
close #53538
  • Loading branch information
xzhangxian1008 authored May 28, 2024
1 parent d49a654 commit 7edf08e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
11 changes: 6 additions & 5 deletions pkg/executor/sortexec/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,18 +415,19 @@ func (e *TopNExec) executeTopNWhenSpillTriggered(ctx context.Context) error {
// Wait for the finish of all workers
workersWaiter := util.WaitGroupWrapper{}

// Fetch chunks from child and put chunks into chunkChannel
fetcherWaiter.Run(func() {
e.fetchChunksFromChild(ctx)
})

for i := range e.spillHelper.workers {
worker := e.spillHelper.workers[i]
worker.initWorker()
workersWaiter.Run(func() {
worker.run()
})
}

// Fetch chunks from child and put chunks into chunkChannel
fetcherWaiter.Run(func() {
e.fetchChunksFromChild(ctx)
})

fetcherWaiter.Wait()
workersWaiter.Wait()
return nil
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/sortexec/topn_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ func newTopNWorker(
}
}

func (t *topNWorker) fetchChunksAndProcess() {
func (t *topNWorker) initWorker() {
// Offset of heap in worker should be 0, as we need to spill all data
t.chkHeap.init(t.topn, t.memTracker, t.topn.Limit.Offset+t.topn.Limit.Count, 0, t.topn.greaterRow, t.topn.RetFieldTypes())
}

func (t *topNWorker) fetchChunksAndProcess() {
for t.fetchChunksAndProcessImpl() {
}
}
Expand Down

0 comments on commit 7edf08e

Please sign in to comment.