25-案例实现
案例实现
根据以下步骤实现本案例。
1.本案例将用 FileMock 类模拟文本文件。该类有两个属性: String 型的数组 content 和 int 型的属性 index ,它们分别存储文件的内容和检索模拟文件的行:
public class FileMock {
private String[] content;
private int index;
2.实现该类的构造函数,并在构造函数中用随机字符填充文件内容:
public FileMock(int size, int length){
content = new String[size];
for (int i = 0; i< size; i++){
StringBuilder buffer = new StringBuilder(length);
for (int j = 0; j < length; j++){
int randomCharacter= (int)Math.random()*255;
buffer.append((char)randomCharacter);
}
content[i] = buffer.toString();
}
index=0;
}
3.实现返回值为 boolean 类型的 hasMoreLines() 方法。如果返回值为 true ,则表示文件还有继续处理的内容;如果返回值为 false ,则表示读到了这份模拟文件的末尾:
public boolean hasMoreLines(){
return index <content.length;
}
4.实现 getLine() 方法。该方法返回文件对应行数的内容,在调用该行数后,就表示已读文件行数的计数值将自增1:
public String getLine(){
if (this.hasMoreLines()) {
System.out.println("Mock: " + (content.length-index));
return content[index++];
}
return null;
}
5.实现表示数据缓冲区的 Buffer 类,并且在生产者和消费者间共享该数据缓冲区内的数据:
public class Buffer {
6.该类包含以下几个基本属性。
- 类型为
LinkedList<String>的变量buffer,用于存放共享数据:
private final LinkedList<String> buffer;
- 存放数据缓冲区长度的
int类型变量maxSize:
private final int maxSize;
- 名为
lock的ReentrantLock类控制线程访问的代码,它可以修改数据缓冲区变量:
private final ReentrantLock lock;
- 两个
Condition类型的属性lines和space:
private final Condition lines;
private final Condition space;
- 添加
boolean标识符pendingLines——用于表示当前数据缓冲区中是否还有未读取的内容:
private boolean pendingLines;
7.在构造函数中完成上述属性的初始化工作:
public Buffer(int maxSize) {
this.maxSize = maxSize;
buffer = new LinkedList<>();
lock = new ReentrantLock();
lines = lock.newCondition();
space = lock.newCondition();
pendingLines =true;
}
8.实现参数类型为 String 的 insert() 方法,并通过该方法在数据缓冲区中插入数据。首先要获得锁的控制权,并判断当前数据缓冲区是否有可以插入数据的空闲空间。如果当前缓冲区数据已满,则通过 space 条件的 await() 方法进行等待,直到缓冲区中出现空余空间。在此之后,当有其他线程调用 space 条件的 signal() 方法或者 signalAll() 方法时,该线程将唤醒。然后,该线程将往数据缓存区中插入一行数据,并通过 lines 条件发起 signalAll() 方法唤醒其他等待读取文本内容的线程[1]。为了让代码简单一点,这里将不处理 InterruptedException 异常。在实际情况中,你可能需要处理它:
public void insert(String line) {
lock.lock();
try {
while (buffer.size() == maxSize) {
space.await();
}
buffer.offer(line);
System.out.printf("%s: Inserted Line: %d\n",
Thread.currentThread().getName(),
buffer.size());
lines.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
9.实现 get() 方法,该方法用于返回数据缓冲区中的第一个字符串。首先,需要获得锁的控制权,然后判断当前数据缓冲区中是否有可以读取的内容。如果数据缓冲区为空,则通过 lines 条件的 await() 方法等待。当有其他线程在 lines 条件上发起 signal() 和 signalAll() 方法后,该线程将唤醒。然后,在 get() 方法内部从数据缓冲区中读取第一行数据,并通过 space 条件的 signalAll() 方法唤醒其他在 space 上等待的线程。完成上述工作后,该方法将返回一个 String 类型的返回值:
public String get() {
String line = null;
lock.lock();
try {
while ((buffer.size() == 0) &&(hasPendingLines())) {
lines.await();
}
if (hasPendingLines()) {
line = buffer.poll();
System.out.printf("%s: Line Readed: %d\n",
Thread.currentThread().getName(),
buffer.size());
space.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return line;
}
10.实现可以设置 pendingLines 属性值的 setPendingLines() 方法,当生产者没有其他内容可以生产的时候,将调用该方法:
public synchronized void setPendingLines(boolean pendingLines) {
this.pendingLines = pendingLines;
}
11.实现 hasPendingLines()方 法。返回值为 true 表示当前还有可以处理的文本内容:
public synchronized boolean hasPendingLines() {
return pendingLines || buffer.size()>0;
}
12.实现 Runnable 接口的 Producer 类,该类在本例中将扮演生产者角色:
public class Producer implements Runnable {
13.在上述类中声明 FileMock 和 Buffer 两个不同类型的属性:
private FileMock mock;
private Buffer buffer;
14.在类的构造函数中完成两个属性的赋值:
public Producer (FileMock mock, Buffer buffer){
this.mock = mock;
this.buffer = buffer;
}
15.实现 run() 方法,通过该方法中读取 FileMock 里的文本并使用 insert() 方法插入数据到数据缓冲区中。读取完成后,通过 setPendingLines() 方法来标识不会输出新数据到数据缓冲区中:
@Override
public void run() {
buffer.setPendingLines(true);
while (mock.hasMoreLines()){
String line = mock.getLine();
buffer.insert(line);
}
buffer.setPendingLines(false);
}
16.实现 Runnable 接口的 Consumer 类——该类在本案例中将扮演消费者角色:
public class Consumer implements Runnable {
17.在该类中声明 Buffer 型属性,并在构造函数中完成赋值操作:
private Buffer buffer;
public Consumer (Buffer buffer) {
this.buffer = buffer;
}
18.实现 run() 方法。只要数据缓冲区中还有未读取的数据,那么该方法将不断进行读取并处理新的文本内容:
@Override
public void run() {
while (buffer.hasPendingLines()) {
String line = buffer.get();
processLine(line);
}
}
19.为了模拟处理文本的情况,程序将在辅助方法 processLine() 中休眠片刻[2]:
private void processLine(String line) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
20.实现一个主类通过类名为 Main 的主类并添加 main() 方法来:
public class Main {
public static void main(String[] args) {
21.创建类型为 FileMock 的对象实例:
FileMock mock = new FileMock(100, 10);
22.创建类型为 Buffer 的对象实例:
Buffer buffer = new Buffer(20);
23.创建 Producer 对象实例并且交由 Thread 来执行:
Producer producer = new Producer(mock, buffer);
Thread producerThread = new Thread(producer,"Producer");
24.创建 Consumer 对象实例并且交由 Thread 来执行:
Consumer consumers[] = new Consumer[3];
Thread consumersThreads[] = new Thread[3];
for (int i=0; i<3; i++){
consumers[i] = new Consumer(buffer);
consumersThreads[i] = new Thread(consumers[i],"Consumer "+i);
}
25.分别启动1个生产者线程和3个消费者线程:
producerThread.start();
for (int i = 0; i< 3; i++){
consumersThreads[i].start();
}