Kafka核心原理第二弹——更新中
架构原理
一、高吞吐机制:Batch打包、缓冲区、acks
1. Kafka Producer怎么把消息发送给Broker集群的?
需要指定把消息发送到哪个topic去
首先需要选择一个topic的分区,默认是轮询来负载均衡,但是如果指定了一个分区key,那么根据这个key的hash值来分发到指定的分区,这样可以让相同的key分发到同一个分区里去,还可以自定义partitioner来实现分区策略
producer.send(msg); // 用类似这样的方式去发送消息,就会把消息给你均匀的分布到各个分区上去
producer.send(key, msg); // 订单id,或者是用户id,他会根据这个key的hash值去分发到某个分区上去,他可以保证相同的key会路由分发到同一个分区上去
知道要发送到哪个分区之后,还得找到这个分区的leader副本所在的机器,然后跟那个机器上的Broker通过Socket建立连接来进行通信,发送Kafka自定义协议格式的请求过去,把消息就带过去了
如果找到了partition的leader所在的broker之后,就可以通过socket跟那台broker建立连接,接着发送消息过去
Producer(生产者客户端),起码要知道两个元数据,每个topic有几个分区,每个分区的leader是在哪台broker上,会自己从broker上拉取kafka集群的元数据,缓存在自己client本地客户端上
kafka使用者的层面来考虑一下,我如果要把数据写入kafka集群,应该如何来做,怎么把数据写入kafka集群,以及他背后的一些原理还有使用过程中需要设置的一些参数,到底应该怎么来弄
2. 用一张图告诉你Producer发送消息的内部实现原理
每次发送消息都必须先把数据封装成一个ProducerRecord对象,里面包含了要发送的topic,具体在哪个分区,分区key,消息内容,timestamp时间戳,然后这个对象交给序列化器,变成自定义协议格式的数据
接着把数据交给partitioner分区器,对这个数据选择合适的分区,默认就轮询所有分区,或者根据key来hash路由到某个分区,这个topic的分区信息,都是在客户端会有缓存的,当然会提前跟broker去获取
接着这个数据会被发送到producer内部的一块缓冲区里
然后producer内部有一个Sender线程,会从缓冲区里提取消息封装成一个一个的batch,然后每个batch发送给分区的leader副本所在的broker
3. 基于Java API写一个Kafka Producer发送消息的代码示例
package com.zhss.demo.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class ProducerDemo {public static void main(String[] args) throws Exception {Properties props = new Properties();// 这里可以配置几台broker即可,他会自动从broker去拉取元数据进行缓存props.put("bootstrap.servers", "hadoop03:9092,hadoop04:9092,hadoop05:9092"); // 这个就是负责把发送的key从字符串序列化为字节数组props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 这个就是负责把你发送的实际的message从字符串序列化为字节数组props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);// 创建一个Producer实例:线程资源,跟各个broker建立socket连接资源KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");// 这是异步发送的模式producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {// 消息发送成功System.out.println("消息发送成功"); } else {// 消息发送失败,需要重新发送}}});Thread.sleep(10 * 1000); // 这是同步发送的模式
// producer.send(record).get(); // 你要一直等待人家后续一系列的步骤都做完,发送消息之后// 有了消息的回应返回给你,你这个方法才会退出来producer.close();}}
4. 发送消息给Broker时遇到的各种异常该如何处理?
之前我们看到不管是异步还是同步,都可能让你处理异常,常见的异常如下:
LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可
如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException
NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可
NetworkException:网络异常,重试即可
我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了
5. 发送消息的缓冲区应该如何优化来提升发送的吞吐量?
buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB
如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住
compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销
6. 消息批量发送的核心参数batch.size是如何优化吞吐量?
batch.size,设置meigebatch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里
默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下
还有一个参数,linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去
但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力
7. 如何根据业务场景对消息大小以及请求超时进行合理的设置?
max.request.size:这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb,这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些
你发送出去的一条大数据,超大的JSON串,超过1MB,就不让你发了
request.timeout.ms:这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒,如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理
8. 基于Kafka内核架构原理深入分析acks参数到底是干嘛的
acks参数,其实是控制发送出去的消息的持久化机制的
如果acks=0,那么producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了,你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的
会让你的发送吞吐量会提升很多,你发送弄一个batch出,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图
acks=all,或者acks=-1:这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知
min.insync.replicas = 2,ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了
acks = -1,每次写成功一定是leader和follower都成功才可以算做成功,leader挂了,follower上是一定有这条数据,不会丢失
retries = Integer.MAX_VALUE,无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
acks=1:只要leader写入成功,就认为消息成功了,默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader
9. 针对瞬间异常的消息重试参数有哪些需要考虑的点
有的时候一些leader切换之类的问题,需要进行重试,设置retries即可,而且还可以跟消息不丢失结合起来,但是消息重试会导致重复发送的问题,比如说网络抖动一下导致他以为没成功,就重试了,其实人家都成功了
所以消息重试导致的消费重复,需要你在下游consumer做幂等性处理,但是kafka已经支持了一次且仅一次的消息语义
另外一个,消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息
两次重试的间隔默认是100毫秒,用“retry.backoff.ms”来进行设置
一般来说,某台broker重启导致的leader切换,是最常见的异常,所以尽可能把重试次数和间隔,设置的可以cover住新leader切换过来
10. Kafka Producer高阶用法(一):自定义分区
public class HotDataPartitioner implements Partitioner {private Random random;@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}}props.put(“partitioner.class”, “com.zhss.HotDataPartitioner”);测试发送bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test-topic
11. Kafka Producer高阶用法(二):自定义序列化
12. Kafka Producer高阶用法(三):自定义拦截器
二、Kafka Consumer选举与Rebalance实现原理
1. 一张图画清Kafka基于Consumer Group的消费者组的模型
每个consumer都要属于一个consumer.group,就是一个消费组,topic的一个分区只会分配给一个消费组下的一个consumer来处理,每个consumer可能会分配多个分区,也有可能某个consumer没有分配到任何分区
分区内的数据是保证顺序性的
group.id = “membership-consumer-group”
如果你希望实现一个广播的效果,你的每台机器都要消费到所有的数据,每台机器启动的时候,group.id可以是一个随机生成的UUID也可以,你只要让不同的机器的KafkaConsumer的group.id是不一样的
如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他,这个就是所谓的消费者rebalance的过程
2. 消费者offset的记录方式以及基于内部topic的提交模式
每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储
所以后来就是提交offset发送给内部topic:__consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact
也就是每个group.id+topic+分区号就保留最新的那条数据即可
而且因为这个__consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多
3. 基于Java API写一个Kafka Consumer消费消息的代码示例
String topicName = “test-topic”;
String groupId = “test-group”;Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “groupId”);
props.put(“enable.auto.commit”, “true”);
props.put(“auto.commit.ineterval.ms”, “1000”);
// 每次重启都是从最早的offset开始读取,不是接着上一次
props.put(“auto.offset.reset”, “earliest”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.SttringDeserializer”);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 超时时间
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + “, ” + record.key() + “, ” + record.value());
}
}
} catch(Exception e) {}
4. Kafka感知消费者故障是通过哪三个参数来实现的?
heartbeat.interval.ms:consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作
session.timeout.ms:kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒
max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了
5. 对消息进行消费时有哪几个参数需要注意以及设置呢?
fetch.max.bytes:获取一条消息最大的字节数,一般建议设置大一些
max.poll.records:一次poll返回消息的最大条数,默认是500条
connection.max.idle.ms:consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收
6. 消费者offset相关的参数设置会对运行产生什么样的影响?
auto.offset.reset:这个参数的意思是,如果下次重启,发现要消费的offset不在分区的范围内,就会重头开始消费;但是如果正常情况下会接着上次的offset继续消费的
enable.auto.commit:这个就是开启自动提交位移
7. Group Coordinator是什么以及主要负责什么?
每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance的,那么这个如何选择呢?
就是根据group.id来进行选择,他有内部的一个选择机制,会给你挑选一个对应的Broker,总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的
他负责的事情只要就是rebalance,说白了你的consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费
coordinator会尽可能均匀的分配分区给各个consumer来消费
8. 为消费者选择Coordinator的算法是如何实现的?
首先对groupId进行hash,接着对__consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到__consumer_offsets的哪个分区
比如说:groupId,“membership-consumer-group” -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,大家可以找到__consumer_offsets的一个分区
__consumer_offset的分区的副本数量默认来说1,只有一个leader
然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,接着就会维护一个Socket连接跟这个Broker进行通信
9. Coordinator和Consume Leader如何协作制定分区方案?
每个consumer都发送JoinGroup请求到Coordinator,然后Coordinator从一个consumer group中选择一个consumer作为leader,把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案,通过SyncGroup发给Coordinator
接着Coordinator就把分区方案下发给各个consumer,他们会从指定的分区的leader broker开始进行socket连接以及消费消息
10. rebalance的三种策略分别有哪些优劣势?
这里有三种rebalance的策略:range、round-robin、sticky
0~8
order-topic-0
order-topic-1
order-topic-2
range策略就是按照partiton的序号范围,比如partitioin02给一个consumer,partition35给一个consumer,partition6~8给一个consumer,默认就是这个策略;
round-robin策略,就是轮询分配,比如partiton0、3、6给一个consumer,partition1、4、7给一个consumer,partition2、5、8给一个consumer
但是上述的问题就在于说,可能在rebalance的时候会导致分区被频繁的重新分配,比如说挂了一个consumer,然后就会导致partition04分配给第一个consumer,partition58分配给第二个consumer
这样的话,原本是第二个consumer消费的partition3~4就给了第一个consumer,实际上来说未必就很好
最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略
consumer1:0~2 + 6~7
consumer2:3~5 + 8
11. Consumer内部单线程处理一切事务的核心设计思想
其实就是在一个while循环里不停的去调用poll()方法,其实是我们自己的一个线程,就是我们自己的这个线程就是唯一的KafkaConsumer的工作线程,新版本的kafka api,简化,减少了线程数量
Consumer自己内部就一个后台线程,定时发送心跳给broker;但是其实负责进行拉取消息、缓存消息、在内存里更新offset、每隔一段时间提交offset、执行rebalance这些任务的就一个线程,其实就是我们调用Consumer.poll()方法的那个线程
就一个线程调用进去,会负责把所有的事情都干了
为什么叫做poll呢?因为就是你可以监听N多个Topic的消息,此时会跟集群里很多Kafka Broker维护一个Socket连接,然后每一次线程调用poll(),就会监听多个socket是否有消息传递过来
可能一个consumer会消费很多个partition,每个partition其实都是leader可能在不同的broker上,那么如果consumer要拉取多个partition的数据,就需要跟多个broker进行通信,维护socket
每个socket就会跟一个broker进行通信
每个Consumer内部会维护多个Socket,负责跟多个Broker进行通信,我们就一个工作线程每次调用poll()的时候,他其实会监听多个socket跟broker的通信,是否有新的数据可以去拉取
12. 消费过程中的各种offset之间的关系是什么?
上一次提交offset,当前offset(还未提交),高水位offset,LEO
内存里记录这么几个东西:上一次提交offset,当前消费到的offset,你不断的在消费消息,不停的在拉取新的消息,不停的更新当前消费的offset,HW offset,你拉取的时候,是只能看到HW他前面的数据
LEO,leader partition已经更新到了一个offset了,但是HW在前面,你只能拉取到HW的数据,HW后面的数据,意味着不是所有的follower都写入进去了,所以不能去读取的
13. 自动提交offset的语义以及导致消息丢失和重复消费的问题
默认是自动提交
auto.commit.inetrval.ms:5000,默认是5秒提交一次
如果你提交了消费到的offset之后,人家kafka broker就可以感知到了,比如你消费到了offset = 56987,下次你的consumer再次重启的时候,就会自动从kafka broker感知到说自己上一次消费到的offset = 56987
这次重启之后,就继续从offset = 56987这个位置继续往后去消费就可以了
他的语义是一旦消息给你poll到了之后,这些消息就认为处理完了,后续就可以提交了,所以这里有两种问题:
第一,消息丢失,如果你刚poll到消息,然后还没来得及处理,结果人家已经提交你的offset了,此时你如果consumer宕机,再次重启,数据丢失,因为上一次消费的那批数据其实你没处理,结果人家认为你处理了
poll到了一批数据,offset = 65510~65532,人家刚好就是到了时间提交了offset,offset = 65532这个地方已经提交给了kafka broker,接着你准备对这批数据进行消费,但是不巧的是,你刚要消费就直接宕机了
其实你消费到的数据是没处理的,但是消费offset已经提交给kafka了,下次你重启的时候,offset = 65533这个位置开始消费的,之前的一批数据就丢失了
第二,重复消费,如果你poll到消息,都处理完毕了,此时还没来得及提交offset,你的consumer就宕机了,再次重启会重新消费到这一批消息,再次处理一遍,那么就是有消息重复消费的问题
poll到了一批数据,offset = 65510~65532,你很快的处理完了,都写入数据库了,结果还没来得及提交offset就宕机了,上一次提交的offset = 65509,重启,他会再次让你消费offset = 65510~65532,一样的数据再次重复消费了一遍,写入数据库
重启kafka consumer,修改了他的代码
14. 如何实现Consumer Group的状态机流转机制?
刚开始Consumer Group状态是:Empty
接着如果部分consumer发送了JoinGroup请求,会进入:PreparingRebalance的状态,等待一段时间其他成员加入,这个时间现在默认就是max.poll.interval.ms来指定的,所以这个时间间隔一般可以稍微大一点
接着如果所有成员都加入组了,就会进入AwaitingSync状态,这个时候就不能允许任何一个consumer提交offset了,因为马上要rebalance了,进行重新分配了,这个时候就会选择一个leader consumer,由他来制定分区方案
然后leader consumer制定好了分区方案,SyncGroup请求发送给coordinator,他再下发方案给所有的consumer成员,此时进入stable状态,都可以正常基于poll来消费了
所以如果说在stable状态下,有consumer进入组或者离开崩溃了,那么都会重新进入PreparingRebalance状态,重新看看当前组里有谁,如果剩下的组员都在,那么就进入AwaitingSync状态
leader consumer重新制定方案,然后再下发
15. 最新设计的rebalance分代机制可以有什么作用?
大家设想一个场景,在rebalance的时候,可能你本来消费了partition3的数据,结果有些数据消费了还没提交offset,结果此时rebalance,把partition3分配给了另外一个cnosumer了,此时你如果提交partition3的数据的offset,能行吗?
必然不行,所以每次rebalance会触发一次consumer group generation,分代,每次分代会加1,然后你提交上一个分代的offset是不行的,那个partiton可能已经不属于你了,大家全部按照新的partiton分配方案重新消费数据
consumer group generation = 1
consumer group generation = 2
16. Consumer端的自定义反序列化器是什么?
17. 自行指定每个Consumer要消费哪些分区有用吗?
List partitions = consumer.partitionsFor(“order-topic”);
new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
consumer.assign(partitions); //指定每个consumer要消费哪些分区,你就不是依靠consumer的自动的分区分配方案来做了
18. 老版本的high-level consumer的实现原理是什么?
producer和consumer api原理,都是新版本的kafka api
老版本的kafka consumer api分成两种,high-level和low-level,都是基于zk实现的,只不过前者有consumer group的概念,后者没有
high-level的api,比如说consumer启动就是在zk里写一个临时节点,但是如果自己宕机了,那么zk临时节点就没了,别人就会发现,然后就会开启rebalance
然后在消费的时候,可以指定多个线程取消费一个topic,比如说你和这个consumer分配到了5个分区,那么你可以指定最多5个线程,每个线程消费一个分区的数据,但是新版本的就一个线程负责消费所有分区
在提交offset,就是向zk写入对某个分区现在消费到了哪个offset了,默认60秒才提交一次
新版本的api就不基于zk来实现了呢,zk主要是做轻量级的分布式协调,元数据存储,并不适合高并发大量连接的场景,cnosumer可能有成百上千个,成千上万个,zk来做的,连接的压力,高并发的读写
broker内部基于zk来进行协调
19. 老版本的low-level consumer的实现原理是什么?
老版本的low-level消费者,是可以自己控制offset的,实现很底层的一些控制,但是需要自己去提交offset,还要自己找到某个分区对应的leader broker,跟他进行连接获取消息,如果leader变化了,也得自己处理,非常的麻烦
比如说storm-kafka这个插件,在storm消费kafka数据的时候,就是使用的low-level api,自己获取offset,提交写入zk中自己指定的znode中,但是在未来基本上老版本的会越来越少使用
三、Kafka的时间轮延时调度机制与架构原理总结
1. Producer的缓冲区内部数据结构是什么样子的?
producer会创建一个accumulator缓冲区,他里面是一个HashMap数据结构,每个分区都会对应一个batch队列,因为你打包成出来的batch,那必须是这个batch都是发往同一个分区的,这样才能发送一个batch到这个分区的leader broker
{
“order-topic-0” -> [batch1, batch2],
“order-topic-1” -> [batch3]
}
batch.size
每个batch包含三个东西,一个是compressor,这是负责追加写入batch的组件;第二个是batch缓冲区,就是写入数据的地方;第三个是thunks,就是每个消息都有一个回调Callback匿名内部类的对象,对应batch里每个消息的回调函数
每次写入一条数据都对应一个Callback回调函数的
2. 消息缓冲区满的时候是阻塞住还是抛出异常?
max.block.ms,其实就是说如果写缓冲区满了,此时是阻塞住一段时间,然后什么时候抛异常,默认是60000,也就是60秒
3. 负责IO请求的Sender线程是如何基于缓冲区发送数据的?
Sender线程会不停的轮询缓冲区内的HashMap,看batch是否满了,或者是看linger.ms时间是不是到了,然后就得发送数据去,发送的时候会根据各个batch的目标leader broker来进行分组
因为可能不同的batch是对应不同的分区,但是不同的分区的Leader是在一个broker上的,<Node, List>,接着会进一步封装为<Node, Request>,每个broker一次就是一个请求,但是这里可能包含很多个batch,接着就是将分组好的batch发送给leader broker,并且处理response,来反过来调用每个batch的callback函数
发送出去的Request会被放入InFlighRequests里面去保存,Map<NodeId, Deque>,这里就代表了发送出去的请求,但是还没接收到响应的
4. 同时可以接受有几个发送到Broker的请求没收到响应?
Map<NodeId, Deque> => 给这个broker发送了哪些请求过去了?
max.in.flight.requests.per.connection:5
这个参数默认值是5,默认情况下,每个Broker最多只能有5个请求是发送出去但是还没接收到响应的,所以这种情况下是有可能导致顺序错乱的,大家一定要搞清楚这一点,先发送的请求可能后续要重发
5. Kafka自定义的基于TCP的二进制协议深入探秘一番(一)
kafka自定义了一组二进制的协议,现在一共是包含了43种协议类型,每种协议都有对应的请求和响应,Request和Response,其实说白了,如果大家现在看咱们的那个自研分布式存储系统的课,里面用到了gRPC
你大概可以认为就是定义了43种接口,每个接口就是一种协议,然后每个接口都有自己对应的Request和Response,就这个意思
每个协议的Request都有相同的请求头(RequestHeader),也有不同的请求体(RequestBody),请求头包含了:api_key、api_version、correlation_id、client_id,这里的api_key就类似于“PRODUCE”、“FETCH”,你可以认为是接口的名字吧
“PRODUCE”就是发送消息的接口,“FETCH”就是拉取消息的接口,就这个意思
api_version,就是这个API的版本号
correlation_id,就是类似客户端生成的一次请求的唯一标志位,唯一标识一次请求
client_id,就是客户端的id
每个协议的Response也有相同的响应头,就是一个correlation_id,就是对某个请求的响应
6. Kafka自定义的基于TCP的二进制协议深入探秘一番(二)
比如说发送消息,就是ProduceRequest和ProduceResponse,代表“PRODUCE”这个接口的请求和响应,api_key=0,其实就是“PRODUCE”接口的代表
他的RequestBody,包含了:transactional_id,acks,timeout,topic_data(topic,data(partition,record_set)),acks就是客户端自己指定的acks参数,这个会指示leader和follower副本的写入方式,timeout就是超时时间,默认就是30秒,request.timeout.ms
然后就是要写入哪个topic,哪个分区,以及对应数据集合,里面是多个batch
ProduceResponse,ResponseBody,包含了responses(topic,partition_responses(partition,error_code,base_offset,log_append_time,log_start_offset)),throttle_time_ms,简单来说就是当前响应是对哪个topic写入的响应
包含了每个topic的各个分区的响应,每个partition的写入响应,包括error_code错误码,base_offset是消息集合的起始offset,log_append_time是写入broker端的时间,log_start_offset是分区的起始offset
其实各种接口大体上来说就是如此,所以现在大家就知道了,协议就是一种规定,你发送过来的请求是什么格式的,他可能有请求头还有请求体,分别包含哪些字段,按什么格式放数据,响应也是一样的
然后大家就可以按一样的协议来发送请求和接收响应
7. 盘点一下在Broker内部有哪些不同场景下会有延时任务?
比如说acks=-1,那么必须等待leader和follower都写完才能返回响应,而且有一个超时时间,默认是30秒,也就是request.timeout.ms,那么在写入一条数据到leader磁盘之后,就必须有一个延时任务,到期时间是30秒
延时任务会被放到DelayedOperationPurgatory,延时操作管理器中
这个延时任务如果因为所有follower都写入副本到本地磁盘了,那么就会被自动触发苏醒,那么就可以返回响应结果给客户端了,否则的话,这个延时任务自己指定了最多是30秒到期,如果到了超时时间都没等到,那么就直接超时返回异常了
还有一种是延时拉取任务,也就是说follower往leader拉取消息的时候,如果发现是空的,那么此时会创建一个延时拉取任务,然后延时时间到了之后,就会再次读取一次消息,如果过程中leader写入了消息那么也会自动执行这个拉取任务
8. Kafka的时间轮延时调度机制(一):O(1)时间复杂度
Kafka内部有很多延时任务,没有基于JDK Timer来实现,那个插入和删除任务的时间复杂度是O(nlogn),而是基于了自己写的时间轮来实现的,时间复杂度是O(1),其实Netty、ZooKeeper、Quartz很多中间件都会实现时间轮
延时任务是很多很多的,大量的发送消息以及拉取消息,都会涉及到延时任务,任务数量很多,如果基于传统的JDK Timer把大量的延时任务频繁的插入和删除,时间复杂度是O(nlogn)性能比较低的
时间轮的机制,延时任务插入和删除,O(1)
简单来说,一个时间轮(TimerWheel)就是一个数组实现的存放定时任务的环形队列,数组每个元素都是一个定时任务列表(TimerTaskList),这个TimerTaskList是一个环形双向链表,链表里的每个元素都是定时任务(TimerTask)
时间轮是有很多个时间格的,一个时间格就是时间轮的时间跨度tickMs,wheelSize就是时间格的数量,时间轮的总时间跨度就是tickMs * wheelSize(interval),然后还有一个表盘指针(currentTime),就是时间轮当前所处的时间
currentTime指向的时间格就是到期,需要执行里面的定时任务
比如说tickMs = 1ms,wheelSize = 20,那么时间轮跨度(inetrval)就是20ms,刚开始currentTime = 0,这个时候如果有一个延时2ms之后执行的任务插入进来,就会基于数组的index直接定位到时间轮底层数组的第三个元素
因为tickMs = 1ms,所以第一个元素代表的是0ms,第二个元素代表的是1ms的地方,第三个元素代表的就是2ms的地方,直接基于数组来定位就是O(1)是吧,然后到数组之后把这个任务插入其中的双向链表,这个时间复杂度也是O(1)
所以这个插入定时任务的时间复杂度就是O(1)
然后currentTime会随着时间不断的推移,1ms之后会指向第二个时间格,2ms之后会指向第三个时间格,这个时候就会执行第三个时间格里刚才插入进来要在2ms之后执行的那个任务了
这个时候如果插入进来一个8ms之后要执行的任务,那么就会放到第11个时间格上去,相比于currentTime刚好是8ms之后,对吧,就是个意思,然后如果是插入一个19ms之后执行的呢?那就会放在第二个时间格
每个插入进来的任务,他都会依据当前的currentTime来放,最后正好要让currentTime转动这么多时间之后,正好可以执行那个时间格里的任务
9. Kafka的时间轮延时调度机制(二):多层级时间轮
接着上一讲的内容,那如果这个时候来一个350毫秒之后执行的定时任务呢?已经超出当前这个时间轮的范围了,那么就放到上层时间轮,上层时间轮的tickMs就是下层时间轮的interval,也就是20ms
wheelSize是固定的,都是20,那么上层时间轮的inetrval周期就是400ms,如果再上一层的时间轮他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一层时间轮的tickMs是8s,interval就是160s,也就是好几分钟了,以此类推即可
反正有很多层级的时间轮,一个时间轮不够,就往上开辟一个新的时间轮出来,每个时间轮的tickMs是下级时间轮的interval,而且currentTime就跟时钟的指针一样是不停的转动的,你只要根据定时周期把他放入对应的轮子即可
每个轮子插入的时候根据currentTime,放到对应时间之后的时间格即可
比如定时350ms后执行的任务,就可以放到interval位400ms的时间轮内,currentTime自然会转动到那个时间格来执行他
10. Kafka的时间轮延时调度机制(三):时间轮层级的下滑
接着上一讲的内容,那如果这个时候来一个350毫秒之后执行的定时任务呢?已经超出当前这个时间轮的范围了,那么就放到上层时间轮,上层时间轮的tickMs就是下层时间轮的interval,也就是20ms
wheelSize是固定的,都是20,那么上层时间轮的inetrval周期就是400ms,如果再上一层的时间轮他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一层时间轮的tickMs是8s,interval就是160s,也就是好几分钟了,以此类推即可
反正有很多层级的时间轮,一个时间轮不够,就往上开辟一个新的时间轮出来,每个时间轮的tickMs是下级时间轮的interval,而且currentTime就跟时钟的指针一样是不停的转动的,你只要根据定时周期把他放入对应的轮子即可
每个轮子插入的时候根据currentTime,放到对应时间之后的时间格即可
比如定时350ms后执行的任务,就可以放到interval位400ms的时间轮内,currentTime自然会转动到那个时间格来执行他
11. Kafka的时间轮延时调度机制(四):基于DelayQueue推动
基于数组和双向链表来O(1)时间度可以插入任务
但是推进时间轮怎么做呢?搞一个线程不停的空循环判断是否进入下一个时间格吗?那样很浪费CPU资源,所以采取的是DelayQueue
每个时间轮里的TimerTaskList作为这个时间格的任务列表,都会插入DelayQueue中,设置一个延时出队时间,DelayQueue会自动把过期时间最短的排在队头,然后专门有一个线程来从DelayQueue里获取到期任务列表
某个时间格对应的TimerTaskList到期之后,就会被线程获取到,这种方式就可以实现时间轮推进的效果,推进时间轮基于DelayQueue,时间复杂度也是O(1),因为只要从队头获取即可
相关文章:

