22-案例实现
案例实现
根据如下步骤来实现本案例。
1.创建一个 MyWorkerThread
的类来扩展 ForkJoinWorkerThread
类:
public class MyWorkerThread extends ForkJoinWorkerThread {
2.声明并创建一个 ThreadLocal
类型的 taskCounter
私有字段:
private final static ThreadLocal<Integer> taskCounter=
new ThreadLocal<Integer>();
3.实现该类一个构造方法:
protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}
4.重写 onStart()
方法。在其父类上调用该方法向控制台输出消息,并将此线程的 taskCounter
字段值设置为零:
@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread %d: Initializing task
counter.\n", getId());
taskCounter.set(0);
}
5.重写 onTermination()
方法。在控制台中打印此线程的 taskCounter
字段值:
@Override
protected void onTermination(Throwable exception) {
System.out.printf("MyWorkerThread %d: %d\n",
getId(),taskCounter.get());
super.onTermination(exception);
}
6.实现 addTask()
方法。增加 taskCounter
字段值:
public void addTask(){
taskCounter.set(taskCounter.get() + 1);;
}
7.创建一个 MyWorkerThreadFactory
类,实现 ForkJoinWorkerThreadFactory
接口,实现 newThread()
方法。创建并返回一个 MyWorkerThread
对象:
public class MyWorkerThreadFactory implements
ForkJoinWorkerThreadFactory {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyWorkerThread(pool);
}
}
8.创建一个 MyRecursiveTask
类,扩展 RecursiveTask
类:
public class MyRecursiveTask extends RecursiveTask<Integer> {
9.声明一个整型的 array
私有数组:
private int array[];
10.声明两个整型的 start
和 end
私有字段:
private int start, end;
11.实现该类的构造方法来初始化字段:
public Task(int array[],int start, int end) {
this.array=array;
this.start=start;
this.end=end;
}
12.实现 compute()
方法,将数组中在开始和结束位置之间的所有元素进行求和。首先,将执行任务的线程转换为 MyWorkerThread
对象,并使用 addTask()
方法递增该线程任务的计数器:
@Override
protected Integer compute() {
Integer ret;
MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();
thread.addTask();
13.如果数组中的开始位置和结束位置之间的差值大于100,则计算中间位置,并分别创建两个新的 MyRecursiveTask
任务来处理第一个和第二个半部分。如果差值小于等于100,则计算开始和结束位置之间所有元素的总和:
if (end-start>100) {
int mid=(start+end)/2;
MyRecursiveTask task1=new MyRecursiveTask(array,start,mid);
MyRecursiveTask task2=new MyRecursiveTask(array,mid,end);
invokeAll(task1,task2);
ret=addResults(task1,task2);
} else {
int add=0;
for (int i=start; i<end; i++) {
add+=array[i];
}
ret=add;
}
14.让线程休眠10ms并返回任务结果:
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ret;
}
15.实现 addResults()
方法。将计算并返回的两个任务结果的总和作为参数:
private Integer addResults(Task task1, Task task2) {
int value;
try {
value = task1.get().intValue()+task2.get().intValue();
} catch (InterruptedException e) {
e.printStackTrace();
value=0;
} catch (ExecutionException e) {
e.printStackTrace();
value=0;
}
16.实现本案例的主类,创建一个 Main
类和实现一个 main()
方法:
public class Main {
public static void main(String[] args) throws Exception {
17.创建一个 MyWorkerThreadFactory
类型的 factory
对象:
MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
18.创建一个 ForkJoinPool
类型的 pool
对象。将之前创建的工厂对象传递给构造方法:
ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);
19.创建一个长度为100000的整型数组。将所有元素初始化为1:
int array[]=new int[100000];
for (int i=0; i<array.length; i++){
array[i]=1;
}
20.创建一个新的任务对象来求和数组中的所有元素:
MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);
21.用 execute()
方法将任务发送到线程池:
pool.execute(task);
22.用 join()
方法等待任务结束:
task.join();
23.用 shutdown()
方法关闭线程池:
pool.shutdown();
24.用 awaitTermination()
方法等待执行器的完成:
pool.awaitTermination(1, TimeUnit.DAYS);
25.用 get()
方法在控制台中打印任务的结果:
System.out.printf("Main: Result: %d\n",task.get());
26.在控制台打印一条消息,指示本案例运行结束:
System.out.printf("Main: End of the program\n");