Skip to content

Commit

Permalink
Introduce ErrPoolInvalidSize error, now start return error if size < 1
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifabdlnaby committed Jan 7, 2019
1 parent e5c8ed3 commit 687478d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 149 deletions.
7 changes: 5 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ const WorkerCount = 2
func main() {
var workerPool gpool.Pool

//workerPool = workerpooldispatch.NewWorkerPool(WORKER_COUNT)
workerPool = gpool.NewSemaphorePool(WorkerCount)

log.Println("Starting Pool...")

workerPool.Start()
err := workerPool.Start()

if err != nil {
panic(err)
}

ctx, _ := context.WithCancel(context.Background())

Expand Down
5 changes: 4 additions & 1 deletion gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// 3- The Pool is closed by pool.Stop().
type Pool interface {
// Start the Pool, otherwise it will not accept any job.
Start()
Start() error

// Stop the Pool.
// 1- ALL Blocked/Waiting jobs will return immediately.
Expand All @@ -36,6 +36,9 @@ type Pool interface {
}

var (
// ErrPoolClosed Error Returned if the Pool has not started yet, or was stopped.
ErrPoolInvalidSize = errors.New("pool size is invalid, pool size must be > 0")

// ErrPoolClosed Error Returned if the Pool has not started yet, or was stopped.
ErrPoolClosed = errors.New("pool is closed")

Expand Down
191 changes: 57 additions & 134 deletions gpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,55 @@ var implementations = []struct {
{name: "Workerpool", new: func(i int) interface{} {
return NewWorkerPool(i)
}},
{name: "SemaphorePool", new: func(i int) interface{} {
{name: "semaphorePool ", new: func(i int) interface{} {
return NewSemaphorePool(i)
}},
}

// -------------- Testing --------------

func TestPool_Start(t *testing.T) {
for _, implementation := range implementations {
// Test both < 0, 0 and > 0 size.
for size := -1; size <= 1; size++ {
t.Run(fmt.Sprintf("%sS[%d]", implementation.name, size), func(t *testing.T) {
pool := implementation.new(size).(Pool)

pool := implementation.new(1).(Pool)

t.Run(implementation.name, func(t *testing.T) {
/// Send Work before Worker Start
Err := pool.Enqueue(context.TODO(), func() {})
/// Send Work before Worker Start
Err := pool.Enqueue(context.TODO(), func() {})

if Err == nil {
t.Error("Pool Enqueued a Job before pool starts.")
}
if Err == nil {
t.Error("Pool Enqueued a Job before pool starts.")
}

if Err != ErrPoolClosed {
t.Error("Pool Sent an incorrect error type")
}
if Err != ErrPoolClosed {
t.Error("Pool Sent an incorrect error type")
}

/// Start Worker
pool.Start()
/// Start Worker
Err = pool.Start()
if size <= 0 {
if Err == nil {
t.Errorf("Pool of invalid size should return error when starting")
}
if Err != ErrPoolInvalidSize {
t.Error("returned incorrect error type")
}
return
} else {
if Err != nil {
t.Errorf("Pool failed to start, Error: %s", Err)
}
}

// Enqueue a Job
Err = pool.Enqueue(context.TODO(), func() {})
// Enqueue a Job
Err = pool.Enqueue(context.TODO(), func() {})

if Err != nil {
t.Errorf("Pool Enqueued Errored after Start. Error: %s", Err.Error())
}
})
if Err != nil {
t.Errorf("Pool Enqueued Errored after Start. Error: %s", Err.Error())
}
})
}
}
}

Expand All @@ -58,7 +75,7 @@ func TestPool_Stop(t *testing.T) {
t.Run(implementation.name, func(t *testing.T) {

/// Start Worker
pool.Start()
_ = pool.Start()
pool.Stop()

x := make(chan int)
Expand All @@ -84,7 +101,7 @@ func TestPool_Restart(t *testing.T) {

t.Run(implementation.name, func(t *testing.T) {
/// Start Worker
pool.Start()
_ = pool.Start()

/// Restarting the Pool
pool.Stop()
Expand All @@ -95,7 +112,7 @@ func TestPool_Restart(t *testing.T) {
t.Error("Enqueued a job on a stopped pool.")
}

pool.Start()
_ = pool.Start()

/// Send Work to pool that has been restarted.
Err = pool.Enqueue(context.TODO(), func() {})
Expand All @@ -114,7 +131,7 @@ func TestPool_Enqueue(t *testing.T) {

t.Run(implementation.name, func(t *testing.T) {
// Start Worker
pool.Start()
_ = pool.Start()

// Enqueue a Job
x := make(chan int, 1)
Expand Down Expand Up @@ -147,7 +164,7 @@ func TestPool_PoolBlocking(t *testing.T) {
ctx := context.TODO()

// Start Worker
pool.Start()
_ = pool.Start()

/// TEST BLOCKING WHEN POOL IS FULL
a := make(chan int)
Expand Down Expand Up @@ -210,63 +227,14 @@ func TestPool_PoolBlocking(t *testing.T) {
}
}

func TestPool_Enqueue0Worker(t *testing.T) {

for _, implementation := range implementations {

pool := implementation.new(0).(Pool)

t.Run(implementation.name, func(t *testing.T) {

Err := pool.Enqueue(context.TODO(), func() {})

if Err != ErrPoolClosed {
t.Errorf("Should Return ErrPoolClosed")
}
pool.Start()

// Enqueue job that will block because 0 workers.
signal := make(chan error)
go func() {
blockedJobErr := pool.Enqueue(context.TODO(), func() {})
signal <- blockedJobErr
}()

// check that the job actually blocked.
select {
case <-signal:
t.Errorf("Job for signal pool of 0 workers didn't block signal call!")
default:
}

// stop the bool
pool.Stop()

// check that blocked job unblocked
select {
case blockedJobErr := <-signal:
if blockedJobErr != ErrPoolClosed {
t.Errorf("Should Return ErrPoolClosed instead returned error: %s", blockedJobErr)
}
}

// Try enqueue job on closed pool again.
Err = pool.Enqueue(context.TODO(), func() {})
if Err != ErrPoolClosed {
t.Errorf("Should Return ErrPoolClosed after Stopping and Not Started Pool of 0 Workers")
}
})
}
}

func TestPool_TryEnqueue(t *testing.T) {
for _, implementation := range implementations {
pool := implementation.new(2).(Pool)
t.Run(implementation.name, func(t *testing.T) {
x := make(chan int, 1)

/// Start Worker
pool.Start()
_ = pool.Start()

success := pool.TryEnqueue(func() {
x <- 123
Expand Down Expand Up @@ -308,63 +276,32 @@ func TestPool_TryEnqueue(t *testing.T) {
}
}

func TestPool_TryEnqueue0Worker(t *testing.T) {
for _, implementation := range implementations {
pool := implementation.new(0).(Pool)
t.Run(implementation.name, func(t *testing.T) {
x := make(chan int, 1)

/// Start Worker
pool.Start()

success := pool.TryEnqueue(func() {
x <- 123
})

if success == true {
t.Errorf("TryEnqueue success on a WorkerCount=0 queue!")
}
})
}
}
// --------------------------------------

// ------------ Benchmarking ------------

func BenchmarkOneJob(b *testing.B) {
var workersCountValues = []int{10, 100, 1000, 10000}
for i := 0; i < 2; i++ {
for _, workercount := range workersCountValues {
var workerPool Pool
var name string
if i == 0 {
name = "Workerpool"
}
if i == 1 {
name = "SemaphorePool"
}

b.Run(fmt.Sprintf("[%s]W[%d]", name, workercount), func(b *testing.B) {
if i == 0 {
workerPool = NewWorkerPool(workercount)
}
if i == 1 {
workerPool = NewSemaphorePool(workercount)
}
for _, implementation := range implementations {
for _, workercount := range workersCountValues {
b.Run(fmt.Sprintf("[%s]S[%d]", implementation.name, workercount), func(b *testing.B) {
pool := implementation.new(workercount).(Pool)

workerPool.Start()
_ = pool.Start()

b.ResetTimer()

for i2 := 0; i2 < b.N; i2++ {
resultChan := make(chan int, 1)
_ = workerPool.Enqueue(context.TODO(), func() {
_ = pool.Enqueue(context.TODO(), func() {
resultChan <- 123
})
<-resultChan
}

b.StopTimer()
workerPool.Stop()
pool.Stop()
})
}
}
Expand All @@ -375,24 +312,10 @@ func BenchmarkBulkJobs(b *testing.B) {
var workAmountValues = []int{1000, 10000, 100000}
for _, workercount := range workersCountValues {
for _, work := range workAmountValues {
for i := 0; i < 2; i++ {
var workerPool Pool
var name string
if i == 0 {
name = "Workerpool"
}
if i == 1 {
name = "Semaphore "
}

b.Run(fmt.Sprintf("[%s]W[%d]J[%d]", name, workercount, work), func(b *testing.B) {
if i == 0 {
workerPool = NewWorkerPool(workercount)
}
if i == 1 {
workerPool = NewSemaphorePool(workercount)
}
workerPool.Start()
for _, implementation := range implementations {
b.Run(fmt.Sprintf("[%s]S[%d]J[%d]", implementation.name, workercount, work), func(b *testing.B) {
pool := implementation.new(workercount).(Pool)
_ = pool.Start()
b.ResetTimer()

for i2 := 0; i2 < b.N; i2++ {
Expand All @@ -401,7 +324,7 @@ func BenchmarkBulkJobs(b *testing.B) {
for i3 := 0; i3 < work; i3++ {
go func() {
resultChan := make(chan int, 1)
_ = workerPool.Enqueue(context.TODO(), func() {
_ = pool.Enqueue(context.TODO(), func() {
resultChan <- 123
})
<-resultChan
Expand All @@ -412,7 +335,7 @@ func BenchmarkBulkJobs(b *testing.B) {
}

b.StopTimer()
workerPool.Stop()
pool.Stop()
})
}
}
Expand Down
Loading

0 comments on commit 687478d

Please sign in to comment.