1.Kafka Logcleaner源码分析
2.Kafka源码分析(五) - Server端 - 基于时间轮的码视延时组件
3.源码解析kafka删除topic
4.Kafka消费者源码:重平衡(1)-初始化与FIND_COORDINATOR
Kafka Logcleaner源码分析
Kafka日志保留策略包括按时间/大小和compact两种。Logcleaner遵循compact策略清理日志,码视只保留最新的码视消息,当多个消息具有相同key时,码视只保留最新的码视一个。
每个日志由两部分组成:clean和dirty。码视交友源码支付dirty部分可以进一步划分为cleanable和uncleanable。码视uncleanable部分不允许清理,码视包括活跃段和未达到compact延迟时间的码视段。
清理过程由后台线程定期执行,码视选择最脏的码视日志进行清理,脏度由dirty部分字节数与总字节数的码视比例决定。清理前,码视Logcleaner构建一个key->last_offset映射,码视包含dirty部分的码视所有消息。清理后,日志文件过滤掉过期消息,并合并较小的连续段为较大文件。
payload为null的消息被Logcleaner删除,这类消息在topic配置的时间内保留,然后被清理。清理过程需与幂等性和事务性生产者兼容,小吃网站php源码保留活跃生产者最后一批消息,直到产生新消息或生产者不活跃。只清理提交或终止事物中的消息,未提交事物中的消息不清理。
Logcleaner通过cleanOrSleep方法启动清理,选择最脏日志,调用clean清理并合并段。在清理前计算tombstone的移除时间,确保在clean部分驻留一定时间后移除。清理过程包括构建offset映射,分组段文件并清理合并。
Logcleaner的清理逻辑确保了高效和一致的日志管理,助力Kafka系统稳定运行。
Kafka源码分析(五) - Server端 - 基于时间轮的延时组件
Kafka内部处理大量的延时操作,例如,在接收到PRODUCE请求后,副本可以等待一个timeout的时间再响应客户端。下面我们来探讨一个问题:为什么Kafka要自己实现一个延时任务组件,而不是直接使用Java的java.util.concurrent.DelayQueue呢?我们可以从以下两个方面来分析这个问题。
1.1 DelayQueue的能力
DelayQueue相关的接口/类如下所示:
相应地,DelayQueue提供的魔法守卫战源码能力如下:
1.2 Kafka的业务场景
Kafka的业务背景具有以下特点:
相应地,Kafka对延时任务组件有以下两点要求:
这两点要求都无法通过直接应用DelayQueue的方式得到满足。
二. 组件接口
让我们来看看Kafka的延时任务组件对外提供的接口,从而了解其提供的能力和使用方式。
如下所示:
左边的两个类定义了"延时操作",右边的DelayedOperationPurgatory类定义了一个维护DelayOperaton的容器,其核心操作如下:
三. 实现
以下是关于"延时"实现方式的介绍。
3.1 业务模型
时间轮延时组件的思路如下:
接下来,通过一个具体的例子来说明这种映射逻辑:
首先关注上图中①号时间轮。圆环中的每一个单元格表示一个TimerTaskList。单元格有其关联的时间跨度;下方的"1s x "表示时间轮上共有个单元格,每个单元格的时间跨度为1秒。有一个指针指向了"当前时间"所对应的单元格。顺时针方向为时间流动方向。
当收到一个延迟时间在0-1s的TimerTask时,会将其追加到①号时间轮的橙色单元格中。当收到一个延迟时间在3-4s的TimerTask时,会将其追加到①号时间轮的**单元格中。以此类推。
现在有一个问题:①号时间轮能表示的最大延迟时间是秒,那如果收到了延迟秒的任务该怎么办?这时该用到②号时间轮了,我们称②号为①号的盛游科技源码"溢出时间轮"。②号时间轮的特点如下:
如此,延迟时间在-s的TimerTask会被追加到②号的紫色单元格,延迟时间在-s的TimerTask会被追加到②号的绿色单元格中。③号时间轮同理。
刚刚是按①->②->③的顺序来分析时间轮的逻辑,反过来也可以得到有用的想象手里有一个"放大镜",其实③号时间轮的蓝色单元格"放大"后是②号时间轮;②号时间轮的蓝色单元格"放大"后是①号时间轮;蓝色单元格并不实际存储TimerTask。
3.2 数据结构
DelayedOperationPurgatory有一个Timer类型的timeoutTimer属性,用于维护延时任务。实际使用的是Timer的实现类:SystemTimer。该类用于维护延时任务的核心属性有两个:delayQueue和timingWheel。TimingWheel表示单个时间轮,接下来我们来看看其类图:
各属性含义如下:
3.3 算法
3.3.1 添加任务
添加任务的入口是DelayedOperationPurgatory.tryCompleteElseWatch,其核心逻辑分为如下两步:
SystemTimer.add直接调用了addTimerTaskEntry方法,后者逻辑如下:
TimingWheel.add的逻辑也很清晰,分如下4种场景处理:
3.3.2 尝试提前触发任务
入口是DelayedOperationPurgatory.checkAndComplete:
接下来看Watchers.tryCompleteWatched方法的内容:
DelayedOperation.maybeTryComplete方法最终调用了DelayedOperation.tryComplete;
DelayedOperation的子类需要在后者中实现自己的"触发条件"检查逻辑;若满足了提前触发的条件,则调用forceComplete方法执行事件触发场景下的业务逻辑。
3.3.3 任务到期自动执行
DelayedOperationPurgatory中维护了一个expirationReaper线程,其职责就是循环调用kafka.utils.timer.SystemTimer#advanceClock来从时间轮中获取已超时的任务,并更新时间轮的"当前时间"指针。
四. 总结
才疏学浅,发卡源码建分站未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。
另外,也可以在目录中找到同系列的其他文章:
感谢阅读。
源码解析kafka删除topic
本文以kafka0.8.2.2为例,解析如何删除一个topic以及其背后的关键技术和源码实现过程。
删除一个topic涉及两个关键点:配置删除参数以及执行删除操作。
首先,配置参数`delete.topic.enable`为`True`,这是Broker级别的配置,用于指示kafka是否允许执行topic删除操作。
其次,执行命令`bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name`,此命令指示kafka删除指定的topic。
若未配置`delete.topic.enable`为`True`,topic仅被标记为删除状态,而非立即清除。此时,通常的做法是手动删除Zookeeper中的topic信息和日志,但这仅会清除Zookeeper的数据,并不会真正清除kafkaBroker内存中的topic数据。因此,最佳做法是配置`delete.topic.enable`为`True`,然后重启kafka。
接下来,我们介绍几个关键类和它们在删除topic过程中的作用。
1. **PartitionStateMachine**:该类代表分区的状态机,决定分区的当前状态及其转移。状态包括:NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition。
2. **ReplicaManager**:负责管理当前机器的所有副本,处理读写、删除等具体操作。读写操作流程包括获取partition对象,再获取Replica对象,接着获取Log对象,并通过其管理的Segment对象将数据写入、读出。
3. **ReplicaStateMachine**:副本的状态机,决定副本的当前状态和状态之间的转移。状态包括:NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionStarted、ReplicaDeletionSuccessful、ReplicaDeletionIneligible、NonExistentReplica。
4. **TopicDeletionManager**:管理topic删除的状态机,包括发布删除命令、监听并开始删除topic、以及执行删除操作。
在删除topic的过程中,分为四个阶段:客户端执行删除命令、未配置`delete.topic.enable`的流水、配置了`delete.topic.enable`的流水、以及手动删除Zookeeper上topic信息和磁盘数据。
客户端执行删除命令时,会在"/admin/delete_topics"目录下创建topicName节点。
未配置`delete.topic.enable`时,topic删除流程涉及监听topic删除命令、判断`delete.topic.enable`状态、标记topic为不可删除、以及队列删除topic任务。
配置了`delete.topic.enable`时,额外步骤包括停止删除topic、检查特定条件、更新删除topic集合、激活删除线程、执行删除操作,如解除分区变动监听、清除内存数据结构、删除副本数据、删除Zookeeper节点信息等。
关于手动删除Zookeeper上topic信息和磁盘数据,通常做法是删除Zookeeper的topic相关信息及磁盘数据,但这可能导致部分内存数据未清除。是否会有隐患,需要进一步测试。
总结而言,kafka的topic删除流程基于Zookeeper实现,通过配置参数、执行命令、管理状态机以及清理相关数据,以实现topic的有序删除。正确配置`delete.topic.enable`并执行删除操作是确保topic完全清除的关键步骤。
Kafka消费者源码:重平衡(1)-初始化与FIND_COORDINATOR
在Kafka 2.5.2的消费者组中,重平衡是关键,它定义了消费者如何根据订阅关系调整对Topic分区的分配。当消费者数量、订阅的Topic或GroupCoordinator所在的Broker发生变更时,会触发重平衡。
消费者组状态由GroupState类管理,共有五个状态:Empty(无成员)、PreparingRebalance(加入中)、CompletingRebalance(等待分配)、Stable(已平衡)和Dead(元数据已删除)。状态间的转换基于预先定义的前置状态。例如,从Empty到PreparingRebalance,预示着重平衡的开始。
重平衡过程分为几个步骤,首先是消费者和Broker之间的协调。服务端启动时,GroupCoordinator组件即已就绪,而Consumer通过ConsumerCoordinator与之通信。在启动时,消费者首先会通过FindCoordinatorRequest找到GroupCoordinator,通过最小负载节点发送请求,然后服务端确定哪个Broker负责协调,如groupId的hash值对consumer_offsets分区数取模确定。
一旦找到GroupCoordinator,消费者会发送JoinGroupRequest。后续步骤如SYNC_GROUP和HEARTBEAT确保消费者与协调器保持同步。这部分详细内容在后续的文章中会进一步探讨。
2025-01-13 20:501902人浏览
2025-01-13 20:471405人浏览
2025-01-13 20:452825人浏览
2025-01-13 20:001369人浏览
2025-01-13 19:07506人浏览
2025-01-13 18:521320人浏览
據新華社北京11月17日消息,記者從退役軍人事務部獲悉,日前,退役軍人事務部與韓國國防部就第十批在韓中國人民志願軍烈士遺骸交接工作達成一致,雙方將於今年11月22日在韓國共同舉行中國人民志願軍烈士遺骸
據央視新聞報道,在國家主席習近平對俄羅斯進行國事訪問期間,中宣部副部長、中央廣播電視總台台長兼總編輯慎海雄與全俄國家電視廣播公司總裁多布羅傑耶夫3月21日在莫斯科簽署了《中國中央廣播電視總台與俄羅斯全
57歲香港女星周海媚病逝,歷年來演過無數古裝劇美人角色,尤其是《倚天屠龍記》中的「周芷若」一角深植人心,無論是外在容貌、裝扮、內心的感情戲,都宛如周芷若活生生的從書中走出來,到現在依然是他人無法超越的