皮皮网
皮皮网

【源码花卷有几个角色】【督查源码】【暴跌源码】workqueue源码实现

来源:考研论坛源码gitee 发表时间:2025-01-13 21:35:44

1.线程池中空闲的码实线程处于什么状态?
2.Java线程池拒绝策略
3.Linux内核 kthread_worker 和 kthread_work 机制
4.fork/join 全面剖析,你可以不用,码实但是码实不能不懂!
5.线程池调优之动态参数配置
6.Linux 中断( IRQ / softirq )基础:原理及内核实现

workqueue源码实现

线程池中空闲的码实线程处于什么状态?

       一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;

       源码追踪:

       for (;;) {

       ...

        workQueue.take();

       ...

       }

       public E take()...{

       ...

       while (count.get() == 0) { / /这里就是任务队列中的消息数量

       notEmpty.await();

       }

       ...

       }

       public final void await()...{

       ...

       LockSupport.park(this);

       ...

       }

       继续往下:

       public static void park(Object blocker) {

       Thread t = Thread.currentThread();

       setBlocker(t, blocker);

       U.park(false, 0L);

       setBlocker(t, null);

       }

       private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();

       //线程调用该方法,线程将一直阻塞直到超时,码实或者是码实源码花卷有几个角色中断条件出现。

       public native void park(boolean isAbsolute,码实 long time);

       上面就是java线程池中阻塞的源码追踪;

       二.对比object的wait()方法:

       @FastNative

       public final native void wait(long timeout, int nanos) throws InterruptedException;

       还有Thread的sleep() 方法:

       @FastNative

       private static native void sleep(Object lock, long millis, int nanos)throws...;

       可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep() ;

       这3个方法最终实现都是通过c&c++实现的native方法.

       三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:

       .4.3 状态转换

       Java语言定义了5种线程状态,在任意一个时间点,码实一个线程只能有且只有其中的码实一种

       状态,这5种状态分别如下。码实

       1)新建(New):创建后尚未启动的码实线程处于这种状态。

       2)运行(Runable):Runable包括了操作系统线程状态中的码实Running和Ready,也就是码实处于此

       状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。码实

       3)无限期等待(Waiting):处于这种状态的码实线程不会被分配CPU执行时间,它们要等待被

       其他线程显式地唤醒。以下方法会让线程陷入无限期的等待状态:

       ●没有设置Timeout参数的Object.wait()方法。

       ●没有设置Timeout参数的Thread.join()方法。

       ●LockSupport.park()方法。

       4)限期等待(Timed Waiting):处于这种状态的线程也不会被分配CPU执行时间,不过无

       须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程

       进入限期等待状态:

       ●Thread.sleep()方法。

       ●设置了Timeout参数的Object.wait()方法。

       ●设置了Timeout参数的Thread.join()方法。

       ●LockSupport.parkNanos()方法。

       ●LockSupport.parkUntil()方法。

       5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等

       待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状

       态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将

       进入这种状态。

       结束(Terminated):已终止线程的线程状态,线程已经结束执行。

Java线程池拒绝策略

       Java线程池在处理超过最大容量时,会采用预定义或自定义的拒绝策略。默认情况下,ThreadPoolExecutor提供了四种策略:

DiscardPolicy:任务被拒绝时不采取任何操作,直接丢弃,源码中表现为一个空的rejectedExecution方法。

AbortPolicy:拒绝时抛出RejectedExecutionException,督查源码中断执行流程,线程会捕获这个异常。

CallerRunsPolicy:由提交任务的线程直接执行被拒绝的任务,workQueue中的任务在该线程中运行。

DiscardOldestPolicy:丢弃最旧的任务后尝试重新执行当前任务,workQueue为空或线程池关闭时,当前任务会被丢弃。

       要自定义拒绝策略,可以实现RejectedExecutionHandler接口,如示例中所示,将自定义的handler传递给ThreadPoolExecutor。这样,当线程池无法执行新任务时,就会调用自定义的rejectedExecution方法来处理策略。

