36-案例实现
案例实现
根据如下步骤实现本案例。
1.创建 Item 类,用于代表发布者发送给订阅者的信息。 Item 类有两个 String 属性( title 和 content ),以及这两个属性的 get() 和 set() 方法。这个类的源代码非常简单,故不再展示。
2.创建 Consumer1 类,用于实现 Subscriber 接口,并以 Item 作为泛型参数。我们必须要实现4个方法。首先,实现 onComplete() 方法—该方法用于简单地把信息写到控制台中:
public class Consumer1 implements Flow.Subscriber<Item> {
@Override
public void onComplete() {
System.out.printf("%s: Consumer 1: Completed\n",
Thread.currentThread().getName());
}
3.实现 onError() 方法。该方法用于简单地把错误信息写到控制台中:
@Override
public void onError(Throwable exception) {
System.out.printf("%s: Consumer 1: Error\n",
Thread.currentThread().getName());
exception.printStackTrace(System.err);
}
4.实现 onNext() 方法。该方法用于把收到的 item 写到控制台中:
@Override
public void onNext(Item item) {
System.out.printf("%s: Consumer 1: Item received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 1: %s\n",
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s: Consumer 1: %s\n",
Thread.currentThread().getName(),
item.getContent());
}
5.最后实现 onSubscribe() 方法。该方法用于把一些信息写到控制台中,不使用 Subscription 对象的 request() 方法请求任何 item:
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 1: Subscription received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 1: No Items requested\n",
Thread.currentThread().getName());
}
6.现在来创建 Consumer2 类。它也以 Item 作为泛型参数实现了 Subscriber 接口。在这个例子中,用一个私有的 Subscription 属性来存储订阅对象。 onComplete() 和 onError() 与 Consumer1 中的相同:
public class Consumer2 implements Flow.Subscriber<Item> {
private Subscription subscription;
@Override
public void onComplete() {
System.out.printf("%s: Consumer 2: Completed\n",
Thread.currentThread().getName());
}
@Override
public void onError(Throwable exception) {
System.out.printf("%s: Consumer 2: Error\n",
Thread.currentThread().getName());
exception.printStackTrace(System.err);
}
7.与 Consumer1 中的 onNext() 相比,这里的 onNext() 增加了请求另一个元素的代码:
@Override
public void onNext(Item item) {
System.out.printf("%s: Consumer 2: Item received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 2: %s\n",
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s: Consumer 2: %s\n",
Thread.currentThread().getName(),
item.getContent());
subscription.request(1);
}
8.与 Consumer1 中的 onSubscribe() 方法相比,这里的 onSubscribe() 增加了请求第一个元素的代码:
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 2: Subscription received\n",
Thread.currentThread().getName());
this.subscription=subscription;
subscription.request(1);
}
9.现在来创建 Consumer3 。它也以 Item 作为泛型参数实现 Subscriber 接口。其 onComplete() 和 onError() 方法与之前的都相同:
public class Consumer3 implements Flow.Subscriber<Item> {
@Override
public void onComplete() {
System.out.printf("%s: Consumer 3: Completed\n",
Thread.currentThread().getName());
}
@Override
public void onError(Throwable exception) {
System.out.printf("%s: Consumer 3: Error\n",
Thread.currentThread().getName());
exception.printStackTrace(System.err);
}
10.虽然 Consumer3 的 onNext() 方法把 item 中的一些信息写到了控制台,但是它没有请求任何元素:
@Override
public void onNext(Item item) {
System.out.printf("%s: Consumer 3: Item received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 3: %s\n",
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s: Consumer 3: %s\n",
Thread.currentThread().getName(),
item.getContent());
}
11.在 onSubscribe() 方法里,我们请求3个 item :
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 3: Subscription received\n",
Thread.currentThread().getName());
System.out.printf("%s: Consumer 3: Requested three items\n",
Thread.currentThread().getName());
subscription.request(3);
}
12.最后,实现包含 main() 方法的 Main 类。首先创建3个消费者,每个类一个:
public class Main {
public static void main(String[] args) {
Consumer1 consumer1=new Consumer1();
Consumer2 consumer2=new Consumer2();
Consumer3 consumer3=new Consumer3();
13.现在创建一个以 Item 作为泛型参数的 SubmissionPublisher 对象,然后用 subscribe() 方法添加这3个消费者:
SubmissionPublisher<Item> publisher=new SubmissionPublisher<>();
publisher.subscribe(consumer1);
publisher.subscribe(consumer2);
publisher.subscribe(consumer3);
14.现在创建10个 Item 对象,用 SubmissionPublisher 对象的 submit() 方法来发布它们。每发布一个 item 之后,等待1s:
for (int i=0; i<10; i++) {
Item item =new Item();
item.setTitle("Item "+i);
item.setContent("This is the item "+i);
publisher.submit(item);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
15.最后用 close() 方法关闭发布者:
publisher.close();
}
}