消息总线¶
消息总线是传输层抽象机制。Frontera 提供了接口和几个实现。同一时间只能使用一种类型的消息总线,并通过 MESSAGE_BUS
设置。
爬虫进程可以使用
-
class
frontera.contrib.backends.remote.messagebus.
MessageBusBackend
(manager)¶
和消息总线进行通信。
内置消息总线参考¶
ZeroMQ¶
这是默认选项,使用轻量级的 ZeroMQ 库实现
可以使用 ZeroMQ 消息总线设置 配置。
ZeroMQ 需要按照 ZeroMQ 库,并且启动broker进程,请参考 启动集群 。
总的来说,使用 ZeroMQ 消息总线是为了用最少的部署实现 PoC (Patch Output Converter 成批输出转换程序)。因为它很容易 在组件的数据流未正确调整或启动过程中发生消息丢失,所以请参照下面的顺序启动组件:
- db worker
- strategy worker
- spiders
不幸的是,停止执行未完成抓取的爬虫时,无法避免消息丢失。如果你的爬虫程序对少量的信息丢失敏感的话,我建议你使用 Kafka。
警告!ZeroMQ消息总线不支持多个 SW worker 和 DB worker, 每种 woker 只能有一个实例。
协议¶
根据数据流,Frontera 使用几种消息类型来编码它的消息。每种消息是用 msgpack 或 JSON 序列化的 python 对象。可以使用 MESSAGE_BUS_CODEC
选择编解码器模块,并且需要导出编码器和解码器类。
以下是子类实现自己的编解码器所需的类:
-
class
frontera.core.codec.
BaseEncoder
¶ -
encode_add_seeds
(seeds)¶ Encodes add_seeds message
参数: seeds (list) – A list of frontier Request objects 返回: bytes encoded message
-
encode_page_crawled
(response)¶ Encodes a page_crawled message
参数: response (object) – A frontier Response object 返回: bytes encoded message
-
encode_request_error
(request, error)¶ Encodes a request_error message
参数: - request (object) – A frontier Request object
- error (string) – Error description
返回: bytes encoded message
-
encode_request
(request)¶ Encodes requests for spider feed stream.
参数: request (object) – Frontera Request object 返回: bytes encoded message
-
encode_update_score
(request, score, schedule)¶ Encodes update_score messages for scoring log stream.
参数: - request (object) – Frontera Request object
- score (float) – score
- schedule (bool) – True if document needs to be scheduled for download
返回: bytes encoded message
-
encode_new_job_id
(job_id)¶ Encodes changing of job_id parameter.
参数: job_id (int) – 返回: bytes encoded message
-
encode_offset
(partition_id, offset)¶ Encodes current spider offset in spider feed.
参数: - partition_id (int) –
- offset (int) –
返回: bytes encoded message
-