Kafka核心原理第二弹——更新中
架构原理 一、高吞吐机制:Batch打包、缓冲区、acks 1. Kafka Producer怎么把消息发送给Broker集群的? 需要指定把消息发送到哪个topic去 首先需要选择一个topic的分区,默认是轮询来负载均衡,但是如果指定了一个分区key&#x…...

巨人互动|游戏出海H5游戏出海规模如何?
H5游戏出海是指将H5游戏推广和运营扩展到国外市场的行为,它的规模受到多个因素的影响。本文小编讲一些关于H5游戏出海规模的详细介绍。 1、市场规模 H5游戏出海的规模首先取决于目标市场的规模。不同国家和地区的游戏市场规模差异很大,有些市场庞大而成…...

【爬虫】实验项目三:验证码处理与识别
目录 一、实验目的 二、实验预习提示 三、实验内容 实验要求 基本要求: 改进要求A: 改进要求B: 四、实验过程 基本要求 五、源码如下 六、资料 一、实验目的 部分网站可能会使用验证机制来阻止用户无效登录或者是验证用户不是用程…...

广东成人高考报名将于9月14日开始!
截图来自广东省教育考试院官网* 今年的广东成人高考正式报名时间终于确定了! 报名时间:2023年 9 月14—20日 准考证打印时间:考前一周左右 考试时间:2023年10月21—22日 录取时间:2023年12 月中上旬 报名条件: …...
pytorch中文文档学习笔记
先贴上链接 torch - PyTorch中文文档 首先我们需要安装拥有pytorch的环境 conda指令 虚拟环境的一些指令 查看所有虚拟环境 conda info -e 创建新的虚拟环境 conda create -n env_name python3.6 删除已有环境 conda env remove -n env_name 激活某个虚拟环境 activate env…...
element-ui全局导入与按需引入
全局引入 npm i element-ui -S 安装好depencencies里面可以看到安装的element-ui版本 然后 在 main.js 中写入以下内容: import Vue from vue; import ElementUI from element-ui; import element-ui/lib/theme-chalk/index.css; import App from ./App.vue;Vue.…...
go 地址 生成唯一索引v2 --chatGPT
问:golang 函数 getIndex(n,addr,Hlen,Tlen) 返回index。参数n为index的上限,addr为包含大小写字母数字的字符串,Hlen为截取addr头部的长度,Tlen为截取addr尾部的长度 gpt: 你可以编写一个函数来计算根据给定的参数 n、addr、Hlen 和 Tlen …...
JSON XML
JSON(JavaScript Object Notation)和XML(eXtensible Markup Language)是两种常用的数据交换格式,用于在不同系统之间传输和存储数据。 JSON是一种轻量级的数据交换格式,它使用易于理解的键值对的形式表示数…...
2023年MySQL实战核心技术第四篇
七 . 吃透索引:...

