深入理解 RxJava2:Scheduler(2)
欢迎来到深入理解 RxJava2 系列第二篇,本文基于 RxJava 2.2.0 正式版源码,码分将探讨 Scheduler 与 Worker 的码分概念及其实现原理。
Scheduler 与 Worker 在 RxJava2 中扮演着至关重要的码分角色,它们是码分线程调度的核心与基石。虽然 Scheduler 的码分影视站代理源码作用较为熟悉,但 Worker 的码分概念了解的人可能较少。为何在已有 Scheduler 的码分情况下,还要引入 Worker 的码分概念呢?让我们继续探讨。
首先,码分Scheduler 的码分核心定义是调度 Runnable,支持立即、码分延时和周期性调用。码分而 Worker 是码分任务的最小单元的载体。在 RxJava2 内部实现中,码分通常一个或多个 Worker 对应一个 ScheduledThreadPoolExecutor 对象,这里暂不深入探讨。
在 RxJava 1.x 中,Scheduler 没有 scheduleDirect/schedulePeriodicallyDirect 方法,只能先创建 Worker,再通过 Worker 来调度任务。这些方法是对 Worker 调度的简化,可以理解为创建一个只能调度一次任务的 Worker 并立即调度该任务。在 Scheduler 基类的网挣源码源码中,默认实现是直接创建 Worker 并创建对应的 Task(虽然在部分 Scheduler 的覆盖实现上并没有创建 Worker,但可以认为存在虚拟的 Worker)。
一个 Scheduler 可以创建多个 Worker,这两者是一对多的关系,而 Worker 与 Task 也是一对多的关系。Worker 的存在旨在确保两件事:统一调度 Runnable 和统一取消任务。例如,在 observeOn 操作符中,可以通过 Worker 来统一调度和取消一系列的 Runnable。
RxJava2 默认内置了多种 Scheduler 实现,适用于不同场景,这些 Scheduler 都可以在 Schedulers 类中直接获得。以下是两个常用 Scheduler 的源码分析:computation 和 io。
NewThreadWorker 在 computation、io 和 newThread 中都有涉及,下面简单了解一下这个类。NewThreadWorker 与 ScheduledThreadPoolExecutor 之间是一对一的关系,在构造函数中通过工厂方法创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor 对象并持有。
ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,支持立即、延时和周期性任务。但是jquery hide 源码注意,在 ScheduledThreadPoolExecutor 中,maximumPoolSize 参数是无效的,corePoolSize 表示最大线程数,且它的队列是无界的。这里不再深入探讨该类,否则会涉及太多内容。
有了这个类,RxJava2 在实现 Worker 时就站在了巨人的肩膀上,线程调度可以直接使用该类解决,唯一的麻烦之处就是封装一层 Disposable 的逻辑。
ComputationScheduler 是计算密集型的 Scheduler,其线程数与 CPU 核心数密切相关。当线程数远超过 CPU 核心数目时,CPU 的时间更多地损耗在了线程的上下文切换。因此,保持最大线程数与 CPU 核心数一致是比较通用的方式。
FixedSchedulerPool 可以看作是固定数量的真正 Worker 的缓存池。确定了 MAX_THREADS 后,在 ComputationScheduler 的构造函数中会创建 FixedSchedulerPool 对象,FixedSchedulerPool 内部会直接创建一个长度为 MAX_THREADS 的 PoolWorker 数组。PoolWorker 继承自 NewThreadWorker,但没有任何额外的代码。
PoolWorker 的种花游戏源码使用方法是从池子里取一个 PoolWorker 并返回。但是需要注意,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述方法暴露 PoolWorker,会出现两个问题:
为了解决上述问题,需要在 PoolWorker 外再包一层 EventLoopWorker。EventLoopWorker 是一个代理对象,它会将 Runnable 代理给 FixedSchedulerPool 中取到的 PoolWorker 来调度,并负责管理通过它创建的任务。当自身被取消时,会将创建的任务全部取消。
与 ComputationScheduler 恰恰相反,IoScheduler 的线程数是无上限的。这是因为 IO 设备的速度远低于 CPU 速度,在等待 IO 操作时,CPU 往往是闲置的。因此,应该创建更多的线程让 CPU 尽可能地利用。当然,并不是线程越多越好,线程数目膨胀到一定程度会影响 CPU 的效率,也会消耗大量的内存。在 IoScheduler 中,在线投保源码每个 Worker 在空置一段时间后就会被清除以控制线程的数目。
CachedWorkerPool 是一个变长并定期清理的 ThreadWorker 的缓存池,内部通过一个 ConcurrentLinkedQueue 维护。和 PoolWorker 类似,ThreadWorker 也是继承自 NewThreadWorker。仅仅是增加了一个 expirationTime 字段,用来标识这个 ThreadWorker 的超时时间。
在 CachedWorkerPool 初始化时,会传入 Worker 的超时时间,目前是写死的 秒。这个超时时间表示 ThreadWorker 闲置后最大存活时间(实际中不保证 秒时被回收)。
IoScheduler 中也存在一个 EventLoopWorker 类,它和 ComputationScheduler 中的作用类似。因为 CachedWorkerPool 是每隔 秒清理一次队列的,所以 ThreadWorker 的存活时间取决于入队的时机。如果一直没有被再次取出,其被实际清理的延迟在 - 秒之间。
熟悉线程的读者会发现,ComputationScheduler 与 IoScheduler 很像某些参数下的 ThreadPoolExecutor。它们对线程的控制外在表现很相似,但实际的线程执行对象不一样。这两者的对比有助于我们更深刻地理解 Scheduler 设计的内在逻辑。
Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler/Worker/Runnable 打交道。
本来计划继续基于 Scheduler 和大家一起探讨 subscribeOn 与 observeOn,但考虑到篇幅问题,这些留待下篇分享。
感谢大家的阅读,欢迎关注笔者的公众号,可以第一时间获取更新,同时欢迎留言沟通。
硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理
深入剖析JUC线程池ThreadPoolExecutor的执行核心 早有计划详尽解读ThreadPoolExecutor的源码,因事务繁忙未能及时整理。在之前的文章中,我们曾提及Doug Lea设计的Executor接口,其顶层方法execute()是线程池扩展的基础。本文将重点关注ThreadPoolExecutor#execute()的实现,结合简化示例,逐步解析。 ThreadPoolExecutor的核心功能包括固定的核心线程、额外的非核心线程、任务队列和拒绝策略。它的设计巧妙地运用了JUC同步器框架AbstractQueuedSynchronizer(AQS),以及位操作和CAS技术。以核心线程为例,设计上允许它们在任务队列满时阻塞,或者在超时后轮询,而非核心线程则在必要时创建。 创建ThreadPoolExecutor时,我们需要指定核心线程数、最大线程数、任务队列类型等。当核心线程和任务队列满载时,会尝试添加额外线程处理新任务。线程池的状态控制至关重要,通过整型变量ctl进行管理和状态转换,如RUNNING、SHUTDOWN、STOP等,状态控制机制包括工作线程上限数量的位操作。 接下来,我们深入剖析execute()方法。首先,方法会检查线程池状态和工作线程数量,确保在需要时添加新线程。这里涉及一个疑惑:为何需要二次检查?这主要是为了处理任务队列变化和线程池状态切换。任务提交流程中,addWorker()方法负责创建工作线程,其内部逻辑复杂,包含线程中断和适配器Worker的创建。 Worker内部类是线程池核心,它继承自AQS,实现Runnable接口。Worker的构造和run()方法共同确保任务的执行,同时处理线程中断和生命周期的终结。getTask()方法是工作线程获取任务的关键,它会检查任务队列状态和线程池大小,确保资源的有效利用。 线程池关闭操作通过shutdown()、shutdownNow()和awaitTermination()方法实现,它们涉及线程中断、任务队列清理和状态更新等步骤,以确保线程池的有序退出。在这些方法中,可重入锁mainLock和条件变量termination起到了关键作用,保证了线程安全。 ThreadPoolExecutor还提供了钩子方法,允许开发者在特定时刻执行自定义操作。除此之外,它还包含了监控统计、任务队列操作等实用功能,每个功能的实现都是对execute()核心逻辑的扩展和优化。 总的来说,ThreadPoolExecutor的execute()方法是整个线程池的核心,它的实现原理复杂而精细。后续将陆续分析ExecutorService和ScheduledThreadPoolExecutor的源码,深入探讨线程池的扩展和调度机制。敬请关注,期待下文的详细解析。Java原理系列ScheduledThreadPoolExecutor原理用法示例源码详解
ScheduledThreadPoolExecutor是Java中实现定时任务与周期性执行任务的高效工具。它继承自ThreadPoolExecutor类,能够提供比常规Timer类更强大的灵活性与功能,特别是在需要多个工作线程或有特殊调度需求的场景下。
该类主要功能包含但不限于提交在指定延迟后执行的任务,以及按照固定间隔周期执行的任务。它实现了ScheduledExecutorService接口,进而提供了丰富的API以实现任务的调度与管理。其中包括now()、getDelay()、compareTo()等方法,帮助开发者更精确地处理任务调度与延迟。
在实际应用中,ScheduledThreadPoolExecutor的使用案例广泛。比如,初始化一个ScheduledThreadPoolExecutor实例,设置核心线程数,从而为定时任务提供资源保障。提交延迟任务,例如在5秒后执行特定操作,并输出相关信息。此外,提交周期性任务,如每隔2秒执行一次特定操作,用于实时监控或数据更新。最后,通过调用shutdown()与shutdownNow()方法来关闭执行器并等待所有任务完成,确保系统资源的合理释放与任务的有序结束。
总的来说,ScheduledThreadPoolExecutor在处理需要精确时间控制的任务时展现出了强大的功能与灵活性,是Java开发者在实现定时与周期性任务时的首选工具。
å¸¦ä½ å¦ä¼åºåScheduledThreadPoolExecutorä¸Timer
æè¦ï¼æ¬æç®åä»ç»ä¸ScheduledThreadPoolExecutorç±»ä¸Timerç±»çåºå«ï¼ScheduledThreadPoolExecutorç±»ç¸æ¯äºTimerç±»æ¥è¯´ï¼ç©¶ç«æåªäºä¼å¿ï¼ä»¥åäºè åå«å®ç°ä»»å¡è°åº¦çç®å示ä¾ãJDK1.5å¼å§æä¾ScheduledThreadPoolExecutorç±»ï¼ScheduledThreadPoolExecutor类继æ¿ThreadPoolExecutorç±»éç¨çº¿ç¨æ± å®ç°äºä»»å¡çå¨ææ§è°åº¦åè½ãå¨JDK1.5ä¹åï¼å®ç°ä»»å¡çå¨ææ§è°åº¦ä¸»è¦ä½¿ç¨çæ¯Timerç±»åTimerTaskç±»ãæ¬æï¼å°±ç®åä»ç»ä¸ScheduledThreadPoolExecutorç±»ä¸Timerç±»çåºå«ï¼ScheduledThreadPoolExecutorç±»ç¸æ¯äºTimerç±»æ¥è¯´ï¼ç©¶ç«æåªäºä¼å¿ï¼ä»¥åäºè åå«å®ç°ä»»å¡è°åº¦çç®å示ä¾ã
äºè çåºå«çº¿ç¨è§åº¦Timeræ¯å线ç¨æ¨¡å¼ï¼å¦ææ个TimerTaskä»»å¡çæ§è¡æ¶é´æ¯è¾ä¹ ï¼ä¼å½±åå°å ¶ä»ä»»å¡çè°åº¦æ§è¡ã
ScheduledThreadPoolExecutoræ¯å¤çº¿ç¨æ¨¡å¼ï¼å¹¶ä¸éç¨çº¿ç¨æ± ï¼æ个ScheduledFutureTaskä»»å¡æ§è¡çæ¶é´æ¯è¾ä¹ ï¼ä¸ä¼å½±åå°å ¶ä»ä»»å¡çè°åº¦æ§è¡ã
ç³»ç»æ¶é´ææ度Timerè°åº¦æ¯åºäºæä½ç³»ç»çç»å¯¹æ¶é´çï¼å¯¹æä½ç³»ç»çæ¶é´ææï¼ä¸æ¦æä½ç³»ç»çæ¶é´æ¹åï¼åTimerçè°åº¦ä¸å精确ã
ScheduledThreadPoolExecutorè°åº¦æ¯åºäºç¸å¯¹æ¶é´çï¼ä¸åæä½ç³»ç»æ¶é´æ¹åçå½±åã
æ¯å¦æè·å¼å¸¸Timerä¸ä¼æè·TimerTaskæåºçå¼å¸¸ï¼å ä¸Timeråæ¯å线ç¨çãä¸æ¦æ个è°åº¦ä»»å¡åºç°å¼å¸¸ï¼åæ´ä¸ªçº¿ç¨å°±ä¼ç»æ¢ï¼å ¶ä»éè¦è°åº¦çä»»å¡ä¹ä¸åæ§è¡ã
ScheduledThreadPoolExecutoråºäºçº¿ç¨æ± æ¥å®ç°è°åº¦åè½ï¼æ个任å¡æåºå¼å¸¸åï¼å ¶ä»ä»»å¡ä»è½æ£å¸¸æ§è¡ã
ä»»å¡æ¯å¦å ·å¤ä¼å 级Timerä¸æ§è¡çTimerTaskä»»å¡æ´ä½ä¸æ²¡æä¼å 级çæ¦å¿µï¼åªæ¯æç §ç³»ç»çç»å¯¹æ¶é´æ¥æ§è¡ä»»å¡ã
ScheduledThreadPoolExecutorä¸æ§è¡çScheduledFutureTaskç±»å®ç°äºjava.lang.Comparableæ¥å£åjava.util.concurrent.Delayedæ¥å£ï¼è¿ä¹å°±è¯´æäºScheduledFutureTaskç±»ä¸å®ç°äºä¸¤ä¸ªé常éè¦çæ¹æ³ï¼ä¸ä¸ªæ¯java.lang.Comparableæ¥å£çcompareToæ¹æ³ï¼ä¸ä¸ªæ¯java.util.concurrent.Delayedæ¥å£çgetDelayæ¹æ³ãå¨ScheduledFutureTaskç±»ä¸compareToæ¹æ³å®ç°äºä»»å¡çæ¯è¾ï¼è·ç¦»ä¸æ¬¡æ§è¡çæ¶é´é´éççä»»å¡ä¼æå¨åé¢ï¼ä¹å°±æ¯è¯´ï¼è·ç¦»ä¸æ¬¡æ§è¡çæ¶é´é´éççä»»å¡çä¼å 级æ¯è¾é«ãègetDelayæ¹æ³åè½å¤è¿åè·ç¦»ä¸æ¬¡ä»»å¡æ§è¡çæ¶é´é´éã
æ¯å¦æ¯æ对任å¡æåºTimerä¸æ¯æ对任å¡çæåºã
ScheduledThreadPoolExecutorç±»ä¸å®ä¹äºä¸ä¸ªéæå é¨ç±»DelayedWorkQueueï¼DelayedWorkQueueç±»æ¬è´¨ä¸æ¯ä¸ä¸ªæåºéåï¼ä¸ºéè¦è°åº¦çæ¯ä¸ªä»»å¡æç §è·ç¦»ä¸æ¬¡æ§è¡æ¶é´é´éç大å°æ¥æåº
è½å¦è·åè¿åçç»æTimerä¸æ§è¡çTimerTaskç±»åªæ¯å®ç°äºjava.lang.Runnableæ¥å£ï¼æ æ³ä»TimerTaskä¸è·åè¿åçç»æã
ScheduledThreadPoolExecutorä¸æ§è¡çScheduledFutureTask类继æ¿äºFutureTaskç±»ï¼è½å¤éè¿Futureæ¥è·åè¿åçç»æã
éè¿ä»¥ä¸å¯¹ScheduledThreadPoolExecutorç±»åTimerç±»çåæ对æ¯ï¼ç¸ä¿¡å¨JDK1.5ä¹åï¼å°±æ²¡æ使ç¨Timeræ¥å®ç°å®æ¶ä»»å¡è°åº¦çå¿ è¦äºã
äºè ç®åç示ä¾è¿éï¼ç»åºä½¿ç¨TimeråScheduledThreadPoolExecutorå®ç°å®æ¶è°åº¦çç®å示ä¾ï¼ä¸ºäºç®ä¾¿ï¼æè¿éå°±ç´æ¥ä½¿ç¨å¿åå é¨ç±»çå½¢å¼æ¥æ交任å¡ã
Timerç±»ç®å示ä¾æºä»£ç 示ä¾å¦ä¸æ示ã
packageio.binghe.concurrent.lab;importjava.util.Timer;importjava.util.TimerTask;/***@authorbinghe*@version1.0.0*@descriptionæµè¯Timer*/publicclassTimerTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ Timertimer=newTimer();timer.scheduleAtFixedRate(newTimerTask(){ @Overridepublicvoidrun(){ System.out.println("æµè¯Timerç±»");}},,);Thread.sleep();timer.cancel();}}è¿è¡ç»æå¦ä¸æ示ã
æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»æµè¯Timerç±»ScheduledThreadPoolExecutorç±»ç®å示ä¾æºä»£ç 示ä¾å¦ä¸æ示ã
packageio.binghe.concurrent.lab;importjava.util.concurrent.*;/***@authorbinghe*@version1.0.0*@descriptionæµè¯ScheduledThreadPoolExecutor*/publicclassScheduledThreadPoolExecutorTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(3);scheduledExecutorService.scheduleAtFixedRate(newRunnable(){ @Overridepublicvoidrun(){ System.out.println("æµè¯æµè¯ScheduledThreadPoolExecutor");}},1,1,TimeUnit.SECONDS);//主线ç¨ä¼ç ç§Thread.sleep();System.out.println("æ£å¨å ³é线ç¨æ± ...");//å ³é线ç¨æ± scheduledExecutorService.shutdown();booleanisClosed;//çå¾ çº¿ç¨æ± ç»æ¢do{ isClosed=scheduledExecutorService.awaitTermination(1,TimeUnit.DAYS);System.out.println("æ£å¨çå¾ çº¿ç¨æ± ä¸çä»»å¡æ§è¡å®æ");}while(!isClosed);System.out.println("ææ线ç¨æ§è¡ç»æï¼çº¿ç¨æ± å ³é");}}è¿è¡ç»æå¦ä¸æ示ã
æµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræµè¯æµè¯ScheduledThreadPoolExecutoræ£å¨å ³é线ç¨æ± ...æµè¯æµè¯ScheduledThreadPoolExecutoræ£å¨çå¾ çº¿ç¨æ± ä¸çä»»å¡æ§è¡å®æææ线ç¨æ§è¡ç»æï¼çº¿ç¨æ± å ³é注æï¼å ³äºTimeråScheduledThreadPoolExecutorè¿æå ¶ä»ç使ç¨æ¹æ³ï¼è¿éï¼æå°±ç®åååºä»¥ä¸ä¸¤ä¸ªä½¿ç¨ç¤ºä¾ï¼æ´å¤ç使ç¨æ¹æ³å¤§å®¶å¯ä»¥èªè¡å®ç°ã
æ¬æå享èªå为äºç¤¾åºããé«å¹¶åãScheduledThreadPoolExecutorä¸Timerçåºå«åç®å示ä¾ãï¼ä½è ï¼å°æ²³ã
2024-12-26 15:46
2024-12-26 14:21
2024-12-26 14:09
2024-12-26 14:03
2024-12-26 13:59