43-案例实现
案例实现
根据如下步骤来实现本案例。
1.实现一个 News 类。该类实现发布者发送给订阅者的元素。它将有两个私有字符型字段,它们分别为 title 和 content ,还有一个名为 date 的 Date 字段。它也将具有获取和设置这些字段值的方法。由于这个类的源代码非常简单,因此不会在这里展示它。
2.创建一个 Consumer 类并指定它实现 Subscriber 接口。它将具有两个私有字段:一个名为 subscription 的 Subscription 对象和一个名为name的 String 字段。实现类的构造方法以初始化name字段:
public class Consumer implements Subscriber<News> {
private Subscription subscription;
private String name;
public Consumer(String name) {
this.name=name;
}
3.实现 onComplete() 方法。发布者在不发送任何其他元素时应该调用此方法。在本案例中,我们只在控制台打印一条消息:
@Override
public void onComplete() {
System.out.printf("%s - %s: Consumer - Completed\n", name,
Thread.currentThread().getName());
}
4.实现 onError() 方法。程序发生错误时应由发布者调用此方法。在本案例中,我们只在控制台打印一条消息:
@Override
public void onError(Throwable exception) {
System.out.printf("%s - %s: Consumer - Error: %s\n", name,
Thread.currentThread().getName(),
exception.getMessage());
}
5.然后,实现 onNext() 。该方法接收一个 News 对象作为参数,并且发布者在向订阅者发送数据项时应该调用它。在本案例中,我们在控制台打印 News 对象的字段值,并且用 Subscription 对象的 request() 方法请求一个额外的数据项:
@Override
public void onNext(News item) {
System.out.printf("%s - %s: Consumer - News\n", name,
Thread.currentThread().getName());
System.out.printf("%s - %s: Title: %s\n", name,
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s - %s: Content: %s\n", name,
Thread.currentThread().getName(),
item.getContent());
System.out.printf("%s - %s: Date: %s\n", name,
Thread.currentThread().getName(),
item.getDate());
subscription.request(1);
}
6.最后,实现 onSubscribe() 。该方法将由发布者调用,并且将成为由发布者调用的第一个 Subscriber 方法。它在发布者和订阅者之间接收订阅。在本案例中,我们存储了 Subscription 对象,并使用 request() 方法请求处理第一个数据项:
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
System.out.printf("%s: Consumer - Subscription\n",
Thread.currentThread().getName());
}
7.实现一个 MySubscription 类并指定它实现 Subscription 接口。它将有一个名为canceled的布尔型私有字段和一个名为requested的整型私有字段:
public class MySubscription implements Subscription {
private boolean canceled=false;
private long requested=0;
8.实现 Subscription 接口提供的 cancel() 方法来取消发布者与订阅者之间的通信。在本案例中,我们设置canceled字段为 true :
@Override
public void cancel() {
canceled=true;
}
9.实现 Subscription 接口提供的 request() 方法。订阅者使用该方法向发布者请求元素。它接收订阅者请求的元素数作为参数。在本案例中,我们增加了requested的值:
@Override
public void request(long value) {
requested+=value;
}
10.实现 isCanceled() 方法以获取canceled的字段值, getRequested() 方法获取requested的字段值,并用 decreaseRequested() 来减小requested的字段值:
public boolean isCanceled() {
return canceled;
}
public long getRequested() {
return requested;
}
public void decreaseRequested() {
requested--;
}
11.实现一个 ConsumerData 类。发布者用该类来存储每个订阅者的信息。它有一个名为 consumer 的 Consumer 私有字段和一个名为 subscription 的 MySubscription 私有字段。它也会有 get() 和 set() 方法。由于这个类的源代码非常简单,因此在这里不会展示它。
12.实现一个 PublisherTask 类并实现 Runnable 接口。它有一个名为 consumer-Data 的 ConsumerData 私有字段和一个名为 news 的 News 私有字段。实现一个构造方法来初始化这两个字段:
public class PublisherTask implements Runnable {
private ConsumerData consumerData;
private News news;
public PublisherTask(ConsumerData consumerData, News news) {
this.consumerData = consumerData;
this.news = news;
}
13.实现 run() 方法。它将获得 consumerData 字段的 MySubscription 对象。如果订阅没有取消,并且它已经请求了元素(字段值大于0),则用 onNext() 方法将 News 对象发送给订阅者,然后递减所请求的字段值:
@Override
public void run() {
MySubscription subscription = consumerData.getSubscription();
if (!(subscription.isCanceled() && (subscription.getRequested()
> 0))) {
consumerData.getConsumer().onNext(news);
subscription.decreaseRequested();
}
}
}
14.然后,实现一个 MyPublisher 类并实现 Publisher 接口。它将存储 Concurrent-LinkedDeque 类型的 consumers 私有对象和名为 executor 的 ThreadPoolExecutor 对象。实现该类的构造方法以初始化两个字段:
public class MyPublisher implements Publisher<News> {
private ConcurrentLinkedDeque<ConsumerData> consumers;
private ThreadPoolExecutor executor;
public MyPublisher() {
consumers=new ConcurrentLinkedDeque<>();
executor = (ThreadPoolExecutor)Executors.newFixedThreadPool
(Runtime.getRuntime().availableProcessors());
}
15.现在,实现 subscribe() 。该方法将接收一个 Subscriber 对象,它以参数形式接收此发布者的数据项。创建 MySubscription 和 ConsumerData 对象,将 ConsumerData 存储在 ConcurrentLinkedDeque 中,并调用订阅者的 onSubscribe() 方法将 subscription 对象发送给 Subscriber 对象:
@Override
public void subscribe(Subscriber<? super News> subscriber) {
ConsumerData consumerData=new ConsumerData();
consumerData.setConsumer((Consumer)subscriber);
MySubscription subscription=new MySubscription();
consumerData.setSubscription(subscription);
subscriber.onSubscribe(subscription);
consumers.add(consumerData);
}
16.实现 publish() 方法。该方法接收一个 News 参数并将其发送给满足所给条件的订阅者。为此,我们为每个订阅者创建了一个 PublisherTask 方法,并将这些任务发送给执行器:
public void publish(News news) {
consumers.forEach( consumerData -> {
try {
executor.execute(new PublisherTask(consumerData, news));
} catch (Exception e) {
consumerData.getConsumer().onError(e);
}
});
}
17.实现 Main 类的 main() 方法。创建一个发布者和两个订阅者并将其订阅给发布者:
public class Main {
public static void main(String[] args) {
MyPublisher publisher=new MyPublisher();
Subscriber<News> consumer1, consumer2;
consumer1=new Consumer("Consumer 1");
consumer2=new Consumer("Consumer 2");
publisher.subscribe(consumer1);
publisher.subscribe(consumer2);
18.最后,创建一个 News 对象,将其发送给发布者,暂时休眠主线程,创建另一个 News 对象,并再次将其发送给发布者:
System.out.printf("Main: Start\n");
News news=new News();
news.setTitle("My first news");
news.setContent("This is the content");
news.setDate(new Date());
publisher.publish(news);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
news=new News();
news.setTitle("My second news");
news.setContent("This is the content of the second news");
news.setDate(new Date());
publisher.publish(news);
System.out.printf("Main: End\n");
}