43-案例实现
案例实现
根据如下步骤来实现本案例。
1.实现一个 News
类。该类实现发布者发送给订阅者的元素。它将有两个私有字符型字段,它们分别为 title
和 content
,还有一个名为 date
的 Date
字段。它也将具有获取和设置这些字段值的方法。由于这个类的源代码非常简单,因此不会在这里展示它。
2.创建一个 Consumer
类并指定它实现 Subscriber
接口。它将具有两个私有字段:一个名为 subscription
的 Subscription
对象和一个名为name的 String
字段。实现类的构造方法以初始化name字段:
public class Consumer implements Subscriber<News> {
private Subscription subscription;
private String name;
public Consumer(String name) {
this.name=name;
}
3.实现 onComplete()
方法。发布者在不发送任何其他元素时应该调用此方法。在本案例中,我们只在控制台打印一条消息:
@Override
public void onComplete() {
System.out.printf("%s - %s: Consumer - Completed\n", name,
Thread.currentThread().getName());
}
4.实现 onError()
方法。程序发生错误时应由发布者调用此方法。在本案例中,我们只在控制台打印一条消息:
@Override
public void onError(Throwable exception) {
System.out.printf("%s - %s: Consumer - Error: %s\n", name,
Thread.currentThread().getName(),
exception.getMessage());
}
5.然后,实现 onNext()
。该方法接收一个 News
对象作为参数,并且发布者在向订阅者发送数据项时应该调用它。在本案例中,我们在控制台打印 News
对象的字段值,并且用 Subscription
对象的 request()
方法请求一个额外的数据项:
@Override
public void onNext(News item) {
System.out.printf("%s - %s: Consumer - News\n", name,
Thread.currentThread().getName());
System.out.printf("%s - %s: Title: %s\n", name,
Thread.currentThread().getName(),
item.getTitle());
System.out.printf("%s - %s: Content: %s\n", name,
Thread.currentThread().getName(),
item.getContent());
System.out.printf("%s - %s: Date: %s\n", name,
Thread.currentThread().getName(),
item.getDate());
subscription.request(1);
}
6.最后,实现 onSubscribe()
。该方法将由发布者调用,并且将成为由发布者调用的第一个 Subscriber
方法。它在发布者和订阅者之间接收订阅。在本案例中,我们存储了 Subscription
对象,并使用 request()
方法请求处理第一个数据项:
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
System.out.printf("%s: Consumer - Subscription\n",
Thread.currentThread().getName());
}
7.实现一个 MySubscription
类并指定它实现 Subscription
接口。它将有一个名为canceled的布尔型私有字段和一个名为requested的整型私有字段:
public class MySubscription implements Subscription {
private boolean canceled=false;
private long requested=0;
8.实现 Subscription
接口提供的 cancel()
方法来取消发布者与订阅者之间的通信。在本案例中,我们设置canceled字段为 true
:
@Override
public void cancel() {
canceled=true;
}
9.实现 Subscription
接口提供的 request()
方法。订阅者使用该方法向发布者请求元素。它接收订阅者请求的元素数作为参数。在本案例中,我们增加了requested的值:
@Override
public void request(long value) {
requested+=value;
}
10.实现 isCanceled()
方法以获取canceled的字段值, getRequested()
方法获取requested的字段值,并用 decreaseRequested()
来减小requested的字段值:
public boolean isCanceled() {
return canceled;
}
public long getRequested() {
return requested;
}
public void decreaseRequested() {
requested--;
}
11.实现一个 ConsumerData
类。发布者用该类来存储每个订阅者的信息。它有一个名为 consumer
的 Consumer
私有字段和一个名为 subscription
的 MySubscription
私有字段。它也会有 get()
和 set()
方法。由于这个类的源代码非常简单,因此在这里不会展示它。
12.实现一个 PublisherTask
类并实现 Runnable
接口。它有一个名为 consumer-Data
的 ConsumerData
私有字段和一个名为 news
的 News
私有字段。实现一个构造方法来初始化这两个字段:
public class PublisherTask implements Runnable {
private ConsumerData consumerData;
private News news;
public PublisherTask(ConsumerData consumerData, News news) {
this.consumerData = consumerData;
this.news = news;
}
13.实现 run()
方法。它将获得 consumerData
字段的 MySubscription
对象。如果订阅没有取消,并且它已经请求了元素(字段值大于0),则用 onNext()
方法将 News
对象发送给订阅者,然后递减所请求的字段值:
@Override
public void run() {
MySubscription subscription = consumerData.getSubscription();
if (!(subscription.isCanceled() && (subscription.getRequested()
> 0))) {
consumerData.getConsumer().onNext(news);
subscription.decreaseRequested();
}
}
}
14.然后,实现一个 MyPublisher
类并实现 Publisher
接口。它将存储 Concurrent-LinkedDeque
类型的 consumers
私有对象和名为 executor
的 ThreadPoolExecutor
对象。实现该类的构造方法以初始化两个字段:
public class MyPublisher implements Publisher<News> {
private ConcurrentLinkedDeque<ConsumerData> consumers;
private ThreadPoolExecutor executor;
public MyPublisher() {
consumers=new ConcurrentLinkedDeque<>();
executor = (ThreadPoolExecutor)Executors.newFixedThreadPool
(Runtime.getRuntime().availableProcessors());
}
15.现在,实现 subscribe()
。该方法将接收一个 Subscriber
对象,它以参数形式接收此发布者的数据项。创建 MySubscription
和 ConsumerData
对象,将 ConsumerData
存储在 ConcurrentLinkedDeque
中,并调用订阅者的 onSubscribe()
方法将 subscription
对象发送给 Subscriber
对象:
@Override
public void subscribe(Subscriber<? super News> subscriber) {
ConsumerData consumerData=new ConsumerData();
consumerData.setConsumer((Consumer)subscriber);
MySubscription subscription=new MySubscription();
consumerData.setSubscription(subscription);
subscriber.onSubscribe(subscription);
consumers.add(consumerData);
}
16.实现 publish()
方法。该方法接收一个 News
参数并将其发送给满足所给条件的订阅者。为此,我们为每个订阅者创建了一个 PublisherTask
方法,并将这些任务发送给执行器:
public void publish(News news) {
consumers.forEach( consumerData -> {
try {
executor.execute(new PublisherTask(consumerData, news));
} catch (Exception e) {
consumerData.getConsumer().onError(e);
}
});
}
17.实现 Main
类的 main()
方法。创建一个发布者和两个订阅者并将其订阅给发布者:
public class Main {
public static void main(String[] args) {
MyPublisher publisher=new MyPublisher();
Subscriber<News> consumer1, consumer2;
consumer1=new Consumer("Consumer 1");
consumer2=new Consumer("Consumer 2");
publisher.subscribe(consumer1);
publisher.subscribe(consumer2);
18.最后,创建一个 News
对象,将其发送给发布者,暂时休眠主线程,创建另一个 News
对象,并再次将其发送给发布者:
System.out.printf("Main: Start\n");
News news=new News();
news.setTitle("My first news");
news.setContent("This is the content");
news.setDate(new Date());
publisher.publish(news);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
news=new News();
news.setTitle("My second news");
news.setContent("This is the content of the second news");
news.setDate(new Date());
publisher.publish(news);
System.out.printf("Main: End\n");
}