实现了Scrapy爬虫,就算爬虫是异步加多线程的,但是只能在一台主机上运行,爬取效率还是有限的。分布式爬虫则是将多台主机组合起来,共同完成一个爬取任务。
分布式爬虫理念 分布式爬虫架构 Scrapy单机爬虫中有一个本地爬取队列Queue,这个队列是利用deque模块实现的。新的Request生成就会被放到队里,随后被调度器Schedule调度,交给Downloader执行爬取。
如果两个Schedule同时从队列里面读取Request,每个Schedule都有其对应的Downloader,那么在带宽足够、正常爬取且不考虑队列的存取压力的情况下,爬取效率会翻倍。这样Schedule可以扩展多个,Downloader也可以扩展多个。而爬取队列Queue必须始终为一个,即所谓的共享爬虫队列。这样才能保证Schedule从队列里调度某个Request后,其他Schedule不会重复调度此Request,就可做到多个Schedule同步爬取了。这就是分布式爬虫的雏形。
要实现在多台主机上运行爬虫任务协同爬取,而协同爬取的前提就是共享爬取队列,这样各台主机就不需要各自维护爬取队列,从共享爬取队列存取Request就行,但各台主机有各自的Scheudle和Downloader,所以调度和下载功能分别完成。如果不考虑队列存取性能消耗,爬取效率还是会成倍提高。
维护爬取队列 爬虫队列怎样维护比较?首先需要考虑性能,什么数据库存取性能高?基于内存的Redis,这里采用Redis来维护爬取队列。Redis支持多种数据结构,而这几种数据结构各有千秋:
列表数据结构有lpush、lpop等操作,可以用其实现一个先进先出的爬取队列,也可以实现一个先进后出的栈式爬取队列;
集合的元素是无序且不重复的,这样可以实现一个随机排序的不重复爬取队列;
有序集合带有分数表示,而Scrapy的Request也有优先级的控制,所以用有序集合我们可以实现一个带有优先级调度的队列;
去重 Scrapy有自动去重功能,它的去重使用了Python中的集合。其相应的Scrapy的源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def request_fingerprint (request, include_headers=None , keep_fragments=False ): if include_headers: include_headers = tuple (to_bytes(h.lower()) for h in sorted (include_headers)) cache = _fingerprint_cache.setdefault(request, {}) cache_key = (include_headers, keep_fragments) if cache_key not in cache: fp = hashlib.sha1() fp.update(to_bytes(request.method)) fp.update(to_bytes(canonicalize_url(request.url, keep_fragments=keep_fragments))) fp.update(request.body or b'' ) if include_headers: for hdr in include_headers: if hdr in request.headers: fp.update(hdr) for v in request.headers.getlist(hdr): fp.update(v) cache[cache_key] = fp.hexdigest() return cache[cache_key]
request_fingerprint就是计算指纹的方法,其方法内部使用的是hashlib的sha1方法,计算的字段包括Method、URL、Body、Headers这几部分。
对于如何判断重复?Scrapy是这样实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class RFPDupeFilter (BaseDupeFilter ): """Request Fingerprint duplicates filter""" def __init__ (self, path=None , debug=False ): self.file = None self.fingerprints = set () self.logdupes = True self.debug = debug self.logger = logging.getLogger(__name__) if path: self.file = open (os.path.join(path, 'requests.seen' ), 'a+' ) self.file.seek(0 ) self.fingerprints.update(x.rstrip() for x in self.file) @classmethod def from_settings (cls, settings ): debug = settings.getbool('DUPEFILTER_DEBUG' ) return cls(job_dir(settings), debug) def request_seen (self, request ): fp = self.request_fingerprint(request) if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + '\n' ) ...
在去重类RFPDupeFilter中,有一个request_seen方法,它的作用就是监测Request是否存在于fingerprints变量中;Scrapy的去重过程就是利用集合元素的不重复特性来实现Request的去重。
对于分布式爬虫来说,不能再用每个爬虫各自的集合去重了,多台主机如果生成了相同的Request,如果无法共享,各主机之间就无法做到去重。要实现去重,这个指纹集合也需要是共享的。Redis正好有集合的数据存储结构,可以利用Redis的集合作为指纹集合,这样去重集合也是利用Redis共享的。
防止中断 在Scrapy中,爬虫运行时的Request队列在内存中。爬虫运行中断后,这个队列的空间就被释放,此队列就销毁了。一旦爬虫运行中断,爬虫再次运行就相当于全新的爬取过程。
要做到中断后继续爬取,可以将队列中的Request保存起来,下次爬取直接读取读取保存数据即可获取上次爬取的队列。在Scrapy中指定一个爬取队列的存储路径即可,可用如下命令实现:
1 scrapy crawl spider -s JOBDIR=crawls/spider
在Scrapy中,实际是把爬取队列保存到本地,第二次直接读取并恢复队列。而在分布式架构中,不需要担心这个问题,因为爬取队列本身就是用数据库保存的,即使爬虫中断了,数据库中的Request依然存在,下次启动会接着上次中断的地方继续爬取。
架构实现 该架构的核心思想有以下几点:
实现一个共享的爬取队列;
实现去重的功能;
重写一个Schedule,使之可以从共享的爬取队列中存取Request;
Scrapy-Redis原理和源码解析 Scrapy-Redis库已经为我们提供了Scrapy分布式的队列、调度、去重等功能,其地址如下:
参考地址:https://github.com/rmax/scrapy-redis
爬取队列 Scrapy-Redis的爬取队列源码文件为queue.py,它有3种队列的实现,首先它实现一个父类Base,提供一些基本方法和属性,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class Base (object ): """Per-spider base queue class""" def __init__ (self, server, spider, key, serializer=None ): """Initialize per-spider redis queue. Parameters ---------- server : StrictRedis Redis client instance. spider : Spider Scrapy spider instance. key: str Redis key where to put and get messages. serializer : object Serializer object with ``loads`` and ``dumps`` methods. """ if serializer is None : serializer = picklecompat if not hasattr (serializer, 'loads' ): raise TypeError(f"serializer does not implement 'loads' function: {serializer} " ) if not hasattr (serializer, 'dumps' ): raise TypeError(f"serializer does not implement 'dumps' function: {serializer} " ) self.server = server self.spider = spider self.key = key % {'spider' : spider.name} self.serializer = serializer def _encode_request (self, request ): """Encode a request object""" try : obj = request.to_dict(spider=self.spider) except AttributeError: obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request (self, encoded_request ): """Decode an request previously encoded""" obj = self.serializer.loads(encoded_request) return request_from_dict(obj, spider=self.spider) def __len__ (self ): """Return the length of the queue""" raise NotImplementedError def push (self, request ): """Push a request""" raise NotImplementedError def pop (self, timeout=0 ): """Pop a request""" raise NotImplementedError def clear (self ): """Clear queue/stack""" self.server.delete(self.key)
接下来需要定一些子类来继承Base类,并实现上述未实现的方法;
首先是FifoQueue ,这个类继承了Base类,并重写了3个方法。Request在列表中左侧进、右侧出,这是有序的进出,即先进先出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class FifoQueue (Base ): """Per-spider FIFO queue""" def __len__ (self ): """Return the length of the queue""" return self.server.llen(self.key) def push (self, request ): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop (self, timeout=0 ): """Pop a request""" if timeout > 0 : data = self.server.brpop(self.key, timeout) if isinstance (data, tuple ): data = data[1 ] else : data = self.server.rpop(self.key) if data: return self._decode_request(data)
然后是LifoQueue ,与FifoQueue相反,它的pop方法使用lpop,而push方法是lpush,即左侧进左侧出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class LifoQueue (Base ): """Per-spider LIFO queue.""" def __len__ (self ): """Return the length of the stack""" return self.server.llen(self.key) def push (self, request ): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop (self, timeout=0 ): """Pop a request""" if timeout > 0 : data = self.server.blpop(self.key, timeout) if isinstance (data, tuple ): data = data[1 ] else : data = self.server.lpop(self.key) if data: return self._decode_request(data)
最后还有一个子类叫做PriorityQueue ,即优先级队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class PriorityQueue (Base ): """Per-spider priority queue abstraction using redis' sorted set""" def __len__ (self ): """Return the length of the queue""" return self.server.zcard(self.key) def push (self, request ): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD' , self.key, score, data) def pop (self, timeout=0 ): """Pop a request timeout not support in this queue class""" pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0 , 0 ).zremrangebyrank(self.key, 0 , 0 ) results, count = pipe.execute() if results: return self._decode_request(results[0 ])
这里使用的存储结构是有序集合,在这个集合中,每个元素都可以设置一个分数,该分数表示优先级。
去重 Scrapy的去重是利用集合来实现的,而Scrapy分布式中的去重需要利用共享的集合,这里使用的是Redis中的集合数据结构。Scrapy-redis的去重源码文件为duperfilter.py,其内实现了一个RFPDupeFilter 类,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class RFPDupeFilter(BaseDupeFilter): "" " Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. " "" logger = logger def __init__(self, server, key, debug=False): "" "Initialize the duplicates filter." "" self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): "" "Returns an instance from given settings." "" server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp' : int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG' ) return cls(server, key=key, debug=debug) @classmethod def from_crawler(cls, crawler): "" "Returns instance from crawler." "" return cls.from_settings(crawler.settings) def request_seen(self, request): "" "Returns True if request was already seen." "" fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): return request_fingerprint(request) @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY" , defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider' : spider.name} debug = settings.getbool('DUPEFILTER_DEBUG' ) return cls(server, key=key, debug=debug) def close(self, reason='' ): self.clear() def clear(self): self.server.delete(self.key)
这里同样实现了一个request_seen方法,与Scrapy中的request_seen方法实现极其类似。不过这里的集合使用的是server(即redis实例)对象的sadd操作,也就是不再是一个简单数据结构了,而是直接换成了数据库的存储方式。鉴别重复的方式还是使用指纹,指纹同样依靠request_fingerprint方法获取。
调度器 Srapy-redis还实现了配合Queue、DupeFilter使用的调度器Schedule,其源文件名称为schedule.py。接下来看看两个核心的存取方法,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def enqueue_request (self, request ): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis' , spider=self.spider) self.queue.push(request) return True def next_request (self ): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis' , spider=self.spider) return request
其中enqueue_request可以向队列中添加Request,核心操作就是调用Queue的push操作,还有一些统计和日志操作。next_queue就是从队列中取Request,核心操作就是调用Queue的pop操作,此时如果队列中还有Request,Request会直接取出来,爬取继续,如果队列为空,则爬取会重新开始。
小结 目前,3个分布式的问题就解决了:
爬取队列的实现:这里提供了3种队列,使用Redis的列表或有序集合来维护;
去重的实现:使用Redis的集合来保存Request指纹,以提供重复过滤;
中断后重新爬取的实现:中断后Redis的队列没有清空,再次启动时调度器的next_request会从队列中会取下一个Request,爬取继续。
基于Scrapy-Redis的分布式爬虫配置实现 *核心配置 首先将调度器的类和去重类替换为Scrapy-Redis提供类,在settings.py
中添加如下配置:
1 2 SCHEDULER = 'scrapy_redis.scheduler.Scheduler' DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
*Redis连接配置 Redis的连接信息有两种配置方式。
第一种是通过字符串连接配置,支持的形式如下:
1 2 3 redis://[:password]@host:port/db rediss://[:password]@host:port/db unix://[:password]@/path/to/socket.sock?db=db
password是密码,中括号代表此项可有可无,host是redis的地址,port是运行端口,db是数据库代号,其默认值是0。
如果通过字符串连接在settings.py
中添加如下配置:
1 REDIS_URL = 'redis://192.168.2.3:6379'
第二种通过分单独配置,这样更加直观:
1 2 3 REDIS_HOST = '127.0.0.1' REDIS_PORT = 6379 REDIS_PASSWORD = None
注意,如果配置了REDIS_URL,那么Scrapy-Redis将优先使用REDIS_URL连接,会覆盖上面的3项配置。
配置调度队列 此配置项是可选的,默认使用PriorityQueue。如需更改,可以配置SCHEDULE_QUEUE_CLASS变量,如下所示:
1 2 3 SCHEDULE_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
配置持久化 此配置项是可选的,默认False。Scrpay-Redis默认会在爬取全部完成后清空爬取队列和去重指纹集合。
如果不想自动清空爬取队列和去重指纹集合,可以增加如下配置:
值得注意的是,如果强制中断爬虫的运行,爬取队列和去重指纹是不会自动清空的。
Pipeline配置 此配置项是可选的,默认不启动Pipeline。Scrpay-Redis实现了一个存储到Redis的Item Pipeline,如果启用了这个Pipeline,爬虫会把Item存储到Redis数据库。在数据量较大时,一般不会这么做。因为Redis是基于内存的,我们利用的是它处理速度快的特性,用它做存储未免太浪费。
基于Scrapy-Redis的分布式优化策略 基于Bloom Filter进行大规模去重 Scrapy-Redis将Request的指纹存储到了Redis集合中,每个指纹的长度为40,是一个字符串。
我们计算一下这种方式的消耗的存储空间。假设每个字符占1字节,即1B,1个指纹占用空间为40B,1万个指纹占用空间约400KB,1亿个指纹占用空间约4GB。当爬取数量达到上亿级别时,Redis的占用的内存就会变得很大,而这仅仅是指纹的存储。redis还存储了爬取队列,内存占用会进一步提高,更别说多个Scrapy项目同时爬取了。这里需要一个更加节省内存的去重算法—Bloom Filter。
Bloom Filter Bloom Filter使用位数组表示一个待检测集合,并可以快速通过概率算法判断一个元素是否在这个集合中。
Bloom Filter算法原理:https://zhuanlan.zhihu.com/p/140545941
这里说结论,位数组的长度m需要比集合元素个数n和散列函数k的乘积还要大,且k约等于m与n的比值的0.7倍时,其误判概率最小。
Bloom Filter对接Scrapy-Redis原理 实现Bloom Fliter时,首先要保证不能破坏Scrapy-Redis分布式爬取的运行架构,所以需要修改Scrapy-Redis的源码 ,替换它们的去重类。同时Bloom Fliter的实现需要借助一个位数组,由于当前架构还是依赖于Redis,位数组的维护可直接使用Redis。
首先实现一个基本的散列算法 ,将一个值经过散列运算后映射到一个m位位数组的某一位上,代码实现如下:
1 2 3 4 5 6 7 8 9 10 class HashMap (object ): def __init__ (self, m, seed ): self.m = m self.seed = seed def hash_demo (self, value ): ret = 0 for i in range (len (value)): ret += self.seed*ret+ord (value[i]) return (self.m-1 ) & ret
这里新建了一个HashMap类,构造函数传入两个值,一个是m位数组的位数,另一个是种子值seed,不同的散列函数需要有不同的seed,这样可以保证不同散列函数的结果不会碰撞。该类的hash_demo方法实现了一个由字符串和seed来确定的散列函数。
接下来实现Bloom Filter ,Bloom Filter里面需要用到k个散列函数,所以这里我们需要对这几个散列函数指定相同的m值和不同的seed值,在这里构造如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 BLOOMFILTER_HASH_NUMBER = 6 BLOOMFILTER_BIT = 30 class BloomFilter (object ): def __init__ (self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER ): """Initialize BloomFilter :param server: Redis Server :param key: BloomFilter Key :param bit: m = 2 ^ bit :param hash_number: the number of hash function """ self.m = 1 << bit self.seeds = range (hash_number) self.server = server self.key = key self.maps = [HashMap(self.m, seed) for seed in self.seeds] def exists (self, value ): """if value exists""" if not value: return False exist = True for map in self.maps: offset = map .hash (value) exist = exist & self.server.getbit(self.key, offset) return exist def insert (self, value ): """add value to bloom""" for f in self.maps: offset = f.hash (value) self.server.setbit(self.key, offset, 1 )
由于需要完成亿级别的数据的去重,即算法中n为1亿以上,散列函数的个数k大约取10左右的量级,而m>kn,所以这里m保底在10亿左右。这里用移位操作来实现,传入位数bit,定义为30。由于是位数组,所以这个位数组占用的大小就是2^30 bit = 128 MB。
随后再传入散列函数的个数,用它来生成几个不同的seed,用不同的seed来定义不同的散列函数,这里构造一个散列函数列表,遍历seed,构造带有不同seed值的HashMap对象。
接着实现比较关键的两个方法,一个是判定元素是否重复的方法exists,另一个是添加元素到集合中的方法insert。至此Bloom Filter的实现已经完成。
然后需要继续修改Scrapy-Redis的源码,将它的去重逻辑替换为Bloom Filter的逻辑,这里主要是修改RFPDupeFilter类的request_seen方法,实现如下:
1 2 3 4 5 6 7 def request_seen (self, request ): fp = self.request_fingerprint(request) if self.bf.exists(fp): return True self.bf.insert(fp) return False
对于Bloom Filter的初始化定义,将其__init__
修改为如下,其中bit和hash_number使用from_settings方法传递。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def __init__ (self, server, key, debug, bit, hash_number ): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.bit = bit self.hash_number = hash_number self.logdupes = True self.bf = BloomFilter(server, self.key, bit, hash_number) @classmethod def from_settings (cls, settings ): """Returns a RFPDupeFilter instance from given settings.""" server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp' : int (time.time())} debug = settings.getbool('DUPEFILTER_DEBUG' , DUPEFILTER_DEBUG) bit = settings.getint('BLOOMFILTER_BIT' , BLOOMFILTER_BIT) hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER' , BLOOMFILTER_HASH_NUMBER) return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)
常量BLOOMFILTER_BIT和DUPEFILTER_DEBUG统一定义在defaults.py中。至此就实现了Bloom Filter和Scrapy-Redis的对接。
Bloom Filter对接Scrapy-Redis配置
pip3 install scrapy-redis-bloomfilter
使用方法和Scrapy-Redis配置基本相似,在Scrapy-Redis配置的基础上,接入Bloom Filter需要修改如下几个配置:
1 2 3 DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter" BLOOMFILTER_BIT = 30 BLOOMFILTER_HASH_NUMBER = 6
参考网址:
https://github.com/Python3WebSpider/ScrapyRedisBloomFilter
https://github.com/Python3WebSpider/ScrapyCompositeDemo/tree/scrapy-redis-bloomfilter
基于RabbitMQ的分布式爬虫 由于我们的爬取队列仍然是基于Redis实现的,它同样会占据非常大的内存,我们可以考虑将爬取队列进行迁移。
爬取队列类似一个消息队列,可以先进先出、先进后出、按优先级进出等,目前消息队列中间件也有很多,如RabbitMQ、RocketMQ等。这里使用RabbitMQ来实现一下Scrapy的分布式爬虫。
RabbitMQ对接Scrapy原理 RabbitMQ就是一个消息队列,Scrapy-Redis利用Redis实现了一个爬取队列,所以同样的,可以仿照Scrapy-Redis的实现,将Redis换成RabbitMQ。
首先定义一个connection对象
1 2 3 4 5 6 7 import pikadef from_settings (settings ): """generate connection of rabbitmq""" connection_parameters = settings.get('RABBITMQ_CONNECTION_PARAMETERS' , RABBITMQ_CONNECTION_PARAMETERS) connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_parameters)) channel = connection.channel() return channel
另外RabbitMQ也提供了对优先级队列的支持,在声明队列的时候设置x-max-priority参数来设定最大的优先级数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 class PriorityQueue (Base ): def __init__ (self, server, spider, key, max_priority=SCHEDULER_QUEUE_MAX_PRIORITY, durable=SCHEDULER_QUEUE_DURABLE, force_flush=SCHEDULER_QUEUE_FORCE_FLUSH, priority_offset=SCHEDULER_QUEUE_PRIORITY_OFFSET ): """ init rabbitmq queue :param server: pika channel :param spider: spider object :param key: queue name """ self.inited = False logger.debug('Queue args %s' , { 'server' : server, 'spider' : spider, 'key' : key, 'max_priority' : max_priority, 'durable' : durable, 'force_flush' : force_flush, 'priority_offset' : priority_offset }) self.durable = durable super (PriorityQueue, self).__init__(server, spider, key) try : self.queue_operator = self.server.queue_declare(queue=self.key, arguments={ 'x-max-priority' : max_priority }, durable=durable) logger.debug('Queue operator %s' , self.queue_operator) self.inited = True except ChannelClosedByBroker as e: logger.error("You have changed queue configuration, you " "must delete queue manually or set `SCHEDULER_QUEUE_FORCE_FLUSH` " "to True, error detail %s" % str (e.args), exc_info=True ) self.inited = False self.priority_offset = priority_offset def __len__ (self ): if not hasattr (self, 'queue_operator' ): return 0 return self.queue_operator.method.message_count def push (self, request ): """push request to queue""" priority = request.priority + self.priority_offset if priority < 0 : priority = 0 delivery_mode = 2 if self.durable else None self.server.basic_publish( exchange='' , properties=pika.BasicProperties( priority=priority, delivery_mode=delivery_mode ), routing_key=self.key, body=self._encode_request(request) ) def pop (self ): """pop request from queue""" method_frame, header, body = self.server.basic_get(queue=self.key, auto_ack=True ) if body: return self._decode_request(body)
对于Schedule。基本原理就是将Queue对象更换为刚才声明的PriorityQueue对象,同时一些初始化的参数通过settings获取即可。
最后整理好包(pip3 install gerapy-rabbitmq
)后更改如下配置:
1 2 3 SCHEDULER = "gerapy_rabbitmq.scheduler.Scheduler" SCHEDULER_QUEUE_KEY = '%(spider)s_requests' RABBITMQ_CONNECTION_PARAMETERS = {'host' : '192.168.2.3' ,'port' : 5672 }
注意分布式的多台主机都需要修改为同一个RabbitMQ地址,运行就可以实现协同爬取了。
参考网址:https://github.com/Python3WebSpider/ScrapyCompositeDemo/blob/gerapy-rabbitmq/scrapycompositedemo