Linux内核 kthread_worker 和 kthread_work 机制

       探究 Linux 内核中的 kthread_worker 和 kthread_work 机制,始于我在研究最新版 Linux Spi 驱动时对这部分工作流程的深入了解。kthread_worker 和 kthread_work 实际上是内核线程管理和使用的一种方式,与 work_struct 和 workqueue_struct 机制类似。接下来,让我们从数据结构、使用方式,以及具体实现入手,对 kthread_worker 和 kthread_work 进行深入分析。

       1、数据结构

       定义 kthread_worker 和 kthread_work 的数据结构位于 include/linux/kthread.h 中。观察结构体定义,可以看出它们之间的紧密联系。

       2、使用方式

       kthread_worker 作为核心组件,理解其使用方法至关重要。首先,定义并初始化 kthread_worker。接着,为 kthread_worker 创建一个内核线程,用于处理工作。

       2.1 准备 kthread_worker

       定义 kthread_worker 并初始化它。注意,初始化完成后,需要为 kthread_worker 创建一个内核线程。

       2.2 准备 kthread_work

       定义 kthread_work 并初始化。为它指定工作函数。暴跌源码

       2.3 启动工作

       准备好了 worker 和 work 后,如有工作需要处理,将工作挂接到 worker 上。

       2.4 执行指定 worker 上的所有工作

       将指定 worker 上的所有工作全部执行。

       2.5 停止当前线程

       了解 Linux 内核源码学习资源。

       3、实现源码

       分析源码的步骤如下:

       3.1 kthread_init_worker

       初始化 kthread_worker。设置成员变量为零,并初始化工作列表。

       3.2 执行线程 kthread_worker_fn

       定义并初始化 kthread_worker 后,调用 kthread_worker_fn 函数,传入 worker 指针。代码逻辑简单,主要涉及状态设置、工作执行等。

       3.3 kthread_init_work

       清零 kthread_work 类型的工作,并初始化链表元素,最后挂接工作执行函数的指针。

       3.4 kthread_queue_work

       将初始化完成的 kthread_worker 和 kthread_work 推进执行。调用 kthread_insert_work 将工作添加至列表中,唤醒沉睡的执行线程。

       4、总结

       kthread_worker 和 kthread_work 机制为 Linux Kernel 提供了一种高效管理内核线程的手段。它们使得驱动等模块开发者能够简便地实现内核线程的使用。

fork/join 全面剖析,你可以不用,但是不能不懂!

       fork/join框架在Java并发包中扮演着重要角色,尤其在Java 8的并行流中。本文将深入剖析其设计思路、核心角色和实现机制。

       首先,fork/join的工作原理是将大任务分解成小任务,并利用多核处理。其特殊之处在于运用了work-stealing算法,通过双端队列分配任务,即使线程处理完一个任务,也能从其他未完成的任务中“窃取”以提高效率。

       核心角色包括ForkJoinPool,作为任务的管理者和线程容器,负责任务的106源码提交和workerThread的管理。ForkJoinWorkerThread则是实际执行任务的“工人”,处理队列中的任务,并通过work-stealing机制优化资源利用。WorkQueue是存放任务的双端队列,ForkJoinTask则定义了任务类型,分为有返回值和无返回值两种。

       在初始化阶段,ForkJoinPool通过ForkJoinWorkerThreadFactory创建线程,任务的提交逻辑分为首次提交和任务切分后提交。首次提交会确保队列的创建和加锁,任务切分则在workerThread中进行。任务的消费则由workerThread或非workerThread线程根据任务状态进行处理。

       至于任务的窃取,工作线程在run()方法中通过scan(WorkQueue, int r)函数实现,不断尝试从队列中“窃取”任务,直到找到或者遍历完所有队列。

       尽管文章只是概述,深入研究fork/join的源码是理解其内在机制的关键,这将有助于在实际开发中更有效地利用并发框架。

线程池调优之动态参数配置

       前言线程池的核心参数配置在网上有一大堆的文章介绍,这次结合个人理解写一篇文章记录一下,以便加深印象和后续查阅。线程池配置参数

       corePoolSize:线程池核心线程数

       maximumPoolSize:线程池最大线程数

       keepAliveTime:允许线程空闲时间(对非核心工作线程的回收)

       TimeUnit:线程空闲时间单位

       workQueue:线程队列(当核心线程数满了,新的任务就会放入这个队列中)

       threadFactory:线程工厂(用于创建工作线程,自定义线程工厂可以指定线程名称)

       handler:线程池拒绝策略(当线程队列满了且最大线程数也满了,就会执行任务拒绝策略,默认有4种)

       allowCoreThreadTimeOut:控制核心工作线程是否需要被回收