cmake编译(qtcreator)mingw下使用的osg3.6.5
官网下载osg3.6.5源码,先不使用依赖库,直接进行编译 如果generate后报错,显示找不到boost必须库,则手动增加路径。然后先在命令行中使用mingw32-make,如果显示不存在,则需要去环境变量里配置一下这个工具的…...

Python钢筋混凝土结构计算.pdf-混凝土强度设计值
计算原理: 需要注意的是,根据不同的规范和设计要求,上述公式可能会有所差异。因此,在进行混凝土强度设计值的计算时,请参考相应的规范和设计手册,以确保计算结果的准确性和合规性。 代码实现: …...

elasticsearch的索引库操作
索引库就类似数据库表,mapping映射就类似表的结构。我们要向es中存储数据,必须先创建“库”和“表”。 mapping映射属性 mapping是对索引库中文档的约束,常见的mapping属性包括: type:字段数据类型,常见的…...

把握市场潮流,溯源一流品质:在抖in新风潮 国货品牌驶过万重山
好原料、好设计、好品质、好服务……这个2023,“国货”二字再度成为服饰行业的发展关键词。以消费热潮为翼,越来越多代表性品类、头部品牌展现出独特价值,迎风而上,在抖音电商掀起一轮轮生意风潮。 一个设问是:在抖音…...
【网络教程】Python如何优雅的分割URL
文章目录 URL分割方法是一种用于解析URL字符串的方法,它可以将URL分解成不同的组成部分,如协议、域名、端口、路径等。在Python中,我们可以使用urllib.parse模块中的urlsplit方法来实现URL分割。 使用方法 下面是一个简单的示例代码,演示了如何使用urlsplit方法解析URL字符…...

