news 2026/6/10 1:52:08

Java多线程(十)ForkJoinPool

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java多线程(十)ForkJoinPool

简介

Java7引入的线程池实现,适合于计算可以被递归执行的任务,并且这些任务都是是计算密集型的,不会有IO阻塞。

ForkJoinPool中有两个关键点:

  • 分治法:将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立、并且与原问题性质相同,求出子问题的解后,将这些解合并,就可以得出原问题的解,例如二分法。
  • 工作窃取:当某个线程的任务队列中没有任务时,从其他线程的任务队列中获取任务来执行,以充分利用工作线程的窃取能力,减少由于线程获取不到任务而造成的空闲浪费。在工作窃取机制中,每个线程都有自己的双端队列存储任务,线程从自身队列的尾部获取任务,此时是后进先出,如果当前线程空闲,会从其他线程的任务队列的头部获取任务,此时是先进先出,获取任务均是通过CAS操作实现的。

ForkJoinPool基于工作窃取算法,能高效利用多核处理器,提升并发性能。

入门案例

案例1:求start到end之间的和

需求:

/** * 求x到y之间的和 */publicclassSumTaskextendsRecursiveTask<BigInteger>{privatestaticfinalLongTHRESHOLD=102400000L;privatefinalIntegerstart;privatefinalIntegerend;publicSumTask(Integerstart,Integerend){this.start=start;this.end=end;}@OverrideprotectedBigIntegercompute(){if(end-start<THRESHOLD){BigIntegerresult=BigInteger.ZERO;longlen=end-start+1;longs=start;for(longi=0;i<len;i++){result=result.add(BigInteger.valueOf(s));s++;}returnresult;}else{intmid=start+(end-start)/2;SumTaskt1=newSumTask(start,mid);SumTaskt2=newSumTask(mid+1,end);invokeAll(t1,t2);BigIntegerjoin=t1.join();BigIntegerjoin1=t2.join();returnjoin.add(join1);}}}publicclassSumTaskTest{publicstaticvoidmain(String[]args){longstartTime=System.currentTimeMillis();ForkJoinPoolcommonPool=ForkJoinPool.commonPool();BigIntegerresult=commonPool.invoke(newSumTask(1,Integer.MAX_VALUE));longexecuteTime=System.currentTimeMillis()-startTime;System.out.println("结果 = "+result.toString()+" 耗时 = "+executeTime+"ms");}}
案例2:斐波那契数列

斐波那契数列,第一项是0还是1? 第一项为0和第一项为1,都符合定义,都可以正确计算。最初斐波那契在《计算之书》(1202年)中以兔子繁殖为例提出数列时,起始项为 1、1。 随着数学体系的发展,第一项为0被引入进来。通常如果没有特殊说明,第一项通常是0

importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.RecursiveTask;publicclassFibonacciTaskextendsRecursiveTask<Long>{privatestaticfinalIntegerTHRESHOLD=10;privatefinallongn;publicFibonacciTask(Longn){if(n==null){thrownewIllegalArgumentException("参数 n 不可为null");}if(n<0){thrownewIllegalArgumentException("参数 n 不可小于0");}this.n=n;}@OverrideprotectedLongcompute(){if(n<=THRESHOLD){returncomputeSequentially();}List<FibonacciTask>list=newArrayList<>();list.add(newFibonacciTask(n-1));list.add(newFibonacciTask(n-2));invokeAll(list);returnlist.get(0).join()+list.get(1).join();}/** * 计算斐波那契数列 * @return 成员变量n对应的数列值 */privateLongcomputeSequentially(){if(n<=1){returnn;}longa=0,b=1;for(longi=2;i<n+1;i++){longtmp=b;b=a+b;a=tmp;}returnb;}}importjava.util.concurrent.ForkJoinPool;publicclassFibonacciTaskTest{publicstaticvoidmain(String[]args){ForkJoinPoolcommonPool=ForkJoinPool.commonPool();longstartTime=System.currentTimeMillis();FibonacciTaskfibonacciTask=newFibonacciTask(100L);Longresult=commonPool.invoke(fibonacciTask);longexecuteTime=System.currentTimeMillis()-startTime;System.out.println(executeTime+"ms, result = "+result);}}
案例3:无返回值的异步任务,归并排序
publicclassMergeSortTask2<TextendsComparable<T>>extendsRecursiveAction{privatestaticfinalIntegerTHRESHOLD=100000;privatefinalT[]arr;privatefinalintstart;privatefinalintend;publicMergeSortTask2(T[]arr){this.arr=arr;start=0;end=arr.length-1;}privateMergeSortTask2(T[]arr,intstart,intend){this.arr=arr;this.start=start;this.end=end;}@Overrideprotectedvoidcompute(){if(end-start<=THRESHOLD){mergeSort(arr,start,end);}else{intmid=calMid(start,end);MergeSortTask2<T>task1=newMergeSortTask2<>(arr,start,mid);MergeSortTask2<T>task2=newMergeSortTask2<>(arr,mid+1,end);// 先提交task1到线程池,再在当前线程计算task2,然后再阻塞地等待task1的结果task1.fork();task2.compute();task1.join();merge(arr,start,mid,end);}}/** * 计算start和end的中间值 */privateintcalMid(intstart,intend){returnstart+(end-start)/2;}// 归并排序privatevoidmergeSort(T[]arr,intleft,intright){if(left==right){return;}intmid=calMid(left,right);mergeSort(arr,left,mid);mergeSort(arr,mid+1,right);merge(arr,left,mid,right);}@SuppressWarnings("unchecked")privatevoidmerge(T[]arr,intstart,intmid,intend){intlen=end-start+1;T[]tmpArr=(T[])newComparable[len];inti=start;intj=mid+1;intk=0;while(i<=mid&&j<=end){if(arr[i].compareTo(arr[j])<=0){tmpArr[k++]=arr[i++];}else{tmpArr[k++]=arr[j++];}}while(i<=mid){tmpArr[k++]=arr[i++];}while(j<=end){tmpArr[k++]=arr[j++];}System.arraycopy(tmpArr,0,arr,start,len);}}

测试:

@Testpublicvoidtest3(){Integer[]arr=buildIntegerArr(n);// 100000000longstartTime=System.currentTimeMillis();MergeSortTask2<Integer>integerMergeSortTask=newMergeSortTask2<>(arr);integerMergeSortTask.invoke();// 1亿大小的数组,使用ForkJoinPool排序,花费 98438ms// System.out.println("arr = " + Arrays.toString(arr));System.out.println("result = "+(System.currentTimeMillis()-startTime)+"ms");}/** * 构建待排序的数组 */privatestaticInteger[]buildIntegerArr(intlimit){longstartTime=System.currentTimeMillis();Integer[]arr=newInteger[limit];Randomrandom=newRandom();for(inti=0;i<limit;i++){arr[i]=random.nextInt(limit*10);}System.out.println("构建测试"+limit+"大小的数组,花费 "+(System.currentTimeMillis()-startTime)+"ms");returnarr;}
案例4:ForkJoinPool 状态监控
/** * 打印ForkJoinPool的状态 */publicstaticvoidprintCommonPoolStatus(ForkJoinPoolcommonPool){if(commonPool==null){return;}System.out.println("----------------------");System.out.println("并行度 = "+commonPool.getParallelism()+"\n"+"工作线程数 = "+commonPool.getPoolSize()+"\n"+"活跃的线程数 = "+commonPool.getActiveThreadCount()+"\n"+"运行中的线程数 = "+commonPool.getRunningThreadCount()+"\n"+"排队提交的数量 = "+commonPool.getQueuedSubmissionCount()+"\n"+"排队任务的数量 = "+commonPool.getQueuedTaskCount()+"\n"+"当前所有线程是否空闲中 = "+commonPool.isQuiescent()+"\n"+"偷窃任务数 = "+commonPool.getStealCount()+"\n"+"是否异步模式 = "+commonPool.getAsyncMode()+"\n"+"是否还有未执行的任务 = "+commonPool.hasQueuedSubmissions()+"\n"+"线程工厂 = "+commonPool.getFactory()+"\n"+"异常处理器 = "+commonPool.getUncaughtExceptionHandler());System.out.println("======================");}

提交任务后,另起一个线程,循环调用这个方法,打印线程池的状态

基本使用

ForkJoinPool,自带一个公共线程池,线程数是CPU核数减1,如果用户向ForkJoinPool中提交任务时,没有指定线程池,就是案例中的那种写法,就使用默认线程池。

向ForkJoinPool中提交任务时指定线程池:

ForkJoinPoolforkJoinPool=newForkJoinPool(4);forkJoinPool.execute(integerMergeSortTask);

ForkJoinPool中执行的任务,需要继承RecursiveTask或RecursiveAction,一个是有返回值的任务,一个是无返回值的任务,任务本身负责自身的拆分,直到拆分到一定阈值再执行。这里就是把单线程执行的计算任务拆分为多线程可以执行的,最后再合并它的结果。同时基于工作窃取机制,确保执行任务时所有线程都不会空闲,除非数据倾斜导致某些任务的计算量比较大,否则所有线程都可以保证一个很好的效率,好的一点是如何拆分数据取决于用户,他可以在拆分时避免数据倾斜。

向线程池中提交task,或者调用task本身的invoke方法,都可以把任务提交到线程池。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 23:46:18

No Man‘s Sky存档编辑终极教程:NomNom完全使用指南

No Mans Sky存档编辑终极教程&#xff1a;NomNom完全使用指南 【免费下载链接】NomNom NomNom is the most complete savegame editor for NMS but also shows additional information around the data youre about to change. You can also easily look up each item individu…

作者头像 李华
网站建设 2026/6/6 10:26:00

Arduino Nano核心解析:ATmega328P架构深度剖析

深入ATmega328P&#xff1a;揭开Arduino Nano的底层硬核逻辑你有没有遇到过这种情况——用delay(1)想延时1毫秒&#xff0c;结果实际停了1.05毫秒&#xff1f;或者在读取传感器时发现数据跳动剧烈&#xff0c;怀疑是ADC采样不准&#xff1f;又或者想让MCU休眠以省电&#xff0c…

作者头像 李华
网站建设 2026/6/8 3:00:20

3分钟彻底解决Windows强制Edge浏览器劫持问题

3分钟彻底解决Windows强制Edge浏览器劫持问题 【免费下载链接】EdgeDeflector A tiny helper application to force Windows 10 to use your preferred web browser instead of ignoring the setting to promote Microsoft Edge. Only runs for a microsecond when needed. 项…

作者头像 李华
网站建设 2026/6/9 19:48:51

Venera跨平台漫画阅读终极指南:一站式解决你的所有阅读需求

Venera跨平台漫画阅读终极指南&#xff1a;一站式解决你的所有阅读需求 【免费下载链接】venera A comic app 项目地址: https://gitcode.com/gh_mirrors/ve/venera 还在为不同设备间的漫画阅读体验不一致而烦恼吗&#xff1f;手机上的阅读进度无法同步到平板&#xff0…

作者头像 李华
网站建设 2026/6/9 19:48:59

ShawzinBot终极指南:免费自动化音乐演奏工具快速上手

ShawzinBot终极指南&#xff1a;免费自动化音乐演奏工具快速上手 【免费下载链接】ShawzinBot Convert a MIDI input to a series of key presses for the Shawzin 项目地址: https://gitcode.com/gh_mirrors/sh/ShawzinBot ShawzinBot是一款革命性的Warframe游戏音乐创…

作者头像 李华
网站建设 2026/6/9 19:02:10

HTML5-QRCode:高效二维码扫描解决方案的7大核心优势

HTML5-QRCode&#xff1a;高效二维码扫描解决方案的7大核心优势 【免费下载链接】html5-qrcode A cross platform HTML5 QR code reader. See end to end implementation at: https://scanapp.org 项目地址: https://gitcode.com/gh_mirrors/ht/html5-qrcode HTML5-QRCo…

作者头像 李华