33-案例实现
案例实现
根据如下步骤来实现本案例。
1.创建一个 MyPriorityTransferQueue
类,它扩展了 PriorityBlocking-Queue
类并实现了 TransferQueue
接口:
public class MyPriorityTransferQueue<E> extends
PriorityBlockingQueue<E> implements TransferQueue<E> {
2.声明一个 AtomicInteger
类型的 counter
私有字段来存储正在等待消费的消费者数量:
private final AtomicInteger counter;
3.声明一个 LinkedBlockingQueue
类型的 transferred
私有字段:
private final LinkedBlockingQueue<E> transferred;
4.声明一个 ReentrantLock
类型的 lock
私有字段:
private final ReentrantLock lock;
5.实现该类的构造方法来初始化其字段:
public MyPriorityTransferQueue() {
counter=new AtomicInteger(0);
lock=new ReentrantLock();
transferred=new LinkedBlockingQueue<E>();
}
6.实现 tryTransfer()
方法。如果可能的话,则该方法会立即尝试将元素发送给等待的消费者。如果没有消费者在等待,则该方法返回 false
:
@Override
public boolean tryTransfer(E e) {
boolean value=false;
try {
lock.lock();
if (counter.get() == 0) {
value = false;
} else {
put(e);
value = true;
}
} finally {
lock.unlock();
}
return value;
}
7.实现 transfer()
方法。如果可能,则该方法会立即尝试将元素发送给等待的消费者。如果没有消费者在等待,则该方法将该元素存储在一个特殊的队列中,以发送给第一个消费者,而该消费者会尝试获取元素并阻塞该线程,直到消费完该元素为止:
@Override
public void transfer(E e) throws InterruptedException {
lock.lock();
if (counter.get()!=0) {
try {
put(e);
} finally {
lock.unlock();
}
} else {
try {
transfered.add(e);
} finally {
lock.unlock();
}
synchronized (e) {
e.wait();
}
}
}
8.实现接收3个参数的 tryTransfer()
方法:元素。如果没有消费者等待的时间,那么它用于指定等待的时间单位。如果有消费者在等待,则它会立即发送该元素。否则,它会将指定的时间转换为毫秒,并用 wait()
方法使线程进入休眠状态。当消费者接收元素时,如果线程在 wait()
方法中休眠,则需要使用 notify()
方法将其唤醒,正如稍后会看到的那样:
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
lock.lock();
if (counter.get() != 0) {
try {
put(e);
} finally {
lock.unlock();
}
return true;
} else {
long newTimeout=0;
try {
transfered.add(e);
newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
} finally {
lock.unlock();
}
e.wait(newTimeout);
lock.lock();
boolean value;
try {
if (transfered.contains(e)) {
transfered.remove(e);
value = false;
} else {
value = true;
}
} finally {
lock.unlock();
}
return value;
}
}
9.实现 hasWaitingConsumer()
方法。用计数值来计算此方法的返回值。如果计数器的值大于0,则返回 true
;否则,返回 false
:
@Override
public boolean hasWaitingConsumer() {
return (counter.get()!=0);
}
10.实现 getWaitingConsumerCount()
方法。返回计数值:
@Override
public int getWaitingConsumerCount() {
return counter.get();
}
11.实现 take()
方法。当消费者想要消费一个元素时,消费者会调用这个方法。首先,获取之前定义的锁并增加等待的消费者数量:
@Override
public E take() throws InterruptedException {
lock.lock();
try {
counter.incrementAndGet();
12.如果传输队列中没有任何元素,则释放锁并尝试用 take()
元素从队列中获取元素,然后再次获取锁。如果队列中没有任何元素,则此方法将使线程进入休眠状态,直到有元素可消费为止:
E value=transfered.poll();
if (value==null) {
lock.unlock();
value=super.take();
lock.lock();
13.否则,从传输队列中取出元素,并唤醒正在等待消费该元素的线程(如果有的话)。此时要考虑到正在同步从外部来到这个类的对象。必须保证该对象不会锁住程序的其他部分:
} else {
synchronized (value) {
value.notify();
}
}
14.最后,给等待消费者的计数器执行减法操作并释放锁:
counter.decrementAndGet();
} finally {
lock.unlock();
}
return value;
}
15.接下来设计一个 Event
类,并实现 Comparable<Event>
接口:
public class Event implements Comparable<Event> {
16.声明一个 String
类型的 thread
私有字段来存储创建该事件的线程名称:
private final String thread;
17.声明一个 int
类型的 priority
私有字段来存储事件的优先级:
private final int priority;
18.实现该类的构造方法以初始化其字段:
public Event(String thread, int priority){
this.thread=thread;
this.priority=priority;
}
19.实现一个方法来返回 thread
字段值:
public String getThread() {
return thread;
}
20.实现一个方法来返回 priority
字段值:
public int getPriority() {
return priority;
}
21.实现 compareTo()
方法。该方法将实际事件与收到的事件作为参数并进行比较。如果实际事件的优先级高于参数,则返回−1;如果实际事件的优先级低于参数,则返回1;如果两个事件的优先级相同,则返回0。将获得按优先级降序排列的列表。具有较高优先级的事件将首先存储在队列中:
public int compareTo(Event e) {
return Integer.compare(e.priority, this.getPriority());
}
22.实现一个 Producer
类并实现 Runnable
接口:
public class Producer implements Runnable {
23.声明一个 MyPriorityTransferQueue<Event>
类型的 buffer
私有字段,用于存储由该生产者生成的事件:
private final MyPriorityTransferQueue<Event> buffer;
24.实现该类的构造方法以初始化其字段:
public Producer(MyPriorityTransferQueue<Event> buffer) {
this.buffer=buffer;
}
25.实现该类的 run()
方法。使用创建时的顺序作为优先级来创建100个事件对象(最新的事件将具有最高优先级),并使用 put()
方法将它们插入到队列中:
@Override
public void run() {
for (int i=0; i<100; i++) {
Event event=new Event(Thread.currentThread().getName(),i);
buffer.put(event);
}
}
26.实现一个 Consumer
类并实现 Runnable
接口:
public class Consumer implements Runnable {
27.声明一个 MyPriorityTransferQueue
类型的 buffer
私有字段,并用它来获取此类所消费的事件:
private final MyPriorityTransferQueue<Event> buffer;
28.实现该类的构造方法以初始化其字段:
public Consumer(MyPriorityTransferQueue<Event> buffer) {
this.buffer=buffer;
}
29.实现 run()
方法。它用 take()
方法消耗1002个事件(本例中生成的所有事件),并在控制台中打印生成该事件的线程数及优先级:
@Override
public void run() {
for (int i=0; i<1002; i++) {
try {
Event value=buffer.take();
System.out.printf("Consumer: %s: %d\n",value.getThread(),
value.getPriority());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
30.创建一个 Main
类和 main()
方法来实现本案例的主类:
public class Main {
public static void main(String[] args) throws Exception {
31.创建一个 MyPriorityTransferQueue
类型的 buffer
对象:
MyPriorityTransferQueue<Event> buffer=new
MyPriorityTransferQueue<Event>();
32.创建一个生产者任务并启动10个线程来执行这个任务:
Producer producer=new Producer(buffer);
Thread producerThreads[]=new Thread[10];
for (int i=0; i<producerThreads.length; i++) {
producerThreads[i]=new Thread(producer);
producerThreads[i].start();
}
33.创建并启动消费者任务:
Consumer consumer=new Consumer(buffer);
Thread consumerThread=new Thread(consumer);
consumerThread.start();
34.在控制台打印实际的消费者数量:
System.out.printf("Main: Buffer: Consumer count: %d\n",
buffer.getWaitingConsumerCount());
35.用 transfer()
方法将事件传递给消费者:
Event myEvent=new Event("Core Event",0);
buffer.transfer(myEvent);
System.out.printf("Main: My Event has ben transfered.\n");
36.用 join()
方法等待生产者完成任务:
for (int i=0; i<producerThreads.length; i++) {
try {
producerThreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
37.使线程休眠1s:
TimeUnit.SECONDS.sleep(1);
38.打印实际的消费者数量:
System.out.printf("Main: Buffer: Consumer count: %d\n",
buffer.getWaitingConsumerCount());
39.用 transfer()
方法传递另一个事件:
myEvent=new Event("Core Event 2",0);
buffer.transfer(myEvent);
40.用 join()
方法等待消费者完成任务:
consumerThread.join();
41.打印一条表明程序结束的消息:
System.out.printf("Main: End of the program\n");