Skip to content

Commit

Permalink
pool pakcage add GoroutinePool3[T]
Browse files Browse the repository at this point in the history
  • Loading branch information
alimy committed Jul 7, 2024
1 parent 22a8cda commit dc88770
Showing 1 changed file with 180 additions and 1 deletion.
181 changes: 180 additions & 1 deletion pool/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,23 @@ type RespFn[T any] func(req T, err error)
// RunFn[T] request handle function
type RunFn[T any] func(req T) error

// ExecFn[T] request handle function
type ExecFn[T any] func(req T)

// GoroutinePool2[T] goroutine pool interface
type GoroutinePool2[T any] interface {
Start()
Stop()
Run(T, RespFn[T])
}

// GoroutinePool3[T] goroutine pool interface
type GoroutinePool3[T any] interface {
Start()
Stop()
Exec(T)
}

// Option groutine pool option help function used to create groutine pool instance
type Option = func(opt *gorotinePoolOpt)

Expand Down Expand Up @@ -100,6 +110,23 @@ type wormPool2[T any] struct {
workerHook WorkerHook
}

type wormPool3[T any] struct {
ctx context.Context
isStarted atomic.Bool
requestCh chan T // 正式工 缓存通道
requestTempCh chan T // 临时工 缓存通道
requestBufCh chan T // 请求缓存通道
maxRequestInCh int
maxRequestInTempCh int
minWorker int // 最少正式工数
maxTempWorker int // 最大临时工数,-1表示无限制
maxIdleTime time.Duration
tempWorkerCount atomic.Int32
execFn ExecFn[T]
cancelFn context.CancelFunc
workerHook WorkerHook
}