常规线程池参数配置

       -首先提问一个面试题:现有个任务,台服务器,每台机器都是4核,在任务不丢弃情况下,线程池参数该怎么配置最合理呢?

       -把这个问题拆分一下,个任务,台机器,那么每台机器就负责个任务(常规轮训负载均衡模式,不考虑其他额外情况),每台机器都是4核,那么就可以设置核心线程数和最大线程数为4,线程队列大小为即可。

       -当然也可以把核心和最大线程数设置为5(n+1)个,线程队列大小为,这样是源码面膜为了防止线程偶尔由于页缺失故障或者其他原因暂停,出多来的一个线程也能确保CPU的调度时钟周期不会被浪费,相当于备用线程。

       如果任务是CPU密集型配置:工作线程=cpu核心数+1;

       如果任务是IO密集型场景:工作线程=cpu核心数*2;

       所以上面例子中就是基于CPU密集型任务配置线程池。而且网上大部分文章描述线程池配置也是基于这两点来分析的。

       可惜理想很丰满,现实很骨感。在实际工作场景中,其实没那么容易区分线程中执行的任务是CPU密集还是IO密集,而且服务器上还会有其他应用线程抢占CPU资源,就算还有一些其他的公式计算配置线程池参数,那也是基于理想场景情况下进行配置的,所以上述配置更多的还是应用于面试中。

动态配置线程池参数

       上述中既然不能一次定义适配所有场景的线程池参数,那么如果可以根据不同业务场景动态配置线程池参数,通过人工干预介入来适配大部分场景也行的

       正好在JDK的自定义线程池ThreadPoolExecutor里,提供了动态扩展线程池核心参数的方法

       可以在运行期间的线程池使用此方法可以覆盖原来配置的值:

ThreadPoolExecutor线程池提供了5种配置参数可供动态更新:核心线程池,最大线程数,线程工厂,线程空闲时间,拒绝策略。

       这里主要讨论的是核心线程池和最大线程池两种参数配置:

/****@Author:ZRH*@Date://:*/@Slf4jpublicclassExecutorTest{ publicstaticvoidmain(String[]args)throwsException{ finalThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,3,,TimeUnit.SECONDS,newLinkedBlockingQueue<>(7),newThreadPoolExecutor.DiscardPolicy());for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}logExecutorInfo(threadPoolExecutor);threadPoolExecutor.setCorePoolSize(5);threadPoolExecutor.setMaximumPoolSize(5);logExecutorInfo(threadPoolExecutor);Thread.currentThread().join();}privatestaticvoidlogExecutorInfo(ThreadPoolExecutorexecutor){ log.info("线程池核心线程数="+executor.getCorePoolSize()+",线程池最大线程数="+executor.getMaximumPoolSize()+",线程池队列剩余任务="+executor.getQueue().size()+",线程池活跃线程数="+executor.getActiveCount()+",线程池任务完成数"+executor.getCompletedTaskCount());}}

       看执行结果:刚开始线程池里核心线程数2个、最大线程数3个、剩下7放队列。活跃的线程也只有3个。

       然后更改核心线程和最大线程数为5后,线程池里对应的核心线程数和最大线程数也增加至5个,活跃的工作线程也是5个。说明更改配置成功。

       注:更新线程池参数时,核心线程数不能超过最大线程数配置。否则配置最后不会生效。

publicstaticvoidmain(String[]args)throwsException{ finalThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,3,,TimeUnit.SECONDS,newLinkedBlockingQueue<>(7),newThreadPoolExecutor.DiscardPolicy());for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}logExecutorInfo(threadPoolExecutor);threadPoolExecutor.setCorePoolSize(5);//threadPoolExecutor.setMaximumPoolSize(5);logExecutorInfo(threadPoolExecutor);Thread.currentThread().join();}

       上图中把核心线程数更新为5,最大线程数不改动任为3。最后看执行结果,最终的活跃线程还是3个,说明配置没有生效,具体源码在ThreadPoolExecutor类的getTask()方法里,感兴趣的同学可以去看一下...

动态更新线程队列

       ThreadPoolExecutor线程池并没有动态配置线程池队列大小的方法

       想自己操作一下也是很简单的,只需要自定义实现一个队列,可以直接把LinkedBlockingQueue复制一份,并把capacity参数设定为可更改

publicstaticvoidmain(String[]args)throwsException{ finalThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,3,,TimeUnit.SECONDS,newCustomLinkedBlockingQueue<>(7),newThreadPoolExecutor.DiscardPolicy());for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}logExecutorInfo(threadPoolExecutor);threadPoolExecutor.setCorePoolSize(5);threadPoolExecutor.setMaximumPoolSize(5);CustomLinkedBlockingQueuequeue=(CustomLinkedBlockingQueue)threadPoolExecutor.getQueue();queue.setCapacity();for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}Thread.currentThread().join();}

       看结果,后续添加的任务会放入队列中,并且队列大小也超过第一次设置大小,说明配置成功

