33-案例实现
案例实现
根据如下步骤来实现本案例。
1.创建一个 MyPriorityTransferQueue 类,它扩展了 PriorityBlocking-Queue 类并实现了 TransferQueue 接口:
public class MyPriorityTransferQueue<E> extends
PriorityBlockingQueue<E> implements TransferQueue<E> {
2.声明一个 AtomicInteger 类型的 counter 私有字段来存储正在等待消费的消费者数量:
private final AtomicInteger counter;
3.声明一个 LinkedBlockingQueue 类型的 transferred 私有字段:
private final LinkedBlockingQueue<E> transferred;
4.声明一个 ReentrantLock 类型的 lock 私有字段:
private final ReentrantLock lock;
5.实现该类的构造方法来初始化其字段:
public MyPriorityTransferQueue() {
counter=new AtomicInteger(0);
lock=new ReentrantLock();
transferred=new LinkedBlockingQueue<E>();
}
6.实现 tryTransfer() 方法。如果可能的话,则该方法会立即尝试将元素发送给等待的消费者。如果没有消费者在等待,则该方法返回 false :
@Override
public boolean tryTransfer(E e) {
boolean value=false;
try {
lock.lock();
if (counter.get() == 0) {
value = false;
} else {
put(e);
value = true;
}
} finally {
lock.unlock();
}
return value;
}
7.实现 transfer() 方法。如果可能,则该方法会立即尝试将元素发送给等待的消费者。如果没有消费者在等待,则该方法将该元素存储在一个特殊的队列中,以发送给第一个消费者,而该消费者会尝试获取元素并阻塞该线程,直到消费完该元素为止:
@Override
public void transfer(E e) throws InterruptedException {
lock.lock();
if (counter.get()!=0) {
try {
put(e);
} finally {
lock.unlock();
}
} else {
try {
transfered.add(e);
} finally {
lock.unlock();
}
synchronized (e) {
e.wait();
}
}
}
8.实现接收3个参数的 tryTransfer() 方法:元素。如果没有消费者等待的时间,那么它用于指定等待的时间单位。如果有消费者在等待,则它会立即发送该元素。否则,它会将指定的时间转换为毫秒,并用 wait() 方法使线程进入休眠状态。当消费者接收元素时,如果线程在 wait() 方法中休眠,则需要使用 notify() 方法将其唤醒,正如稍后会看到的那样:
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
lock.lock();
if (counter.get() != 0) {
try {
put(e);
} finally {
lock.unlock();
}
return true;
} else {
long newTimeout=0;
try {
transfered.add(e);
newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
} finally {
lock.unlock();
}
e.wait(newTimeout);
lock.lock();
boolean value;
try {
if (transfered.contains(e)) {
transfered.remove(e);
value = false;
} else {
value = true;
}
} finally {
lock.unlock();
}
return value;
}
}
9.实现 hasWaitingConsumer() 方法。用计数值来计算此方法的返回值。如果计数器的值大于0,则返回 true ;否则,返回 false :
@Override
public boolean hasWaitingConsumer() {
return (counter.get()!=0);
}
10.实现 getWaitingConsumerCount() 方法。返回计数值:
@Override
public int getWaitingConsumerCount() {
return counter.get();
}
11.实现 take() 方法。当消费者想要消费一个元素时,消费者会调用这个方法。首先,获取之前定义的锁并增加等待的消费者数量:
@Override
public E take() throws InterruptedException {
lock.lock();
try {
counter.incrementAndGet();
12.如果传输队列中没有任何元素,则释放锁并尝试用 take() 元素从队列中获取元素,然后再次获取锁。如果队列中没有任何元素,则此方法将使线程进入休眠状态,直到有元素可消费为止:
E value=transfered.poll();
if (value==null) {
lock.unlock();
value=super.take();
lock.lock();
13.否则,从传输队列中取出元素,并唤醒正在等待消费该元素的线程(如果有的话)。此时要考虑到正在同步从外部来到这个类的对象。必须保证该对象不会锁住程序的其他部分:
} else {
synchronized (value) {
value.notify();
}
}
14.最后,给等待消费者的计数器执行减法操作并释放锁:
counter.decrementAndGet();
} finally {
lock.unlock();
}
return value;
}
15.接下来设计一个 Event 类,并实现 Comparable<Event> 接口:
public class Event implements Comparable<Event> {
16.声明一个 String 类型的 thread 私有字段来存储创建该事件的线程名称:
private final String thread;
17.声明一个 int 类型的 priority 私有字段来存储事件的优先级:
private final int priority;
18.实现该类的构造方法以初始化其字段:
public Event(String thread, int priority){
this.thread=thread;
this.priority=priority;
}
19.实现一个方法来返回 thread 字段值:
public String getThread() {
return thread;
}
20.实现一个方法来返回 priority 字段值:
public int getPriority() {
return priority;
}
21.实现 compareTo() 方法。该方法将实际事件与收到的事件作为参数并进行比较。如果实际事件的优先级高于参数,则返回−1;如果实际事件的优先级低于参数,则返回1;如果两个事件的优先级相同,则返回0。将获得按优先级降序排列的列表。具有较高优先级的事件将首先存储在队列中:
public int compareTo(Event e) {
return Integer.compare(e.priority, this.getPriority());
}
22.实现一个 Producer 类并实现 Runnable 接口:
public class Producer implements Runnable {
23.声明一个 MyPriorityTransferQueue<Event> 类型的 buffer 私有字段,用于存储由该生产者生成的事件:
private final MyPriorityTransferQueue<Event> buffer;
24.实现该类的构造方法以初始化其字段:
public Producer(MyPriorityTransferQueue<Event> buffer) {
this.buffer=buffer;
}
25.实现该类的 run() 方法。使用创建时的顺序作为优先级来创建100个事件对象(最新的事件将具有最高优先级),并使用 put() 方法将它们插入到队列中:
@Override
public void run() {
for (int i=0; i<100; i++) {
Event event=new Event(Thread.currentThread().getName(),i);
buffer.put(event);
}
}
26.实现一个 Consumer 类并实现 Runnable 接口:
public class Consumer implements Runnable {
27.声明一个 MyPriorityTransferQueue 类型的 buffer 私有字段,并用它来获取此类所消费的事件:
private final MyPriorityTransferQueue<Event> buffer;
28.实现该类的构造方法以初始化其字段:
public Consumer(MyPriorityTransferQueue<Event> buffer) {
this.buffer=buffer;
}
29.实现 run() 方法。它用 take() 方法消耗1002个事件(本例中生成的所有事件),并在控制台中打印生成该事件的线程数及优先级:
@Override
public void run() {
for (int i=0; i<1002; i++) {
try {
Event value=buffer.take();
System.out.printf("Consumer: %s: %d\n",value.getThread(),
value.getPriority());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
30.创建一个 Main 类和 main() 方法来实现本案例的主类:
public class Main {
public static void main(String[] args) throws Exception {
31.创建一个 MyPriorityTransferQueue 类型的 buffer 对象:
MyPriorityTransferQueue<Event> buffer=new
MyPriorityTransferQueue<Event>();
32.创建一个生产者任务并启动10个线程来执行这个任务:
Producer producer=new Producer(buffer);
Thread producerThreads[]=new Thread[10];
for (int i=0; i<producerThreads.length; i++) {
producerThreads[i]=new Thread(producer);
producerThreads[i].start();
}
33.创建并启动消费者任务:
Consumer consumer=new Consumer(buffer);
Thread consumerThread=new Thread(consumer);
consumerThread.start();
34.在控制台打印实际的消费者数量:
System.out.printf("Main: Buffer: Consumer count: %d\n",
buffer.getWaitingConsumerCount());
35.用 transfer() 方法将事件传递给消费者:
Event myEvent=new Event("Core Event",0);
buffer.transfer(myEvent);
System.out.printf("Main: My Event has ben transfered.\n");
36.用 join() 方法等待生产者完成任务:
for (int i=0; i<producerThreads.length; i++) {
try {
producerThreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
37.使线程休眠1s:
TimeUnit.SECONDS.sleep(1);
38.打印实际的消费者数量:
System.out.printf("Main: Buffer: Consumer count: %d\n",
buffer.getWaitingConsumerCount());
39.用 transfer() 方法传递另一个事件:
myEvent=new Event("Core Event 2",0);
buffer.transfer(myEvent);
40.用 join() 方法等待消费者完成任务:
consumerThread.join();
41.打印一条表明程序结束的消息:
System.out.printf("Main: End of the program\n");