当前位置:嗨网首页>书籍在线阅读

10-用于读写Redis的管道

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

9.3.1 用于读写Redis的管道

Google Geocoding API是按照IP进行限制的。我们可以利用多个IP(例如使用多台服务器)进行缓解,此时需要避免重复请求其他机器上已经完成地理编码的地址。这种情况也适用于之前运行中曾见到过的地址。我们不想浪费宝贵的限额。

请与API供应商沟通,确保在他们的策略下这种做法是可行的。比如,你可能必须每隔几分钟/小时就要丢弃掉缓存记录,或者根本不允许缓存。

我们可以使用Redis的键值对缓存,从本质上说,它是一个分布式的字典。我们已经在vagrant环境中运行了一个Redis实例,可以使用 redis-cli 命令,从开发机连接它并执行基本操作。

$ redis-cli -h redis
redis:6379> info keyspace
# Keyspace
redis:6379> set key value
OK
redis:6379> info keyspace
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
redis:6379> FLUSHALL
OK
redis:6379> info keyspace
# Keyspace
redis:6379> exit

通过Google搜索"Redis Twisted",我们找到了 txredisapi 库。其本质区别是它不再是同步Python库的包装,而是适用于Twisted的库,它使用 reactor.connectTCP() 连接Redis、实现Twisted协议等。使用该库的方式与其他库类似,不过在Twisted应用中使用它时,其效率肯定会更高一些。我们在安装它时可以再附带一个工具库—— dj_redis_url ,该工具库用于解析Redis配置URL,我们可以使用 pip 进行安装( sudo pip install txredisapi dj_redis_url ),和往常一样,在我们的开发机中也已经预先安装好了这些库。

可以按如下代码初始化 RedisCache

from txredisapi import lazyConnectionPool
class RedisCache(object):
...
  def __init__(self, crawler, redis_url, redis_nm):
    self.redis_url = redis_url
    self.redis_nm = redis_nm
    args = RedisCache.parse_redis_url(redis_url)
    self.connection = lazyConnectionPool(connectTimeout=5,
                       replyTimeout=5,
                       **args)
    crawler.signals.connect(
        self.item_scraped,signal=signals.item_scraped)

该管道非常简单。为了连接Redis服务器,我们需要主机地址、端口等参数,由于这些参数是以URL格式存储的,因此需要使用 parse_redis_url() 方法解析该格式(为简洁起见已经省略)。为键设置前缀作为命名空间的行为非常常见,在本例中,我们将其存储在 redis_nm 中。然后,使用 txredisapilazyConnectionPool() ,打开到服务器的连接。

最后一行使用了一个很有意思的函数。我们的目的是将地理编码管道与该管道包装起来。如果在Redis中没有某个值,我们将不会设置该值,我们的地理编码管道将像之前那样使用API对地址进行地理编码。在该操作完成之后,需要有一种方式在Redis中缓存这些键值对,在这里是通过连接到 signals.item_scraped 信号的方式实现的。我们定义的回调( item_scraped() 方法,将很快看到)在非常靠后的位置被调用,此时坐标位置将会被设置。

本示例的完整代码位于 `ch09/properties/properties/pipelines/redis.py` 。

我们通过查找和记录每个 Item 的地址和位置,保持了缓存的简单性。这对Redis来说是很有意义的,因为它经常运行在同一个服务器当中,这使得它运行速度非常快。如果不是这种情况,那么可能需要添加一个基于字典的缓存,与我们在地理编码管道中的实现类似。下面是处理传入的Item的方法。

@defer.inlineCallbacks
def process_item(self, item, spider):
  address = item["address"][0]
  key = self.redis_nm + ":" + address
  value = yield self.connection.get(key)
  if value:
    item["location"] = json.loads(value)
  defer.returnValue(item)

和大家的期望相同。我们得到了地址,为其添加前缀,然后使用 txredisapi connectionget() 方法在Redis中查询。我们在Redis中存储的值是JSON编码的对象。如果值已经设定,则使用JSON对其进行解码,并将其设为坐标位置。

当一个 Item 到达所有管道的结尾时,我们重新捕获它,确保存储到Redis的位置值当中。下面是实现代码。

from txredisapi import ConnectionError
def item_scraped(self, item, spider):
  try:
    location = item["location"]
    value = json.dumps(location, ensure_ascii=False)
  except KeyError:
    return
  address = item["address"][0]
  key = self.redis_nm + ":" + address
  quiet = lambda failure: failure.trap(ConnectionError)
  return self.connection.set(key, value).addErrback(quiet)

这里同样没有什么惊喜。如果我们找到一个位置,就可以得到地址,为其添加前缀,并使用它们作为键值对,用于 txredisapi 连接的 set() 方法。你会发现该函数没有使用 @defer.inlineCallbacks ,这是因为在处理 signals.item_scraped 时并不支持该装饰器。这就意味着无法再对 connection.set() 使用非常便捷的 yield 操作,不过我们可以做的工作是返回延迟操作,Scrapy可以用它串联任何未来的信号进行监听。无论何种情况,如果到Redis的连接无法执行 connection.set() ,就会抛出一个异常。可以通过添加自定义错误处理到 connection.set() 返回的延迟操作中,静默忽略该异常。在该错误处理中,我们将失败作为参数传递,并告知它们对任何 ConnectionError 执行 trap() 操作。这是Twisted的延迟操作API的一个非常好用的功能。通过在预期的异常中使用 trap() ,我们能够以紧凑的方式静默忽略它们。

为了启用该管道,我们所需做的就是将其添加到 ITEM_PIPELINES 设置中,并在 settings.py 文件中提供一个 REDIS_PIPELINE_URL 。为该管道设置一个比地理编码管道更小的优先级值非常重要,否则其运行就会太迟,无法起到作用。

ITEM_PIPELINES = { ...
  'properties.pipelines.redis.RedisCache': 300,
  'properties.pipelines.geo.GeoPipeline': 400,
...
REDIS_PIPELINE_URL = 'redis://redis:6379'

我们可以像平时那样运行该爬虫。第一次运行将会和之前类似,不过接下来的每次运行都会像下面这样。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=100
...
INFO: Enabled item pipelines: TidyUp, RedisCache, GeoPipeline, 
MysqlWriter, EsWriter
...
Scraped... 0.0 items/s, avg latency: 0.00 s, time in pipelines: 0.00 s
Scraped... 21.2 items/s, avg latency: 0.78 s, time in pipelines: 0.15 s
Scraped... 24.2 items/s, avg latency: 0.82 s, time in pipelines: 0.16 s
...
INFO: Dumping Scrapy stats: {...
  'geo_pipeline/already_set': 106,
  'item_scraped_count': 106,

可以看到 GeoPipelineRedisCache 都已经启用,并且RedisCache会首先进行。另外,还可以注意到 geo_pipeline/already_set 统计值是106。这些是GeoPipeline从Redis缓存中找到的预先填充好的item,并且它们都不需要请求Google API调用。如果Redis缓存为空,你会看到一些键依然会使用Google API进行处理。在性能方面,我们注意到GeoPipeline引发的初始行为现在没有了。实际上,由于目前使用了缓存,因此绕过了每秒5个请求的API限制。当使用Redis时,还应当考虑使用过期键,使系统可以周期性地刷新缓存数据。