最后

       参考:Java线程池实现原理及其在美团业务中的实践

       虚心学习,共同进步-_-

Linux 中断( IRQ / softirq )基础:原理及内核实现

       中断(IRQ),尤其是软中断(softirq)的广泛用途之一是网络数据包的接收与发送,但其应用场景并非单一。本文将全面整理中断(IRQ)与软中断(softirq)的基础知识,这些内容与网络数据包处理虽无直接联系,但整理本文旨在更深入地理解网络数据包处理机制。

       什么是中断?

       CPU 通过时分复用处理多任务,其中包括硬件任务,如磁盘读写、键盘输入,以及软件任务,如网络数据包处理。CPU 在任何时刻只能执行一个任务。当某个硬件或软件任务当前未被执行,但希望CPU立即处理时,会向CPU发送中断请求——希望CPU暂停手头工作,优先服务“我”。中断以事件形式通知CPU,因此常看到“在XX条件下会触发XX中断事件”的表述。

       中断分为两类:

       管理中断的设备:Advanced Programmable Interrupt Controller(APIC)。

       硬中断的中断处理流程

       中断随时发生,处理流程如下:

       Maskable and non-maskable

       Maskable interrupts 在x_上可以通过sti/cli指令来屏蔽(关闭)和恢复:

       在屏蔽期间,这种类型的中断不会触发新的中断事件。大部分IRQ都属于这种类型。例如,网卡的收发包硬件中断。

       Non-maskable interrupts 不可屏蔽,因此属于更高优先级的类型。

       问题:执行速度与逻辑复杂性之间的矛盾

       IRQ处理器的两个特点如下:

       存在内在矛盾。

       解决方式:中断的推迟处理(deferred interrupt handling)

       传统解决方式是将中断处理分为两部分:

       这种方式称为中断的推迟处理或延后处理。现在已是一个通用术语,涵盖各种推迟执行中断处理的方式。中断分为两部分处理:

       在Linux中,有三种推迟中断(deferred interrupts):

       具体细节将在后续介绍。

       软中断与软中断子系统

       软中断是内核子系统的一部分:

       每个CPU上会初始化一个ksoftirqd内核线程,负责处理各种类型的softirq中断事件;

       使用cgroup ls或ps -ef都能看到:

       软中断事件的handler提前注册到softirq子系统,注册方式为open_softirq(softirq_id, handler)

       例如,注册网卡收发包(RX/TX)软中断处理函数:

       软中断占用了CPU的总开销:可以使用top查看,第三行倒数第二个指标是系统的软中断开销(si字段):

       Linux内核源码分析学习地址:ke.qq.com/course/...

       文章福利小编推荐自己的Linux内核源码分析交流群:点击加入整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!

       主处理

       smpboot.c类似于事件驱动的循环,会调度ksoftirqd线程执行pending的软中断。ksoftirqd内部会进一步调用到__do_softirq,

       避免软中断占用过多CPU

       软中断的潜在影响:推迟执行的部分(如softirq)可能会占用较长时间,在这段时间内,用户空间线程只能等待。反映在top中,si占比。

       不过softirq调度循环对此有所改进,通过budget机制来避免softirq占用过多CPU时间。

       硬中断-软中断调用栈

       softirq是一种推迟中断处理机制,将IRQ的大部分处理逻辑推迟在这里执行。有两条路径都会执行到softirq主处理逻辑__do_softirq():

       1、CPU调度到ksoftirqd线程时,会执行到__do_softirq();

       2、每次IRQ handler退出时:do_IRQ()->...

       do_IRQ是内核中主要的IRQ处理方式。它执行结束时,会调用exiting_irq(),这会展开成irq_exit()。后者会检查是否pending有softirq,如果有,则唤醒:

       进而会使CPU执行到__do_softirq。

       软中断触发执行的步骤

       总结,每个软中断会经过以下阶段:

       以收包软中断为例,IRQ handler并不执行NAPI,只是触发它,在内部会执行到raiseNET_RX_SOFTIRQ;真正的执行在softirq,会调用网卡的poll()方法收包。IRQ handler中会调用napi_schedule(),然后启动NAPI poll()。

       需要注意的是,虽然IRQ handler所做的工作很少,但处理这个包的softirq和IRQ在同一CPU上运行。这意味着,如果大量的包都放在同一个RX队列,虽然IRQ开销可能不多,但该CPU仍然会非常繁忙,都花费在softirq上。解决方式:RPS。它不会降低延迟,只是将包重新分配:RXQ->CPU。

       三种推迟执行方式(softirq/tasklet/workqueue)

       提到,Linux中的三种推迟中断执行方式:

       其中:

       前面已经看到,Linux在每个CPU上创建了一个ksoftirqd内核线程。

       softirqs是在Linux内核编译时确定的,例如网络收包对应的NET_RX_SOFTIRQ软中断。因此是一种静态机制。如果想添加一种新softirq类型,需要修改并重新编译内核。

       内部组织

       内部由一个数组(或称为向量)管理,每个软中断号对应一个softirq handler。数组与注册:

       在5.中所有类型的softirq:

       也就是在cat /proc/softirqs看到的哪些。

       触发(唤醒)softirq

       以收包软中断为例,IRQ handler并不执行NAPI,只是触发它,在内部会执行到raiseNET_RX_SOFTIRQ;真正的执行在softirq,会调用网卡的poll()方法收包。IRQ handler中会调用napi_schedule(),然后启动NAPI poll()。

       如果对内核源码有一定了解,会发现softirq使用非常有限,原因之一是它是静态编译的,依赖内置的ksoftirqd线程来调度内置的9种softirq。如果想添加一种新功能,就得修改并重新编译内核,开发成本很高。

       实际上,实现推迟执行的更常用方式是tasklet。它构建在softirq机制之上,具体来说就是使用了两种softirq:

       换句话说,tasklet是在运行时(runtime)创建和初始化的softirq,

       内核软中断子系统初始化了两个per-cpu变量:

       tasklet再执行针对list的循环:

       tasklet在内核中的使用非常广泛。不过,后面又出现了第三种方式:workqueue。

       这也是一种推迟执行机制,与tasklet有些相似,但有显著不同。

       使用场景

       简而言之,workqueue子系统提供了一个接口,通过该接口可以创建内核线程来处理从其他地方enqueue过来的任务。这些内核线程称为worker threads,内置的per-cpu worker threads:

       结构体

       kworker线程调度workqueues,原理与ksoftirqd线程调度softirqs类似。然而,我们可以为workqueue创建新的线程,而softirq则不行。

       参考资料引用链接

       [1]

       中断与中断处理:0xax.gitbooks.io/linux-...

       作者:赵亚楠 原文:arthurchiao.art/blog/li...来源:云原生实验室

