1.ForkjoinPool -1
2.Java:Java中的源码Fork/Join框架的并行编程基础
3.java中Fork-Join框架原理及应用
4.Java并发编程:Fork/Join框架解释
5.java的fork/join任务,你写对了吗?
6.fork/join 全面剖析,你可以不用,源码但是源码不能不懂!
ForkjoinPool -1
ForkJoinæ¯ç¨äºå¹¶è¡æ§è¡ä»»å¡çæ¡æ¶ï¼ æ¯ä¸ä¸ªæ大任å¡åå²æè¥å¹²ä¸ªå°ä»»å¡ï¼æç»æ±æ»æ¯ä¸ªå°ä»»å¡ç»æåå¾å°å¤§ä»»å¡ç»æçæ¡æ¶ãForkå°±æ¯æä¸ä¸ªå¤§ä»»å¡åå为è¥å¹²åä»»å¡å¹¶è¡çæ§è¡ï¼Joinå°±æ¯å并è¿äºåä»»å¡çæ§è¡ç»æï¼æåå¾å°è¿ä¸ªå¤§ä»»å¡çç»æãä¸é¢æ¯ä¸ä¸ªæ¯ä¸ä¸ªç®åçJoin/Fork计ç®è¿ç¨ï¼å°1âæ°åç¸å
é常è¿æ ·ä¸ªæ¨¡åï¼ä½ 们ä¼æ³å°ä»ä¹ï¼
Release Framework ï¼ å¸¸è§çå¤ç模åæ¯ä»ä¹ï¼ task pool - worker poolç模åã ä½æ¯Forkjoinpool éåäºå®å ¨ä¸åç模åã
ForkJoinPoolä¸ç§ExecutorServiceçå®ç°ï¼è¿è¡ForkJoinTaskä»»å¡ãForkJoinPoolåºå«äºå ¶å®ExecutorServiceï¼ä¸»è¦æ¯å 为å®éç¨äºä¸ç§å·¥ä½çªå(work-stealing)çæºå¶ãææ被ForkJoinPool管çç线ç¨å°è¯çªåæ交å°æ± åéçä»»å¡æ¥æ§è¡ï¼æ§è¡ä¸åå¯äº§çåä»»å¡æ交å°æ± åä¸ã
ForkJoinPoolç»´æ¤äºä¸ä¸ªWorkQueueçæ°ç»(æ°ç»é¿åº¦æ¯2çæ´æ°æ¬¡æ¹ï¼èªå¨å¢é¿)ãæ¯ä¸ªworkQueueé½æä»»å¡éå(ForkJoinTaskçæ°ç»)ï¼å¹¶ä¸ç¨baseãtopæåä»»å¡éåéå°¾åé头ãwork-stealingæºå¶å°±æ¯å·¥ä½çº¿ç¨æ¨ä¸ªæ«æä»»å¡éåï¼å¦æéåä¸ä¸ºç©ºååéå°¾çä»»å¡å¹¶æ§è¡ã示æå¾å¦ä¸
æµç¨å¾ï¼
poolå±æ§
workQueuesæ¯poolçå±æ§ï¼å®æ¯WorkQueueç±»åçæ°ç»ãexternalPushåexternalSubmitæå建çworkQueue没æowner(å³ä¸æ¯worker)ï¼ä¸ä¼è¢«æ¾å°workQueuesçå¶æ°ä½ç½®ï¼ècreateWorkerå建çworkQueueï¼å³workerï¼æownerï¼ä¸ä¼è¢«æ¾å°workQueuesçå¥æ°ä½ç½®ã
WorkQueueçå 个éè¦æååé说æå¦ä¸ï¼
è¿æ¯WorkQueueçconfigï¼é«ä½è·poolçconfigå¼ä¿æä¸è´ï¼èä½ä½åæ¯workQueueå¨workQueuesæ°ç»çä½ç½®ã
ä»workQueueså±æ§çä»ç»ä¸ï¼æ们ç¥éï¼ä¸æ¯ææworkQueueé½æworkerï¼æ²¡æworkerçworkQueueç§°ä¸ºå ¬å ±éåï¼shared queueï¼ï¼configç第ä½å°±æ¯ç¨æ¥å¤ææ¯å¦æ¯å ¬å ±éåçãå¨externalSubmitå建工ä½éåæ¶ï¼æï¼
q.config = k | SHARED_QUEUE;
å ¶ä¸qæ¯æ°å建çworkQueueï¼kå°±æ¯qå¨workQueuesæ°ç»ä¸çä½ç½®ï¼SHARED_QUEUE=1<<ï¼æ³¨æè¿éconfig没æä¿çmodeçä¿¡æ¯ã
èå¨registerWorkerä¸ï¼åæ¯è¿æ ·ç»workQueueçconfigèµå¼çï¼
w.config = i | mode;
wæ¯æ°å建çworkQueueï¼iæ¯å ¶å¨workQueuesæ°ç»ä¸çä½ç½®ï¼æ²¡æ设置SHARED_QUEUEæ è®°ä½
scanStateæ¯workQueueçå±æ§ï¼æ¯intç±»åçãscanStateçä½ä½å¯ä»¥ç¨æ¥å®ä½å½åworkerå¤äºworkQueuesæ°ç»çåªä¸ªä½ç½®ãæ¯ä¸ªworkerå¨è¢«å建æ¶ä¼å¨å ¶æé å½æ°ä¸è°ç¨poolçregisterWorkerï¼èregisterWorkerä¼ç»scanStateèµä¸ä¸ªåå§å¼ï¼è¿ä¸ªå¼æ¯å¥æ°ï¼å 为workeræ¯ç±createWorkerå建ï¼å¹¶ä¼è¢«æ¾å°WorkQueuesçå¥æ°ä½ç½®ï¼ècreateWorkerå建workeræ¶ä¼è°ç¨registerWorkerã
ç®è¨ä¹ï¼workerçscanStateåå§å¼æ¯å¥æ°ï¼éworkerçscanstateåå§å¼=INACTIVE=1<<ï¼å°äº0ï¼éworkerçworkQueueå¨externalSubmitä¸å建ï¼ã
å½æ¯æ¬¡è°ç¨signalWorkï¼ætryReleaseï¼å¤éworkeræ¶ï¼workerçé«ä½å°±ä¼å 1
å¦å¤ï¼scanState<0表示workeræªæ¿æ´»ï¼å½workerè°ç¨runtaskæ§è¡ä»»å¡æ¶ï¼scanStateä¼è¢«ç½®ä¸ºå¶æ°ï¼å³è®¾ç½®scanStateçæå³è¾¹ä¸ä½ä¸º0ã
workerä¼ç æ¶ï¼æ¯è¿æ ·åå¨ç
workerçå¤é类似è¿æ ·ï¼
å¨workerä¼ç ç4è¡ä¼ªç ä¸ï¼è®©ctlçä½ä½çå¼å为worker.scanStateï¼è¿æ ·ä¸æ¬¡å°±å¯ä»¥éè¿scanStateå¤é该workerãå¤é该workeræ¶ï¼æ该workerçpreStack设置为ctlä½ä½çå¼ï¼è¿æ ·ä¸ä¸æ¬¡å¤éçworkerå°±æ¯scanStateçäºè¯¥preStackçworkerã
è¿ééè¿preStackä¿åä¸ä¸ä¸ªworkerï¼è¿ä¸ªworkeræ¯å½åworkeræ´æ©å°å¨çå¾ ï¼æ以形æä¸ä¸ªåè¿å åºçæ ã
runStateæ¯intç±»åçå¼ï¼æ§å¶æ´ä¸ªpoolçè¿è¡ç¶æåçå½å¨æï¼æä¸é¢å 个å¼ï¼å¯ä»¥å¥½å 个å¼åæ¶åå¨ï¼ï¼
å¦ærunStateå¼ä¸º0ï¼è¡¨ç¤ºpoolå°æªåå§åã
RSLOCK表示éå®poolï¼å½æ·»å workeråpoolç»æ¢æ¶ï¼å°±è¦ä½¿ç¨RSLOCKéå®æ´ä¸ªpoolãå¦æç±äºrunState被éå®ï¼å¯¼è´å ¶ä»æä½çå¾ runState解éï¼é常ç¨waitè¿è¡çå¾ ï¼ï¼å½runState设置äºRSIGNALï¼è¡¨ç¤ºrunState解éï¼å¹¶éç¥ï¼notifyAllï¼çå¾ çæä½ã
å©ä¸4个å¼é½è·runStateçå½å¨ææå ³ï¼é½å¯ä»¥é¡¾åæä¹ï¼
å½éè¦åæ¢æ¶ï¼è®¾ç½®runStateçSTOPå¼ï¼è¡¨ç¤ºåå¤å ³éï¼è¿æ ·å ¶ä»æä½çå°è¿ä¸ªæ è®°ä½ï¼å°±ä¸ä¼ç»§ç»æä½ï¼æ¯å¦tryAddWorkerçå°STOPå°±ä¸ä¼åå建workerï¼
ètryTerminate对è¿äºçå½å¨æç¶æçå¤çåæ¯è¿æ ·çï¼
å½åtopåbaseçåå§å¼ä¸º INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2ãç¶åpushä¸ä¸ªtaskä¹åï¼top+=1ï¼ä¹å°±æ¯è¯´ï¼top对åºçä½ç½®æ¯æ²¡ætaskçï¼æè¿pushè¿æ¥çtaskå¨top-1çä½ç½®ãèbaseçä½ç½®åè½å¯¹åºå°taskï¼base对åºæå æ¾è¿éåçtaskï¼top-1对åºæåæ¾è¿éåçtaskã
qlockå¼å«ä¹ï¼1: locked,源码 < 0: terminate; else 0
å³å½qlockå¼ä½0æ¶ï¼å¯ä»¥æ£å¸¸æä½ï¼å¼=1æ¶ï¼è¡¨ç¤ºéå®
int SQMASK=0xeï¼åä»»ä½æ´æ°è·SQMASKä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ã
è¯æï¼
注æè¿éå为äºè¿å¶æ¯ ï¼å°¤å ¶æ³¨ææå³è¾¹ç¬¬ä¸ä½æ¯0ï¼ä»»ä½æ°è·æå³è¾¹ç¬¬ä¸ä½æ¯0çæ°ä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ï¼å 为ä½ä¸ä¹åï¼ç¬¬ä¸ä½å°±æ¯0ï¼æ¯å¦s=A&SQMASKï¼Aå¯ä»¥æ¯ä»»ææ´æ°ï¼ç¶åæsæäºè¿å¶è¿è¡å¤é¡¹å¼å±å¼ï¼åæs=2 n1+2 n2 â¦â¦+2^nnï¼è¿énâ¥1ï¼æ以så¯ä»¥è¢«2æ´é¤ï¼å³sæ¯å¶æ°ã
æ以ä¸ä¸ªæ°æ¯å¥æ°è¿æ¯å¶æ°ï¼çå ¶æå³è¾¹ç¬¬ä¸ä½å³å¯ã
æ们ç¥éworkQueueæexternalPushå建çåcreateWorkerå建çworkerï¼ä¸¤ç§æ¹å¼å建çworkQueueï¼å ¶æ¾ç½®å°workQueuesçä½ç½®æ¯ä¸åçï¼åè æ¾å°workQueueçå¶æ°ä½ç½®ï¼èåè åæ¾å°å¥æ°ä½ç½®ãä¸åworkQueueæ¾å°èªå·±å¨workQueuesçä½ç½®çç®æ³æç¹ä¸åã
ä¸é¢çä¸ä¸forkjoinæ¡æ¶è·åworkQueuesä¸çå¶æ°ä½ç½®çworkQueueçç®æ³ï¼
è¿æ ·å°±è½è·åworkQueuesçå¶æ°ä½ç½®çworkQueueãmä¿è¯m & r & SQMASKè¿æ´ä¸ªè¿ç®ç»æä¸ä¼è¶ åºworkQueuesçä¸æ ï¼SQMASKä¿è¯åå°çæ¯å¶æ°ä½ç½®çworkQueueãè¿éæä¸ä¸ªæ趣çç°è±¡ï¼å设0å°workQueues.length-1ä¹é´æn个å¶æ°ï¼m & r & SQMASKæ¯æ¬¡é½è½åå°å ¶ä¸ä¸ä¸ªå¶æ°ï¼èä¸è¿ç»n次åå°çå¶æ°ä¸ä¼åºç°éå¤å¼ï¼æ£åæ§é常好ãèä¸æ¯å¾ªç¯çï¼å³1å°n次ån个ä¸åå¶æ°ï¼n+1å°2nä¹æ¯ån次ä¸åå¶æ°ï¼æ¤æ¶n个å¶æ°æ¯ä¸ªé½è¢«éæ°åä¸æ¬¡ãä¸é¢åæä¸rå¼æä»ä¹ç§å¯ï¼ä¸ºä½è½ä¿è¯è¿æ ·çæ£åæ§
ThreadLocalRandomå æä¸å¸¸éPROBE_INCREMENT = 0x9eb9ï¼ä»¥åä¸ä¸ªéæçprobeGenerator =new AtomicInteger() ï¼ç¶åæ¯ä¸ªçº¿ç¨çprobe= probeGenerator.addAndGet(PROBE_INCREMENT)æ以第ä¸ä¸ªçº¿ç¨çprobeå¼æ¯0x9eb9ï¼ç¬¬äºä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9ï¼ç¬¬ä¸ä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9+0x9eb9以æ¤ç±»æ¨ï¼æ´ä¸ªå¼æ¯çº¿æ§çï¼å¯ä»¥ç¨y=kx表示ï¼å ¶ä¸k=0x9eb9ï¼x表示第å 个线ç¨ãè¿æ ·æ¯ä¸ªçº¿ç¨çprobeå¯ä»¥ä¿è¯ä¸ä¸æ ·ï¼èä¸å ·æå¾å¥½ç离æ£æ§ã
å®é ä¸ï¼å¯ä»¥ä¸ç¨0x9eb9è¿ä¸ªå¼ï¼ç¨ä»»æä¸ä¸ªå¥æ°é½æ¯å¯ä»¥çï¼æ¯å¦1ãå¦æç¨1çè¯ï¼probe+=1ï¼è¿æ ·æ¯ä¸ªçº¿ç¨çprobeå°±é½æ¯ä¸åçï¼èä¸å ·æå¾å¥½ç离æ£æ§ãä¹å°±æ¯è¯´ï¼å设æéå¶æ¡ä»¶probe<nï¼è¶ è¿nå产ç溢åºãåprobeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼n次åprobeæ¯æ¬¡èªå çå¼é½ä¸åãå®é ä¸ç¨ä»»æä¸ä¸ªå¥æ°ï¼é½å¯ä»¥ä¿è¯probeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼æå ´è¶£å¯çæ¬ææåéå½é¨åãç±äºå¥æ°ç离æ£æ§ï¼æ以åªè¦çº¿ç¨æ°å°äºmæè SQMASK两è ä¸çæå°å¼ï¼åæ¯ä¸ªçº¿ç¨é½è½å¯ä¸å°å æ®ä¸ä¸ªwsä¸çä¸ä¸ªä½ç½®
å½ä¸ä¸ªæä½æ¯å¨éForkjoinThreadç线ç¨ä¸è¿è¡çï¼å称该æä½ä¸ºå¤é¨æä½ãæ¯å¦æ们åé¢æ§è¡pool.invokeï¼invokeå åæ§è¡externalPushãç±äºinvokeæ¯å¨éForkjoinThread线ç¨ä¸è¿è¡çï¼è¿éæ¯å¨main线ç¨ä¸è¿è¡ï¼ï¼æ以æ¯ä¸ä¸ªå¤é¨æä½ï¼è°ç¨çæ¯externalPushãä¹åtaskçæ§è¡æ¯éè¿ForkJoinThreadæ¥æ§è¡çï¼æ以taskä¸çforkå°±æ¯å é¨æä½ï¼è°ç¨çæ¯pushï¼æä»»å¡æ交å°å·¥ä½éåãå ¶å®forkçå®ç°æ¯ç±»ä¼¼ä¸é¢è¿æ ·çï¼
å³forkä¼æ ¹æ®æ§è¡èªèº«ç线ç¨æ¯å¦æ¯ForkJoinThreadçå®ä¾æ¥å¤ææ¯å¤äºå¤é¨è¿æ¯å é¨ãé£ä¸ºä½è¦åºåå å¤é¨ï¼
ä»»ä½çº¿ç¨é½å¯ä»¥ä½¿ç¨ForkJoinæ¡æ¶ï¼ä½æ¯å¯¹äºéForkJoinThreadç线ç¨ï¼å®å°åºæ¯ææ ·çï¼ForkJoinæ æ³æ§å¶ï¼ä¹æ æ³å¯¹å ¶ä¼åãå æ¤åºååºå å¤é¨ï¼è¿æ ·æ¹ä¾¿ForkJoinæ¡æ¶å¯¹ä»»å¡çæ§è¡è¿è¡æ§å¶åä¼å
forkJoinPool.invoke(task)æ¯æä»»å¡æ¾å ¥å·¥ä½éåï¼å¹¶çå¾ ä»»å¡æ§è¡ãæºç å¦ä¸
è¿éexternalPushè´è´£ä»»å¡æ交ï¼externalPushæºç å¦ä¸ï¼
Java:Java中的Fork/Join框架的并行编程基础
并行编程,是源码多核 CPU 技术出现后,充分利用处理资源的源码小学源码编辑兴趣班计划重要方式。它允许程序中的源码多个进程并发执行,从而极大提升性能与效率。源码Java 并发 API 中的源码 Fork/Join 框架,就是源码实现并行化算法的强大工具。本文将探索使用 Java 中的源码 Fork/Join 框架进行并行编程的概念。
并行编程的源码核心在于,使用多个处理器完成任务,源码这与多线程有相似之处。源码然而,源码它们在实质上大不相同。多线程提供了一种错觉上的并行处理,实际上是通过时间共享机制在竞争线程间分配 CPU 时间。而并行编程则意味着程序员可以并行使用多个专用 CPU,这需要优化以匹配内存速度、处理能力以及其他硬件附件,适用于多核 CPU 环境。
在并行编程中,任务是独立的,执行顺序无关紧要。vncserver源码它们可以是功能并行(每个处理器处理其部分问题)或数据并行(处理器处理其部分数据)。适合大型问题库,或问题规模太大以至于无法在合理时间内解决。这种编程方式在多处理器系统中能够快速获得结果。
Fork/Join 框架是 Java 并发 API 的一部分,包含支持并行编程的类和接口。它简化了多线程创建与使用过程,并自动化了进程间的数据分配。与多线程相比,Fork/Join 框架针对多个处理器环境优化,采用递归分治策略实现并行处理。
该框架包含四个核心类:ForkJoinTask、ForkJoinPool、RecursiveAction 和 RecursiveTask。ForkJoinTask 是抽象任务类,用于定义并行任务,ForkJoinPool 是任务执行的公共池,RecursiveAction 和 RecursiveTask 分别用于创建不返回结果或具有结果的任务。
Fork/Join 框架采用递归分治策略,将任务拆分至更小部分,直至每个单元问题可由多核处理器并行执行。这种方式与非并行环境下的顺序处理形成鲜明对比,显著提升效率。然而,Bigdicmal 源码并非所有问题都适合并行处理,但许多数据数组、集合和分组问题通常与并行编程策略兼容。
综上所述,Fork/Join 框架在 Java 中提供了实现并行编程的强大支持。正确使用并行编程技术,可以有效提升程序性能,但在实际应用中需要考虑负载平衡、任务通信等复杂因素。正确选择并行编程策略与 API,可以实现最佳性能。
java中Fork-Join框架原理及应用
在处理大数据量任务时,使用Java中的Fork-Join框架能大幅提升效率。
一、使用场景
当面对大规模任务,如对大量元素数组进行排序或者需要大量资源同步执行的复杂操作,Fork-Join框架能够将任务拆分成较小部分,并行处理,最后整合结果。以数组排序为例,任务被分解为多个较小的排序任务,这些任务由多个线程并行执行,大幅提高了处理效率。
二、notepad 源码基本思想
Fork-Join框架基于分治算法原理。它将大规模任务递归分解为更小的子任务,子任务之间并行执行,最后将结果合并,实现快速有效解决大型任务。
三、工作逻辑
每个工作线程内部维护双端队列存储任务。任务通过fork产生并加入队尾,线程在处理本队列同时尝试窃取其他线程任务。此过程确保任务被动态分配给工作线程,且通过并发执行提高效率。
ForkJoin包含三个关键方法:fork(启动新线程执行任务),join(等待子任务完成),compute(拆解和执行任务)。通过这三种操作,ForkJoin框架实现高效并行任务执行。
代码实现上通常包括QTask.java模板,展示了任务执行逻辑。
四、是否使用fork vs invokeAll
Fork-Join框架相较于仅使用fork操作,引入invokeAll方法更方便同步子任务,简化了任务执行流程。
五、ForkJoin与线程池区别
相较于通用线程池模型,swarm 源码Fork-Join框架设计更为高效和灵活。它自动管理和分配任务,无需手动初始化和关闭线程池,减轻了编码复杂度。同时,Fork-Join框架动态任务分配能力使其实现了更为智能的任务并行。
综上所述,Fork-Join框架提供了简单且高效的并行任务执行方法,尤其适用于大规模数据处理和复杂同步操作场景,其动态任务分配机制与线程池相比,提升了代码简洁性和执行效率。
Java并发编程:Fork/Join框架解释
分治算法是一种策略,将复杂问题分解为较小、相似的子问题,递归解决后合并解。步骤包括分解、解决子问题与合并。
Fork/Join框架在Java中实现分治思想,用以高效执行并行任务。传统线程池存在效率瓶颈,Fork/Join框架提供了解决方案。
ForkJoin框架的核心是ForkJoinTask抽象类,它用于定义任务。此框架主要特点包括任务的分解、并行执行与结果合并。
以查找最大数组值为例,该过程可直观展示Fork/Join框架的运用。
方法流程如下:首先使用fork方法将任务分解,然后调用join方法等待结果,invoke方法则代表fork与join的结合,即先分解后等待结果。
具体步骤:invoke方法等同于fork后调用join,join负责检查任务是否完成,若有结果立即返回,否则阻塞至任务完成。若不先执行fork直接join,则任务将无限阻塞。
java的fork/join任务,你写对了吗?
从 JDK 1.7 开始,Java引入了一种新的 Fork/Join 线程池框架,旨在将大任务拆分为多个小任务并行执行,最后汇总结果。比如计算一个大数组的和,传统的单线程循环执行效率较低。通过将数组拆分为四部分并行计算,最后汇总结果,执行效率明显提升。更进一步,如果部分仍过大,继续拆分至满足最小颗粒度后进行计算,这种反复裂变形成一系列小任务,便是 Fork/Join 的工作原理。
Fork/Join 采用分而治之的思想,将复杂任务分解为多个简单小任务,各小任务执行结果汇总后得到最终结果。这一思想在大数据领域广泛应用。接下来,让我们具体了解 Fork/Join 的用法。
以计算 个数字组成的数组并行求和为例,使用 Fork/Join 框架进行操作。结果表明,使用 Fork/Join 方式汇总计算与传统的循环方式结果一致。为了提高效率,最小任务数组最大容量设置为,Fork/Join 对数组进行三次拆分,执行过程清晰。
数组量越大时,采用 Fork/Join 方式计算,程序执行效率优势明显。Fork/Join 框架的核心类包括 ForkJoinPool 和 ForkJoinTask,它们协同工作,分解大任务并汇总结果。值得注意的是,ForkJoinPool 线程池与 ThreadPoolExecutor 线程池在实现原理上有显著区别,ForkJoinPool 允许线程创建新任务并挂起当前任务,从任务队列中选择子任务执行,以充分利用并行计算。
ForkJoinPool 是负责任务执行的线程池,构造方法提供了默认无参和使用 Executors 工具类创建两种方式。ForkJoinPool 实现了 Executor 和 ExecutorService 接口,支持通过多种方法提交任务。尽管 ForkJoinPool 和 ThreadPoolExecutor 在实现上不同,但二者均能有效提升线程并发执行性能。
ForkJoinTask 是负责任务分解和合并计算的抽象类,它实现了 Future 接口,可以直接提交到线程池。ForkJoinTask 包含 fork() 和 join() 方法,分别表示任务的分拆与合并。使用 ForkJoinTask 的三个常用子类,如 RecursiveTask,通常用于有返回值的任务计算。
综上,ForkJoinPool 提供了一种补充线程池,通过存放任务队列和并行计算,进一步提升性能。ForkJoinTask 与 ForkJoinPool 搭配使用,将大计算任务拆分成互不干扰的小任务提交给线程池计算,最后汇总结果,实现与单线程执行相同的结果。当任务量越大,Fork/Join 框架的执行效率优势越明显。然而,并非所有任务都适合使用 Fork/Join 框架,例如 IO 密集型任务。
fork/join 全面剖析,你可以不用,但是不能不懂!
fork/join框架在Java并发包中扮演着重要角色,尤其在Java 8的并行流中。本文将深入剖析其设计思路、核心角色和实现机制。
首先,fork/join的工作原理是将大任务分解成小任务,并利用多核处理。其特殊之处在于运用了work-stealing算法,通过双端队列分配任务,即使线程处理完一个任务,也能从其他未完成的任务中“窃取”以提高效率。
核心角色包括ForkJoinPool,作为任务的管理者和线程容器,负责任务的提交和workerThread的管理。ForkJoinWorkerThread则是实际执行任务的“工人”,处理队列中的任务,并通过work-stealing机制优化资源利用。WorkQueue是存放任务的双端队列,ForkJoinTask则定义了任务类型,分为有返回值和无返回值两种。
在初始化阶段,ForkJoinPool通过ForkJoinWorkerThreadFactory创建线程,任务的提交逻辑分为首次提交和任务切分后提交。首次提交会确保队列的创建和加锁,任务切分则在workerThread中进行。任务的消费则由workerThread或非workerThread线程根据任务状态进行处理。
至于任务的窃取,工作线程在run()方法中通过scan(WorkQueue, int r)函数实现,不断尝试从队列中“窃取”任务,直到找到或者遍历完所有队列。
尽管文章只是概述,深入研究fork/join的源码是理解其内在机制的关键,这将有助于在实际开发中更有效地利用并发框架。