当前位置:嗨网首页>书籍在线阅读

08-多进程爬虫

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

[toc]

4.4.2 多进程爬虫

为了进一步改善性能,我们对多线程示例再度扩展,使其支持多进程。目前,爬虫队列都是存储在本地内存当中的,其他进程都无法处理这一爬虫。为了解决该问题,需要把爬虫队列转移到Redis当中。单独存储队列,意味着即使是不同服务器上的爬虫也能够协同处理同一个爬虫任务。

如果想要拥有更加健壮的队列,则需要考虑使用专用的分布式任务工具,比如Celery。不过,为了尽量减少本书中介绍的技术种类和依赖,我们在这里选择复用Redis。下面是基于Redis实现的队列代码。

# Based loosely on the Redis Cookbook FIFO Queue:
# http://www.rediscookbook.org/implement_a_fifo_queue.html
from redis import StrictRedis
class RedisQueue:
    """ RedisQueue helps store urls to crawl to Redis
        Initialization components:
        client: a Redis client connected to the key-value database for
                the web crawling cache (if not set, a localhost:6379
                default connection is used).
        db (int): which database to use for Redis
        queue_name (str): name for queue (default: wswp)
    """
    def __init__(self, client=None, db=0, queue_name='wswp'):
        self.client = (StrictRedis(host='localhost', port=6379, db=db)
                       if client is None else client)
        self.name = "queue:%s" % queue_name
        self.seen_set = "seen:%s" % queue_name
        self.depth = "depth:%s" % queue_name
    def __len__(self):
        return self.client.llen(self.name)
    def push(self, element):
        """Push an element to the tail of the queue"""
        if isinstance(element, list):
            element = [e for e in element if not self.already_seen(e)]
            self.client.lpush(self.name, *element)
            self.client.sadd(self.seen_set, *element)
        elif not self.client.already_seen(element):
            self.client.lpush(self.name, element)
            self.client.sadd(self.seen_set, element)
    def pop(self):
        """Pop an element from the head of the queue"""
        return self.client.rpop(self.name)
    def already_seen(self, element):
        """ determine if an element has already been seen """
        return self.client.sismember(self.seen_set, element)
    def set_depth(self, element, depth):
        """ Set the seen hash and depth """
        self.client.hset(self.depth, element, depth)
    def get_depth(self, element):
        """ Get the seen hash and depth """
        return self.client.hget(self.depth, element)

可以看到在前面的 RedisQueue 类中,我们维护了几个不同的数据类型。首先是预期中的Redis列表类型,它可以通过 lpushrpop 命令进行处理,其队列名称存储在 self.name 属性中。

接下来是Redis集合,其功能类似于只包含唯一成员的Python集合。集合名称存储在 self.seen_set 中,我们可以通过 saddsismember 方法进行管理(添加新键以及测试成员)。

最后,我们把深度相关的功能移至 set_depthget_depth 方法中,它使用了标准的Redis哈希表,其名称存储在 self.depth 中,每个URL及其深度分别作为键值。对代码的一个有用的补充是设置域名访问的最后时间,这样我们就可以为 Downloader 类实现更有效的延时功能了。这一部分留给读者作为练习。

如果你希望队列拥有更多功能,但又有着与Redis相同的可用性,我推荐你了解 `python-rq` ,这是一个易于安装和使用的Python任务队列,它与Celery类似,但其功能和依赖更少。

继续当前的 RedisQueue 实现,我们需要对多线程爬虫进行少量更新,以支持新的队列类型,如下所示。

def threaded_crawler_rq(...):
    ...
    # the queue of URL's that still need to be crawled
    crawl_queue = RedisQueue()
    crawl_queue.push(seed_url)
    def process_queue():
        while len(crawl_queue):
            url = crawl_queue.pop()
        ...

第一个改动是将Python列表替换成基于Redis的新队列,这里将其命名为 RedisQueue 。由于该队列会在内部实现中处理重复URL的问题,因此不再需要 seen 变量。最后,调用 RedisQueuelen 方法,确定是否仍然有URL在队列中。处理深度和发现功能的进一步逻辑变更如下所示。

## inside process_queue
if no_robots or rp.can_fetch(user_agent, url):
    depth = crawl_queue.get_depth(url) or 0
    if depth == max_depth:
        print('Skipping %s due to depth' % url)
        continue
    html = D(url, num_retries=num_retries)
    if not html:
        continue
    if scraper_callback:
        links = scraper_callback(url, html) or []
    else:
        links = []
    # filter for links matching our regular expression
    for link in get_links(html, link_regex) + links:
        if 'http' not in link:
            link = clean_link(url, domain, link)
        crawl_queue.push(link)
        crawl_queue.set_depth(link, depth + 1)

完整代码请参见本书源码文件的chp4文件夹中的 threaded_crawler_with_queue.py

更新后的多线程爬虫还可以启动多个进程,如下面的代码所示。

import multiprocessing
def mp_threaded_crawler(args, **kwargs):
    num_procs = kwargs.pop('num_procs')
    if not num_procs:
        num_cpus = multiprocessing.cpu_count()
    processes = []
    for i in range(num_procs):
        proc = multiprocessing.Process(
            target=threaded_crawler_rq, args=args,
            kwargs=kwargs)
        proc.start()
        processes.append(proc)
    # wait for processes to complete
    for proc in processes:
        proc.join()

这段代码的结构看起来十分熟悉,因为多进程模块和之前使用的多线程模块接口相似。这段代码在启动脚本时,使用可用CPU的数量(我的机器上是8),或通过参数传入的 num_procs 。然后,每个处理器启动一个多线程爬虫,并等待所有处理器完成执行。

现在,让我们使用如下命令,测试多进程版本链接爬虫的性能。关于 mp_threaded_crawler 的代码可以从本书源码文件的chp4文件夹中的 threaded_crawler_aith_queue.py 获取。

$ python threaded_crawler_with_queue.py
...
Total time: 197.0864086151123s

通过脚本检测,我的机器有8个CPU(4个物理核心、4个虚拟核心),而线程的默认设置是5。如果想使用不同的组合,可以使用 -h 命令查看想要的参数,如下所示。

$ python threaded_crawler_with_queue.py -h
usage: threaded_crawler_with_queue.py [-h]
 [max_threads] [num_procs] [url_pattern]
Multiprocessing threaded link crawler
positional arguments:
 max_threads maximum number of threads
 num_procs number of processes
 url_pattern regex pattern for url matching
optional arguments:
 -h, --help show this help message and exit
`-h` 命令同样也适用于测试 `threaded_crawler.py` 脚本的不同值。

在默认的8个处理器以及每个处理器5个线程的设置下,运行时间比之前只使用一个进程的多线程爬虫快了大约80%。在下一节中,我们将进一步研究这三种方式的相对性能。