文章摘要(AI生成)
ZookeeperKafka 通过 zookeeper 来存储集群的meta元数据信息。Zookeeper负责维护和协调broker,负责Broker Controller的选举。BrokerKafka集群中的每⼀台服务器。一个broker上有多个partition组成。当每个 broker 启动时
Zookeeper
Kafka 通过 zookeeper 来存储集群的meta元数据信息。Zookeeper负责维护和协调broker,负责Broker Controller的选举。
Broker
Kafka集群中的每⼀台服务器。一个broker上有多个partition组成。
当每个 broker 启动时,会在 ZooKeeper 中的 /brokers/ids
路径下创建⼀个节点来注册⾃⼰, 节点 ID 为配置⽂件中的 broker.id
参数,后注册的 broker 会报 NodeExists
的错。
每个 broker 除了注册⾃身之外,还会监听 /brokers/ids
这个节点,当这个节点下增加或删除⼦ 节点时,ZooKeeper 会通知监听了的 broker。每个 broker 创建的节点都是临时节点,如果 broker 下 线, /brokers/ids
下对应的节点就会被删除。
Controller
每个 broker 除了注册⾃身之外,还会监听 /brokers/ids
这个节点,当这个节点下增加或删除⼦ 节点时,ZooKeeper 会通知监听了的 broker。每个 broker 创建的节点都是临时节点,如果 broker 下线, /brokers/ids
下对应的节点就会被删除。
broker controller
负责管理分区与副本,broker controller
由 zk 负责选举。 Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller
节点。第⼀个成功创建 /controller
节点的 Broker 会被指定为控制器。
Controller的作用
- 选举Leader和ISR:从分区副本列表中选出⼀个作为该分区的leader,并将该分区对应所有副本置于ISR列表
- 同步元数据信息包括broker和分区的元数据信息:控制器架⼦ZK的
/brokers/ids
以及上⼀个步骤得到的topic下各分区leader和ISR将这些元数据信息同步到集群每个broker。 当有broker或者分区发⽣变更时及时更新到集群保证集群每⼀台broker缓存的是最新元数据。 - broker增删监听与处理:控制器启动时就起⼀个监视器监视ZK
/brokers/ids/
⼦节点。当新增或下线broker时,ZK会先进行broker的变更。控制器的监视器发现这种变化后,控制器开始执⾏broker变更的相关流程并更新元数据信息到集群。 - topic变化监听与处理:控制器启动时就起⼀个监视器监视ZK
/brokers/topics/
⼦节点。当通过脚本或者请求增加/删除⼀个topic后,该topic变更会写⼊该⽬录下的⼀个⼦节点。控制器的监视器发现这种变化后,如果为新增topic,则开始执⾏topic创建的相关流程包括leader选举和ISR并同步元数据信息到集群,且新增⼀个监视器监视ZK/brokers/topics/<新增topic⼦节点内容>
防⽌该topic内容变化;如果时删除topic,则开始执⾏topic删除的相关流程包括通知该topic所有分区的所有副本停⽌运⾏;通知所有分区所有副本删除数据;删除ZK/admin/delete_topics/<待删除topic⼦节点>
。 - 分区变化监听与变化处理:分区重分配通过KAFKA管理员脚本执⾏完成⼀个topic下分区的副本重新分配broker。
- Broker优雅退出:控制器接收到退出请求后,执⾏leader重选举和ISR后响应broker。broker接收后退出。
- 数据服务:是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从⽽更新其内存中的缓存数据。
故障转移:当运⾏中的控制器突然宕机或意外终⽌时,Kafka 能够快速地感知到,并⽴即启⽤备⽤控制器来代替之前失败的控制器。
topic
在Kafka中,使⽤⼀个类别属性来划分消息的所属类,划分消息的这个类称为topic。topic相当于消 息的分类标签,是⼀个逻辑概念。
partition
topic中的消息被分割为⼀个或多个partition,其是⼀个物理概念,对应到系统上就是⼀个或若⼲个⽬录。Kafka分配的单位是partition。每个Partition是⼀个FIFO队列,其中的消息是有序的。但 Partition之间的顺序是⽆序的。
Kafka 的消息组织⽅式实际上是三级结构:主题 (逻辑)- 分区(物理) - 消息。主题下的每条消息只会保存在某⼀个分区中。
partition leader和partition follower就是broker Controller 选举出来的,ISR中的follower采⽤的是轮流坐庄的⽅式。
partition的副本
同⼀个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从⽽能够对抗部分 Broker 宕机带来的数据不可⽤。
副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举⼀个副本,称为领导者副本,其余的副本⾃动称为追随者副本。
追随者副本是不对外提供服务的。这就是说,任何⼀个追随者副本都不能响应消费者和⽣产者的读 写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在 的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯⼀的任务就是从领导者副本 异步拉取消息,并写⼊到⾃⼰的提交⽇志中,从⽽实现与领导者副本的同步。
segment
Kafka将partition进⼀步细分为了若⼲的segment,每个segment⽂件的最⼤⼤⼩相等。一个segment文件由一个.log
的消息存储文件和一个 .index
的消息索引文件构成。文件名称的序号表示了该segment前有多少条消息,例如00000000000000368769.index
⽂件,表示其前⾯有368769条消息。
⼀个 Segment 中消息的存放是顺序存放的。
index⽂件中并没有为数据⽂件中的每条消息都建⽴索引,⽽是采⽤了稀疏存储的⽅式, 每隔⼀定字节的数据建⽴⼀条索引。 这样避免了索引⽂件占⽤过多的空间,从⽽可以将索引⽂件保留在内存中。
日志清理策略
kafka log的清理策略有两种:delete,compact,默认是delete,这个对应了kafka中每个topic对于 record的管理模式
delete:⼀般是使⽤按照时间保留的策略,当不活跃的segment的时间戳是⼤于设置的时间的时候,当前segment就会被删除
compact: ⽇志不会被删除,会被去重清理,这种模式要求每个record都必须有key,然后kafka会按照⼀定的时机清理segment中的key,对于同⼀个key只保留最新的那个key.同样的,compact也只针对不活跃的segment
offset
消费者通过指定的offset来定位下⼀条要读取的消息。而broker通过offset来维护各partition副本之间的消息同步。
kafka通过创建了一个名为
__consumer_offsets
的topic来存储不同消费者提交的位移信息。该主题的partition默认有50个,数据的有效期为1天。key由【group.id+topic+分区号】构成,value即为当前offset的值。
副本同步机制
AR:Assigned Replicas,Kafka中某个partition的所有的副本统称为Assigned Replica,指定的副本。
ISR:In-Sync Replicas,是指副本同步列表。
OSR:Out-Sync Replicas,移除的副本。
Kafka 判断 Follower 是否与 Leader 同步的标准,是 Broker 端参数 replica.lag.time.max.ms
参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最⻓时间间隔,当前默认值是 10 秒。这 就是说,只要⼀个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
HW-高水位机制
⾼⽔位,表示Consumer可以消费到的最⾼partition偏移量。
该机制要求,对于partition leader 新写⼊的消息,由于各partition副本之间的LEO(副本最大offset)不同,consumer 不能⽴刻消费。leader 会等待该消息被所有 ISR 中的 partition follower 同步后才会更新 HW,此时该消息才能被 consumer 消费。
这种机制会造成消息丢失和消息不一致问题,为了解决消息不一致的问题,Kafka通过HW截断机制,会在恢复故障follower时先将同步到HW,再将其恢复;在恢复故障leader时先将其回退到HW后重新同步,再将其恢复。
消息写入broker
- producer向broker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的 通信URL,即broker controller主机配置⽂件中的listeners地址
- 当producer指定了要⽣产消息的topic后,其会向broker controller发送请求,请求当前topic中所有 partition的leader列表地址
- broker controller在接收到请求后,会从zk中查找到指定topic的所有partition的leader,并返回给producer
- producer在接收到leader列表地址后,根据消息路由策略找到当前要发送消息所要发送的partition leader,然后将消息发送给该leader
- leader将消息写⼊本地log,并通知ISR中的followers
- ISR中的followers从leader中同步消息后向leader发送ACK
- leader收到所有ISR中的followers的ACK后,增加HW,表示消费者已经可以消费到该位置了
- 若leader在等待的followers的ACK超时了,发现还有follower没有发送ACK,则会将该follower从ISR 中清除,然后增加HW。
Producer
Producer负责将消息发送到Kafka集群的某⼀个topic中。同时Producer发送消息时能够指定 partition号,从⽽将消息持久化到特定的partition中。
- 如果没有指定具体的partition号,那么Kafka Producer可以通过⼀定的算法计算出对应的 partition号。
- 如果消息指定了key,则对key进⾏hash,然后映射到对应的partition号
- 如果消息没有指定key,则使⽤Round Robin轮询算法来确定partition号,这样可以保证数据在所有的partition上平均分配。
另外,Kafka Producer也⽀持⾃定义的partition分配⽅式。客户端提供⼀个实现了 org.apache.kafka.clients.producer.Partitioner
的类,然后将此实现类配置到Producer中即可。
消息路由策略
- 若指定了partition,则直接写⼊到指定的partition;
- 若未指定partition但指定了key,则通过对key的hash值与partition数量取模,该取模结果就是要选出的partition索引;
- 若partition和key都未指定,则使⽤轮询策略选出⼀个partition
Consumer&Consumer group
在kafka中,⼀个group是⼀个“订阅者”,⼀个topic中的每个partions只会被⼀个“订阅者”中的⼀个 consumer消费,不过⼀个consumer可以消费多个partitions中的消息
consumer group是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者,它们共享⼀个公共的ID,即group ID。 每⼀条消息只会被同⼀个消费组⾥的⼀个消费者实例消费,不同的消费组可以同时消费同⼀条消息。 kafka的设计理念之⼀就是同时提供离线处理和实时处理。
提交offset
Consumer消费消息时,通过指定的offset来定位下⼀条要读取的消息。值得注意的是,offset的维护是由Consumer全权控制的。而offset保存在kafka集群上,consumer每次消费消息需要提交offset来完成offset更新。
当consumer从partition中消费了消息后,consumer会将其消费的消息的offset提交给broker,表 示当前partition已经消费到了该offset所标识的消息。
Consumer从partition中取出⼀批消息写⼊到buffer对其进⾏消费,在规定时间内消费完消息后,会⾃动将其消费消息的offset提交给broker,以让broker记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,是不会提交offset的。
Consumer提交Offset的方式有两种:自动提交和手动提交,其优缺点如下:
提交方式 | 优点 | 缺点 |
---|---|---|
自动提交(enable.auto.commit=true ) |
简单 | 消息重复 |
手动同步提交(commitSync() ) |
灵活 | 阻塞 |
手动异步提交(commitAsync() ) |
不会重试 |
重置offset
通过设置消费者的auto.offset.reset
属性,来在丢失offset时,重置消费者的offset。该属性提供了两种重置策略:earliest
和latest
也可以通过kakfa提供的指令集,在集群上对相应的consumer group重置offset。该指令提供了以下重置策略:
维度 | 策略 | 含义 |
---|---|---|
位移维度 | earliest | 调整到当前最早位移处 |
位移维度 | latest | 调整到当前最新位移处 |
位移维度 | current | 调整到当前最新提交位移处 |
位移维度 | Specified-Offset | 调整到指定位移处 |
位移维度 | Shift-By-N | 调整到当前位移+N处 |
时间维度 | DateTime | 调整到大于给定时间的最小位移处 |
时间维度 | Duration | 调整到距离当前时间指定间隔的位移处 |
消费分区分配算法
Kafka中提供了多重分区分配算法(PartitionAssignor)的实现: RangeAssignor、RoundRobinAssignor、StickyAssignor
RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然 后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽 量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。
StickyAssignor的分配策略则是在增加和减少Consumer时,在分区的分配尽量均衡的条件下, 每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致。
Group Coordinator
主要⽤于Consumer Group中的各个成员的offset位移管理和Rebalance。Group Coordinator 同时管理着当前broker的所有消费者组。
Coordinator的作⽤
每个consumer group都会选择⼀个broker作为⾃⼰的coordinator,他是负责监控这个消费组⾥的各个消费者的⼼跳,以及判断是否宕机,然后开启rebalance。
根据内部的⼀个选择机制,会挑选⼀个对应的Broker,Kafka总会把各个消费组均匀分配给各个 Broker作为coordinator来进⾏管理的。
consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的 coordinator所在的broker进⾏通信,然后由coordinator分配分区给你的这个consumer来进⾏消费。 coordinator会尽可能均匀的分配分区给各个consumer来消费。
评论区