Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

合入develop #18

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# [v0.5.0](https://github.com/wetrycode/tegenaria/compare/v0.4.6...v0.5.0) (2023-03-23)


### Refactor

* 拆分引擎组件,抽离独立的组件接口,提供自定义组件的能力
* 移除定时轮询接口
* 优化引擎内部对单个请求进行调度的流程
* 移除`Context`全局管理器,在引擎端引入context计数器
* 移除`Request`和`Context`对象内存池,解决多协程场景下Request复用错误的问题
* `Request`对象的`Parser`类型由`func(resp *Context, req chan<- *Context) error`方法改为`string`方便对`Request`对象进行序列化
* 优化爬虫停止判断策略,增加[组件接口判断逻辑](https://github.com/wetrycode/tegenaria/blob/master/engine.go#L346),允许用户自定义爬虫终止逻辑


### Features

* 新增[分布式抓取组件](https://github.com/wetrycode/tegenaria/tree/master/distributed),提供分布式部署和抓取的能力
* 新增[gRPC和http接口](https://github.com/wetrycode/tegenaria/tree/master/service),提供实时远程控和查询制爬虫状态的能力
* 引擎内部新增一个[运行时的状态管理器](https://github.com/wetrycode/tegenaria/blob/v0.5.0/stats.go#L50),用于控制爬虫的启停

* `Request`对象新增一个`DoNotFilter`字段,支持Request粒度下的去重控制
* `Request`对象新增方法`ToMap() (map[string]interface{}, error)`,用于将`Request`对象进行序列化
* `Request`对象新增方法`RequestFromMap(src map[string]interface{}, opts ...RequestOption) *Request`,用于将`map[string]interface{}`对象进行反序列化为`Request`对象
* `Request`新增初始化选项`RequestWithPostForm(payload url.Values) RequestOption`用于接收`application/x-www-form-urlencoded`参数
* `Request`新增初始化选项`RequestWithBodyReader(body io.Reader) RequestOption`用于从内存读取二进制数据
* `Request`新增初始化选项`RRequestWithDoNotFilter(doNotFilter bool) RequestOption`用于控制`Request`对象是否参与去重
* `Response`新增一个接口`WriteTo(writer io.Writer) (int64, error)`,允许用户将response写入自定义的`io.Writer`,例如一个本地文件io实例,实现文件下载

* 新增`ComponentInterface`[组件接口](https://github.com/wetrycode/tegenaria/blob/v0.5.0/components.go),允许用户自定义组件

* 新增`DistributedWorkerInterface`[分布式节点控制接口](https://github.com/wetrycode/tegenaria/blob/v0.5.0/distributed.go)允许用户实现自定义对节点的控制逻辑

### Style

* 重命名`Request`和`Response`的`Header`为`Headers`

* 重命名`CacheInterface`接口的所有方法以大写字母开头
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import "github.com/wetrycode/tegenaria"

# 文档

- [入门](docs/Tutorial.md)
- [入门](https://wetrycode.github.io/tegenaria/#/quickstart)

# TODO

- 管理WEB API
- ~~管理WEB API~~


# Contribution
Expand All @@ -53,4 +53,4 @@ Send me an email directly, [email protected]

## License

[MIT](LICENSE) © geebytes
[MIT](LICENSE) © wetrycode
1 change: 1 addition & 0 deletions distributed/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (d *DistributedComponents) SetCurrentSpider(spider tegenaria.SpiderInterfac
d.spider = spider
d.worker.SetCurrentSpider(spider)
}

// SpiderBeforeStart 启动爬虫之前检查主节点的状态
// 若没有在线的主节点则从节点直接退出,并抛出panic
func (d *DistributedComponents) SpiderBeforeStart(engine *tegenaria.CrawlEngine, spider tegenaria.SpiderInterface) error {
Expand Down
2 changes: 1 addition & 1 deletion docs/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@
- 其他
- [FAQ](faq.md)
- [贡献](contribute.md)
- [更新日志](changelog.md)
- [更新日志](changelog)
Empty file removed docs/changelog.md
Empty file.
43 changes: 39 additions & 4 deletions docs/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type CrawlEngine struct {
runtimeStatus *RuntimeStatus
// components 引擎核心组件,包括去重、请求队列、限速器、指标统计组件、时间监听器
components ComponentInterface
onceStart sync.Once
oncePause sync.Once
// onceClose 引擎关闭动作只执行一次
onceClose sync.Once
}
func NewEngine(opts ...EngineOption) *CrawlEngine {
Engine := &CrawlEngine{
Expand All @@ -64,8 +64,9 @@ func NewEngine(opts ...EngineOption) *CrawlEngine {
currentSpider: nil,
ctxCount: 0,
reqChannelSize: 1024,
onceStart: sync.Once{},
components: NewDefaultComponents(),
onceClose: sync.Once{},

}
for _, o := range opts {
o(Engine)
Expand All @@ -76,4 +77,38 @@ func NewEngine(opts ...EngineOption) *CrawlEngine {

#### 说明

可以看到引擎包含了前文提及到的说有组件,引擎负责将这些组件进行组合构成一个完整的调度链路
- 可以看到引擎包含了前文提及到的说有组件,引擎负责将这些组件进行组合构成一个完整的调度链路

- 引擎提供了运行时状态控制和查询的组件`RuntimeStatus`,该组件可以控制和查询爬虫的运行状态,其定义如下

```go
// RuntimeStatus 引擎状态控制和查询
type RuntimeStatus struct {
// StartAt 第一次启动时间
StartAt int64
// Duration 运行时长
Duration float64
// StopAt 停止时间
StopAt int64
// RestartAt 重启时间
RestartAt int64
// StatusOn 当前引擎的状态
StatusOn StatusType
// onceStart 启动状态只执行一次
onceStart sync.Once
// oncePause 暂停状态只触发一次
oncePause *sync.Once
}

func NewRuntimeStatus() *RuntimeStatus {
return &RuntimeStatus{
StartAt: 0,
Duration: 0,
StopAt: 0,
RestartAt: 0,
StatusOn: ON_STOP,
onceStart: sync.Once{},
oncePause: &sync.Once{},
}
}
```
4 changes: 4 additions & 0 deletions docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
loadSidebar: true,
coverpage: true,
auto2top: true,
alias:{
'.*?/changelog':
'https://raw.githubusercontent.com/wetrycode/tegenaria/master/CHANGELOG'
},
search: {
noData: "没有结果",
paths: 'auto',
Expand Down
4 changes: 3 additions & 1 deletion docs/response.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ type Response struct {

- ```func (r *Response) Json() (map[string]interface{}, error)``` 序列化为json格式数据

- ```func (r *Response) String() (string, error)``` 序列化为stringa数据类型
- ```func (r *Response) String() (string, error)``` 序列化为stringa数据类型

- ```WriteTo(writer io.Writer) (int64, error)``` 允许用户将response写入自定义的witer,例如下载文件时可以将response写入本地文件
2 changes: 0 additions & 2 deletions docs/start_request.md

This file was deleted.

57 changes: 27 additions & 30 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ type CrawlEngine struct {
runtimeStatus *RuntimeStatus
// components 引擎核心组件,包括去重、请求队列、限速器、指标统计组件、时间监听器
components ComponentInterface
onceStart sync.Once
oncePause sync.Once
// onceClose 引擎关闭动作只执行一次
onceClose sync.Once
}

// RegisterSpiders 将spider实例注册到引擎的 spiders
Expand Down Expand Up @@ -144,22 +144,6 @@ func (e *CrawlEngine) stop() StatisticInterface {
e.runtimeStatus.SetStopAt(time.Now().Unix())
return e.components.GetStats()
}
func (e *CrawlEngine) Execute(spiderName string) StatisticInterface {
e.start(spiderName)
return e.stop()

}

// setCurrentSpider 对相关组件设置当前的spider
func (e *CrawlEngine) setCurrentSpider(spider SpiderInterface) {
e.components.GetStats().SetCurrentSpider(spider)
e.components.GetQueue().SetCurrentSpider(spider)
e.components.GetDupefilter().SetCurrentSpider(spider)
e.components.GetEventHooks().SetCurrentSpider(spider)
e.components.GetLimiter().SetCurrentSpider(spider)
e.components.SetCurrentSpider(spider)
e.currentSpider = spider
}

// Start 爬虫启动器
func (e *CrawlEngine) start(spiderName string) {
Expand All @@ -182,14 +166,10 @@ func (e *CrawlEngine) start(spiderName string) {
engineLog.Errorf("SpiderBeforeStart ERROR %s", err.Error())
return
}
e.onceStart.Do(func() {
e.runtimeStatus.SetStartAt(time.Now().Unix())

})
e.runtimeStatus.SetStartAt(time.Now().Unix())
e.runtimeStatus.SetRestartAt(time.Now().Unix())
// 引入引擎所有的组件
e.eventsChan <- START
e.oncePause = sync.Once{}
tasks := []GoFunc{e.recvRequest, e.Scheduler}
e.runtimeStatus.SetStatus(ON_START)
wg := &conc.WaitGroup{}
Expand All @@ -208,6 +188,21 @@ func (e *CrawlEngine) start(spiderName string) {
}

}
func (e *CrawlEngine) Execute(spiderName string) StatisticInterface {
e.start(spiderName)
return e.stop()
}

// setCurrentSpider 对相关组件设置当前的spider
func (e *CrawlEngine) setCurrentSpider(spider SpiderInterface) {
e.components.GetStats().SetCurrentSpider(spider)
e.components.GetQueue().SetCurrentSpider(spider)
e.components.GetDupefilter().SetCurrentSpider(spider)
e.components.GetEventHooks().SetCurrentSpider(spider)
e.components.GetLimiter().SetCurrentSpider(spider)
e.components.SetCurrentSpider(spider)
e.currentSpider = spider
}

// EventsWatcherRunner 事件监听器运行组件
func (e *CrawlEngine) EventsWatcherRunner() error {
Expand All @@ -225,10 +220,11 @@ func (e *CrawlEngine) Scheduler() error {
return nil
}
if e.runtimeStatus.GetStatusOn() == ON_PAUSE {
e.oncePause.Do(func() {
e.runtimeStatus.oncePause.Do(func() {
e.eventsChan <- PAUSE
})
e.eventsChan <- HEARTBEAT
time.Sleep(time.Second)
runtime.Gosched()
continue
}
Expand Down Expand Up @@ -519,12 +515,13 @@ func (e *CrawlEngine) GetSpiders() *Spiders {
return e.spiders
}

// Close 关闭引擎
// close 关闭引擎
func (e *CrawlEngine) close() {
defer func() {

}()
close(e.requestsChan)
e.onceClose.Do(func() {
// 保证channel只关闭一次
close(e.requestsChan)
close(e.eventsChan)
})
}

// GetStatic 获取StatisticInterface 统计指标
Expand Down Expand Up @@ -565,7 +562,7 @@ func NewEngine(opts ...EngineOption) *CrawlEngine {
currentSpider: nil,
ctxCount: 0,
reqChannelSize: 1024,
onceStart: sync.Once{},
onceClose: sync.Once{},
components: NewDefaultComponents(),
}
for _, o := range opts {
Expand Down
54 changes: 54 additions & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,57 @@ func TestPostForm(t *testing.T) {
convey.So(content, convey.ShouldContainSubstring, "form data")
})
}

func TestPauseRestart(t *testing.T) {
convey.Convey("test pause then restart,puase should be done once", t, func() {
engine := NewTestEngine("PauseSpider01", EngineWithUniqueReq(false))
server := NewTestServer()

feedUrls := []string{
server.URL + "/testGET",
server.URL + "/testGET",
server.URL + "/testGET",
server.URL + "/testGET",
server.URL + "/testGET",
}
spider := &TestSpider{NewBaseSpider("PauseSpider02", feedUrls)}
engine.RegisterSpiders(spider)
patch := gomonkey.ApplyFunc(
(*TestSpider).StartRequest,
func(spider *TestSpider, req chan<- *Context) {
for _, url := range spider.FeedUrls {
request := NewRequest(url, GET, spider.Parser)
ctx := NewContext(request, spider)
req <- ctx
time.Sleep(time.Second)
}
})
defer patch.Reset()
count := 0
pausePatch := gomonkey.ApplyFunc((*DefaultHooks).Pause, func(_ *DefaultHooks, params ...interface{}) error {
count = count + 1
return nil
})
defer pausePatch.Reset()

go func() {
go engine.Execute("PauseSpider02")
}()
time.Sleep(time.Second)
engine.GetRuntimeStatus().SetStatus(ON_PAUSE)

engine.GetRuntimeStatus().SetStatus(ON_PAUSE)
time.Sleep(time.Second)
// except pause only once
convey.So(count, convey.ShouldAlmostEqual, 1)
// restart and reset pause status
engine.GetRuntimeStatus().SetStatus(ON_START)
time.Sleep(time.Second)

engine.GetRuntimeStatus().SetStatus(ON_PAUSE)
time.Sleep(time.Second)

convey.So(count, convey.ShouldAlmostEqual, 2)

})
}
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/smartystreets/assertions v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand All @@ -56,7 +55,6 @@ require (
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.11.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huandu/go-tls v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
Loading
Loading