实现了Scrapy爬虫,就算爬虫是异步加多线程的,但是只能在一台主机上运行,爬取效率还是有限的。分布式爬虫则是将多台主机组合起来,共同完成一个爬取任务。

分布式爬虫理念

分布式爬虫架构

Scrapy单机爬虫中有一个本地爬取队列Queue,这个队列是利用deque模块实现的。新的Request生成就会被放到队里,随后被调度器Schedule调度,交给Downloader执行爬取。

如果两个Schedule同时从队列里面读取Request,每个Schedule都有其对应的Downloader,那么在带宽足够、正常爬取且不考虑队列的存取压力的情况下,爬取效率会翻倍。这样Schedule可以扩展多个,Downloader也可以扩展多个。而爬取队列Queue必须始终为一个,即所谓的共享爬虫队列。这样才能保证Schedule从队列里调度某个Request后,其他Schedule不会重复调度此Request,就可做到多个Schedule同步爬取了。这就是分布式爬虫的雏形。

要实现在多台主机上运行爬虫任务协同爬取,而协同爬取的前提就是共享爬取队列,这样各台主机就不需要各自维护爬取队列,从共享爬取队列存取Request就行,但各台主机有各自的Scheudle和Downloader,所以调度和下载功能分别完成。如果不考虑队列存取性能消耗,爬取效率还是会成倍提高。

维护爬取队列

爬虫队列怎样维护比较?首先需要考虑性能,什么数据库存取性能高?基于内存的Redis,这里采用Redis来维护爬取队列。Redis支持多种数据结构,而这几种数据结构各有千秋:

  • 列表数据结构有lpush、lpop等操作,可以用其实现一个先进先出的爬取队列,也可以实现一个先进后出的栈式爬取队列;
  • 集合的元素是无序且不重复的,这样可以实现一个随机排序的不重复爬取队列;
  • 有序集合带有分数表示,而Scrapy的Request也有优先级的控制,所以用有序集合我们可以实现一个带有优先级调度的队列;

去重

Scrapy有自动去重功能,它的去重使用了Python中的集合。其相应的Scrapy的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def request_fingerprint(request, include_headers=None, keep_fragments=False):
if include_headers:
include_headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
cache_key = (include_headers, keep_fragments)
if cache_key not in cache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url, keep_fragments=keep_fragments)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[cache_key] = fp.hexdigest()
return cache[cache_key]

request_fingerprint就是计算指纹的方法,其方法内部使用的是hashlib的sha1方法,计算的字段包括Method、URL、Body、Headers这几部分。

对于如何判断重复?Scrapy是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RFPDupeFilter(BaseDupeFilter):
"""Request Fingerprint duplicates filter"""

def __init__(self, path=None, debug=False):
self.file = None
self.fingerprints = set()
self.logdupes = True
self.debug = debug
self.logger = logging.getLogger(__name__)
if path:
self.file = open(os.path.join(path, 'requests.seen'), 'a+')
self.file.seek(0)
self.fingerprints.update(x.rstrip() for x in self.file)

@classmethod
def from_settings(cls, settings):
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(job_dir(settings), debug)

def request_seen(self, request):
fp = self.request_fingerprint(request)
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + '\n')
...

在去重类RFPDupeFilter中,有一个request_seen方法,它的作用就是监测Request是否存在于fingerprints变量中;Scrapy的去重过程就是利用集合元素的不重复特性来实现Request的去重。

对于分布式爬虫来说,不能再用每个爬虫各自的集合去重了,多台主机如果生成了相同的Request,如果无法共享,各主机之间就无法做到去重。要实现去重,这个指纹集合也需要是共享的。Redis正好有集合的数据存储结构,可以利用Redis的集合作为指纹集合,这样去重集合也是利用Redis共享的。

防止中断

在Scrapy中,爬虫运行时的Request队列在内存中。爬虫运行中断后,这个队列的空间就被释放,此队列就销毁了。一旦爬虫运行中断,爬虫再次运行就相当于全新的爬取过程。

要做到中断后继续爬取,可以将队列中的Request保存起来,下次爬取直接读取读取保存数据即可获取上次爬取的队列。在Scrapy中指定一个爬取队列的存储路径即可,可用如下命令实现:

1
scrapy crawl spider -s JOBDIR=crawls/spider

在Scrapy中,实际是把爬取队列保存到本地,第二次直接读取并恢复队列。而在分布式架构中,不需要担心这个问题,因为爬取队列本身就是用数据库保存的,即使爬虫中断了,数据库中的Request依然存在,下次启动会接着上次中断的地方继续爬取。

