Skip to content

Commit

Permalink
spool: fix data race when to exit (#42129)
Browse files Browse the repository at this point in the history
close #42130
  • Loading branch information
hawkingrei authored Mar 13, 2023
1 parent c19cae6 commit 51c22cd
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
2 changes: 2 additions & 0 deletions resourcemanager/pool/spool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ go_test(
"spool_test.go",
],
embed = [":spool"],
flaky = True,
race = "on",
shard_count = 2,
deps = [
"//resourcemanager/pool",
Expand Down
23 changes: 22 additions & 1 deletion resourcemanager/pool/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ import (
"go.uber.org/zap"
)

const waitInterval = 5 * time.Millisecond

// Pool is a single goroutine pool. it can not reuse the goroutine.
type Pool struct {
wg sync.WaitGroup
mu deadlock.RWMutex
options *Options
capacity int32
running atomic.Int32
waiting atomic.Int32
isStop atomic.Bool
condMu sync.Mutex
cond sync.Cond
concurrencyMetrics prometheus.Gauge
taskManager pooltask.TaskManager[any, any, any, any, pooltask.NilContext]
pool.BasePool
Expand All @@ -52,6 +57,7 @@ func NewPool(name string, size int32, component util.Component, options ...Optio
concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name),
taskManager: pooltask.NewTaskManager[any, any, any, any, pooltask.NilContext](size), // TODO: this general type
}
result.cond = *sync.NewCond(&result.condMu)
if size == 0 {
return nil, pool.ErrPoolParamsInvalid
}
Expand Down Expand Up @@ -79,6 +85,9 @@ func (p *Pool) Tune(size int32) {

// Run runs a function in the pool.
func (p *Pool) Run(fn func()) error {
p.waiting.Add(1)
defer p.cond.Signal()
defer p.waiting.Add(-1)
if p.isStop.Load() {
return pool.ErrPoolClosed
}
Expand Down Expand Up @@ -121,6 +130,9 @@ func (p *Pool) run(fn func()) {

// RunWithConcurrency runs a function in the pool with concurrency.
func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error {
p.waiting.Add(1)
defer p.cond.Signal()
defer p.waiting.Add(-1)
if p.isStop.Load() {
return pool.ErrPoolClosed
}
Expand All @@ -143,6 +155,9 @@ func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error {
// checkAndAddRunning is to check if a task can run. If can, add the running number.
func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) {
for {
if p.isStop.Load() {
return 0, false
}
p.mu.Lock()
value, run := p.checkAndAddRunningInternal(int32(concurrency))
if run {
Expand All @@ -154,7 +169,7 @@ func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) {
return 0, false
}
p.mu.Unlock()
time.Sleep(5 * time.Millisecond)
time.Sleep(waitInterval)
}
}

Expand All @@ -173,6 +188,12 @@ func (p *Pool) checkAndAddRunningInternal(concurrency int32) (conc int32, run bo
// ReleaseAndWait releases the pool and waits for all tasks to be completed.
func (p *Pool) ReleaseAndWait() {
p.isStop.Store(true)
// wait for all the task in the pending to exit
p.cond.L.Lock()
for p.waiting.Load() > 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
p.wg.Wait()
resourcemanager.InstanceResourceManager.Unregister(p.Name())
}

0 comments on commit 51c22cd

Please sign in to comment.