1998-2014年工业企业数据库和绿色专利匹配
1998-2014年工业企业数据库绿色专利匹配 1、时间:1998-2014年 2、样本量:470万 3、来源:工业企业数据库、国家知识产权局、WIPO 4、指标: 企业匹配唯一标识码、组织机构代码、企业名称、年份、法定代表人、法定代表人职务、行…...

Python基于Mirai开发的QQ机器人保姆式教程(亲测可用)
在本教程中,我们将使用Python和Mirai来开发一个QQ机器人,本文提供了三个教学视频,包教包会,本文也很贴心贴了代码和相关文件。话不多说,直接开始教学。 目录 一、安装配置MIrai 图片验证码报错: 二、机器…...
算法笔记:堆
【如无特别说明,皆为最小二叉堆】 1 介绍 2 特性 结构性:符合完全二叉树的结构有序性:满足父节点小于子节点(最小化堆)或父节点大于子节点(最大化堆) 3 二叉堆的存储 顺序存储 二叉堆的有序…...

vue3 判断包含某个字符
<img v-if"node.level 1 && checkIfIncludeSubStr(node.label, 人口)"src"/assets/images/icon-convention-01.png" width"16"class"inlineBlock Vmiddle" style"margin-right: 8px;"/>const data reactive…...
MySQL的故事——查询性能优化
查询性能优化 文章目录 查询性能优化一、查询优化器的提示(hint)二、优化特定类型的查询 一、查询优化器的提示(hint) HIGH_PRIORITY和LOW_PRIORITY 这个提示告诉MySQL,当多个语句同时访问某一个表时,哪些语句的优先级相对高些,哪些相对低些…...

在外SSH远程连接macOS服务器【cpolar内网穿透】
文章目录 前言1. macOS打开远程登录2. 局域网内测试ssh远程3. 公网ssh远程连接macOS3.1 macOS安装配置cpolar3.2 获取ssh隧道公网地址3.3 测试公网ssh远程连接macOS 4. 配置公网固定TCP地址4.1 保留一个固定TCP端口地址4.2 配置固定TCP端口地址 5. 使用固定TCP端口地址ssh远程 …...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...

大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务
通过akshare库,获取股票数据,并生成TabPFN这个模型 可以识别、处理的格式,写一个完整的预处理示例,并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务,进行预测并输…...