架构实现

该架构的核心思想有以下几点:

  • 实现一个共享的爬取队列;
  • 实现去重的功能;
  • 重写一个Schedule,使之可以从共享的爬取队列中存取Request;

Scrapy-Redis原理和源码解析

Scrapy-Redis库已经为我们提供了Scrapy分布式的队列、调度、去重等功能,其地址如下:

参考地址:https://github.com/rmax/scrapy-redis

爬取队列

Scrapy-Redis的爬取队列源码文件为queue.py,它有3种队列的实现,首先它实现一个父类Base,提供一些基本方法和属性,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Base(object):
"""Per-spider base queue class"""

def __init__(self, server, spider, key, serializer=None):
"""Initialize per-spider redis queue.
Parameters
----------
server : StrictRedis Redis client instance.
spider : Spider Scrapy spider instance.
key: str Redis key where to put and get messages.
serializer : object Serializer object with ``loads`` and ``dumps`` methods.
"""
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError(f"serializer does not implement 'loads' function: {serializer}")
if not hasattr(serializer, 'dumps'):
raise TypeError(f"serializer does not implement 'dumps' function: {serializer}")

self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer

# 将Request序列化成字符串再存储
def _encode_request(self, request):
"""Encode a request object"""
try:
obj = request.to_dict(spider=self.spider)
except AttributeError:
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)

# 将Request反序列化后再取出
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, spider=self.spider)

def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError

def push(self, request):
"""Push a request"""
raise NotImplementedError

def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError

def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)

接下来需要定一些子类来继承Base类,并实现上述未实现的方法;

首先是FifoQueue,这个类继承了Base类,并重写了3个方法。Request在列表中左侧进、右侧出,这是有序的进出,即先进先出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class FifoQueue(Base):
"""Per-spider FIFO queue"""

def __len__(self):
"""Return the length of the queue"""
return self.server.llen(self.key)

def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))

def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.rpop(self.key)
if data:
return self._decode_request(data)

然后是LifoQueue,与FifoQueue相反,它的pop方法使用lpop,而push方法是lpush,即左侧进左侧出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class LifoQueue(Base):
"""Per-spider LIFO queue."""

def __len__(self):
"""Return the length of the stack"""
return self.server.llen(self.key)

def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))

def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.lpop(self.key)

if data:
return self._decode_request(data)

最后还有一个子类叫做PriorityQueue,即优先级队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class PriorityQueue(Base):
"""Per-spider priority queue abstraction using redis' sorted set"""

def __len__(self):
"""Return the length of the queue"""
return self.server.zcard(self.key)

def push(self, request):
"""Push a request"""
data = self._encode_request(request)
score = -request.priority
# We don't use zadd method as the order of arguments change depending on whether
# the class is Redis or StrictRedis, and the option of using kwargs only accepts strings, not bytes.
self.server.execute_command('ZADD', self.key, score, data)

def pop(self, timeout=0):
"""Pop a request timeout not support in this queue class"""
# use atomic range/remove using multi/exec
pipe = self.server.pipeline()
pipe.multi()
pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
results, count = pipe.execute()
if results:
return self._decode_request(results[0])

这里使用的存储结构是有序集合,在这个集合中,每个元素都可以设置一个分数,该分数表示优先级。

去重

Scrapy的去重是利用集合来实现的,而Scrapy分布式中的去重需要利用共享的集合,这里使用的是Redis中的集合数据结构。Scrapy-redis的去重源码文件为duperfilter.py,其内实现了一个RFPDupeFilter类,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class RFPDupeFilter(BaseDupeFilter):
""" Redis-based request duplicates filter.
This class can also be used with default Scrapy's scheduler.
"""
logger = logger

def __init__(self, server, key, debug=False):
"""Initialize the duplicates filter."""
self.server = server
self.key = key
self.debug = debug
self.logdupes = True

@classmethod
def from_settings(cls, settings):
"""Returns an instance from given settings."""
server = get_redis_from_settings(settings)
# XXX: This creates one-time key. needed to support to use this
# class as standalone dupefilter with scrapy's default scheduler
# if scrapy passes spider on open() method this wouldn't be needed
# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)

@classmethod
def from_crawler(cls, crawler):
"""Returns instance from crawler."""
return cls.from_settings(crawler.settings)

def request_seen(self, request):
"""Returns True if request was already seen."""
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp)
return added == 0

