Skip to content

Commit

Permalink
Start Pool by default + Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifabdlnaby committed Dec 16, 2019
1 parent f54cafe commit f70a16c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 94 deletions.
83 changes: 38 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# gpool - A Generic bounded concurrency goroutine pool
# gpool - a generic context-aware resizable goroutines pool to bound concurrency.

[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go#goroutines) [![](https://godoc.org/github.com/sherifabdlnaby/gpool?status.svg)](http://godoc.org/github.com/sherifabdlnaby/gpool)

[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go#goroutines)
<a>
<img src="https://img.shields.io/github/v/tag/sherifabdlnaby/gpool?label=release&amp;sort=semver">
</a>
[![](https://godoc.org/github.com/sherifabdlnaby/gpool?status.svg)](http://godoc.org/github.com/sherifabdlnaby/gpool)
[![Go Report Card](https://goreportcard.com/badge/github.com/sherifabdlnaby/gpool)](https://goreportcard.com/report/github.com/sherifabdlnaby/gpool)
[![Build Status](https://travis-ci.org/sherifabdlnaby/gpool.svg?branch=func)](https://travis-ci.org/sherifabdlnaby/gpool)
[![codecov](https://codecov.io/gh/sherifabdlnaby/gpool/branch/func/graph/badge.svg)](https://codecov.io/gh/sherifabdlnaby/gpool)
Expand All @@ -25,9 +30,7 @@ Easily manages a resizeable pool of context aware goroutines to bound concurrenc

- The Pool can be re-sized using `Resize()` that will resize the pool in a concurrent safe-way. `Resize` can enlarge the pool and any blocked enqueue will unblock after pool is resized, in case of shrinking the pool `resize` will not affect any already processing/waiting jobs.

- Enqueuing a Job will return error `nil` once a job starts, `ErrPoolClosed` if the pool is closed, or the context's error if the job's context is canceled while blocking waiting for the pool.

- The Pool will not accept any job unless `pool.Start()` is called.
- Enqueuing a Job will return error `nil` once a job starts, `ErrPoolClosed` if the pool is closed, or **the context's error if the job's context is canceled while blocking waiting for the pool.**

- Stopping the Pool using `pool.Stop()` will **wait** for all processing jobs to finish before returning, it will also unblock any **blocked** job enqueues (enqueues will return ErrPoolClosed).

Expand All @@ -43,11 +46,6 @@ further documentation at : [![](https://godoc.org/github.com/sherifabdlnaby/gpoo
```
pool, err := gpool.NewPool(concurrency)
```
- Start the pool
(otherwise the pool will not accept any jobs and returns `ErrPoolClosed` when enqueued)
```
pool.Start()
```
- Enqueue a job
```
job := func() {
Expand All @@ -72,7 +70,7 @@ further documentation at : [![](https://godoc.org/github.com/sherifabdlnaby/gpoo

- Stop() WILL Block until all running jobs is done.

- Different types of Enqueues
- Different types of "Enqueues"
- `Enqueue(ctx, job)` returns ONCE the job has started executing (not after job finishes/return)

- `EnqueueAndWait(ctx, job)` returns ONCE the job has started **and** finished executing.
Expand All @@ -91,42 +89,35 @@ $ go test -bench=. -cpu=2 -benchmem
```

```
go test -bench=. -cpu=1,2 -benchmem
go test -bench=. -cpu=2 -benchmem
goos: darwin
goarch: amd64
pkg: github.com/sherifabdlnaby/gpool
BenchmarkThroughput/PoolSize[10] 2000000 725 ns/op 159 B/op 2 allocs/op
BenchmarkThroughput/PoolSize[10]-2 5000000 347 ns/op 10 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[100] 2000000 728 ns/op 159 B/op 2 allocs/op
BenchmarkThroughput/PoolSize[100]-2 5000000 271 ns/op 0 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[1000] 2000000 789 ns/op 158 B/op 2 allocs/op
BenchmarkThroughput/PoolSize[1000]-2 5000000 293 ns/op 0 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[10000] 1000000 1110 ns/op 147 B/op 2 allocs/op
BenchmarkThroughput/PoolSize[10000]-2 5000000 289 ns/op 0 B/op 0 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[100] 30000 43084 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[100]-2 50000 38763 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[1000] 3000 485119 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[1000]-2 5000 336313 ns/op 23 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[10000] 200 6618685 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[10000]-2 500 3821636 ns/op 27 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[100]BulkJobs[1000] 3000 499013 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[100]BulkJobs[1000]-2 5000 342215 ns/op 18 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[100]BulkJobs[10000] 200 6783276 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[100]BulkJobs[10000]-2 500 3834620 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[1000]BulkJobs[1000] 3000 498985 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[1000]BulkJobs[1000]-2 5000 340884 ns/op 17 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[1000]BulkJobs[10000] 200 6542575 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_OverLimit/PoolSize[1000]BulkJobs[10000]-2 500 3781776 ns/op 16 B/op 1 allocs/op
PASS
ok github.com/sherifabdlnaby/gpool 42.110s
BenchmarkThroughput/PoolSize[2]-2 853724 730 ns/op 125 B/op 2 allocs/op
BenchmarkThroughput/PoolSize[10]-2 3647638 329 ns/op 10 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[100]-2 4869789 248 ns/op 0 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[1000]-2 4320566 280 ns/op 0 B/op 0 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[2]BulkJobs[2]-2 530328 2146 ns/op 275 B/op 5 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[2]BulkJobs[100]-2 10000 112478 ns/op 12737 B/op 239 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[2]BulkJobs[1000]-2 1069 1109384 ns/op 126943 B/op 2380 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10]BulkJobs[2]-2 1417808 844 ns/op 58 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10]BulkJobs[100]-2 30556 39187 ns/op 1764 B/op 33 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10]BulkJobs[1000]-2 3012 385652 ns/op 15737 B/op 295 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[100]BulkJobs[2]-2 1986571 609 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[100]BulkJobs[100]-2 41235 29004 ns/op 37 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[100]BulkJobs[1000]-2 4080 290791 ns/op 188 B/op 4 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[1000]BulkJobs[2]-2 1963564 605 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[1000]BulkJobs[100]-2 42000 28442 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[1000]BulkJobs[1000]-2 4333 284865 ns/op 17 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[2]-2 1963168 611 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[100]-2 42238 28419 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[1000]-2 4171 283981 ns/op 29 B/op 1 allocs/op
```


**BenchmarkOneThroughput/PoolSize[S]** = Enqueue Async Jobs ( Will not wait for result ) in a Pool of size = `S`

**BenchmarkBulkJobs_UnderLimit/PoolSize[S]BulkJobs[J]** = Enqueue `J` Jobs In Pool of size `S` at a time where `J` < `S`

**BenchmarkBulkJobs_OverLimit/PoolSize[S]BulkJobs[J]** = Enqueue `J` Jobs In Pool of size `S` at a time where `J` > `S`
**BenchmarkBulkJobs/PoolSize[S]BulkJobs[J]** = Enqueue `J` Jobs In Pool of size `S` at a time where `J` < `S`


------------------------------------------------------
Expand All @@ -140,7 +131,6 @@ func main() {

// Create and start pool.
pool, _ := gpool.NewPool(concurrency)
pool.Start()
defer pool.Stop()

// Create JOB
Expand All @@ -153,14 +143,12 @@ func main() {

// Enqueue Job
err1 := pool.Enqueue(ctx, job)

if err1 != nil {
log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
return
}

log.Printf("Job Enqueued and started processing")

log.Printf("Job Done, Received: %v", <-resultChan1)
}
```
Expand All @@ -174,7 +162,6 @@ func main() {

// Create and start pool.
pool, _ := gpool.NewPool(concurrency)
pool.Start()
defer pool.Stop()

// Create JOB
Expand Down Expand Up @@ -212,7 +199,6 @@ const concurrency = 2

func main() {
pool, _ := gpool.NewPool(concurrency)
pool.Start()
defer pool.Stop()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -265,7 +251,6 @@ func main() {
```
#### Output
```
2019/01/08 20:15:38 Starting Pool...
2019/01/08 20:15:39 Job [0] Enqueueing
2019/01/08 20:15:39 Job [0] Enqueue-ed
2019/01/08 20:15:39 Job [1] Enqueueing
Expand Down Expand Up @@ -297,3 +282,11 @@ Sleeping for couple of seconds so canceled job have a chance to print out their
Process finished with exit code 0
```

# License
[MIT License](https://raw.githubusercontent.com/sherifabdlnaby/gpool/blob/master/LICENSE)
Copyright (c) 2019 Sherif Abdel-Naby

# Contribution

PR(s) are Open and Welcomed.
16 changes: 8 additions & 8 deletions gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ var (
// ErrPoolInvalidSize Returned if the Size of pool < 1.
ErrPoolInvalidSize = errors.New("pool size is invalid, pool size must be >= 0")

// ErrPoolClosed Error Returned if the Pool has not pool_started yet, or was stopped.
ErrPoolClosed = errors.New("pool is closed")
// ErrPoolStopped Error Returned if the Pool has not pool_started yet, or was stopped.
ErrPoolStopped = errors.New("pool is stopped")
)

const (
Expand Down Expand Up @@ -76,7 +76,7 @@ func (w *Pool) Start() {
//
// 1- ALL Blocked/Waiting jobs will return immediately.
// 2- All Jobs Processing will finish successfully
// 3- Stop() WILL Block until all running jobs is done.
// 3- Stop() WILL Block until all running jobs i s done.
// Subsequent Calls to Stop() will have no effect unless start() is called.
func (w *Pool) Stop() {
w.mu.Lock()
Expand All @@ -92,7 +92,7 @@ func (w *Pool) Stop() {

w.status = poolStopped

// Release the Semaphore so that subsequent enqueues will not block and return ErrPoolClosed.
// Release the Semaphore so that subsequent enqueues will not block and return ErrPoolStopped.
w.semaphore.Release(int64(w.workerCount))

return
Expand Down Expand Up @@ -131,7 +131,7 @@ func (w *Pool) Resize(newSize int) {
// 2- The Job context is canceled.
// 3- The Pool is closed by `pool.Stop()`.
// @Returns nil once the job has started executing.
// @Returns ErrPoolClosed if the pool is not running.
// @Returns ErrPoolStopped if the pool is not running.
// @Returns ctx.Err() if the job Enqueued context was canceled before the job could be processed by the pool.
func (w *Pool) Enqueue(ctx context.Context, job func()) error {
// Acquire 1 from semaphore ( aka Acquire one worker )
Expand All @@ -145,7 +145,7 @@ func (w *Pool) Enqueue(ctx context.Context, job func()) error {
// Check if pool is running
// (This is safe as the semaphore in stop() will only change status if it acquired the full semaphore.).
if w.status != poolStarted {
return ErrPoolClosed
return ErrPoolStopped
}

// Run the job and return.
Expand All @@ -166,7 +166,7 @@ func (w *Pool) Enqueue(ctx context.Context, job func()) error {
// 2- The Job context is canceled.
// 3- The Pool is closed by `pool.Stop()`.
// @Returns nil once the job has executed and returned.
// @Returns ErrPoolClosed if the pool is not running.
// @Returns ErrPoolStopped if the pool is not running.
// @Returns ctx.Err() if the job Enqueued context was canceled before the job could be processed by the pool.
func (w *Pool) EnqueueAndWait(ctx context.Context, job func()) error {

Expand All @@ -181,7 +181,7 @@ func (w *Pool) EnqueueAndWait(ctx context.Context, job func()) error {
// Check if pool is running
// (This is safe as the semaphore in stop() will only change status if it acquired the full semaphore.).
if w.status != poolStarted {
return ErrPoolClosed
return ErrPoolStopped
}

// Run the job
Expand Down
49 changes: 8 additions & 41 deletions gpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestPool_Stop(t *testing.T) {
if err == nil {
t.Errorf("Accepted Job after Stopping the pool")
}
if err != gpool.ErrPoolClosed {
if err != gpool.ErrPoolStopped {
t.Errorf("Returned Incorrect Error after sending job to stopped pool")
}

Expand All @@ -85,7 +86,7 @@ func TestPool_Stop(t *testing.T) {
if err == nil {
t.Errorf("Accepted Job after Stopping the pool")
}
if err != gpool.ErrPoolClosed {
if err != gpool.ErrPoolStopped {
t.Errorf("Returned Incorrect Error after sending job to stopped pool")
}
}
Expand Down Expand Up @@ -552,7 +553,7 @@ func TestPool_Getters(t *testing.T) {
// ------------ Benchmarking ------------

func BenchmarkThroughput(b *testing.B) {
var workersCountValues = []int{10, 100, 1000, 10000}
var workersCountValues = []int{runtime.GOMAXPROCS(0), 10, 100, 1000}
for _, workercount := range workersCountValues {
b.Run(fmt.Sprintf("PoolSize[%d]", workercount), func(b *testing.B) {
pool := gpool.NewPool(workercount)
Expand All @@ -574,54 +575,20 @@ func BenchmarkThroughput(b *testing.B) {
}

func BenchmarkBulkJobs_UnderLimit(b *testing.B) {
var workersCountValues = []int{10000}
var workAmountValues = []int{100, 1000, 10000}
var workersCountValues = []int{runtime.GOMAXPROCS(0), 10, 100, 1000, 10000}
var workAmountValues = []int{runtime.GOMAXPROCS(0), 100, 1000}

for _, workercount := range workersCountValues {
for _, work := range workAmountValues {
b.Run(fmt.Sprintf("PoolSize[%d]BulkJobs[%d]", workercount, work), func(b *testing.B) {
pool := gpool.NewPool(workercount)
pool.Start()
b.ResetTimer()

for i2 := 0; i2 < b.N; i2++ {
wg := sync.WaitGroup{}
wg.Add(work)
for i3 := 0; i3 < work; i3++ {
go func() {
_ = pool.Enqueue(context.TODO(), func() {})
wg.Done()
}()
}
wg.Wait()
}

b.StopTimer()
pool.Stop()
})
}
}
}

func BenchmarkBulkJobs_OverLimit(b *testing.B) {
var workersCountValues = []int{100, 1000}
var workAmountValues = []int{1000, 10000}

for _, workercount := range workersCountValues {
for _, work := range workAmountValues {
b.Run(fmt.Sprintf("PoolSize[%d]BulkJobs[%d]", workercount, work), func(b *testing.B) {
pool := gpool.NewPool(workercount)
pool.Start()
b.ResetTimer()

for i2 := 0; i2 < b.N; i2++ {
wg := sync.WaitGroup{}
wg.Add(work)
for i3 := 0; i3 < work; i3++ {
go func() {
_ = pool.Enqueue(context.TODO(), func() {})
wg.Done()
}()
_ = pool.Enqueue(context.TODO(), func() {})
wg.Done()
}
wg.Wait()
}
Expand Down

0 comments on commit f70a16c

Please sign in to comment.