建立在反应流之上
建立在反应流之上
原文:https://medium.com/hackernoon/building-on-top-of-reactive-streams-86dacdbf9ad
反应流
反应流规范允许我们创建以异步方式通信的应用程序组件,同时使用背压机制维护安全性。
反应流正在每一种现代编程语言中实现,比如 JavaScript 和 Scala。
让我们看一个简单的例子,它将一个 生产者 连接到一个 订户 来看看流程是如何进行的。
**val** publisher = *Subject*[Int]
**val** subscriber = publisher.subscribe(message => *println*(message))
publisher.onNext(5)
在这里我们已经创建了一个 主题【Int】所以我们可以调用 。onNext 上的将一条消息(IntT25)推送到流中。在管道的另一端,我们有一个 **订阅者 ,即 将 订阅给 发布者 ,并指定每个新消息都将使用一个函数进行处理。**
如您所见,我们需要两个部分,一个发布者和一个订阅者,我们需要将它们连接在一起。
流上的操作
有时我们不需要处理每一条消息,所以我们可以过滤管道。
**val** subscriber = publisher
.filter(_ % 2 == 0)
.subscribe(message => *println*(message))
其他时候,我们需要应用转换,所以我们在流上映射。
**case class** EvenNumber(n: Int)**val** subscriber = publisher
.filter(_ % 2 == 0)
.map(EvenNumber)
.subscribe(message => *println*(message))
到目前为止,我们可能已经意识到流的操作与我们在任何集合中发现的操作相似。这是一个强大的功能,因为我们不必学习新的 API,我们可以使用我们已经知道的东西。
一个小问题
我们的问题是,要创建一个完整的管道,我们需要将所有部分连接在一起。这意味着当创建一个订阅者时,我们需要一个已经创建的发布者,并且我们必须能够访问它。
**val** subscriber = publisher.subscribe(message => *println*(message))
订阅者需要知道发布者,所以我们需要将发布者传递到每个要创建订阅者的地方。
共同的终点
我们希望创建一种机制来避免上述问题。解决这个问题的思路一点也不复杂。它实际上来自于一个艺术系统的状态,阿帕奇卡夫卡。
Kafka 使用基于主题的方法来划分流。我们可以在主题A中放置一条消息,这样只有那些收听主题A的人才能接收和处理这条消息。
另一方面,发布者和订阅者完全解耦。他们不了解对方。他们使用一个中心位置来发布和订阅消息。
建筑迷你小屋
MiniKaf 是我们为了克服这个问题而开发的一个库。它公开了一个由最少的类型集支持的非常干净的 API。
让我们看看一些主要组件。
首先,我们为发布到系统的每个事件/消息提供了一个接口。
**trait** Event[A] {
**def** value: A
}
然后,我们有了一种机制,可以在事件被发送到管道之前将类型转换为事件。
@typeclass **trait** ToEvent[A] {
**def** event(a: A): Event[A]
}
当然,这是进行转换的默认方式。
**object** ToEvent {
**implicit def** toEvent[A]: ToEvent[A] = **new** ToEvent[A] {
**override def** event(a: A) = *E*(a)
}
}
或者我们可以通过函数使用用户特定的方式进行转换。
***object** ToEvent {
**def** event[A](a: A)(f: A => Event[A]): Event[A] = f(a)
**implicit def** toEvent[A]: ToEvent[A] = **new** ToEvent[A] {
**override def** event(a: A) = *E*(a)
}
}*
我们现在准备开始发送消息。
*val publisher = Publisher()
publisher.publish(5)
publisher.publish("hello")*
但是等等!我们如何消费这些信息?
使用消息就像订阅您想要使用的消息类型一样简单。
*val subscriber = Subscriber()
subscriber.subscribe[Int](e => println((e.topic, e.value)))*
*在这里,我们订阅了类型为Int的消息。下次通过流发送一个Int时,函数e => println((e.topic, e.value))将被执行。注意,e不是一个Int,而是一个EventRecord,其中topic是"Int",value是由 *发布者推送的实际消息。**
*这里需要注意的最重要的部分是,我们没有将 订阅者 连接到 发布者 。我们只是创建了 订户 并指定了订阅。这与 Kafka 的解耦水平相同,但我们依赖于底层实现的反应流。*
结论
然而,我们认为它们不应该被直接使用,而是应该通过我们自己的 API 来使用。
招聘广告
*MiniKaf如今正处于它的最开始阶段。我们正在运行时做一些不安全的工作,这些工作可能会被编译时机制取代(可能使用Shapeless)。此外,API 并不完整,并且缺少一些部分。任何帮助都将不胜感激。*
黑客中午是黑客如何开始他们的下午。我们是 @AMI 家庭的一员。我们现在接受投稿,并乐意讨论广告&赞助机会。
要了解更多信息,请阅读我们的“关于”页面、在脸书上点赞/给我们发消息,或者简单地说, tweet/DM @HackerNoon。