def request_fingerprint(self, request):
return request_fingerprint(request)

@classmethod
def from_spider(cls, spider):
settings = spider.settings
server = get_redis_from_settings(settings)
dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
key = dupefilter_key % {'spider': spider.name}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)

def close(self, reason=''):
self.clear()

def clear(self):
self.server.delete(self.key)

这里同样实现了一个request_seen方法,与Scrapy中的request_seen方法实现极其类似。不过这里的集合使用的是server(即redis实例)对象的sadd操作,也就是不再是一个简单数据结构了,而是直接换成了数据库的存储方式。鉴别重复的方式还是使用指纹,指纹同样依靠request_fingerprint方法获取。

调度器

Srapy-redis还实现了配合Queue、DupeFilter使用的调度器Schedule,其源文件名称为schedule.py。接下来看看两个核心的存取方法,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True

def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request

其中enqueue_request可以向队列中添加Request,核心操作就是调用Queue的push操作,还有一些统计和日志操作。next_queue就是从队列中取Request,核心操作就是调用Queue的pop操作,此时如果队列中还有Request,Request会直接取出来,爬取继续,如果队列为空,则爬取会重新开始。

小结

目前,3个分布式的问题就解决了:

  • 爬取队列的实现:这里提供了3种队列,使用Redis的列表或有序集合来维护;
  • 去重的实现:使用Redis的集合来保存Request指纹,以提供重复过滤;
  • 中断后重新爬取的实现:中断后Redis的队列没有清空,再次启动时调度器的next_request会从队列中会取下一个Request,爬取继续。

基于Scrapy-Redis的分布式爬虫配置实现

*核心配置

首先将调度器的类和去重类替换为Scrapy-Redis提供类,在settings.py中添加如下配置:

1
2
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

*Redis连接配置

Redis的连接信息有两种配置方式。

第一种是通过字符串连接配置,支持的形式如下:

1
2
3
redis://[:password]@host:port/db
rediss://[:password]@host:port/db
unix://[:password]@/path/to/socket.sock?db=db

password是密码,中括号代表此项可有可无,host是redis的地址,port是运行端口,db是数据库代号,其默认值是0。

如果通过字符串连接在settings.py中添加如下配置:

1
REDIS_URL = 'redis://192.168.2.3:6379'

第二种通过分单独配置,这样更加直观:

1
2
3
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PASSWORD = None

注意,如果配置了REDIS_URL,那么Scrapy-Redis将优先使用REDIS_URL连接,会覆盖上面的3项配置。

配置调度队列

此配置项是可选的,默认使用PriorityQueue。如需更改,可以配置SCHEDULE_QUEUE_CLASS变量,如下所示:

1
2
3
SCHEDULE_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# SCHEDULE_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'
# SCHEDULE_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'

配置持久化

此配置项是可选的,默认False。Scrpay-Redis默认会在爬取全部完成后清空爬取队列和去重指纹集合。

如果不想自动清空爬取队列和去重指纹集合,可以增加如下配置:

1
SCHEDULE_PERSIST = True

值得注意的是,如果强制中断爬虫的运行,爬取队列和去重指纹是不会自动清空的。

Pipeline配置

此配置项是可选的,默认不启动Pipeline。Scrpay-Redis实现了一个存储到Redis的Item Pipeline,如果启用了这个Pipeline,爬虫会把Item存储到Redis数据库。在数据量较大时,一般不会这么做。因为Redis是基于内存的,我们利用的是它处理速度快的特性,用它做存储未免太浪费。

基于Scrapy-Redis的分布式优化策略

基于Bloom Filter进行大规模去重

Scrapy-Redis将Request的指纹存储到了Redis集合中,每个指纹的长度为40,是一个字符串。

我们计算一下这种方式的消耗的存储空间。假设每个字符占1字节,即1B,1个指纹占用空间为40B,1万个指纹占用空间约400KB,1亿个指纹占用空间约4GB。当爬取数量达到上亿级别时,Redis的占用的内存就会变得很大,而这仅仅是指纹的存储。redis还存储了爬取队列,内存占用会进一步提高,更别说多个Scrapy项目同时爬取了。这里需要一个更加节省内存的去重算法—Bloom Filter。

Bloom Filter

Bloom Filter使用位数组表示一个待检测集合,并可以快速通过概率算法判断一个元素是否在这个集合中。

Bloom Filter算法原理:https://zhuanlan.zhihu.com/p/140545941

