当前位置:嗨网首页>书籍在线阅读

13-使用二进制或脚本的管道

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

9.4.2 使用二进制或脚本的管道

对于一个遗留功能来说,最不可知的接口就是独立的可执行程序或脚本。它可能需要几秒钟时间启动(比如从数据库中加载数据),不过在这之后,它可能会在一小段延时内处理许多值。即使对于这种情况,Twisted仍然能够覆盖。我们可以使用 reactor.spawnProcess() API以及相关的 protocol.ProcessProtocol 运行任何类型的可执行程序。来看一个例子,该示例的脚本如下所示。

#!/bin/bash
trap "" SIGINT
sleep 3
while read line
do
  # 4 per second
  sleep 0.25
  awk "BEGIN {print 1.20 * $line}"
done

这是一个简单的bash脚本。当它启动后,会禁用Ctrl + C。这是为了解决Ctrl + C派生到子进程后过早终止,导致Scrapy自身无法停止,无限等待子进程返回结果的系统特性。禁用Ctrl + C后,脚本将会睡眠3秒钟,以模拟启动时间。然后脚本会从输入中读取行,等待250毫秒,再返回结果价格,该计算使用Linux的 awk 命令将原值乘以1.2倍。该脚本的最大吞吐量是每秒4个 Item 。可以使用一个简短的会话对其进行测试,如下所示。

$ properties/pipelines/legacy.sh
12 <- If you type this quickly you will wait ~3 seconds to get results
14.40
13 <- For further numbers you will notice just a slight delay
15.60

由于Ctrl + C被禁用,我们必须使用Ctrl + D终止会话。不错!那么,我们要如何在Scrapy中使用该脚本呢?仍然从一个简化的版本起步。

class CommandSlot(protocol.ProcessProtocol):
  def __init__(self, args):
    self._queue = []
    reactor.spawnProcess(self, args[0], args)
  def legacy_calculate(self, price):
    d = defer.Deferred()
    self._queue.append(d)
    self.transport.write("%f\n" % price)
    return d
  # Overriding from protocol.ProcessProtocol
  def outReceived(self, data):
    """Called when new output is received"""
    self._queue.pop(0).callback(float(data))
class Pricing(object):
  def __init__(self):
    self.slot = CommandSlot(['properties/pipelines/legacy.sh'])
  @defer.inlineCallbacks
  def process_item(self, item, spider):
    item["price"][0] = yield self.slot.legacy_calculate(item["price"][0])
    defer.returnValue(item)

我们可以在这里找到名为 CommandSlotProcessProtocol 的定义,以及 Pricing 爬虫。在 __init__() 中,我们创建了新的 CommandSlot ,其构造方法初始化了一个空队列,并使用 reactor.spawnProcess() 启动了一个新的进程。该调用将从进程中传输和接收数据的 ProcessProtocol 作为第一个参数。在本例中,该值为 self ,因为 spawnProcess() 是在 protocol 类中进行调用的。第二个参数是可执行程序的名称。第三个参数 args 将该二进制程序的所有命令行参数作为字符串列表保留。

在管道的 process_item() 中,基本上将所有工作都委托给 CommandSlotlegacy_calculate() 方法,它将返回一个延迟操作,并执行 yield 操作。 legacy_calculate() 创建了一个延迟操作,使其排队,然后使用 transport.write() 将价格写入到进程当中。 transportProcessProtocol 提供,用于让我们和进程进行通信。无论我们何时从进程中接收到数据,都会调用 outReceived() 。通过延迟操作排队,以及按顺序处理的shell脚本,我们可以从队列中只弹出最旧的延迟操作,使用接收到的值触发它。到此为止。我们可以通过在 ITEM_PIPELINES 中添加它的方式,启动该管道,并像平时那样运行。

ITEM_PIPELINES = {...
  'properties.pipelines.legacy.Pricing': 600,

如果我们运行一次,就会发现其性能非常糟糕。如我们所料,我们的处理成为瓶颈,限制了吞吐量只能达到每秒4个Item。要想增长吞吐量,我们所能做的就是对管道进行一些修改,允许该类并行运行多个,如下所示。

class Pricing(object):
  def __init__(self):
    self.concurrency = 16
    args = ['properties/pipelines/legacy.sh']
    self.slots = [CommandSlot(args)
           for i in xrange(self.concurrency)]
    self.rr = 0
  @defer.inlineCallbacks
  def process_item(self, item, spider):
    slot = self.slots[self.rr]
    self.rr = (self.rr + 1) % self.concurrency
    item["price"][0] = yield
             slot.legacy_calculate(item["price"][0])
    defer.returnValue(item)

我们将其修改为启动16个实例,并以轮询的方式为每个实例发送价格。该管道现在提供了每秒16×4 = 64个item的吞吐量。我们可以通过一个快速爬取来确认,如下所示。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 0.0 items/s, avg latency: 0.00 s and avg time in pipelines: 
0.00 s
Scraped... 21.0 items/s, avg latency: 2.20 s and avg time in pipelines: 
1.48 s
Scraped... 24.2 items/s, avg latency: 1.16 s and avg time in pipelines: 
0.52 s

延时和预期一样,增长到250毫秒,不过吞吐量仍然是每秒25个item。

请注意,前面的方法中使用了 transport.write() 将shell脚本输入中的所有价格排入队列。对于你的应用而言,这种方式可能合适,也可能不合适,尤其是当它使用了更多的数据而不仅仅是几个数字时。本例完整代码会将所有值和回调排入队列,并且只有在前一次结果被接收后,才会向脚本发送新值。你会发现这种方式对你的遗留应用更加友好,不过也增添了一些复杂度。