Skip to content

gpool - a generic context-aware resizable goroutines pool to bound concurrency based on semaphore.


Notifications You must be signed in to change notification settings


Repository files navigation

gpool - A Generic bounded concurrency goroutine pool

Go Report Card Build Status codecov License

Easily manages a resizeable pool of context aware goroutines to bound concurrency, A Job is Enqueued to the pool and only N jobs can be processing concurrently.

When you Enqueue a job it will return ONCE the job starts processing otherwise if the pool is full it will block until: (1) pool has room for the job, (2) job's context is canceled, or (3) the pool is stopped.

Stopping the Pool using pool.Stop() will unblock any blocked enqueues and wait All active jobs to finish before returning.

Enqueuing a Job will return error nil once a job starts, gpool.ErrPoolClosed if the pool is closed, or gpool.ErrTimeoutJob if the job's context is canceled while blocking waiting for the pool.

The Pool can be re-sized using Resize() that will resize the pool in a concurrent safe-way. Resize can enlarge the pool so that any blocked enqueue will unblock after resize is called, in case of shrinking the pool resize will not affect any processing job.

further documentation at :


$ go get


Example 1

func main() {
  concurrency := 2
  var pool gpool.Pool = gpool.NewSemaphorePool(concurrency)
  defer pool.Stop()

  // Send JOB
  resultChan1 := make(chan int)
  err1 := pool.Enqueue(context.TODO(), func() {
    time.Sleep(2000 * time.Millisecond)
    resultChan1 <- 1337
  if err1 != nil {
    log.Printf("Job was not enqueued. Error: [%s]", err1.Error())

  log.Printf("Job Done, Received: %v", <-resultChan1)

Example 2

func main() {
  concurrency := 2
  var pool gpool.Pool = gpool.NewSemaphorePool(concurrency)
  defer pool.Stop()

  // Send JOB 1
  resultChan1 := make(chan int)
  err1 := pool.Enqueue(context.TODO(), func() {
    time.Sleep(2000 * time.Millisecond)
    resultChan1 <- 100
  if err1 != nil {
    log.Printf("Job [%v] was not enqueued. [%s]", 1, err1.Error())

  // Send JOB 2
  resultChan2 := make(chan int)
  err2 := pool.Enqueue(context.TODO(), func() {
    time.Sleep(1000 * time.Millisecond)
    resultChan2 <- 200
  if err2 != nil {
    log.Printf("Job [%v] was not enqueued. [%s]", 2, err2.Error())

  // Recieve The Two Jobs which ever come first
  for i := 0; i < 2; i++ {
    select {
    case result := <-resultChan1:
      log.Printf("Job [%v] Done [%v]", 1, result)
    case result := <-resultChan2:
      log.Printf("Job [%v] Done [%v]", 2, result)

Example 3

// WorkerCount Number of Workers / Concurrent jobs of the Pool
const WorkerCount = 2

func main() {
  var workerPool gpool.Pool

  workerPool = gpool.NewSemaphorePool(WorkerCount)

  log.Println("Starting Pool...")


  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  go func() {
    log.Printf("Enqueueing 10 jobs on a seperate goroutine...")
    for i := 0; i < 10; i++ {

      // Small Interval for more readable output
      time.Sleep(500 * time.Millisecond)

      go func(i int) {
        x := make(chan int, 1)

        log.Printf("Job [%v] Enqueueing", i)

        err := workerPool.Enqueue(ctx, func() {
          time.Sleep(2000 * time.Millisecond)
          x <- i

        if err != nil {
          log.Printf("Job [%v] was not enqueued. [%s]", i, err.Error())

        log.Printf("Job [%v] Enqueue-ed ", i)

        log.Printf("Job [%v] Receieved [%v]", i, <-x)

  // Wait 5 Secs before Stopping
  time.Sleep(5000 * time.Millisecond)




  fmt.Println("Sleeping for couple of seconds so canceled job have a chance to print out their status")

  time.Sleep(10000 * time.Millisecond)


2018/12/16 05:37:03 Starting Pool...
2018/12/16 05:37:03 Enqueueing 10 jobs on a seperate goroutine...
2018/12/16 05:37:03 Job [0] Enqueueing
2018/12/16 05:37:03 Job [0] Enqueue-ed 
2018/12/16 05:37:04 Job [1] Enqueueing
2018/12/16 05:37:04 Job [1] Enqueue-ed 
2018/12/16 05:37:04 Job [2] Enqueueing
2018/12/16 05:37:05 Job [3] Enqueueing
2018/12/16 05:37:05 Job [2] Enqueue-ed 
2018/12/16 05:37:05 Job [0] Receieved [0]
2018/12/16 05:37:05 Job [4] Enqueueing
2018/12/16 05:37:06 Job [3] Enqueue-ed 
2018/12/16 05:37:06 Job [1] Receieved [1]
2018/12/16 05:37:06 Job [5] Enqueueing
2018/12/16 05:37:06 Job [6] Enqueueing
2018/12/16 05:37:07 Job [7] Enqueueing
2018/12/16 05:37:07 Job [4] Enqueue-ed 
2018/12/16 05:37:07 Job [2] Receieved [2]
2018/12/16 05:37:07 Job [8] Enqueueing
2018/12/16 05:37:08 Job [3] Receieved [3]
2018/12/16 05:37:08 Job [5] was not enqueued. [pool is closed]
2018/12/16 05:37:08 Job [6] was not enqueued. [pool is closed]
2018/12/16 05:37:08 Job [8] was not enqueued. [pool is closed]
2018/12/16 05:37:08 Job [7] was not enqueued. [pool is closed]
2018/12/16 05:37:08 Job [9] Enqueueing
2018/12/16 05:37:09 Job [4] Receieved [4]
Sleeping for couple of seconds so canceled job have a chance to print out their status
2018/12/16 05:37:09 Job [9] was not enqueued. [pool is closed]


Benchmarks for the two goroutines pool implementation Workerpool & Semaphore. Semaphore is substantially better.

$ go test -bench=. -cpu=2 -benchmem

S[n] <- Size of Pool / N Concurrent work at the same time.

J[n] <- Number of Jobs

BenchmarkOneThroughput/S[] = Enqueue Async Jobs ( Will not wait for result ) in a Pool of size = S

BenchmarkOneJobSync/S[] = Enqueue One Jobs at a time and wait result (Pool will have at MAX one job running)

BenchmarkBulkJobs_UnderLimit/S[]J[] = Enqueue J Jobs In Pool of size S at a time where J < S

BenchmarkBulkJobs_OverLimit/S[]J[] = Enqueue J Jobs In Pool of size S at a time where J > S

goos: darwin
goarch: amd64
BenchmarkOneThroughput/[Semaphore]S[10]-2                5000000               258 ns/op              14 B/op          0 allocs/op
BenchmarkOneThroughput/[Semaphore]S[100]-2               5000000               264 ns/op               0 B/op          0 allocs/op
BenchmarkOneThroughput/[Semaphore]S[1000]-2              5000000               279 ns/op               0 B/op          0 allocs/op
BenchmarkOneThroughput/[Semaphore]S[10000]-2             5000000               284 ns/op               0 B/op          0 allocs/op
BenchmarkOneThroughput/[Workerpool]S[10]-2               5000000               367 ns/op               0 B/op          0 allocs/op
BenchmarkOneThroughput/[Workerpool]S[100]-2              5000000               329 ns/op               0 B/op          0 allocs/op
BenchmarkOneThroughput/[Workerpool]S[1000]-2             5000000               325 ns/op               0 B/op          0 allocs/op
BenchmarkOneThroughput/[Workerpool]S[10000]-2            3000000               534 ns/op               0 B/op          0 allocs/op
BenchmarkOneJobSync/[Semaphore]S[10]-2                   2000000               909 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Semaphore]S[100]-2                  2000000               911 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Semaphore]S[1000]-2                 2000000               914 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Semaphore]S[10000]-2                2000000               910 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Workerpool]S[10]-2                  1000000              1218 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Workerpool]S[100]-2                 1000000              1349 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Workerpool]S[1000]-2                1000000              1232 ns/op              16 B/op          1 allocs/op
BenchmarkOneJobSync/[Workerpool]S[10000]-2               1000000              2137 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_UnderLimit/[Semaphore]S[10000]J[100]-2                   30000             40378 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_UnderLimit/[Semaphore]S[10000]J[1000]-2                   5000            363141 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_UnderLimit/[Semaphore]S[10000]J[10000]-2                   300           4042157 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_UnderLimit/[Workerpool]S[10000]J[100]-2                  20000             69548 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_UnderLimit/[Workerpool]S[10000]J[1000]-2                  2000            714208 ns/op              70 B/op          1 allocs/op
BenchmarkBulkJobs_UnderLimit/[Workerpool]S[10000]J[10000]-2                  200           8868398 ns/op              21 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Semaphore]S[100]J[1000]-2                      5000            362379 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Semaphore]S[100]J[10000]-2                      300           4054593 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Semaphore]S[1000]J[1000]-2                     5000            361441 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Semaphore]S[1000]J[10000]-2                     300           4057458 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Workerpool]S[100]J[1000]-2                     3000            514161 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Workerpool]S[100]J[10000]-2                     200           6305356 ns/op              16 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Workerpool]S[1000]J[1000]-2                    3000            525189 ns/op              20 B/op          1 allocs/op
BenchmarkBulkJobs_OverLimit/[Workerpool]S[1000]J[10000]-2                    200           7027354 ns/op              30 B/op          1 allocs/op
ok 58.464s


gpool - a generic context-aware resizable goroutines pool to bound concurrency based on semaphore.