这里说结论,位数组的长度m需要比集合元素个数n和散列函数k的乘积还要大,且k约等于m与n的比值的0.7倍时,其误判概率最小。

Bloom Filter对接Scrapy-Redis原理

实现Bloom Fliter时,首先要保证不能破坏Scrapy-Redis分布式爬取的运行架构,所以需要修改Scrapy-Redis的源码,替换它们的去重类。同时Bloom Fliter的实现需要借助一个位数组,由于当前架构还是依赖于Redis,位数组的维护可直接使用Redis。

首先实现一个基本的散列算法,将一个值经过散列运算后映射到一个m位位数组的某一位上,代码实现如下:

1
2
3
4
5
6
7
8
9
10
class HashMap(object):
def __init__(self, m, seed):
self.m = m
self.seed = seed

def hash_demo(self, value):
ret = 0
for i in range(len(value)):
ret += self.seed*ret+ord(value[i])
return (self.m-1) & ret

这里新建了一个HashMap类,构造函数传入两个值,一个是m位数组的位数,另一个是种子值seed,不同的散列函数需要有不同的seed,这样可以保证不同散列函数的结果不会碰撞。该类的hash_demo方法实现了一个由字符串和seed来确定的散列函数。

接下来实现Bloom Filter,Bloom Filter里面需要用到k个散列函数,所以这里我们需要对这几个散列函数指定相同的m值和不同的seed值,在这里构造如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30

class BloomFilter(object):
def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER):
"""Initialize BloomFilter
:param server: Redis Server
:param key: BloomFilter Key
:param bit: m = 2 ^ bit
:param hash_number: the number of hash function
"""
# default to 1 << 30 = 10,7374,1824 = 2^30 = 128MB, max filter 2^30/hash_number = 1,7895,6970 fingerprints
self.m = 1 << bit
self.seeds = range(hash_number)
self.server = server
self.key = key
self.maps = [HashMap(self.m, seed) for seed in self.seeds]

def exists(self, value):
"""if value exists"""
if not value:
return False
exist = True
for map in self.maps:
offset = map.hash(value)
exist = exist & self.server.getbit(self.key, offset)
return exist

def insert(self, value):
"""add value to bloom"""
for f in self.maps:
offset = f.hash(value)
self.server.setbit(self.key, offset, 1)

由于需要完成亿级别的数据的去重,即算法中n为1亿以上,散列函数的个数k大约取10左右的量级,而m>kn,所以这里m保底在10亿左右。这里用移位操作来实现,传入位数bit,定义为30。由于是位数组,所以这个位数组占用的大小就是2^30 bit = 128 MB。

随后再传入散列函数的个数,用它来生成几个不同的seed,用不同的seed来定义不同的散列函数,这里构造一个散列函数列表,遍历seed,构造带有不同seed值的HashMap对象。

接着实现比较关键的两个方法,一个是判定元素是否重复的方法exists,另一个是添加元素到集合中的方法insert。至此Bloom Filter的实现已经完成。

然后需要继续修改Scrapy-Redis的源码,将它的去重逻辑替换为Bloom Filter的逻辑,这里主要是修改RFPDupeFilter类的request_seen方法,实现如下:

1
2
3
4
5
6
7
def request_seen(self, request):
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
if self.bf.exists(fp):
return True
self.bf.insert(fp)
return False

对于Bloom Filter的初始化定义,将其__init__修改为如下,其中bit和hash_number使用from_settings方法传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, server, key, debug, bit, hash_number):
"""Initialize the duplicates filter.
Parameters
----------
server : redis.StrictRedis The redis server instance.
key : str Redis key Where to store fingerprints.
debug : bool, optional Whether to log filtered requests.
"""
self.server = server
self.key = key
self.debug = debug
self.bit = bit
self.hash_number = hash_number
self.logdupes = True
self.bf = BloomFilter(server, self.key, bit, hash_number)

@classmethod
def from_settings(cls, settings):
"""Returns a RFPDupeFilter instance from given settings."""
server = get_redis_from_settings(settings)
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG)
bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT)
hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER)
return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)

常量BLOOMFILTER_BIT和DUPEFILTER_DEBUG统一定义在defaults.py中。至此就实现了Bloom Filter和Scrapy-Redis的对接。

Bloom Filter对接Scrapy-Redis配置

pip3 install scrapy-redis-bloomfilter

使用方法和Scrapy-Redis配置基本相似,在Scrapy-Redis配置的基础上,接入Bloom Filter需要修改如下几个配置:

