Skip to content

Commit

Permalink
feature:add download limit
Browse files Browse the repository at this point in the history
  • Loading branch information
geebytes committed Feb 5, 2022
1 parent 05a3064 commit a3f0c77
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
45 changes: 30 additions & 15 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package tegenaria
import (
"context"
"fmt"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -116,6 +117,9 @@ type SpiderEngine struct {
// It will ctrl readyDone、StartSpiders、recvRequest group
mainWaitGroup *sync.WaitGroup

downloadLimit int64

currentDownload int64
// isDone is all scrap task is done flag
// It will set for true until all channel is empty and
// goroutineRunning is 0 and startRequestFinish is true
Expand Down Expand Up @@ -180,7 +184,11 @@ Loop:
// handle error
e.waitGroup.Add(1)
go e.doError(spider, err)
case <-time.After(time.Second * 3):
case sig := <-e.quitSignal:
engineLog.Warningln("Engine recv signal ", sig)
e.isDone = true
signal.Stop(e.quitSignal)
case <-time.After(time.Second * 5):
if e.checkReadyDone() {
e.isDone = true
break Loop
Expand All @@ -190,7 +198,6 @@ Loop:

}
engineLog.Info("Scheduler is done")
e.isClosed = true
e.mainWaitGroup.Done()
}

Expand Down Expand Up @@ -220,7 +227,6 @@ func (e *SpiderEngine) listenNotify() {
if ok {
engineLog.Warningln("Engine recv signal ", s)
e.isDone = true
close(e.quitSignal)
signal.Stop(e.quitSignal)
return
} else {
Expand Down Expand Up @@ -278,9 +284,9 @@ func (e *SpiderEngine) Start(spiderName string) {
// Output handle stats counter pre 5s
go e.statsReportTicker()
engineLog.Info("Spider engine is running\n")

e.waitGroup.Wait()
e.isClosed = true

engineLog.Info("Waitting engine stop\n")

e.mainWaitGroup.Wait()
Expand All @@ -299,7 +305,9 @@ func (e *SpiderEngine) checkChanStatus() bool {
// if all status is ok it will stop engine and close spider

func (e *SpiderEngine) checkReadyDone() bool {
if e.startRequestFinish && e.checkChanStatus() && e.isClosed {
engineLog.Info("Check scheduler is ready stop")

if e.startRequestFinish && e.checkChanStatus() && e.isClosed && atomic.LoadInt64(&e.currentDownload) == 0 {
engineLog.Debug("Scheduler ready done")
return true
} else {
Expand All @@ -309,7 +317,6 @@ func (e *SpiderEngine) checkReadyDone() bool {

// recvRequest receive request from cacheChan and do download.
func (e *SpiderEngine) recvRequestHandler(req *Context) {
defer e.waitGroup.Done()
if req == nil {
return
}
Expand All @@ -335,7 +342,6 @@ func (e *SpiderEngine) writeCache(ctx *Context) {
defer func() {
e.waitGroup.Done()
}()

if e.doFilter(ctx, ctx.Request) && !e.isDone {
err := e.cache.enqueue(ctx)
if err != nil {
Expand All @@ -350,23 +356,26 @@ func (e *SpiderEngine) writeCache(ctx *Context) {
func (e *SpiderEngine) readCache() {
defer func() {
e.mainWaitGroup.Done()
engineLog.Debug("Close read cache\n")
engineLog.Infof("Close read cache\n")
if p := recover(); p != nil {
engineLog.Errorln("Read cache error \n", p)

}
}()
for {
if e.isDone {
return
}
if atomic.LoadInt64(&e.currentDownload) > e.downloadLimit{
continue
}
req, err := e.cache.dequeue()
if req != nil && err == nil && !e.isDone {
request := req.(*Context)
e.waitGroup.Add(1)
go e.recvRequestHandler(request)
e.recvRequestHandler(request)

}
if e.isDone {
return
}

runtime.Gosched()
}
}
Expand All @@ -389,6 +398,7 @@ func (e *SpiderEngine) doError(spider SpiderInterface, err *HandleError) {
func (e *SpiderEngine) doDownload(ctx *Context) {
defer func() {
e.waitGroup.Done()

}()
// use download middleware to handle request object
for _, middleware := range e.downloaderMiddlewares {
Expand All @@ -401,8 +411,11 @@ func (e *SpiderEngine) doDownload(ctx *Context) {
}
}
// incr request download number
atomic.AddUint64(&e.Stats.RequestDownloaded, 1)
atomic.AddInt64(&e.currentDownload, 1)
e.requestDownloader.Download(ctx, e.requestResultChan)
atomic.AddInt64(&e.currentDownload, -1)
atomic.AddUint64(&e.Stats.RequestDownloaded, 1)

}

// doFilter filer duplicate request if filterDuplicateReq is true
Expand Down Expand Up @@ -558,7 +571,7 @@ func DefaultErrorHandler(spider SpiderInterface, err *HandleError) {
}

func NewSpiderEngine(opts ...EngineOption) *SpiderEngine {
numCPU:=runtime.NumCPU()
numCPU := runtime.NumCPU()
Engine = &SpiderEngine{
spiders: NewSpiders(),
requestsChan: make(chan *Context, 1024),
Expand All @@ -580,6 +593,8 @@ func NewSpiderEngine(opts ...EngineOption) *SpiderEngine {
engineStatus: 0,
waitGroup: &sync.WaitGroup{},
mainWaitGroup: &sync.WaitGroup{},
downloadLimit: 256,
currentDownload: 0,
isDone: false,
isRunning: false,
schedulerNum: uint(numCPU),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ require (
github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
)
)
18 changes: 9 additions & 9 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func EngineWithReadCacheNum(cacheReadNum uint) EngineOption {
}
}

// // EngineWithRequestNum set request channel buffer size
// // request channel buffer size default to 1024
// func EngineWithRequestNum(requestNum uint) EngineOption {
// return func(r *SpiderEngine) {
// r.cacheChan = make(chan *Context, requestNum)
// engineLog.Infoln("Set request channel buffer size ", requestNum)

// }
// }
// EngineWithRequestNum set request channel buffer size
// request channel buffer size default to 1024
func EngineWithRequestNum(requestNum int64) EngineOption {
return func(r *SpiderEngine) {
r.downloadLimit = requestNum
engineLog.Infoln("Set request channel buffer size ", requestNum)

}
}
1 change: 1 addition & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func NewRequest(url string, method string, parser Parser, opts ...Option) *Reque
request.parser = parser
request.ResponseWriter = nil
request.BodyReader = nil
request.Meta = nil
request.Header = make(map[string]string)
request.MaxRedirects = 3
request.AllowRedirects = true
Expand Down

0 comments on commit a3f0c77

Please sign in to comment.