Skip to content

Commit

Permalink
Change Semaphore Implementation to a faster and resizable implementat…
Browse files Browse the repository at this point in the history
…ion.
  • Loading branch information
sherifabdlnaby committed Jan 7, 2019
1 parent b043216 commit e5c8ed3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/sherifabdlnaby/gpool

require golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
require github.com/marusama/semaphore v0.0.0-20181027083059-edd5217b5829
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
github.com/marusama/semaphore v0.0.0-20181027083059-edd5217b5829 h1:zyFWf1/F3eiNG0xM5lz83P6F6NNp8aUALaq/wg2pNRk=
github.com/marusama/semaphore v0.0.0-20181027083059-edd5217b5829/go.mod h1:TmeOqAKoDinfPfSohs14CO3VcEf7o+Bem6JiNe05yrQ=
47 changes: 22 additions & 25 deletions semaphore_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ package gpool

import (
"context"
"golang.org/x/sync/semaphore"
"github.com/marusama/semaphore"
)

// SemaphorePool is an implementation of gpool.Pool interface to bound concurrency using a Semaphore.
type SemaphorePool struct {
WorkerCount int
semaphore semaphore.Weighted
// semaphorePool is an implementation of gpool.Pool interface to bound concurrency using a Semaphore.
type semaphorePool struct {
workerCount int
semaphore semaphore.Semaphore
ctx context.Context
cancel context.CancelFunc
}

// NewSemaphorePool is SemaphorePool Constructor
func NewSemaphorePool(workerCount int) *SemaphorePool {
newWorkerPool := SemaphorePool{
WorkerCount: workerCount,
semaphore: *semaphore.NewWeighted(1),
// NewSemaphorePool is semaphorePool Constructor
func NewSemaphorePool(size int) *semaphorePool {
newWorkerPool := semaphorePool{
workerCount: size,
semaphore: semaphore.New(1),
}

// Cancel immediately - So that ErrPoolClosed will be returned by Enqueues
Expand All @@ -29,32 +29,29 @@ func NewSemaphorePool(workerCount int) *SemaphorePool {
}

// Start the Pool, otherwise it will not accept any job.
func (w *SemaphorePool) Start() {
func (w *semaphorePool) Start() error {
if w.workerCount < 1 {
return ErrPoolInvalidSize
}
ctx := context.Background()
w.ctx, w.cancel = context.WithCancel(ctx)
w.semaphore = *semaphore.NewWeighted(int64(w.WorkerCount))
return
w.semaphore = semaphore.New(w.workerCount)
return nil
}

// Stop the Pool.
// 1- ALL Blocked/Waiting jobs will return immediately.
// 2- All Jobs Processing will finish successfully
// 3- Stop() WILL Block until all running jobs is done.
func (w *SemaphorePool) Stop() {
func (w *semaphorePool) Stop() {
// Send Cancellation Signal to stop all waiting work
w.cancel()

// Try to Acquire the whole Semaphore ( This will block until all ACTIVE works are done )
_ = w.semaphore.Acquire(context.TODO(), int64(w.WorkerCount))
_ = w.semaphore.Acquire(context.TODO(), w.workerCount)

// Release the Semaphore so that subsequent enqueues will not block and return ErrPoolClosed.
w.semaphore.Release(int64(w.WorkerCount))

// This to give a breathing space for Enqueue to not wait for a Semaphore of 0 size.
// so that Enqueue won't block ( will still send ErrPoolClosed, no job will run )
if w.WorkerCount == 0 {
w.semaphore = *semaphore.NewWeighted(1)
}
w.semaphore.Release(w.workerCount)

return
}
Expand All @@ -67,7 +64,7 @@ func (w *SemaphorePool) Stop() {
// @Returns nil once the job has started.
// @Returns ErrPoolClosed if the pool is not running.
// @Returns ErrJobCanceled if the job Enqueued context was canceled before the job could be processed by the pool.
func (w *SemaphorePool) Enqueue(ctx context.Context, job func()) error {
func (w *semaphorePool) Enqueue(ctx context.Context, job func()) error {
// Acquire 1 from semaphore ( aka Acquire one worker )
err := w.semaphore.Acquire(ctx, 1)

Expand All @@ -91,7 +88,7 @@ func (w *SemaphorePool) Enqueue(ctx context.Context, job func()) error {
}*/
}()

// Run the Function
// Run the job
job()
}()
}
Expand All @@ -101,7 +98,7 @@ func (w *SemaphorePool) Enqueue(ctx context.Context, job func()) error {

// TryEnqueue will not block if the pool is full, will return true once the job has started processing or false if
// the pool is closed or full.
func (w *SemaphorePool) TryEnqueue(job func()) bool {
func (w *semaphorePool) TryEnqueue(job func()) bool {
// Acquire 1 from semaphore ( aka Acquire one worker )
if !w.semaphore.TryAcquire(1) {
return false
Expand Down

0 comments on commit e5c8ed3

Please sign in to comment.