27-管道生产与消费
12.7.1 管道生产与消费
管道(Pipeline)是一种模式,是一个协程产生的无限数据流。例如,使用协程生成一个无限序列。
fun produceNumbers() = produce<Long>(CommonPool) {
var x = 1L
while (true) send(x++) // 产生从1开始的数据流
}
在接收端协程接收到数据流后,会进行一些消费处理或对接收的结果进行一些其他的处理。
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
上面的square函数式将接收到的数字进行求平方操作,如果要将整个管道连接起来,则可以定义一个测试函数。
fun testPipeline() = runBlocking {
val numbers = produceNumbers()
val squares = square(numbers)
for (i in 1..5) println(squares.receive())
println("Done!")
squares.cancel()
numbers.cancel()
}