Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed May 24, 2018
1 parent 08e6a26 commit ff4b7d8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 35 deletions.
7 changes: 2 additions & 5 deletions ants_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func BenchmarkGoroutineWithFunc(b *testing.B) {
}
wg.Wait()
}
//b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB)
}

func BenchmarkAntsPoolWithFunc(b *testing.B) {
Expand All @@ -88,9 +87,8 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) {
p.Serve(loop)
}
wg.Wait()
//b.Logf("running goroutines: %d", p.Running())
b.Logf("running goroutines: %d", p.Running())
}
//b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB)
}

func BenchmarkGoroutine(b *testing.B) {
Expand All @@ -100,15 +98,14 @@ func BenchmarkGoroutine(b *testing.B) {
}
}

//b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB)
}

func BenchmarkAntsPool(b *testing.B) {
b.N = 3
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
ants.Push(demoFunc)
}
b.Logf("running goroutines: %d", ants.Running())
}
//b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB)
}
1 change: 1 addition & 0 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ func main() {
p.Serve(str)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Println("finish all tasks!")
}
40 changes: 11 additions & 29 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"math"
"sync"
"sync/atomic"
"time"
)

type pf func(interface{}) error
Expand Down Expand Up @@ -55,10 +54,10 @@ type PoolWithFunc struct {

lock sync.Mutex

// closed is used to confirm whether this pool has been closed.
closed int32

poolFunc pf

once sync.Once

}

// NewPoolWithFunc generates a instance of ants pool with a specific function.
Expand All @@ -69,8 +68,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {
p := &PoolWithFunc{
capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig),
closed: 0,
release: make(chan sig, 1),
poolFunc: f,
}

Expand All @@ -79,27 +77,12 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {

//-------------------------------------------------------------------------

// scanAndClean is a goroutine who will periodically clean up
// after it is noticed that this pool is closed.
func (p *PoolWithFunc) scanAndClean() {
ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second)
go func() {
ticker.Stop()
for range ticker.C {
if atomic.LoadInt32(&p.closed) == 1 {
p.lock.Lock()
for _, w := range p.workers {
w.stop()
}
p.lock.Unlock()
}
}
}()
}

// Serve submit a task to pool
func (p *PoolWithFunc) Serve(args interface{}) error {
if atomic.LoadInt32(&p.closed) == 1 {
//if atomic.LoadInt32(&p.closed) == 1 {
// return ErrPoolClosed
//}
if len(p.release) > 0 {
return ErrPoolClosed
}
w := p.getWorker()
Expand All @@ -124,10 +107,9 @@ func (p *PoolWithFunc) Cap() int {

// Release Closed this pool
func (p *PoolWithFunc) Release() error {
p.lock.Lock()
atomic.StoreInt32(&p.closed, 1)
close(p.release)
p.lock.Unlock()
p.once.Do(func() {
p.release<- sig{}
})
return nil
}

Expand Down
22 changes: 21 additions & 1 deletion worker_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (w *WorkerWithFunc) run() {
atomic.AddInt32(&w.pool.running, 1)
go func() {
for args := range w.args {
if args == nil {
if args == nil || len(w.pool.release) > 0 {
atomic.AddInt32(&w.pool.running, -1)
return
}
Expand All @@ -53,6 +53,26 @@ func (w *WorkerWithFunc) run() {
}()
}

//func (w *WorkerWithFunc) run() {
// atomic.AddInt32(&w.pool.running, 1)
// go func() {
// for {
// select {
// case args := <-w.args:
// if args == nil {
// atomic.AddInt32(&w.pool.running, -1)
// return
// }
// w.pool.poolFunc(args)
// w.pool.putWorker(w)
// case <-w.pool.release:
// atomic.AddInt32(&w.pool.running, -1)
// return
// }
// }
// }()
//}

// stop this worker.
func (w *WorkerWithFunc) stop() {
w.args <- nil
Expand Down

0 comments on commit ff4b7d8

Please sign in to comment.