消息总线

消息总线是传输层抽象机制。Frontera 提供了接口和几个实现。同一时间只能使用一种类型的消息总线,并通过 MESSAGE_BUS 设置。

爬虫进程可以使用

class frontera.contrib.backends.remote.messagebus.MessageBusBackend(manager)

和消息总线进行通信。

内置消息总线参考

ZeroMQ

这是默认选项,使用轻量级的 ZeroMQ 库实现

可以使用 ZeroMQ 消息总线设置 配置。

ZeroMQ 需要按照 ZeroMQ 库,并且启动broker进程,请参考 启动集群

总的来说,使用 ZeroMQ 消息总线是为了用最少的部署实现 PoC (Patch Output Converter 成批输出转换程序)。因为它很容易 在组件的数据流未正确调整或启动过程中发生消息丢失,所以请参照下面的顺序启动组件:

  1. db worker
  2. strategy worker
  3. spiders

不幸的是,停止执行未完成抓取的爬虫时,无法避免消息丢失。如果你的爬虫程序对少量的信息丢失敏感的话,我建议你使用 Kafka。

警告!ZeroMQ消息总线不支持多个 SW worker 和 DB worker, 每种 woker 只能有一个实例。

Kafka

使用这个类

使用 Kafka 消息总线配置 配置。

需要运行 Kafka 服务,这个服务更适合大规模采集。

协议

根据数据流,Frontera 使用几种消息类型来编码它的消息。每种消息是用 msgpack 或 JSON 序列化的 python 对象。可以使用 MESSAGE_BUS_CODEC 选择编解码器模块,并且需要导出编码器和解码器类。

以下是子类实现自己的编解码器所需的类:

class frontera.core.codec.BaseEncoder
encode_add_seeds(seeds)

Encodes add_seeds message

参数:seeds (list) – A list of frontier Request objects
返回:bytes encoded message
encode_page_crawled(response)

Encodes a page_crawled message

参数:response (object) – A frontier Response object
返回:bytes encoded message
encode_request_error(request, error)

Encodes a request_error message

参数:
  • request (object) – A frontier Request object
  • error (string) – Error description
返回:

bytes encoded message

encode_request(request)

Encodes requests for spider feed stream.

参数:request (object) – Frontera Request object
返回:bytes encoded message
encode_update_score(request, score, schedule)

Encodes update_score messages for scoring log stream.

参数:
  • request (object) – Frontera Request object
  • score (float) – score
  • schedule (bool) – True if document needs to be scheduled for download
返回:

bytes encoded message

encode_new_job_id(job_id)

Encodes changing of job_id parameter.

参数:job_id (int) –
返回:bytes encoded message
encode_offset(partition_id, offset)

Encodes current spider offset in spider feed.

参数:
  • partition_id (int) –
  • offset (int) –
返回:

bytes encoded message

class frontera.core.codec.BaseDecoder
decode(buffer)

Decodes the message.

参数:buffer (bytes) – encoded message
返回:tuple of message type and related objects
decode_request(buffer)

Decodes Request objects.

参数:buffer (bytes) – serialized string
返回:object Request

可用的编解码器

MsgPack

Module: frontera.contrib.backends.remote.codecs.msgpack

JSON

A JSON codec for Frontera. Implemented using native json library.

Module: frontera.contrib.backends.remote.codecs.json