func (p *wormPool[T, R]) Do(req T, fn ResponseFn[T, R]) {
item := &requestItem[T, R]{req, fn}
select {
Expand Down Expand Up @@ -206,6 +233,58 @@ func (p *wormPool2[T]) Run(req T, fn RespFn[T]) {
}
}

func (p *wormPool3[T]) Exec(item T) {
select {
case p.requestCh <- item:
// send request item by requestCh chan
case <-p.ctx.Done():
// do nothing
default:
select {
case p.requestTempCh <- item:
// send request item by requestTempCh chan"
default:
if p.maxTempWorker >= 0 && p.tempWorkerCount.Load() >= int32(p.maxTempWorker) {
p.requestBufCh <- item
break
}
go func() {
// update temp worker count and run worker hook
count := p.tempWorkerCount.Add(1)
if p.workerHook != nil {
p.workerHook.OnJoin(count)
}
defer func() {
count = p.tempWorkerCount.Add(-1)
if p.workerHook != nil {
p.workerHook.OnLeave(count)
}
}()
// handle the request
p.exec(item)
// watch requestTempCh to continue do work if needed.
idleTimer := time.NewTimer(p.maxIdleTime)
for {
select {
case item = <-p.requestTempCh:
p.exec(item)
case <-p.ctx.Done():
// worker exits
return
case <-idleTimer.C:
// worker exits
return
}
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
}
}()
}
}
}

func (p *wormPool[T, R]) Start() {
if !p.isStarted.Swap(true) {
p.ctx, p.cancelFn = context.WithCancel(context.Background())
Expand Down Expand Up @@ -236,6 +315,21 @@ func (p *wormPool2[T]) Start() {
}
}

func (p *wormPool3[T]) Start() {
if !p.isStarted.Swap(true) {
p.ctx, p.cancelFn = context.WithCancel(context.Background())
p.requestCh = make(chan T, p.maxRequestInCh)
p.requestTempCh = make(chan T, p.maxRequestInTempCh)
for numWorker := p.minWorker; numWorker > 0; numWorker-- {
go p.goExec()
}
if p.maxTempWorker >= 0 {
p.requestBufCh = make(chan T, 1)
go p.runBufferWorker()
}
}
}

func (p *wormPool[T, R]) runBufferWorker() {
var reqBuf []*requestItem[T, R]
for {
Expand Down Expand Up @@ -290,6 +384,33 @@ func (p *wormPool2[T]) runBufferWorker() {
}
}

func (p *wormPool3[T]) runBufferWorker() {
var reqBuf []T
for {
if latesIdx := len(reqBuf) - 1; latesIdx >= 0 {
select {
case p.requestCh <- reqBuf[0]:
reqBuf[0] = reqBuf[latesIdx]
reqBuf = reqBuf[:latesIdx]
case p.requestTempCh <- reqBuf[0]:
reqBuf[0] = reqBuf[latesIdx]
reqBuf = reqBuf[:latesIdx]
case item := <-p.requestBufCh:
reqBuf = append(reqBuf, item)
case <-p.ctx.Done():
return
}
} else {
select {
case item := <-p.requestBufCh:
reqBuf = append(reqBuf, item)
case <-p.ctx.Done():
return
}
}
}
}

func (p *wormPool[T, R]) Stop() {
if p.isStarted.Swap(false) {
p.cancelFn()
Expand All @@ -312,6 +433,17 @@ func (p *wormPool2[T]) Stop() {
}
}

func (p *wormPool3[T]) Stop() {
if p.isStarted.Swap(false) {
p.cancelFn()
close(p.requestCh)
close(p.requestTempCh)
if p.maxTempWorker >= 0 {
close(p.requestBufCh)
}
}
}

func (p *wormPool[T, R]) do(item *requestItem[T, R]) {
if item != nil {
resp, err := p.doFn(item.req)
Expand All @@ -335,6 +467,16 @@ func (p *wormPool2[T]) run(item *requestItem2[T]) {
}
}

func (p *wormPool3[T]) exec(item T) {
p.execFn(item)
defer func() {
if err := recover(); err != nil {
// TODO: add log
// do nothing
}
}()
}

func (p *wormPool[T, R]) goDo() {
For:
for {
Expand All @@ -359,6 +501,18 @@ For:
}
}

func (p *wormPool3[T]) goExec() {
For:
for {
select {
case item := <-p.requestCh:
p.exec(item)
case <-p.ctx.Done():
break For
}
}
}

// WithMinWorker set min worker count
func WithMinWorker(num int) Option {
return func(opt *gorotinePoolOpt) {
Expand Down Expand Up @@ -426,7 +580,7 @@ func NewGoroutinePool[T, R any](fn DoFn[T, R], opts ...Option) GoroutinePool[T,
return p
}

// NewGoroutinePool2[T] create a new GoroutinePool[T, R] instance
// NewGoroutinePool2[T] create a new GoroutinePool2[T] instance
func NewGoroutinePool2[T any](fn RunFn[T], opts ...Option) GoroutinePool2[T] {
opt := &gorotinePoolOpt{
minWorker: 10,
Expand All @@ -450,3 +604,28 @@ func NewGoroutinePool2[T any](fn RunFn[T], opts ...Option) GoroutinePool2[T] {
p.Start()
return p
}

// NewGoroutinePool3[T] create a new GoroutinePool3[T] instance
func NewGoroutinePool3[T any](fn ExecFn[T], opts ...Option) GoroutinePool3[T] {
opt := &gorotinePoolOpt{
minWorker: 10,
maxTempWorker: -1,
maxRequestInCh: 100,
maxRequestInTempCh: 100,
maxIdleTime: 60 * time.Second,
}
for _, optFn := range opts {
optFn(opt)
}
p := &wormPool3[T]{
maxRequestInCh: opt.maxRequestInCh,
maxRequestInTempCh: opt.maxRequestInTempCh,
minWorker: opt.minWorker,
maxTempWorker: opt.maxTempWorker,
maxIdleTime: opt.maxIdleTime,
workerHook: opt.workerHook,
execFn: fn,
}
p.Start()
return p
}

0 comments on commit dc88770

Please sign in to comment.