当前位置:嗨网首页>书籍在线阅读

22-案例实现

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

案例实现

根据如下步骤来实现本案例。

1.创建一个 MyWorkerThread 的类来扩展 ForkJoinWorkerThread 类:

public class MyWorkerThread extends ForkJoinWorkerThread {

2.声明并创建一个 ThreadLocal 类型的 taskCounter 私有字段:

private final static ThreadLocal<Integer> taskCounter=
                                 new ThreadLocal<Integer>();

3.实现该类一个构造方法:

protected MyWorkerThread(ForkJoinPool pool) {
  super(pool);
}

4.重写 onStart() 方法。在其父类上调用该方法向控制台输出消息,并将此线程的 taskCounter 字段值设置为零:

@Override
protected void onStart() {
  super.onStart();
  System.out.printf("MyWorkerThread %d: Initializing task
                     counter.\n", getId());
  taskCounter.set(0);
}

5.重写 onTermination() 方法。在控制台中打印此线程的 taskCounter 字段值:

@Override
protected void onTermination(Throwable exception) {
  System.out.printf("MyWorkerThread %d: %d\n",
                    getId(),taskCounter.get());
  super.onTermination(exception);
}

6.实现 addTask() 方法。增加 taskCounter 字段值:

public void addTask(){
  taskCounter.set(taskCounter.get() + 1);;
}

7.创建一个 MyWorkerThreadFactory 类,实现 ForkJoinWorkerThreadFactory 接口,实现 newThread() 方法。创建并返回一个 MyWorkerThread 对象:

public class MyWorkerThreadFactory implements
               ForkJoinWorkerThreadFactory {
  @Override
  public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new MyWorkerThread(pool);
  }
}

8.创建一个 MyRecursiveTask 类,扩展 RecursiveTask 类:

public class MyRecursiveTask extends RecursiveTask<Integer> {

9.声明一个整型的 array 私有数组:

private int array[];

10.声明两个整型的 startend 私有字段:

private int start, end;

11.实现该类的构造方法来初始化字段:

public Task(int array[],int start, int end) {
  this.array=array;
  this.start=start;
  this.end=end;
}

12.实现 compute() 方法,将数组中在开始和结束位置之间的所有元素进行求和。首先,将执行任务的线程转换为 MyWorkerThread 对象,并使用 addTask() 方法递增该线程任务的计数器:

@Override
protected Integer compute() {
  Integer ret;
  MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();
  thread.addTask();

13.如果数组中的开始位置和结束位置之间的差值大于100,则计算中间位置,并分别创建两个新的 MyRecursiveTask 任务来处理第一个和第二个半部分。如果差值小于等于100,则计算开始和结束位置之间所有元素的总和:

if (end-start>100) {
  int mid=(start+end)/2;
  MyRecursiveTask task1=new MyRecursiveTask(array,start,mid);
  MyRecursiveTask task2=new MyRecursiveTask(array,mid,end);
  invokeAll(task1,task2);
  ret=addResults(task1,task2);
} else {
  int add=0;
  for (int i=start; i<end; i++) {
    add+=array[i];
  }
  ret=add;
}

14.让线程休眠10ms并返回任务结果:

  try {
    TimeUnit.MILLISECONDS.sleep(10);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return ret;
}

15.实现 addResults() 方法。将计算并返回的两个任务结果的总和作为参数:

private Integer addResults(Task task1, Task task2) {
  int value;
  try {
    value = task1.get().intValue()+task2.get().intValue();
  } catch (InterruptedException e) {
    e.printStackTrace();
    value=0;
  } catch (ExecutionException e) {
    e.printStackTrace();
    value=0;
  }

16.实现本案例的主类,创建一个 Main 类和实现一个 main() 方法:

public class Main {
  public static void main(String[] args) throws Exception {

17.创建一个 MyWorkerThreadFactory 类型的 factory 对象:

MyWorkerThreadFactory factory=new MyWorkerThreadFactory();

18.创建一个 ForkJoinPool 类型的 pool 对象。将之前创建的工厂对象传递给构造方法:

ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);

19.创建一个长度为100000的整型数组。将所有元素初始化为1:

int array[]=new int[100000];
for (int i=0; i<array.length; i++){
    array[i]=1;
}

20.创建一个新的任务对象来求和数组中的所有元素:

MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);

21.用 execute() 方法将任务发送到线程池:

pool.execute(task);

22.用 join() 方法等待任务结束:

task.join();

23.用 shutdown() 方法关闭线程池:

pool.shutdown();

24.用 awaitTermination() 方法等待执行器的完成:

pool.awaitTermination(1, TimeUnit.DAYS);

25.用 get() 方法在控制台中打印任务的结果:

System.out.printf("Main: Result: %d\n",task.get());

26.在控制台打印一条消息,指示本案例运行结束:

System.out.printf("Main: End of the program\n");