13-使用二进制或脚本的管道
9.4.2 使用二进制或脚本的管道
对于一个遗留功能来说,最不可知的接口就是独立的可执行程序或脚本。它可能需要几秒钟时间启动(比如从数据库中加载数据),不过在这之后,它可能会在一小段延时内处理许多值。即使对于这种情况,Twisted仍然能够覆盖。我们可以使用 reactor.spawnProcess()
API以及相关的 protocol.ProcessProtocol
运行任何类型的可执行程序。来看一个例子,该示例的脚本如下所示。
#!/bin/bash
trap "" SIGINT
sleep 3
while read line
do
# 4 per second
sleep 0.25
awk "BEGIN {print 1.20 * $line}"
done
这是一个简单的bash脚本。当它启动后,会禁用Ctrl + C。这是为了解决Ctrl + C派生到子进程后过早终止,导致Scrapy自身无法停止,无限等待子进程返回结果的系统特性。禁用Ctrl + C后,脚本将会睡眠3秒钟,以模拟启动时间。然后脚本会从输入中读取行,等待250毫秒,再返回结果价格,该计算使用Linux的 awk
命令将原值乘以1.2倍。该脚本的最大吞吐量是每秒4个 Item
。可以使用一个简短的会话对其进行测试,如下所示。
$ properties/pipelines/legacy.sh
12 <- If you type this quickly you will wait ~3 seconds to get results
14.40
13 <- For further numbers you will notice just a slight delay
15.60
由于Ctrl + C被禁用,我们必须使用Ctrl + D终止会话。不错!那么,我们要如何在Scrapy中使用该脚本呢?仍然从一个简化的版本起步。
class CommandSlot(protocol.ProcessProtocol):
def __init__(self, args):
self._queue = []
reactor.spawnProcess(self, args[0], args)
def legacy_calculate(self, price):
d = defer.Deferred()
self._queue.append(d)
self.transport.write("%f\n" % price)
return d
# Overriding from protocol.ProcessProtocol
def outReceived(self, data):
"""Called when new output is received"""
self._queue.pop(0).callback(float(data))
class Pricing(object):
def __init__(self):
self.slot = CommandSlot(['properties/pipelines/legacy.sh'])
@defer.inlineCallbacks
def process_item(self, item, spider):
item["price"][0] = yield self.slot.legacy_calculate(item["price"][0])
defer.returnValue(item)
我们可以在这里找到名为 CommandSlot
的 ProcessProtocol
的定义,以及 Pricing
爬虫。在 __init__()
中,我们创建了新的 CommandSlot
,其构造方法初始化了一个空队列,并使用 reactor.spawnProcess()
启动了一个新的进程。该调用将从进程中传输和接收数据的 ProcessProtocol
作为第一个参数。在本例中,该值为 self
,因为 spawnProcess()
是在 protocol
类中进行调用的。第二个参数是可执行程序的名称。第三个参数 args
将该二进制程序的所有命令行参数作为字符串列表保留。
在管道的 process_item()
中,基本上将所有工作都委托给 CommandSlot
的 legacy_calculate()
方法,它将返回一个延迟操作,并执行 yield
操作。 legacy_calculate()
创建了一个延迟操作,使其排队,然后使用 transport.write()
将价格写入到进程当中。 transport
由 ProcessProtocol
提供,用于让我们和进程进行通信。无论我们何时从进程中接收到数据,都会调用 outReceived()
。通过延迟操作排队,以及按顺序处理的shell脚本,我们可以从队列中只弹出最旧的延迟操作,使用接收到的值触发它。到此为止。我们可以通过在 ITEM_PIPELINES
中添加它的方式,启动该管道,并像平时那样运行。
ITEM_PIPELINES = {...
'properties.pipelines.legacy.Pricing': 600,
如果我们运行一次,就会发现其性能非常糟糕。如我们所料,我们的处理成为瓶颈,限制了吞吐量只能达到每秒4个Item。要想增长吞吐量,我们所能做的就是对管道进行一些修改,允许该类并行运行多个,如下所示。
class Pricing(object):
def __init__(self):
self.concurrency = 16
args = ['properties/pipelines/legacy.sh']
self.slots = [CommandSlot(args)
for i in xrange(self.concurrency)]
self.rr = 0
@defer.inlineCallbacks
def process_item(self, item, spider):
slot = self.slots[self.rr]
self.rr = (self.rr + 1) % self.concurrency
item["price"][0] = yield
slot.legacy_calculate(item["price"][0])
defer.returnValue(item)
我们将其修改为启动16个实例,并以轮询的方式为每个实例发送价格。该管道现在提供了每秒16×4 = 64个item的吞吐量。我们可以通过一个快速爬取来确认,如下所示。
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 0.0 items/s, avg latency: 0.00 s and avg time in pipelines:
0.00 s
Scraped... 21.0 items/s, avg latency: 2.20 s and avg time in pipelines:
1.48 s
Scraped... 24.2 items/s, avg latency: 1.16 s and avg time in pipelines:
0.52 s
延时和预期一样,增长到250毫秒,不过吞吐量仍然是每秒25个item。
请注意,前面的方法中使用了 transport.write()
将shell脚本输入中的所有价格排入队列。对于你的应用而言,这种方式可能合适,也可能不合适,尤其是当它使用了更多的数据而不仅仅是几个数字时。本例完整代码会将所有值和回调排入队列,并且只有在前一次结果被接收后,才会向脚本发送新值。你会发现这种方式对你的遗留应用更加友好,不过也增添了一些复杂度。