当前位置:嗨网首页>书籍在线阅读

36-案例实现

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

案例实现

根据如下步骤实现本案例。

1.创建 Item 类,用于代表发布者发送给订阅者的信息。 Item 类有两个 String 属性( titlecontent ),以及这两个属性的 get()set() 方法。这个类的源代码非常简单,故不再展示。

2.创建 Consumer1 类,用于实现 Subscriber 接口,并以 Item 作为泛型参数。我们必须要实现4个方法。首先,实现 onComplete() 方法—该方法用于简单地把信息写到控制台中:

public class Consumer1 implements Flow.Subscriber<Item> {
  @Override
  public void onComplete() {
    System.out.printf("%s: Consumer 1: Completed\n",
                      Thread.currentThread().getName());
  }

3.实现 onError() 方法。该方法用于简单地把错误信息写到控制台中:

@Override
public void onError(Throwable exception) {
  System.out.printf("%s: Consumer 1: Error\n",
                    Thread.currentThread().getName());
  exception.printStackTrace(System.err);
}

4.实现 onNext() 方法。该方法用于把收到的 item 写到控制台中:

@Override
public void onNext(Item item) {
  System.out.printf("%s: Consumer 1: Item received\n",
                    Thread.currentThread().getName());
  System.out.printf("%s: Consumer 1: %s\n",
                    Thread.currentThread().getName(),
                    item.getTitle());
  System.out.printf("%s: Consumer 1: %s\n",
                    Thread.currentThread().getName(),
                    item.getContent());
}

5.最后实现 onSubscribe() 方法。该方法用于把一些信息写到控制台中,不使用 Subscription 对象的 request() 方法请求任何 item:

@Override
public void onSubscribe(Flow.Subscription subscription) {
  System.out.printf("%s: Consumer 1: Subscription received\n",
                    Thread.currentThread().getName());
  System.out.printf("%s: Consumer 1: No Items requested\n",
                    Thread.currentThread().getName());
}

6.现在来创建 Consumer2 类。它也以 Item 作为泛型参数实现了 Subscriber 接口。在这个例子中,用一个私有的 Subscription 属性来存储订阅对象。 onComplete()onError()Consumer1 中的相同:

public class Consumer2 implements Flow.Subscriber<Item> {
  private Subscription subscription;
  @Override
  public void onComplete() {
    System.out.printf("%s: Consumer 2: Completed\n",
                      Thread.currentThread().getName());
  }
  @Override
  public void onError(Throwable exception) {
    System.out.printf("%s: Consumer 2: Error\n",
                      Thread.currentThread().getName());
    exception.printStackTrace(System.err);
  }

7.与 Consumer1 中的 onNext() 相比,这里的 onNext() 增加了请求另一个元素的代码:

@Override
public void onNext(Item item) {
  System.out.printf("%s: Consumer 2: Item received\n",
                    Thread.currentThread().getName());
  System.out.printf("%s: Consumer 2: %s\n",
                    Thread.currentThread().getName(),
                    item.getTitle());
  System.out.printf("%s: Consumer 2: %s\n",
                    Thread.currentThread().getName(),
                    item.getContent());
  subscription.request(1);
}

8.与 Consumer1 中的 onSubscribe() 方法相比,这里的 onSubscribe() 增加了请求第一个元素的代码:

@Override
public void onSubscribe(Flow.Subscription subscription) {
  System.out.printf("%s: Consumer 2: Subscription received\n",
                    Thread.currentThread().getName());
  this.subscription=subscription;
  subscription.request(1);
}

9.现在来创建 Consumer3 。它也以 Item 作为泛型参数实现 Subscriber 接口。其 onComplete()onError() 方法与之前的都相同:

public class Consumer3 implements Flow.Subscriber<Item> {
  @Override
  public void onComplete() {
    System.out.printf("%s: Consumer 3: Completed\n",
                      Thread.currentThread().getName());
  }
  @Override
  public void onError(Throwable exception) {
    System.out.printf("%s: Consumer 3: Error\n",
                      Thread.currentThread().getName());
    exception.printStackTrace(System.err);
  }

10.虽然 Consumer3onNext() 方法把 item 中的一些信息写到了控制台,但是它没有请求任何元素:

@Override
public void onNext(Item item) {
  System.out.printf("%s: Consumer 3: Item received\n",
                    Thread.currentThread().getName());
  System.out.printf("%s: Consumer 3: %s\n",
                    Thread.currentThread().getName(),
                    item.getTitle());
  System.out.printf("%s: Consumer 3: %s\n",
                    Thread.currentThread().getName(),
                    item.getContent());
}

11.在 onSubscribe() 方法里,我们请求3个 item

@Override
public void onSubscribe(Flow.Subscription subscription) {
  System.out.printf("%s: Consumer 3: Subscription received\n",
                    Thread.currentThread().getName());
  System.out.printf("%s: Consumer 3: Requested three items\n",
                    Thread.currentThread().getName());
  subscription.request(3);
}

12.最后,实现包含 main() 方法的 Main 类。首先创建3个消费者,每个类一个:

public class Main {
  public static void main(String[] args) {
    Consumer1 consumer1=new Consumer1();
    Consumer2 consumer2=new Consumer2();
    Consumer3 consumer3=new Consumer3();

13.现在创建一个以 Item 作为泛型参数的 SubmissionPublisher 对象,然后用 subscribe() 方法添加这3个消费者:

SubmissionPublisher<Item> publisher=new SubmissionPublisher<>();
publisher.subscribe(consumer1);
publisher.subscribe(consumer2);
publisher.subscribe(consumer3);

14.现在创建10个 Item 对象,用 SubmissionPublisher 对象的 submit() 方法来发布它们。每发布一个 item 之后,等待1s:

for (int i=0; i<10; i++) {
  Item item =new Item();
  item.setTitle("Item "+i);
  item.setContent("This is the item "+i);
  publisher.submit(item);
  try {
    TimeUnit.SECONDS.sleep(1);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

15.最后用 close() 方法关闭发布者:

    publisher.close();
  }
}