深入理解k8s -- workqueue

       深入理解k8s -- workqueue

       在探讨k8s中的informer组件时,workqueue是一个关键角色。在前文的Controller源码分析中,workqueue的使用已经有所提及。工作队列是k8s中用于处理资源变更事件和调度任务的高效机制。它支持三种类型的队列:简单的FIFO队列、延时队列以及限速队列。

       工作队列通过一个名为Type的底层数据结构来实现,它实现了workqueue.Interface接口。Type结构体包含queue、dirty和processing三个重要字段,以及一个golang原生的条件锁cond。queue用于存储待处理的任务,dirty和processing用于管理任务的添加和完成状态。cond用于控制多个协程的同步操作。

       接下来,我们通过源码深入Type的方法实现,如Add、Get和Done。Add方法简单地将任务添加到queue、dirty和processing中。Get方法包含删除逻辑,同时会检查dirty中是否已有数据,若无,则从queue中取出任务。Done方法用于清理processing状态,确保任务正确处理并移出队列。Get和Done方法之间的配合保证了任务的正确执行和管理。

       在处理资源变更事件时,工作队列的作用尤为明显。在事件触发后,队列将资源变更事件加入到队列中,由Controller进行处理。Controller通过工作队列的Get方法获取待处理的任务,执行处理逻辑,然后调用Done方法将任务标记为完成。这种机制保证了资源变更事件能够被及时且有序地处理。

       除了基础的FIFO队列,k8s还提供了更高级的队列类型,如延时队列和限速队列。延时队列允许用户指定任务的延迟时间,即在特定时间后才将任务加入队列。这有助于优化资源的处理顺序和负载均衡。限速队列则进一步增强了队列功能,通过限速器动态调整任务的处理速率,避免系统过载或资源浪费。

       限速队列基于延时队列实现,通过引入限速器来控制任务的处理速率。常见的限速器包括BucketRateLimiter、ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter和MaxOfRateLimiter。这些限速器可以根据不同需求灵活配置,实现资源的高效管理和优化。

       总结而言,工作队列是k8s中实现资源变更事件处理和任务调度的核心组件,通过简单、延时和限速队列的不同组合,可以满足各种复杂场景的需求,实现资源管理的高效、有序和灵活。

相关栏目:百科