当前位置:嗨网首页>书籍在线阅读

33-案例实现

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

案例实现

根据如下步骤来实现本案例。

1.创建一个 MyPriorityTransferQueue 类,它扩展了 PriorityBlocking-Queue 类并实现了 TransferQueue 接口:

public class MyPriorityTransferQueue<E> extends 
         PriorityBlockingQueue<E> implements TransferQueue<E> {

2.声明一个 AtomicInteger 类型的 counter 私有字段来存储正在等待消费的消费者数量:

private final AtomicInteger counter;

3.声明一个 LinkedBlockingQueue 类型的 transferred 私有字段:

private final LinkedBlockingQueue<E> transferred;

4.声明一个 ReentrantLock 类型的 lock 私有字段:

private final ReentrantLock lock;

5.实现该类的构造方法来初始化其字段:

public MyPriorityTransferQueue() {
  counter=new AtomicInteger(0);
  lock=new ReentrantLock();
  transferred=new LinkedBlockingQueue<E>();
}

6.实现 tryTransfer() 方法。如果可能的话,则该方法会立即尝试将元素发送给等待的消费者。如果没有消费者在等待,则该方法返回 false

@Override
public boolean tryTransfer(E e) {
  boolean value=false;
  try {
    lock.lock();
    if (counter.get() == 0) {
      value = false;
    } else {
      put(e);
      value = true;
    }
  } finally {
    lock.unlock();
  }
  return value;
}

7.实现 transfer() 方法。如果可能,则该方法会立即尝试将元素发送给等待的消费者。如果没有消费者在等待,则该方法将该元素存储在一个特殊的队列中,以发送给第一个消费者,而该消费者会尝试获取元素并阻塞该线程,直到消费完该元素为止:

  @Override
  public void transfer(E e) throws InterruptedException {
    lock.lock();
    if (counter.get()!=0) {
      try {
        put(e);
      } finally {
        lock.unlock();
      }
    } else {
      try {
        transfered.add(e);
      } finally {
        lock.unlock();
      }
      synchronized (e) {
      e.wait();
    }
  }
}

8.实现接收3个参数的 tryTransfer() 方法:元素。如果没有消费者等待的时间,那么它用于指定等待的时间单位。如果有消费者在等待,则它会立即发送该元素。否则,它会将指定的时间转换为毫秒,并用 wait() 方法使线程进入休眠状态。当消费者接收元素时,如果线程在 wait() 方法中休眠,则需要使用 notify() 方法将其唤醒,正如稍后会看到的那样:

@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
                               throws InterruptedException {
  lock.lock();
  if (counter.get() != 0) {
    try {
      put(e);
    } finally {
      lock.unlock();
    }
    return true;
  } else {
    long newTimeout=0;
    try {
      transfered.add(e);
      newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
    } finally {
      lock.unlock();
    }
    e.wait(newTimeout);
    lock.lock();
    boolean value;
    try {
      if (transfered.contains(e)) {
        transfered.remove(e);
        value = false;
      } else {
        value = true;
      }
    } finally {
      lock.unlock();
    }
    return value;
  }
}

9.实现 hasWaitingConsumer() 方法。用计数值来计算此方法的返回值。如果计数器的值大于0,则返回 true ;否则,返回 false

@Override
public boolean hasWaitingConsumer() {
  return (counter.get()!=0);
}

10.实现 getWaitingConsumerCount() 方法。返回计数值:

@Override
public int getWaitingConsumerCount() {
  return counter.get();
}

11.实现 take() 方法。当消费者想要消费一个元素时,消费者会调用这个方法。首先,获取之前定义的锁并增加等待的消费者数量:

@Override
public E take() throws InterruptedException {
  lock.lock();
  try {
    counter.incrementAndGet();

12.如果传输队列中没有任何元素,则释放锁并尝试用 take() 元素从队列中获取元素,然后再次获取锁。如果队列中没有任何元素,则此方法将使线程进入休眠状态,直到有元素可消费为止:

E value=transfered.poll();
if (value==null) {
  lock.unlock();
  value=super.take();
  lock.lock();

13.否则,从传输队列中取出元素,并唤醒正在等待消费该元素的线程(如果有的话)。此时要考虑到正在同步从外部来到这个类的对象。必须保证该对象不会锁住程序的其他部分:

} else {
  synchronized (value) {
    value.notify();
  }
}

14.最后,给等待消费者的计数器执行减法操作并释放锁:

    counter.decrementAndGet();
  } finally {
    lock.unlock();
  }
  return value;
}

15.接下来设计一个 Event 类,并实现 Comparable<Event> 接口:

public class Event implements Comparable<Event> {

16.声明一个 String 类型的 thread 私有字段来存储创建该事件的线程名称:

private final String thread;

17.声明一个 int 类型的 priority 私有字段来存储事件的优先级:

private final int priority;

18.实现该类的构造方法以初始化其字段:

public Event(String thread, int priority){
  this.thread=thread;
  this.priority=priority;
}

19.实现一个方法来返回 thread 字段值:

public String getThread() {
  return thread;
}

20.实现一个方法来返回 priority 字段值:

public int getPriority() {
  return priority;
}

21.实现 compareTo() 方法。该方法将实际事件与收到的事件作为参数并进行比较。如果实际事件的优先级高于参数,则返回−1;如果实际事件的优先级低于参数,则返回1;如果两个事件的优先级相同,则返回0。将获得按优先级降序排列的列表。具有较高优先级的事件将首先存储在队列中:

public int compareTo(Event e) {
  return Integer.compare(e.priority, this.getPriority());
}

22.实现一个 Producer 类并实现 Runnable 接口:

public class Producer implements Runnable {

23.声明一个 MyPriorityTransferQueue<Event> 类型的 buffer 私有字段,用于存储由该生产者生成的事件:

private final MyPriorityTransferQueue<Event> buffer;

24.实现该类的构造方法以初始化其字段:

public Producer(MyPriorityTransferQueue<Event> buffer) {
  this.buffer=buffer;
}

25.实现该类的 run() 方法。使用创建时的顺序作为优先级来创建100个事件对象(最新的事件将具有最高优先级),并使用 put() 方法将它们插入到队列中:

@Override
public void run() {
  for (int i=0; i<100; i++) {
    Event event=new Event(Thread.currentThread().getName(),i);
    buffer.put(event);
  }
}

26.实现一个 Consumer 类并实现 Runnable 接口:

public class Consumer implements Runnable {

27.声明一个 MyPriorityTransferQueue 类型的 buffer 私有字段,并用它来获取此类所消费的事件:

private final MyPriorityTransferQueue<Event> buffer;

28.实现该类的构造方法以初始化其字段:

public Consumer(MyPriorityTransferQueue<Event> buffer) {
  this.buffer=buffer;
}

29.实现 run() 方法。它用 take() 方法消耗1002个事件(本例中生成的所有事件),并在控制台中打印生成该事件的线程数及优先级:

@Override
public void run() {
  for (int i=0; i<1002; i++) {
    try {
      Event value=buffer.take();
      System.out.printf("Consumer: %s: %d\n",value.getThread(),
                        value.getPriority());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

30.创建一个 Main 类和 main() 方法来实现本案例的主类:

public class Main {
  public static void main(String[] args) throws Exception {

31.创建一个 MyPriorityTransferQueue 类型的 buffer 对象:

MyPriorityTransferQueue<Event> buffer=new
                        MyPriorityTransferQueue<Event>();

32.创建一个生产者任务并启动10个线程来执行这个任务:

Producer producer=new Producer(buffer);
Thread producerThreads[]=new Thread[10];
for (int i=0; i<producerThreads.length; i++) {
  producerThreads[i]=new Thread(producer);
  producerThreads[i].start();
}

33.创建并启动消费者任务:

Consumer consumer=new Consumer(buffer);
Thread consumerThread=new Thread(consumer);
consumerThread.start();

34.在控制台打印实际的消费者数量:

System.out.printf("Main: Buffer: Consumer count: %d\n",
                  buffer.getWaitingConsumerCount());

35.用 transfer() 方法将事件传递给消费者:

Event myEvent=new Event("Core Event",0);
buffer.transfer(myEvent);
System.out.printf("Main: My Event has ben transfered.\n");

36.用 join() 方法等待生产者完成任务:

for (int i=0; i<producerThreads.length; i++) {
  try {
    producerThreads[i].join();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

37.使线程休眠1s:

TimeUnit.SECONDS.sleep(1);

38.打印实际的消费者数量:

System.out.printf("Main: Buffer: Consumer count: %d\n",
                  buffer.getWaitingConsumerCount());

39.用 transfer() 方法传递另一个事件:

myEvent=new Event("Core Event 2",0);
buffer.transfer(myEvent);

40.用 join() 方法等待消费者完成任务:

consumerThread.join();

41.打印一条表明程序结束的消息:

System.out.printf("Main: End of the program\n");