Skip to content

Commit

Permalink
Merge pull request #15 from wetrycode/release/fix-writecache
Browse files Browse the repository at this point in the history
Release/fix writecache
  • Loading branch information
geebytes authored Jan 27, 2023
2 parents 275fbb4 + 202df92 commit f916e63
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 26 deletions.
22 changes: 21 additions & 1 deletion cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
package tegenaria

import (
"time"

"github.com/spf13/cobra"
)

Expand All @@ -38,17 +40,35 @@ var rootCmd = &cobra.Command{
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func ExecuteCmd(engine *CrawlEngine) {
var isTicker bool = false
var timer = 0

var crawlCmd = &cobra.Command{
Use: "crawl",
Short: "Start spider by name",
// Uncomment the following line if your bare application
// has an action associated with it:

Run: func(_ *cobra.Command, args []string) {
engineLog.Infof("准备启动%s爬虫", args[0])
engine.start(args[0])

if isTicker {
if timer > 0 {
engine.interval = time.Duration(timer * int(time.Second))
}
engineLog.Infof("以定时任务方式启动,时间间隔为:%d", engine.interval/time.Second)

engine.startWithTicker(args[0])
timer = 0
} else {
engine.start(args[0])

}
},
}
rootCmd.AddCommand(crawlCmd)
crawlCmd.Flags().BoolVarP(&isTicker, "interval", "i", false, "Whether to start tasks at regular intervals,default false")
crawlCmd.Flags().IntVarP(&timer, "timer", "t", 0, "Timed task interval,default 0")

crawlCmd.Flags().BoolVarP(&engine.isMaster, "master", "m", false, "Whether to set the current node as the master node,default false")
rootCmd.Execute()
Expand Down
16 changes: 16 additions & 0 deletions cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tegenaria
import (
"bytes"
"testing"
"time"

"github.com/smartystreets/goconvey/convey"
)
Expand All @@ -11,9 +12,24 @@ func TestCmdStart(t *testing.T) {
convey.Convey("test cmd", t, func() {
engine := newTestEngine("testCmdSpider")
buf := new(bytes.Buffer)
defer func() {
rootCmd.ResetFlags()
rootCmd.ResetCommands()
}()
rootCmd.SetOut(buf)
rootCmd.SetErr(buf)
rootCmd.SetArgs([]string{"crawl", "testCmdSpider"})
convey.So(engine.Execute, convey.ShouldNotPanic)

})
convey.Convey("test cmd with interval", t, func() {
defer rootCmd.ResetCommands()

engine := newTestEngine("testCmdIntervalSpider", EngineWithInterval(4*time.Second), EngineWithUniqueReq(false))
rootCmd.SetArgs([]string{"crawl", "testCmdIntervalSpider", "-i"})
go engine.Execute()
time.Sleep(8 * time.Second)
convey.So(engine.statistic.GetRequestSent(), convey.ShouldAlmostEqual, 2)
convey.So(engine.statistic.GetItemScraped(), convey.ShouldAlmostEqual, 2)
})
}
3 changes: 2 additions & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *Context) setResponse(resp *Response) {
}

// setError 设置异常
func (c *Context) setError(msg string) {
func (c *Context) setError(msg string, stack string) {
err := NewError(c, fmt.Errorf("%s", msg))
c.Error = err
// 取上一帧栈
Expand All @@ -184,6 +184,7 @@ func (c *Context) setError(msg string) {
"request_id": c.CtxId,
"func": f.Name(),
"file": fmt.Sprintf("%s:%d", file, lineNo),
"stack": stack,
}
log := engineLog.WithFields(fields)
log.Logger.SetReportCaller(false)
Expand Down
3 changes: 0 additions & 3 deletions downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,6 @@ func (d *SpiderDownloader) CheckStatus(statusCode uint64, allowStatus []uint64)
func (d *SpiderDownloader) Download(ctx *Context) (*Response, error) {
downloadLog := log.WithField("request_id", ctx.CtxId)
defer func() {
if err := recover(); err != nil {
downloadLog.Fatalf("download panic: %v", err)
}

}()
// 记录网络请求处理开始时间
Expand Down
3 changes: 3 additions & 0 deletions dupefilters.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (f *RFPDupeFilter) Fingerprint(ctx *Context) ([]byte, error) {
// DoDupeFilter 通过布隆过滤器对request对象进行去重处理
func (f *RFPDupeFilter) DoDupeFilter(ctx *Context) (bool, error) {
// Use bloom filter to do fingerprint deduplication
if ctx.Request.DoNotFilter {
return false, nil
}
data, err := f.Fingerprint(ctx)
if err != nil {
return false, err
Expand Down
10 changes: 8 additions & 2 deletions dupefilters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@ func TestDoDupeFilter(t *testing.T) {
"Boolparams": "false",
}
spider1 := &TestSpider{
NewBaseSpider("testspider", []string{"https://www.baidu.com"}),
NewBaseSpider("testspider", []string{"https://m.s.weibo.com/ajax_topic/detail?q=%23%E9%9F%A9%E5%9B%BD%E9%98%B4%E5%8E%86%E6%96%B0%E5%B9%B4%E5%BC%95%E4%BA%89%E8%AE%AE%23"}),
}
request1 := NewRequest(server.URL+"/testHeader", GET, testParser, RequestWithRequestHeader(headers))
ctx1 := NewContext(request1, spider1)

request2 := NewRequest(server.URL+"/testHeader", GET, testParser, RequestWithRequestHeader(headers))
ctx2 := NewContext(request2, spider1)

request3 := NewRequest(server.URL+"/testHeader2", GET, testParser, RequestWithRequestHeader(headers))
request3 := NewRequest(server.URL+"/testHeader", GET, testParser, RequestWithRequestHeader(headers), RequestWithDoNotFilter(true))
ctx3 := NewContext(request3, spider1)

request4 := NewRequest(server.URL+"/testHeader2", GET, testParser, RequestWithRequestHeader(headers))
ctx4 := NewContext(request4, spider1)

duplicates := NewRFPDupeFilter(0.001, 1024*1024)
r1, _ := duplicates.DoDupeFilter(ctx1)
convey.So(r1, convey.ShouldBeFalse)
r2, _ := duplicates.DoDupeFilter(ctx2)
convey.So(r2, convey.ShouldBeTrue)
r3, _ := duplicates.DoDupeFilter(ctx3)
convey.So(r3, convey.ShouldBeFalse)

r4, _ := duplicates.DoDupeFilter(ctx4)
convey.So(r4, convey.ShouldBeFalse)
})
}

Expand Down
40 changes: 29 additions & 11 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package tegenaria
import (
"fmt"
"runtime"
"runtime/debug"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -97,6 +98,8 @@ type CrawlEngine struct {
// mutex spider 启动锁
// 同一进程下只能启动一个spider
mutex sync.Mutex
// interval 定时任务执行时间间隔
interval time.Duration
}

// RegisterSpider 将spider实例注册到引擎的 spiders
Expand Down Expand Up @@ -162,6 +165,8 @@ func (e *CrawlEngine) startSpider(spiderName string) GoFunc {
// Execute 通过命令行启动spider
func (e *CrawlEngine) Execute() {
ExecuteCmd(e)
defer rootCmd.ResetCommands()

}

// start spider 爬虫启动器
Expand Down Expand Up @@ -189,8 +194,21 @@ func (e *CrawlEngine) start(spiderName string) StatisticInterface {
stats := e.statistic.OutputStats()
s := Map2String(stats)
engineLog.Infof(s)
e.startSpiderFinish = false
e.isStop = false
return e.statistic
}
func (e *CrawlEngine) startWithTicker(spiderName string) {
ticker := time.NewTicker(e.interval)
defer ticker.Stop()
for {
engineLog.Infof("进入下一轮")
e.start(spiderName)
engineLog.Infof("完成一轮抓取")
<-ticker.C
}

}

// EventsWatcherRunner 事件监听器运行组件
func (e *CrawlEngine) EventsWatcherRunner() error {
Expand Down Expand Up @@ -227,9 +245,6 @@ func (e *CrawlEngine) worker(ctx *Context) GoFunc {
c := ctx
return func() error {
defer func() {
if err := recover(); err != nil {
c.setError(fmt.Sprintf("crawl error %s", err))
}
if c.Error != nil {
// 新增一个错误
e.statistic.IncrErrorCount()
Expand Down Expand Up @@ -259,8 +274,10 @@ func (e *CrawlEngine) worker(ctx *Context) GoFunc {
// 依次执行工作单元
for _, unit := range units {
err := unit(c)
if err != nil {
c.Error = NewError(c, err)
if err != nil || c.Error != nil {
if c.Error == nil {
c.Error = NewError(c, err)
}
break
}
}
Expand Down Expand Up @@ -301,7 +318,7 @@ func (e *CrawlEngine) writeCache(ctx *Context) error {
var isDuplicated bool = false
defer func() {
if err := recover(); err != nil {
ctx.setError(fmt.Sprintf("write cache error %s", err))
ctx.setError(fmt.Sprintf("write cache error %s", err), string(debug.Stack()))
}
// 写入分布式组件后主动删除
if e.useDistributed || isDuplicated {
Expand All @@ -310,7 +327,7 @@ func (e *CrawlEngine) writeCache(ctx *Context) error {
}()
var err error = nil
// 是否进入去重流程
if e.filterDuplicateReq {
if e.filterDuplicateReq && !ctx.Request.DoNotFilter {
ret, err := e.rfpDupeFilter.DoDupeFilter(ctx)
if err != nil {
isDuplicated = true
Expand Down Expand Up @@ -341,7 +358,7 @@ func (e *CrawlEngine) doDownload(ctx *Context) error {
var err error = nil
defer func() {
if p := recover(); p != nil {
ctx.setError(fmt.Sprintf("Download error %s", p))
ctx.setError(fmt.Sprintf("Download error %s", p), string(debug.Stack()))
}
if err != nil || ctx.Err() != nil {
e.statistic.IncrDownloadFail()
Expand Down Expand Up @@ -377,7 +394,7 @@ func (e *CrawlEngine) doDownload(ctx *Context) error {
func (e *CrawlEngine) doHandleResponse(ctx *Context) error {
defer func() {
if p := recover(); p != nil {
ctx.setError(fmt.Sprintf("handle response error %s", p))
ctx.setError(fmt.Sprintf("handle response error %s", p), string(debug.Stack()))
}
}()
if ctx.Response == nil {
Expand Down Expand Up @@ -412,7 +429,7 @@ func (e *CrawlEngine) doHandleResponse(ctx *Context) error {
func (e *CrawlEngine) doParse(ctx *Context) error {
defer func() {
if err := recover(); err != nil {
ctx.setError(fmt.Sprintf("parse error %s", err))
ctx.setError(fmt.Sprintf("parse error %s", err), string(debug.Stack()))
}
}()
if ctx.Response == nil {
Expand All @@ -432,7 +449,7 @@ func (e *CrawlEngine) doParse(ctx *Context) error {
func (e *CrawlEngine) doPipelinesHandlers(ctx *Context) error {
defer func() {
if err := recover(); err != nil {
ctx.setError(fmt.Sprintf("pipeline error %s", err))
ctx.setError(fmt.Sprintf("pipeline error %s", err), string(debug.Stack()))
}
}()
for item := range ctx.Items {
Expand Down Expand Up @@ -487,6 +504,7 @@ func NewEngine(opts ...EngineOption) *CrawlEngine {
limiter: NewDefaultLimiter(32),
downloader: NewDownloader(),
hooker: NewDefaultHooks(),
interval: -1 * time.Second,
}
for _, o := range opts {
o(Engine)
Expand Down
27 changes: 23 additions & 4 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestEngineStartPanic(t *testing.T) {
ctxManager.Clear()
}
engine := newTestEngine("testStartPanicSpider")
patch := gomonkey.ApplyFunc((*Statistic).OutputStats, func(_ *Statistic) map[string]uint64 {
patch := gomonkey.ApplyFunc((*CrawlEngine).start, func(_ *CrawlEngine, _ string) StatisticInterface {
panic("output panic")

})
Expand Down Expand Up @@ -459,9 +459,28 @@ func TestWorkerErr(t *testing.T) {
return fmt.Errorf("call PipelinesHandlers error")
})
wokerError(ctx, url, "call PipelinesHandlers error", t, patch, engine)
patch = gomonkey.ApplyFunc((*CrawlEngine).doDownload, func(_ *CrawlEngine, _ *Context) error {
panic("call doDownload panic")
patch = gomonkey.ApplyFunc((*SpiderDownloader).Download, func(_ *SpiderDownloader, _ *Context) (*Response, error) {
panic("call download panic")
})
wokerError(ctx, url, "call doDownload panic", t, patch, engine)
wokerError(ctx, url, "call download panic", t, patch, engine)

}

func TestTicker(t *testing.T) {
convey.Convey("engine ticker start", t, func() {
if ctxManager != nil {
ctxManager.Clear()
}
engine := newTestEngine("testTickerSpider", EngineWithInterval(4*time.Second), EngineWithUniqueReq(false))
go func() {
engine.startWithTicker("testTickerSpider")
}()
time.Sleep(8 * time.Second)
convey.So(engine.statistic.GetDownloadFail(), convey.ShouldAlmostEqual, 0)
convey.So(engine.statistic.GetRequestSent(), convey.ShouldAlmostEqual, 2)
convey.So(engine.statistic.GetItemScraped(), convey.ShouldAlmostEqual, 2)
convey.So(engine.statistic.GetErrorCount(), convey.ShouldAlmostEqual, 0)

// engine.Close()
})
}
2 changes: 1 addition & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func DefaultventsWatcher(ch chan EventType, hooker EventHooksInterface) error {

}

// EventsWatcher DefaultHooks 的事件监听器
// EventsWatcher DefualtHooks 的事件监听器
func (d *DefaultHooks) EventsWatcher(ch chan EventType) error {
return DefaultventsWatcher(ch, d)

Expand Down
2 changes: 2 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package tegenaria

import (
"encoding/gob"
"runtime/debug"
"sync"
"time"
)
Expand All @@ -39,6 +40,7 @@ func init() {
gob.Register([]interface{}{})
gob.Register(GET)
gob.Register(1 * time.Second)
debug.SetTraceback("crash")
})

}
7 changes: 7 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func EngineWithUniqueReq(uniqueReq bool) EngineOption {
}
}

// EngineWithInterval 定时执行时间
func EngineWithInterval(interval time.Duration) EngineOption {
return func(r *CrawlEngine) {
r.interval = interval
}
}

// EngineWithLimiter 引擎使用的限速器
func EngineWithLimiter(limiter LimitInterface) EngineOption {
return func(r *CrawlEngine) {
Expand Down
Loading

0 comments on commit f916e63

Please sign in to comment.