Skip to content

Commit

Permalink
Use my forked semaphore implementation
Browse files Browse the repository at this point in the history
Since I found a bug in `github.com/marusama/semaphore` Issue #3 that had a potential deadlock which was avoided/worked-around. however after submitting a PR and fixing the said bug the performance was halved.

Now the `x/sync/semaphore` is much better performance-wise however it is not resizable, hence use my forked resizable semaphore.
  • Loading branch information
sherifabdlnaby committed Jan 17, 2019
1 parent a9a7bcc commit d2e742a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/sherifabdlnaby/gpool

require github.com/marusama/semaphore v0.0.0-20181027083059-edd5217b5829
require github.com/sherifabdlnaby/semaphore v0.0.0-20190117073948-fc3e9b0e3834
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/marusama/semaphore v0.0.0-20181027083059-edd5217b5829 h1:zyFWf1/F3eiNG0xM5lz83P6F6NNp8aUALaq/wg2pNRk=
github.com/marusama/semaphore v0.0.0-20181027083059-edd5217b5829/go.mod h1:TmeOqAKoDinfPfSohs14CO3VcEf7o+Bem6JiNe05yrQ=
github.com/sherifabdlnaby/semaphore v0.0.0-20190117073948-fc3e9b0e3834 h1:ypyWj+fYiu70Hwdgvc8LIe/2ZvirUwvoFash4cpcGcU=
github.com/sherifabdlnaby/semaphore v0.0.0-20190117073948-fc3e9b0e3834/go.mod h1:6q3c9bS2amkllnRxGuIHqiXCpe/z/lSJlNvfQb7tCNE=
22 changes: 7 additions & 15 deletions semaphore_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package gpool

import (
"context"
"github.com/marusama/semaphore"
"github.com/sherifabdlnaby/semaphore"
"sync"
)

// SemaphorePool is an implementation of gpool.Pool interface to bound concurrency using a Semaphore.
type SemaphorePool struct {
workerCount int
semaphore semaphore.Semaphore
semaphore *semaphore.Weighted
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
Expand All @@ -26,7 +26,7 @@ func NewSemaphorePool(size int) (Pool, error) {

newWorkerPool := SemaphorePool{
workerCount: size,
semaphore: semaphore.New(1),
semaphore: semaphore.NewWeighted(int64(1)),
mu: sync.Mutex{},
status: poolClosed,
}
Expand Down Expand Up @@ -55,7 +55,7 @@ func (w *SemaphorePool) Start() {
w.status = poolStarted
ctx := context.Background()
w.ctx, w.cancel = context.WithCancel(ctx)
w.semaphore = semaphore.New(w.workerCount)
w.semaphore = semaphore.NewWeighted(int64(w.workerCount))
}

// Stop the Pool.
Expand All @@ -75,19 +75,11 @@ func (w *SemaphorePool) Stop() {
// Send Cancellation Signal to stop all waiting work
w.cancel()

/*
// Try to Acquire the whole Semaphore ( This will block until all ACTIVE works are done )
_ = w.semaphore.Acquire(context.Background(), w.workerCount)
*/

// Try to Acquire the whole Semaphore ( This will block until all ACTIVE works are done )
// Acquire 1 by 1 because a bug I found when N > GOMAXPROCS -> https://github.com/cockroachdb/cockroach/issues/33554
for i := 0; i < w.workerCount; i++ {
_ = w.semaphore.Acquire(context.TODO(), 1)
}
_ = w.semaphore.Acquire(context.Background(), int64(w.workerCount))

// Release the Semaphore so that subsequent enqueues will not block and return ErrPoolClosed.
w.semaphore.Release(w.workerCount)
w.semaphore.Release(int64(w.workerCount))

w.status = poolClosed

Expand All @@ -109,7 +101,7 @@ func (w *SemaphorePool) Resize(newSize int) error {

// If already pool_started live resize semaphore limit.
if w.status == poolStarted {
w.semaphore.SetLimit(newSize)
w.semaphore.Resize(int64(newSize))
}

w.mu.Unlock()
Expand Down

0 comments on commit d2e742a

Please sign in to comment.