36-案例实现
案例实现
根据如下步骤实现本案例。
1.创建 Item
类,用于代表发布者发送给订阅者的信息。 Item
类有两个 String
属性( title
和 content
),以及这两个属性的 get()
和 set()
方法。这个类的源代码非常简单,故不再展示。
2.创建 Consumer1
类,用于实现 Subscriber
接口,并以 Item
作为泛型参数。我们必须要实现4个方法。首先,实现 onComplete()
方法—该方法用于简单地把信息写到控制台中:
public class Consumer1 implements Flow.Subscriber<Item> {
@Override
public void onComplete() {
System.out.printf("%s: Consumer 1: Completed\n",
Thread.currentThread().getName());
}
3.实现 onError()
方法。该方法用于简单地把错误信息写到控制台中:
@Override
public void onError(Throwable exception) {
System.out.printf("%s: Consumer 1: Error\n",
Thread.currentThread().getName());
exception.printStackTrace(System.err);
}
4.实现 onNext()
方法。该方法用于把收到的 item
写到控制台中:
@Override
public void onNext(Item item) {
System.out.printf("%s: Consumer 1: Item received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 1: %s\n",
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s: Consumer 1: %s\n",
Thread.currentThread().getName(),
item.getContent());
}
5.最后实现 onSubscribe()
方法。该方法用于把一些信息写到控制台中,不使用 Subscription
对象的 request()
方法请求任何 item:
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 1: Subscription received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 1: No Items requested\n",
Thread.currentThread().getName());
}
6.现在来创建 Consumer2
类。它也以 Item
作为泛型参数实现了 Subscriber
接口。在这个例子中,用一个私有的 Subscription
属性来存储订阅对象。 onComplete()
和 onError()
与 Consumer1
中的相同:
public class Consumer2 implements Flow.Subscriber<Item> {
private Subscription subscription;
@Override
public void onComplete() {
System.out.printf("%s: Consumer 2: Completed\n",
Thread.currentThread().getName());
}
@Override
public void onError(Throwable exception) {
System.out.printf("%s: Consumer 2: Error\n",
Thread.currentThread().getName());
exception.printStackTrace(System.err);
}
7.与 Consumer1
中的 onNext()
相比,这里的 onNext()
增加了请求另一个元素的代码:
@Override
public void onNext(Item item) {
System.out.printf("%s: Consumer 2: Item received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 2: %s\n",
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s: Consumer 2: %s\n",
Thread.currentThread().getName(),
item.getContent());
subscription.request(1);
}
8.与 Consumer1
中的 onSubscribe()
方法相比,这里的 onSubscribe()
增加了请求第一个元素的代码:
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 2: Subscription received\n",
Thread.currentThread().getName());
this.subscription=subscription;
subscription.request(1);
}
9.现在来创建 Consumer3
。它也以 Item
作为泛型参数实现 Subscriber
接口。其 onComplete()
和 onError()
方法与之前的都相同:
public class Consumer3 implements Flow.Subscriber<Item> {
@Override
public void onComplete() {
System.out.printf("%s: Consumer 3: Completed\n",
Thread.currentThread().getName());
}
@Override
public void onError(Throwable exception) {
System.out.printf("%s: Consumer 3: Error\n",
Thread.currentThread().getName());
exception.printStackTrace(System.err);
}
10.虽然 Consumer3
的 onNext()
方法把 item
中的一些信息写到了控制台,但是它没有请求任何元素:
@Override
public void onNext(Item item) {
System.out.printf("%s: Consumer 3: Item received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 3: %s\n",
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s: Consumer 3: %s\n",
Thread.currentThread().getName(),
item.getContent());
}
11.在 onSubscribe()
方法里,我们请求3个 item
:
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 3: Subscription received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 3: Requested three items\n",
Thread.currentThread().getName());
subscription.request(3);
}
12.最后,实现包含 main()
方法的 Main
类。首先创建3个消费者,每个类一个:
public class Main {
public static void main(String[] args) {
Consumer1 consumer1=new Consumer1();
Consumer2 consumer2=new Consumer2();
Consumer3 consumer3=new Consumer3();
13.现在创建一个以 Item
作为泛型参数的 SubmissionPublisher
对象,然后用 subscribe()
方法添加这3个消费者:
SubmissionPublisher<Item> publisher=new SubmissionPublisher<>();
publisher.subscribe(consumer1);
publisher.subscribe(consumer2);
publisher.subscribe(consumer3);
14.现在创建10个 Item
对象,用 SubmissionPublisher
对象的 submit()
方法来发布它们。每发布一个 item
之后,等待1s:
for (int i=0; i<10; i++) {
Item item =new Item();
item.setTitle("Item "+i);
item.setContent("This is the item "+i);
publisher.submit(item);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
15.最后用 close()
方法关闭发布者:
publisher.close();
}
}