1
2
3
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"					# 去重类
BLOOMFILTER_BIT = 30 # 该字段决定位数组的位数
BLOOMFILTER_HASH_NUMBER = 6 # BloomFilter使用的散列函数个数

参考网址:

  • https://github.com/Python3WebSpider/ScrapyRedisBloomFilter
  • https://github.com/Python3WebSpider/ScrapyCompositeDemo/tree/scrapy-redis-bloomfilter

基于RabbitMQ的分布式爬虫

由于我们的爬取队列仍然是基于Redis实现的,它同样会占据非常大的内存,我们可以考虑将爬取队列进行迁移。

爬取队列类似一个消息队列,可以先进先出、先进后出、按优先级进出等,目前消息队列中间件也有很多,如RabbitMQ、RocketMQ等。这里使用RabbitMQ来实现一下Scrapy的分布式爬虫。

RabbitMQ对接Scrapy原理

RabbitMQ就是一个消息队列,Scrapy-Redis利用Redis实现了一个爬取队列,所以同样的,可以仿照Scrapy-Redis的实现,将Redis换成RabbitMQ。

首先定义一个connection对象

1
2
3
4
5
6
7
import pika
def from_settings(settings):
"""generate connection of rabbitmq"""
connection_parameters = settings.get('RABBITMQ_CONNECTION_PARAMETERS', RABBITMQ_CONNECTION_PARAMETERS)
connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_parameters))
channel = connection.channel()
return channel

另外RabbitMQ也提供了对优先级队列的支持,在声明队列的时候设置x-max-priority参数来设定最大的优先级数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class PriorityQueue(Base):
def __init__(self, server, spider, key,
max_priority=SCHEDULER_QUEUE_MAX_PRIORITY,
durable=SCHEDULER_QUEUE_DURABLE,
force_flush=SCHEDULER_QUEUE_FORCE_FLUSH,
priority_offset=SCHEDULER_QUEUE_PRIORITY_OFFSET):
"""
init rabbitmq queue
:param server: pika channel
:param spider: spider object
:param key: queue name
"""
self.inited = False
logger.debug('Queue args %s', {
'server': server,
'spider': spider,
'key': key,
'max_priority': max_priority,
'durable': durable,
'force_flush': force_flush,
'priority_offset': priority_offset
})
self.durable = durable
super(PriorityQueue, self).__init__(server, spider, key)
try:
self.queue_operator = self.server.queue_declare(queue=self.key, arguments={
'x-max-priority': max_priority
}, durable=durable)
logger.debug('Queue operator %s', self.queue_operator)
self.inited = True
except ChannelClosedByBroker as e:
logger.error("You have changed queue configuration, you "
"must delete queue manually or set `SCHEDULER_QUEUE_FORCE_FLUSH` "
"to True, error detail %s" % str(e.args), exc_info=True)
self.inited = False
self.priority_offset = priority_offset

def __len__(self):
if not hasattr(self, 'queue_operator'):
return 0
return self.queue_operator.method.message_count

def push(self, request):
"""push request to queue"""
priority = request.priority + self.priority_offset
# set min priority in queue to 0
if priority < 0:
priority = 0
delivery_mode = 2 if self.durable else None
self.server.basic_publish(
exchange='',
properties=pika.BasicProperties(
priority=priority,
delivery_mode=delivery_mode
),
routing_key=self.key,
body=self._encode_request(request)
)

def pop(self):
"""pop request from queue"""
method_frame, header, body = self.server.basic_get(queue=self.key, auto_ack=True)

if body:
return self._decode_request(body)

对于Schedule。基本原理就是将Queue对象更换为刚才声明的PriorityQueue对象,同时一些初始化的参数通过settings获取即可。

最后整理好包(pip3 install gerapy-rabbitmq)后更改如下配置:

1
2
3
SCHEDULER = "gerapy_rabbitmq.scheduler.Scheduler"							  # 调度器类
SCHEDULER_QUEUE_KEY = '%(spider)s_requests' # 队列名称,使用Spider名称+Requests的组合
RABBITMQ_CONNECTION_PARAMETERS = {'host': '192.168.2.3','port': 5672} # RabbitMQ的连接对象

注意分布式的多台主机都需要修改为同一个RabbitMQ地址,运行就可以实现协同爬取了。

参考网址:https://github.com/Python3WebSpider/ScrapyCompositeDemo/blob/gerapy-rabbitmq/scrapycompositedemo