当前位置:嗨网首页>书籍在线阅读

43-案例实现

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

案例实现

根据如下步骤来实现本案例。

1.实现一个 News 类。该类实现发布者发送给订阅者的元素。它将有两个私有字符型字段,它们分别为 titlecontent ,还有一个名为 dateDate 字段。它也将具有获取和设置这些字段值的方法。由于这个类的源代码非常简单,因此不会在这里展示它。

2.创建一个 Consumer 类并指定它实现 Subscriber 接口。它将具有两个私有字段:一个名为 subscriptionSubscription 对象和一个名为name的 String 字段。实现类的构造方法以初始化name字段:

public class Consumer implements Subscriber<News> {
  private Subscription subscription;
  private String name;
  public Consumer(String name) {
    this.name=name;
  }

3.实现 onComplete() 方法。发布者在不发送任何其他元素时应该调用此方法。在本案例中,我们只在控制台打印一条消息:

@Override
public void onComplete() {
  System.out.printf("%s - %s: Consumer - Completed\n", name,
                    Thread.currentThread().getName());
}

4.实现 onError() 方法。程序发生错误时应由发布者调用此方法。在本案例中,我们只在控制台打印一条消息:

@Override
public void onError(Throwable exception) {
  System.out.printf("%s - %s: Consumer - Error: %s\n", name,
                    Thread.currentThread().getName(),
                    exception.getMessage());
}

5.然后,实现 onNext() 。该方法接收一个 News 对象作为参数,并且发布者在向订阅者发送数据项时应该调用它。在本案例中,我们在控制台打印 News 对象的字段值,并且用 Subscription 对象的 request() 方法请求一个额外的数据项:

@Override
public void onNext(News item) {
  System.out.printf("%s - %s: Consumer - News\n", name,
                    Thread.currentThread().getName());
  System.out.printf("%s - %s: Title: %s\n", name,
                    Thread.currentThread().getName(),
                    item.getTitle());
  System.out.printf("%s - %s: Content: %s\n", name,
                    Thread.currentThread().getName(),
                    item.getContent());
  System.out.printf("%s - %s: Date: %s\n", name,
                    Thread.currentThread().getName(),
                    item.getDate());
  subscription.request(1);
}

6.最后,实现 onSubscribe() 。该方法将由发布者调用,并且将成为由发布者调用的第一个 Subscriber 方法。它在发布者和订阅者之间接收订阅。在本案例中,我们存储了 Subscription 对象,并使用 request() 方法请求处理第一个数据项:

@Override
public void onSubscribe(Subscription subscription) {
  this.subscription = subscription;
  subscription.request(1);
  System.out.printf("%s: Consumer - Subscription\n",
                    Thread.currentThread().getName());
}

7.实现一个 MySubscription 类并指定它实现 Subscription 接口。它将有一个名为canceled的布尔型私有字段和一个名为requested的整型私有字段:

public class MySubscription implements Subscription {
  private boolean canceled=false;
  private long requested=0;

8.实现 Subscription 接口提供的 cancel() 方法来取消发布者与订阅者之间的通信。在本案例中,我们设置canceled字段为 true

@Override
public void cancel() {
  canceled=true;
}

9.实现 Subscription 接口提供的 request() 方法。订阅者使用该方法向发布者请求元素。它接收订阅者请求的元素数作为参数。在本案例中,我们增加了requested的值:

@Override
public void request(long value) {
  requested+=value;
}

10.实现 isCanceled() 方法以获取canceled的字段值, getRequested() 方法获取requested的字段值,并用 decreaseRequested() 来减小requested的字段值:

public boolean isCanceled() {
  return canceled;
}
public long getRequested() {
  return requested;
}
public void decreaseRequested() {
  requested--;
}

11.实现一个 ConsumerData 类。发布者用该类来存储每个订阅者的信息。它有一个名为 consumerConsumer 私有字段和一个名为 subscriptionMySubscription 私有字段。它也会有 get()set() 方法。由于这个类的源代码非常简单,因此在这里不会展示它。

12.实现一个 PublisherTask 类并实现 Runnable 接口。它有一个名为 consumer-DataConsumerData 私有字段和一个名为 newsNews 私有字段。实现一个构造方法来初始化这两个字段:

public class PublisherTask implements Runnable {
private ConsumerData consumerData;
private News news;
public PublisherTask(ConsumerData consumerData, News news) {
  this.consumerData = consumerData;
  this.news = news;
}

13.实现 run() 方法。它将获得 consumerData 字段的 MySubscription 对象。如果订阅没有取消,并且它已经请求了元素(字段值大于0),则用 onNext() 方法将 News 对象发送给订阅者,然后递减所请求的字段值:

  @Override
  public void run() {
    MySubscription subscription = consumerData.getSubscription();
    if (!(subscription.isCanceled() && (subscription.getRequested()
                                                       > 0))) {
      consumerData.getConsumer().onNext(news);
      subscription.decreaseRequested();
    }
  }
}

14.然后,实现一个 MyPublisher 类并实现 Publisher 接口。它将存储 Concurrent-LinkedDeque 类型的 consumers 私有对象和名为 executorThreadPoolExecutor 对象。实现该类的构造方法以初始化两个字段:

public class MyPublisher implements Publisher<News> {
  private ConcurrentLinkedDeque<ConsumerData> consumers;
  private ThreadPoolExecutor executor;
  public MyPublisher() {
    consumers=new ConcurrentLinkedDeque<>();
    executor = (ThreadPoolExecutor)Executors.newFixedThreadPool
                  (Runtime.getRuntime().availableProcessors());
  }

15.现在,实现 subscribe() 。该方法将接收一个 Subscriber 对象,它以参数形式接收此发布者的数据项。创建 MySubscriptionConsumerData 对象,将 ConsumerData 存储在 ConcurrentLinkedDeque 中,并调用订阅者的 onSubscribe() 方法将 subscription 对象发送给 Subscriber 对象:

@Override
public void subscribe(Subscriber<? super News> subscriber) {
  ConsumerData consumerData=new ConsumerData();
  consumerData.setConsumer((Consumer)subscriber);
  MySubscription subscription=new MySubscription();
  consumerData.setSubscription(subscription);
  subscriber.onSubscribe(subscription);
  consumers.add(consumerData);
}

16.实现 publish() 方法。该方法接收一个 News 参数并将其发送给满足所给条件的订阅者。为此,我们为每个订阅者创建了一个 PublisherTask 方法,并将这些任务发送给执行器:

public void publish(News news) {
  consumers.forEach( consumerData -> {
    try {
      executor.execute(new PublisherTask(consumerData, news));
    } catch (Exception e) {
      consumerData.getConsumer().onError(e);
    }
  });
}

17.实现 Main 类的 main() 方法。创建一个发布者和两个订阅者并将其订阅给发布者:

public class Main {
  public static void main(String[] args) {
    MyPublisher publisher=new MyPublisher();
    Subscriber<News> consumer1, consumer2;
    consumer1=new Consumer("Consumer 1");
    consumer2=new Consumer("Consumer 2");
    publisher.subscribe(consumer1);
    publisher.subscribe(consumer2);

18.最后,创建一个 News 对象,将其发送给发布者,暂时休眠主线程,创建另一个 News 对象,并再次将其发送给发布者:

System.out.printf("Main: Start\n");
  News news=new News();
  news.setTitle("My first news");
  news.setContent("This is the content");
  news.setDate(new Date());
  publisher.publish(news);
  try {
    TimeUnit.SECONDS.sleep(1);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  news=new News();
  news.setTitle("My second news");
  news.setContent("This is the content of the second news");
  news.setDate(new Date());
  publisher.publish(news);
  System.out.printf("Main: End\n");
}