Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use sync.Cond to handle no-task blocking wait #299

Merged
merged 1 commit into from
Dec 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package taskqueue

import (
"context"
"sync/atomic"
"sync"
"time"

"github.com/ipfs/go-peertaskqueue"
Expand Down Expand Up @@ -33,7 +33,7 @@ type WorkerTaskQueue struct {
cancelFn func()
peerTaskQueue *peertaskqueue.PeerTaskQueue
workSignal chan struct{}
noTaskSignal chan struct{}
noTaskCond *sync.Cond
ticker *time.Ticker
activeTasks int32
}
Expand All @@ -46,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
cancelFn: cancelFn,
peerTaskQueue: peertaskqueue.New(),
workSignal: make(chan struct{}, 1),
noTaskSignal: make(chan struct{}, 1),
noTaskCond: sync.NewCond(&sync.Mutex{}),
ticker: time.NewTicker(thawSpeed),
}
}
Expand Down Expand Up @@ -93,13 +93,11 @@ func (tq *WorkerTaskQueue) Shutdown() {
}

func (tq *WorkerTaskQueue) WaitForNoActiveTasks() {
for atomic.LoadInt32(&tq.activeTasks) > 0 {
select {
case <-tq.ctx.Done():
return
case <-tq.noTaskSignal:
}
tq.noTaskCond.L.Lock()
for tq.activeTasks > 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this looks unnecessary, but my assumption here is that there are no guarantees as to who gets the next mutex lock if there are multiple parties waiting on it - both via Wait() and Lock(). So if a second task started and got the lock before our wait caller got a mutex notify then we'd not be in a tq.activeTasks>0 condition even after waking up here.

At least that's how it would work in C++ and Java. Or do we have stronger guarantees in Go about who gets the lock next after a Broadcast()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the loop is right - the sync.Cond.Wait docs use a loop as an example too.

tq.noTaskCond.Wait()
}
tq.noTaskCond.L.Unlock()
}

func (tq *WorkerTaskQueue) worker(executor Executor) {
Expand All @@ -118,14 +116,16 @@ func (tq *WorkerTaskQueue) worker(executor Executor) {
}
}
for _, task := range tasks {
atomic.AddInt32(&tq.activeTasks, 1)
tq.noTaskCond.L.Lock()
tq.activeTasks = tq.activeTasks + 1
tq.noTaskCond.L.Unlock()
terminate := executor.ExecuteTask(tq.ctx, pid, task)
if atomic.AddInt32(&tq.activeTasks, -1) == 0 {
select {
case tq.noTaskSignal <- struct{}{}:
default:
}
tq.noTaskCond.L.Lock()
tq.activeTasks = tq.activeTasks - 1
if tq.activeTasks == 0 {
tq.noTaskCond.Broadcast()
}
tq.noTaskCond.L.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync.Cond.Broadcast doesn't require holding the lock to be called, so I wonder if you could make this loop lock-free by also continuing to use sync/atomic for activeTasks.

I think that's ideally what we want, because otherwise we're adding some amount of lock contention on each worker. The locks are only held for tiny amounts of time here, but they still add their overhead, whereas atomics are comparatively very cheap.

In other words, I think we can just use sync.Cond for its broadcast feature, and continue using atomics for the counter. If our condition was more complex and we couldn't use atomics, then we'd need the shared lock for sure, but I don't think we absolutely need it here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful: if a sync.Cond.Broadcast() is issued and there isn't anyone Wait()ing to "hear it": the broadcast is discarded. You effectively need at least an implicit ( by some other means ) "lock", to ensure that there is something in Wait()ing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this depends on when the Wait call happens. I assume we call Wait before the workers all go idle. If we may call Wait at a later time, then indeed we need something more. Perhaps Wait should first atomically check if we're already idle, and if so, return immediately. Otherwise, block until the next broadcast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, the current code already does this - it only Waits inside the loop after it checks that we're not already idle.

I think you're saying that replacing the lock with atomics on the broadcast side could add a race, like the following sequence of events:

  1. WaitForNoActiveTasks grabs the lock
  2. WaitForNoActiveTasks sees the counter is 1, so it enters the loop
  3. worker finishes a task, and broadcasts without grabbing the lock
  4. WaitForNoActiveTasks calls Cond.Wait, which doesn't see the broadcast as it is late

Whereas with Rod's current code:

  1. WaitForNoActiveTasks grabs the lock
  2. WaitForNoActiveTasks sees the counter is 1, so it enters the loop
  3. worker finishes a task, and tries to grab the lock to broadcast - blocking, as WaitForNoActiveTasks has the lock
  4. WaitForNoActiveTasks calls Cond.Wait, releasing the block temporarily
  5. worker grabs the lock and broadcasts
  6. WaitForNoActiveTasks sees the broadcast and finishes

So I think you're right. we don't need to hold the lock to broadcast, like the docs say, but not doing so inserts a form of logic race.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! This is what I was trying to convey, sorry for being terse!

if terminate {
return
}
Expand Down