08-多进程爬虫
[toc]
4.4.2 多进程爬虫
为了进一步改善性能,我们对多线程示例再度扩展,使其支持多进程。目前,爬虫队列都是存储在本地内存当中的,其他进程都无法处理这一爬虫。为了解决该问题,需要把爬虫队列转移到Redis当中。单独存储队列,意味着即使是不同服务器上的爬虫也能够协同处理同一个爬虫任务。
如果想要拥有更加健壮的队列,则需要考虑使用专用的分布式任务工具,比如Celery。不过,为了尽量减少本书中介绍的技术种类和依赖,我们在这里选择复用Redis。下面是基于Redis实现的队列代码。
# Based loosely on the Redis Cookbook FIFO Queue:
# http://www.rediscookbook.org/implement_a_fifo_queue.html
from redis import StrictRedis
class RedisQueue:
""" RedisQueue helps store urls to crawl to Redis
Initialization components:
client: a Redis client connected to the key-value database for
the web crawling cache (if not set, a localhost:6379
default connection is used).
db (int): which database to use for Redis
queue_name (str): name for queue (default: wswp)
"""
def __init__(self, client=None, db=0, queue_name='wswp'):
self.client = (StrictRedis(host='localhost', port=6379, db=db)
if client is None else client)
self.name = "queue:%s" % queue_name
self.seen_set = "seen:%s" % queue_name
self.depth = "depth:%s" % queue_name
def __len__(self):
return self.client.llen(self.name)
def push(self, element):
"""Push an element to the tail of the queue"""
if isinstance(element, list):
element = [e for e in element if not self.already_seen(e)]
self.client.lpush(self.name, *element)
self.client.sadd(self.seen_set, *element)
elif not self.client.already_seen(element):
self.client.lpush(self.name, element)
self.client.sadd(self.seen_set, element)
def pop(self):
"""Pop an element from the head of the queue"""
return self.client.rpop(self.name)
def already_seen(self, element):
""" determine if an element has already been seen """
return self.client.sismember(self.seen_set, element)
def set_depth(self, element, depth):
""" Set the seen hash and depth """
self.client.hset(self.depth, element, depth)
def get_depth(self, element):
""" Get the seen hash and depth """
return self.client.hget(self.depth, element)
可以看到在前面的 RedisQueue 类中,我们维护了几个不同的数据类型。首先是预期中的Redis列表类型,它可以通过 lpush 和 rpop 命令进行处理,其队列名称存储在 self.name 属性中。
接下来是Redis集合,其功能类似于只包含唯一成员的Python集合。集合名称存储在 self.seen_set 中,我们可以通过 sadd 和 sismember 方法进行管理(添加新键以及测试成员)。
最后,我们把深度相关的功能移至 set_depth 和 get_depth 方法中,它使用了标准的Redis哈希表,其名称存储在 self.depth 中,每个URL及其深度分别作为键值。对代码的一个有用的补充是设置域名访问的最后时间,这样我们就可以为 Downloader 类实现更有效的延时功能了。这一部分留给读者作为练习。
如果你希望队列拥有更多功能,但又有着与Redis相同的可用性,我推荐你了解 `python-rq` ,这是一个易于安装和使用的Python任务队列,它与Celery类似,但其功能和依赖更少。
继续当前的 RedisQueue 实现,我们需要对多线程爬虫进行少量更新,以支持新的队列类型,如下所示。
def threaded_crawler_rq(...):
...
# the queue of URL's that still need to be crawled
crawl_queue = RedisQueue()
crawl_queue.push(seed_url)
def process_queue():
while len(crawl_queue):
url = crawl_queue.pop()
...
第一个改动是将Python列表替换成基于Redis的新队列,这里将其命名为 RedisQueue 。由于该队列会在内部实现中处理重复URL的问题,因此不再需要 seen 变量。最后,调用 RedisQueue 的 len 方法,确定是否仍然有URL在队列中。处理深度和发现功能的进一步逻辑变更如下所示。
## inside process_queue
if no_robots or rp.can_fetch(user_agent, url):
depth = crawl_queue.get_depth(url) or 0
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
html = D(url, num_retries=num_retries)
if not html:
continue
if scraper_callback:
links = scraper_callback(url, html) or []
else:
links = []
# filter for links matching our regular expression
for link in get_links(html, link_regex) + links:
if 'http' not in link:
link = clean_link(url, domain, link)
crawl_queue.push(link)
crawl_queue.set_depth(link, depth + 1)
完整代码请参见本书源码文件的chp4文件夹中的 threaded_crawler_with_queue.py 。
更新后的多线程爬虫还可以启动多个进程,如下面的代码所示。
import multiprocessing
def mp_threaded_crawler(args, **kwargs):
num_procs = kwargs.pop('num_procs')
if not num_procs:
num_cpus = multiprocessing.cpu_count()
processes = []
for i in range(num_procs):
proc = multiprocessing.Process(
target=threaded_crawler_rq, args=args,
kwargs=kwargs)
proc.start()
processes.append(proc)
# wait for processes to complete
for proc in processes:
proc.join()
这段代码的结构看起来十分熟悉,因为多进程模块和之前使用的多线程模块接口相似。这段代码在启动脚本时,使用可用CPU的数量(我的机器上是8),或通过参数传入的 num_procs 。然后,每个处理器启动一个多线程爬虫,并等待所有处理器完成执行。
现在,让我们使用如下命令,测试多进程版本链接爬虫的性能。关于 mp_threaded_crawler 的代码可以从本书源码文件的chp4文件夹中的 threaded_crawler_aith_queue.py 获取。
$ python threaded_crawler_with_queue.py
...
Total time: 197.0864086151123s
通过脚本检测,我的机器有8个CPU(4个物理核心、4个虚拟核心),而线程的默认设置是5。如果想使用不同的组合,可以使用 -h 命令查看想要的参数,如下所示。
$ python threaded_crawler_with_queue.py -h
usage: threaded_crawler_with_queue.py [-h]
[max_threads] [num_procs] [url_pattern]
Multiprocessing threaded link crawler
positional arguments:
max_threads maximum number of threads
num_procs number of processes
url_pattern regex pattern for url matching
optional arguments:
-h, --help show this help message and exit
`-h` 命令同样也适用于测试 `threaded_crawler.py` 脚本的不同值。
在默认的8个处理器以及每个处理器5个线程的设置下,运行时间比之前只使用一个进程的多线程爬虫快了大约80%。在下一节中,我们将进一步研究这三种方式的相对性能。
如果你希望队列拥有更多功能,但又有着与Redis相同的可用性,我推荐你了解 `python-rq` ,这是一个易于安装和使用的Python任务队列,它与Celery类似,但其功能和依赖更少。
`-h` 命令同样也适用于测试 `threaded_crawler.py` 脚本的不同值。