kafka入门到精通
文章目录
- 一、kafka概述?
- 1.定义
- 1.2消息队列
- 1.2.1 传统消息队列的使用场景
- 1.2.2 消息队列好处
- 1.2.3 消息队列两种模式
- 1.3 kafka基础架构
- 二、kafka快速入门
- 1.1使用docker-compose安装kafka
- 1.2测试访问kafka-manager
- 1.3 查看kafka版本号
- 1.4 查看zookeeper版本号
- 1.5 扩展kafka的broker
- 1.6 使用kafka
- 1.7 测试生产者和消费者
- 1.8 到zk 中查看节点信息,如下
- 1.9 kafka群起脚本
- 三、kafka架构深入
- 1.kafka数据文件的存储
- 2.kafka生产者
- 2.1 分区策略
- 2.2 数据可靠性保证
- 2.2.1.**副本数据同步策略**
- 2.2.2.ISR队列
- 2.2.3 ack应答机制
- 2.2.4 故障处理细节
- 2.3Exactly Once语义(精准一次)
- 3.kafka消费者
- 3.1消费方式
- 3.2分区分配策略(针对group)
- 1.round robin
- 2.range(默认分区分配策略)
- 3.3offet的维护
- 3.4消费者组案例
- 4.kafka高效读写数据
- 5.zk在kafka中的作用
- 6.kafka事务
- 6.1producer事务
- 6.2comsumer事务(很少聊)
- 四、kafka的API
- 1.producer API
- 1.1消息发送流程
- 1.2.异步发送api
- 1.3.同步发送api
- 2.consumer API
- 3.自定义interceptor
- 五、kafka监控
- **kafka eagle**
- 六、flume对接kafka
- 七、kafka面试题
文章整理自:尚硅谷kafka教程
一、kafka概述?
1.定义
kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
1.2消息队列
1.2.1 传统消息队列的使用场景

1.2.2 消息队列好处
1.解耦
允许程序独立的扩展或两边的处理过程,只要确保它们遵守相同的接口约束
2.可恢复性
*系统一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个消息处理进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
3.缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息远大于消费消息处理速度不一致的问题
4.灵活性(kafka服务扩缩容)&削峰处理
在访问量突增的情况下,应用仍然要继续发挥作用,但这样的突发流量并不常见。
如果因为以处理这类峰值的标准来部署应用则会造成巨大的资源浪费。使用消息队列能够使关键应用顶住突发访问压力,而不会因为突发的超负荷请求使整个服务崩溃。
5.异步通信
很多情况,用户也不需要立即处理消息。消息队列提供了异步处理机制,允许程序把一个消息放入队列,但并不需要立即处理。向消息队列中放入大量消息,在需要的时候程序再去进行处理
1.2.3 消息队列两种模式
点对点模式(一对一,消费者主动拉取数据,消息收到后清除消息)
消息生产者发送消息到queue中,消息消费者从queue中取出消息并消费消息。
消息被消费后,queue中不再存储该消息,所以消费者不可能消费到已经被消费的消息。
queue支持存在多个消费者,但对于一个消息,只会有一个消费者进行消费。

发布/订阅模式(一对多,消费者消费数据后不会清除消息)
消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。
和点对点模式不同,发布到topic的消息会被所有订阅者消费

发布订阅模式也有两种:
1.消费者主动拉取;
缺点:消费者不知道topic中是否有消息,需要定时去轮询,比较浪费资源,所以出现了主动推送的模式
2.消息队列主动推送(如微信公众号)
1.3 kafka基础架构

topic:主题,对消息进行分类
partition:分区,主要提高kafka集群负载均衡,同时也提高了并发量
leader:针对于分区,消费者连接kafa只会连接leader
follower:针对于分区,仅仅数据备份,同一个partition的leader和follower一定不在同一台kafka服务上
comsumer goup:消费组,提高消费能力。多个消费者在同一个group时,消费消息时,一个分区的消息只能被同一个消费组的同一个消费者消费;消费者group中消费者的个数等于topic的partition数时,消费能力最合理,group中消费者数量大于partition分区数时,会造成资源浪费,多余消费者依然无法消费到消息。
zookeeper:管理kafa集群信息,存储消费位置信息(0.9版本前);
如:消费者A需要消费topic-partition0的10条消息,在消费到第5条时挂了,这时候消费进度的信息保存在zk里和内存中;
0.9版本后,offset存储在kafka集群的系统级topic中(默认存储磁盘7天),有kafka集群维护,主动拉取时高并发情况下对zk访问压力较大。
二、kafka快速入门
kafka的jar下载
1.1使用docker-compose安装kafka
version: '3'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.58.100 # 不能通过hostname来配置(消费者和生产者识别不到)KAFKA_CREATE_TOPICS: "test:1:1"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sockkafka-manager:image: sheepkiller/kafka-managerenvironment:ZK_HOSTS: zookeeper:2181ports:- "19000:9000"
1.2测试访问kafka-manager
192.168.253.100:19000

出现以上问题 一般是docker-compose.yml文件中的kafka配置ip地址有误
添加kafka节点

1.3 查看kafka版本号
docker exec kafka_compose-kafka-1 find / -name *kafka_* | head -1 | grep -o ‘\kafka[^\n]*’

1.4 查看zookeeper版本号
docker exec kafka_compose-zookeeper-1 pwd

1.5 扩展kafka的broker
启动时指定kafka的broker数量
#端口一致所以只能启动一个broker,可能需要配置swarm网络。未亲测
docker-compose up --scale kafka=3 -d
查看kafka扩展后的容器名称
docker ps;CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0b32ee5a3744 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32777->9092/tcp, :::32777->9092/tcp kafka_compose-kafka-4
0a655887b672 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32778->9092/tcp, :::32778->9092/tcp kafka_compose-kafka-1
e8fcd6cafa2c sheepkiller/kafka-manager "./start-kafka-manag…" 9 minutes ago Up 9 minutes 0.0.0.0:19000->9000/tcp, :::19000->9000/tcp kafka_compose-kafka-manager-1
402829709dc9 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 9 minutes ago Up 9 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp kafka_compose-zookeeper-1
fbee5d80662b wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32779->9092/tcp, :::32779->9092/tcp kafka_compose-kafka-2
8c78ffde4538 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32780->9092/tcp, :::32780->9092/tcp kafka_compose-kafka-3
1.6 使用kafka
需要进入一个容器
docker exec -it kafka_compose-kafka-1 /bin/sh
创建topic:
kafka-topics.sh --create --topic topic001 --partitions 4 --zookeeper zookeeper:2181 --replication-factor 2
查看topic
#查看一个topic
kafka-topics.sh --list --zookeeper zookeeper:2181 topic001
#查看所有topic
kafka-topics.sh --list --zookeeper zookeeper:2181
删除topic
# 需要配置server.properties的delete.topic.enable=true
kafka-topics.sh --delete --topic topic002 --zookeeper zookeeper:2181
查看刚刚创建的topic的情况,borker和副本情况
kafka-topics.sh --describe --zookeeper zookeeper:2181 topic001
1.7 测试生产者和消费者
(1) 启动消费者客户端
kafka-console-consumer.sh --topic topic001 --bootstrap-server kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092
# --from-beginning 参数会从topic开始的位置消费,如果不指定,不会消费当前时刻之前的信息
启动后控制台不会打印消息,因为没有生产者生产消息。
(2) 启动生产者并且发送消息客户端
kafka-console-producer.sh --topic topic001 --broker-list kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092
现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。
创建kafka集群时,需要勾选启动JMX PORT,broker通讯需要用到

1.8 到zk 中查看节点信息,如下
1.安装prettyZoo可视化软件
prettyZoo可视化安装
2.连接并查看zookeeper情况

1.9 kafka群起脚本
#/bin/bash
case #1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho "***********************$1***************************"ssh $1 "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/mudole/kafka/config/server.properties" done
};;"stop"){for i in hadoop102 hadoop103 hadoop104doecho "***********************$1***************************"ssh $1 "/opt/module/kafka/bin/kafka-server-stop.sh " done
};;esac
kafka无法启动时,主要是看server.log日志
三、kafka架构深入
生产者生产消息时,如主题不存在,默认会创建一个分区和一个副本。在server.properties中进行修改
kafka集群可以保证分区有序,不能保证全局数据有序
1.kafka数据文件的存储
.log文件和.index文件
kafka分区的两个重要文件
0000000000000000.log只存储数据
0000000000000000.index记录消费数据offet
1.log文件默认存储大小1g,超过1个g了,生成新文件讲如何命名?
.index 和 .log 文件的命名规则就是当前文件的最小offset值(偏移量值)
2.怎么快速定位到想要消费的位置?
引入分片和索引机制。
分片规则:log文件1g后会新增分片,每片段都包含一个对应的.log和.index文件
索引机制:index文件中,存储了每条消息的id、起始偏移量和消息的大小。
文件索引原理:index文件通过二分查找找到是哪个消息,通过消息去log文件中找到消息内容
2.kafka生产者
2.1 分区策略
1.分区原因
方便在集群扩展
提高并发,可以以partition为单位进行读写
2.分区原则(3种)

1).在指定partition分区时,直接将数据存放在指定的分区中
2).未指定partition分区时,根据key的值hash后取余topic的分区数,得到存放数据的partition
3).未指定partition分区和key时,第一次调用时随机生成一个整数(后面如果没有指定partition和key时,会在这个整数的基础上自增),根据这个值与topic的partion数量取余得到存放数据的partition。
这是round-robin算法。
2.2 数据可靠性保证
topic的每个partition收到producer发送的数据后,都会向producer发送akc(acknowledgment确认收到),如果producer收到ack,就会发送下一轮数据,否则重复发送
kafka何时发送akc方案:

2.2.1.副本数据同步策略

全同步策略:
5台机器最多容忍4台机器故障,topicA分区下的5个副本(leader+follower)分别位于5台机器上且都是全量数据,5个副本中只要保证1个正常,那么kafka数据就是稳定的。则需要n+1个副本
超半数同步策略:
同样如果要容忍4台机器故障且kafka可用,则至少有一个副本在4台故障后依然是有topicA的全量数据,最坏情况下,这4台故障机器都是半数已经同步的机器,那么为了保证kafka数据稳定且可用需要4*2+1个副本。
kafka最终选择的方案:所有fowoller同步完成后发送ack
1.优点:
解决数据冗余:同样为了容忍n台节点故障,超半数同步策略需要2n+1个副本,而全同步策略只需要n+1个副本,针对kafka的使用场景,每个分区都有大量的数据,第一种方案会造成大量数据冗余
2.缺点:
网络延迟:虽然全量同步方案网络延迟高,但对kafka的影响较小
2.2.2.ISR队列
针对kafka选择全量同步方案时,网络延迟较高的问题,kafka做了优化
ISR队列(in-sync replicats):作用1在生产消息时快速响应、2在leader选举时做了优化
在0.9版本以前:ISR队列中存储的副本取决于两个因素,即同步数据快的follower、同步数据多的follower将会优先存储在ISR队列中,以备leader选举时进行挑选,那些同步慢、同步数据小的副本将会被剔除,不被考虑作为下一任leader
在0.9版本及以后:ISR队列只考同步快的副本进入队列
- 为什么0.9版本后ISR队列取消了follower同步数量的条件?
producer批量发送消息时,如果这个batch数量大于ISR同步的数量,那么久会造成延迟且数据不在范围内,可能就会从ISR中批量剔除副本,会造成频繁的ISR队列进出和zk的写入
现任版本:Leader维护了一个动态的ISR队列,意为和leader保持同步的follower集合。当ISR中的leader存储完producer的消息后,leader会给follower发送ack,如果follower长时间没有从leader同步数据,将会被剔除ISR队列(该时间阈值由replicat.lag.time.max.ms参数设定),一旦收到了ISR队列中所有follower的ACK,该消息就被确认commit了,leader将增加HW并且想producer发送ACK。
leader故障后,会从ISR队列中重新选区leader。
2.2.3 ack应答机制
对于某些不太重要的数据,对数据可靠性不是很高,能够容忍少量数据丢失,所以没有必要等ISR中所有follower全部接收成功。
kafka提供了3中可靠性的级别,用户根据可靠性和延迟的要求进行权衡进行选择
acks参数配置:
acks=0时,producer不等待broker的ack,这种操作延迟最低,broker接收到消息还没写入磁盘就返回,当broker发生故障时有可能丢失数据
acks=1时,producer等待broker的ack,partition的leader落盘成功后返回ack,如果follower在同步成功前leader发生故障有可能丢失数据
acks=-1(all)时,producer等待broker的ack,partition的leader和follower全部落盘成功后才会返回ack。但是如果follower同步完成后,broker返回producer的ack之前,leader发生故障,那么会造成数据重复;当ISR中没有可用的follower时,leader返回ack后此时leader宕机并未将消息同步给其他ISR之外的follower时,就会丢失数据
2.2.4 故障处理细节

LEO:每个副本最大的offset
HW:高水位,是指消费者能见到的最大offset,ISR队列中最小的LEO;保证消费者获取数据一致性;保证副本之间的数据一致性
1.follower故障
follower发生故障后会被临时剔除ISR,等待follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件中高于HW的部分截取掉,从HW开始想leader同步。等该follower的LEO大于等于该topic的HW,即follower追上leader后,就可以重新加入ISR
2.leader故障
leader发生故障后,会从ISR中选举一个新的leader,之后为保证副本之前的一致性,其余的follower会先将各自的log文件中高于HW部分截取掉,然后重新想leader同步数据
这里只能保证副本之间数据的一致性。并不能保证数据不丢失或重复(ack决定)。
2.3Exactly Once语义(精准一次)
将kafkaserver的ack级别设置为-1,可以保证producer到server之间不会丢失数据,即是at least once语义;
相对的将ack级别设置为0,可以保证生产者每条消息只会发送一次,即at most once语义;
at least once可以保证数据不丢失,但是不能保证数据不重复;
at most once可以保证数据不重复,但是不能保证数据不丢失;
但对于一些非常重要的消息,比如说交易数据,下游的消费者要求数据即不能重复也不能丢失,即是Exactly Once语义;
kafka在0.11以前,对此是无能为力,只能保证数据不丢失,在消费者对数据做全局去重。对于多个下游应用的情况下,每个应用都要单独去重,这对性能造成了很大影响。
0.11之后,引入了幂等性;指的是producer无论向server发送多少次重复数据,server端都只会持久化一条。幂等性+at least once = exactly once
3.kafka消费者
3.1消费方式
1.comsumer采用pull的方式从broker拉去数据;pull可以根据消费能力调整消费速率。
2.broker向comsumer push的方式很难适应comsumer,因为push的频率是由broker决定的。push频率过快会comsumer来不及消费,表现为拒绝服务或网络阻塞。
pull不足:kafka没有消息被消费时,会陷入循环中,一直返回空数据,会造成资源浪费。kafka引入了timeout参数,comsumer pull返回空后,在timeout时间之后再去拉去。
3.2分区分配策略(针对group)
一个消费组有多个comsumer,一个topic有多个partition,消费时会涉及到partiion分配问题。
1.round robin
按照消费组来进行轮训分配
使用前提:消费组消费的是一个topic
2.range(默认分区分配策略)
根据topic来进行范围分配,多个主题被消费时,消费者消费数据不对等问题
当comsumer个数发生变化时,都会触发分配策略,重新分配消费分区。
当comsumer个数大于partition个数时,也会触发重新分配,会有多余的comsumer分配不到partition。
3.3offet的维护
comsumer group+topic+partition确定一个offset
3.4消费者组案例
4.kafka高效读写数据
1.顺序写磁盘
2.零拷贝
5.zk在kafka中的作用
controller:broker抢占zk中的controller,谁先来谁就是controller,controller是哪个broker无所谓,kafka的数据都是共享的,只是由这个身份为controller的broker来维护与zk的信息

6.kafka事务
kafka从0.11版本开始支持事务。事务可以保证kafka在exactly once语义的基础上,生产者和消费者可以跨分区和会话,要么全部成功,要么全部失败。
6.1producer事务
为了实现跨分区会话的事务,需要引入一个全局的transactionID(producer客户端自己生成),并且producer的pid和tid绑定,这样当producer重启后就可以通过正在进行的tid来获得原来的pid;
为了管理transaction,kafka引入了一个新的组件transaction coordinator。producer就是通过transaction coordinator交互获得TID对应的任务状态;coordinator还负责将事务所有写入kafka内部的一个topic,这样即使整个服务重启,由于事务状态可以保存,进行中的事务状态可以恢复,从而继续运行。
6.2comsumer事务(很少聊)
上述的事务机制主要是从producer方面去考虑,对于comsumer而言,事务的保证相对较弱,尤其是无法保证commit的信息被精确消费。这是由于comsumer可以通过offset访问任何信息,而且segmentFile的生命周期不同,同一事务的消息可能会出现重启后被删除的情况。(如segmnetFile01刚好7天过期,consumer批量消费时,恰好跨了两个segmentFile,重启后发现segmentFile01刚好过期,则无法读取到过期的数据)
四、kafka的API
1.producer API
1.1消息发送流程
kafka的producer消息时异步发送的。在消息发送过程中涉及到了两个线程main和sender,以及线程共享的一个变量RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉去并发送到broker。

相关参数:
batch.size: 只有数据累计到这个值的时候,sender才会发送数据
linger.ms:当batch.size迟迟没有累计到这个值,到了linger.ms时间是,sender也会发送
1.2.异步发送api
1.导入依赖

2.编写代码
需要用到的类:
KafkaProducer:创建一个生产者对象用来发送数据
ProducerConfig:获取所需的一系列参数
ProducerRecord:每一条消息封装成为一个对象
1.3.同步发送api
2.consumer API
3.自定义interceptor
五、kafka监控
kafka eagle
六、flume对接kafka
七、kafka面试题

1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?
ISR:副本同步队列,速率和leader相差小于10秒的follower集合
OSR:非副本同步队列,速率和leader相差大于10秒的follower集合
AR:所有分区的follower,AR=ISR+OSR
2.Kafka 中的 HW、LEO 等分别代表什么?
HW:高水位,根据同一分区中,最低的LEO所决定,是消费者能见可消费的数据
LEO: 每个副本最高的offset
在leader、follower节点故障后,partition会对HW和每个副本的LEO进行调整
3.Kafka的用途有哪些?使用场景如何?
1.用户追踪:根据用户在web或app上的操作,将这些操作记录到topic中,消费者订阅这些消息做实时的分析和数据挖掘
2.日志收集:通过kafka对各个服务的日志进行收集,再开放给comsumer
3.系统消息:缓存消息
运营指标:记录运营监控数据,搜集操作应用数据的集中反馈,如报错和报告
4.Kafka中是怎么体现消息顺序性的?
每个分区内,每条消息都有offset,所以只能在同一分区内有序;不同的分区无法做到消息的顺序性
5.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
是。超过分区数的消费者是接受不到数据的,只要有消费者接入 就会触发分区分配策略
6.有哪些情形会造成重复消费?或丢失信息?
重复消费:先处理业务后提交offset,可能会造成重复消费
丢失信息:先提交offset再处理业务,可能会造成信息丢失
7.Kafka 分区的目的?
对kafka集群来说,分区做到负载均衡;对于消费者来说,可以提高并发度,提高读取效率
8.Kafka 的高可靠性是怎么实现的?
为了实现高可靠性,kafka使用了订阅模式,并且使用ISR和ack应答机制
能进入ISR中的follower和leadeer之间同步速率相差小于10秒
当ack=0时, producr不等待broker的ack,不管数据有没有写成,都不再重发这个数据
当ack=1时,broker会等leader写完数据后想producer发送ack,但不会等follower同步数据;在follower数据未同步前,leader挂掉,producer会再次发送新的消息到新的leader中,old的leader未同步的消息就会丢失
当ack=all(-1)时,broker会等到leader和isr中的所有follower都同步完成后再想producer发送ack;当follower数据同步完成返回producer ack前,leader挂掉,producer数据重发就会造成数据重复。
9.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
10.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不可以,现有的分区数据难以处理
11.简述Kafka的日志目录结构?
每一个分区对应一个文件夹,命名为topic-0或topic-1,,每个文件夹内有.index文件和.log文件;
.log文件存储数据
.index文件存储.log文件的数据id,起始偏移量和数据大小;通过index
12.如何解决消费者速率低的问题?
增加topic分区的数量、增加消费者个数
13.Kafka的那些设计让它有如此高的性能??
1.kafka是分布式的消息队列
2.对log文件进行了segment,并对segment文件进行索引
3.对于单节点使用了顺序读写,速度可达600M/S
4.引入了零拷贝,在os系统上就能完成读写操作,无需进入kafka应用(用户态)
14.kafka启动不起来的原因?
在关闭kafka时,先关闭了zk,zk中保留了kafka的id信息,会导致kafka下一次启动时报节点已经存在
把zk中的zkdata/version-2的文件删除就可以了
15.聊一聊Kafka Controller的作用?
负责kafka集群上下线工作,所有topic的副本分区分配和leader选举
16.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
1.kafka的controller选举,先到先得
2.partitioin的leader选举,从isr中随机选取
17.失效副本是指什么?有那些应对措施?
失效副本:同步速率比leader相差大于10秒的副本
将失效的副本剔除ISR队列,进入OSR队列
失效副本等于leader同步速率小于10秒后从新进入ISR队列
18.Kafka消息是采用Pull模式,还是Push模式?
在producer阶段,采用的是push模式
在comsumer阶段,采用的是pull模式
comsumer在pull模式下:
优点:
comsumer可以根据自己消费能力调整消费速率,避免了broker push到comsumer时消费不及时而导致的崩溃问题;
缺点:consumer要时不时的去询问broker是否有新数据,容易发生死循环,内存溢出
解决办法:拉去不到数据时,增加下次拉去的时间,有api
19.Kafka创建Topic时如何将分区放置到不同的Broker中?
1.副本数不能超过broker数量
2.第一个分区是controller从broker中随机选取一个,然后其他分区相对0号分区依次向后移,第一个分区是用nextReplicatShift决定,而这个数也是随机产生
20.Kafka中的事务是怎么实现的?☆☆☆☆☆
kafka有两种事务:producer事务和consumer事务
producer事务是为了解决kafka跨分区跨会话的问题
kafka早起版本不能跨分区是因为producer的pid是kafka server根据producer生成的
为了解决这个问题,在java代码中给producer指定id,也就是transaction id,简称TID
我们将TID和PID进行绑定,在producer带着TID和PID第一次想broker注册时,broker会记录TID,并生成一个新的组件_transaction_state用来保存TID的事务状态信息
当producer重启后,就会带着TID和新的PID想broker发起请求,当发现TID一致时,producer就会获取之前的PID,将新的PID覆盖掉,并且去上一次事务的状态信息,从而继续上次的工作;
consumer事务相对于producer的事务相对弱一点,需要先确保consumer的消费和提交位置为一致且具有事务功能,才能保证数据的完成,不然就会造成数据的丢失或重复
21.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
拦截器>序列化器>分区器
拦截器拦截处理无效信息
序列化加密数据
分区分配原则
22.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?

使用了2个线程:main线程和sender线程
main线程会依次经过拦截器、序列化器、分区器、将数据发送到RecordAccumlator(x线程共享变量),再有sender线程从RecordAccumlate中拉取数据并发送到kafka broker,
batch.size:只有数据累计到batch.size后,sender才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender线程等待linger.ms之后发送数据
23.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
答案是:offset+1;测试证明:
生产者发送数据offset是从0开始的:如下

消费者消费的数据offset是从offset+1开始的:如下

24.当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?
1)会在 zookeeper 中的/brokers/topics 节点下创建一个新的 topic 节点,如:/brokers/topics/first
2)触发 Controller 的监听程序
3)kafka Controller 负责 topic 的创建工作,并更新 metadata cache
25.Kafka 有内部的 topic 吗?如果有是什么?有什么所用?
有 __consumer_offsets,保存消费者offset
相关文章:
kafka入门到精通
文章目录一、kafka概述?1.定义1.2消息队列1.2.1 传统消息队列的使用场景1.2.2 消息队列好处1.2.3 消息队列两种模式1.3 kafka基础架构二、kafka快速入门1.1使用docker-compose安装kafka1.2测试访问kafka-manager1.3 查看kafka版本号1.4 查看zookeeper版本号1.5 扩展…...
es-09模糊查询
模糊查询 前缀搜索:prefix 概念:以xx开头的搜索,不计算相关度评分。 注意: 前缀搜索匹配的是term,而不是field。前缀搜索的性能很差前缀搜索没有缓存前缀搜索尽可能把前缀长度设置的更长 语法: GET <ind…...
57 - 深入解析任务调度
---- 整理自狄泰软件唐佐林老师课程 文章目录1. 问题1.1 思考1.2 实例分析:问题分析及解决2. 深入讨论2.1 任务调度的定义2.2 关于调度算法的分类2.3 什么时候进行任务调度2.4 任务的分类2.5 关于优先级调度2.6 问题2.7 调度算法的终极目标2.8 课后扩展1. 问题 系统…...
CAN总线开发一本全(3) - 微控制器集成的FlexCAN外设
CAN总线开发一本全(3) - 微控制器集成的FlexCAN外设 苏勇,2023年2月 文章目录CAN总线开发一本全(3) - 微控制器集成的FlexCAN外设引言硬件外设模块系统概要总线接口单元 - 寄存器清单数据结构 - 消息缓冲区MB初始化过…...
Elasticsearch7.8.0版本进阶——段合并
目录一、段的概述1.1、段的概念1.2、段的缺点1.3、如何解决段数量暴增问题二、段合并的流程三、段合并的注意事项一、段的概述 1.1、段的概念 每一 段 本身都是一个倒排索引。 1.2、段的缺点 由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量…...
Java版贪食蛇游戏
技术:Java等摘要:近年来Java作为一种新的编程语言,以其简单性、可移植性和平台无关性等优点,得到了广泛地应用,特别是Java与万维网的完美结合,使其成为网络编程和嵌入式编程领域的首选编程语言。MyEclipse是…...
2023年度数学建模竞赛汇总
本人7年数学建模竞赛经验,历史获奖率百分之百。团队成员都是拿过全国一等奖的硕博,有需要数模竞赛帮助的可以私信我。 下面主要列几年一些比较有含金量的数学建模竞赛(按比赛时间顺序) 1. 美国大学生数学建模竞赛 报名时间&…...
了解Python语言和版本
1.1 任务1了解Python语言和版本 Python 语言的名字来自于一个著名的电视剧"Monty Pythons Flying Cireus",Python之父 Guido van Rossum是这部电视剧的狂热爱好者,所以把他设计的语言命名为Python。 Python 是一门跨平台、开源、免费的解释型高级动态编…...
nvm (node版本管理工具)安装的详细步骤,并解决安装过程中遇到的问题
1、下载NVM,跳转下载链接后,如下图,下载红框后解压文件 2、安装 注意:双击安装之后,会有两个地址选择, 1、地址中不能存在空格 2、不要放在C盘中,后面需要改个设置文件,安装到C盘的…...
朴素贝叶斯笔记
贝叶斯公式在A 条件成立下,B的概率等于B的概率*在B条件成立下,A的概率/A的概率,推导假设一个学校中男生占总数的60%,女生占总数的40%。并且男生总是穿长裤,女生则一半穿长裤、一半穿裙子。1.正向概率。随机选取一个学生…...
【GUI】用于电动助力车性能分析的GUI(Matlab代码实现)
👨🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…...
Android:反编译apk踩坑/apktool/dex2jar/JDGUI
需求描述 想要反编译apk文件,搜到了这篇博客:Android APK反编译就这么简单 详解(附图),非常有参考价值~但其中的工具下载链接都已404,而本杂鱼实际操作的过程中也出现了亿点点点点点点的问题,于…...
React 跨域的配置
1、为什么会出现跨域? 浏览器遵循同源政策(同源策略三要素:协议相同、域名相同、端口相同) 2、配置跨域代理 使用中间件 http-proxy-middleware(安装依赖) npm install http-proxy-middleware 创建setupP…...
Elasticsearch7.8.0版本进阶——持久化变更
目录一、持久化变更的概述二、事务日志(translog)三、持久化变更完整流程四、事务日志(translog)的作用五、事务日志(translog)的目的一、持久化变更的概述 没有用 fsync 把数据从文件系统缓存刷ÿ…...
CF Edu 127 A-E vp补题
CF Edu 127 A-D vp补题 继续每日一vp,今天晚上有课,时间不太多,回去就直接vp。前三题比较简单,过了之后排名rk2000,然后就去洗澡了。d题没怎么认真思考,其实也可做。最后rk4000。发挥还行,b题罚…...
剑指 Offer 05. 替换空格
摘要 剑指 Offer 05. 替换空格 一、字符替换 由于每次替换从1个字符变成3个字符,使用字符数组可方便地进行替换。建立字符数组地长度为 s 的长度的3倍,这样可保证字符数组可以容纳所有替换后的字符。 获得 s 的长度 length创建字符数组 array&#x…...
通过操作Cortex-A7核,串口输入相应的命令,控制LED灯进行工作
1.通过操作Cortex-A7核,串口输入相应的命令,控制LED灯进行工作 例如在串口输入led1on,开饭led1灯点亮 2.例如在串口输入led1off,开饭led1灯熄灭 3.例如在串口输入led2on,开饭led2灯点亮 4.例如在串口输入led2off,开饭led2灯熄灭 5.例如在串口输入led…...
Python实现某du文库vip内容下载,保存成PDF
前言 是谁,是谁在网页上搜索往年考试卷题答案的时候只能阅读前两页的选择题,是谁在搜几千字的文档资料只能看25%,是谁在百度文库找七找八的时候所有的东西都要付费才能继续看… 我先说 是我自己 我又不经常用,只有偶尔需要看看…...
vue3.0 模板语法
文章目录前言:1. 内容渲染指令1.1 v-text1.2 {{ }}插值表达式1.3 v-html2. 双向绑定指令2.1 v-model2.2 v-model的修饰符3. 属性绑定指令3.1 动态绑定多个属性值3.2 绑定class和style属性4.条件渲染指令4.1 v-if、v-else-if、v-else4.2 v-show4.3 v-if与v-show的区别…...
【GlobalMapper精品教程】054:标签(标注)功能案例详解
同ArcGIS标注一样,globalmapper提供了动态标注的功能,称为标签,本文详解标签的使用方法。 文章目录 一、标签配置二、创建标签图层三、标签图层选项1. 标签字段2. 标签样式3. 标签格式4. 标签语言5. 标签优先级一、标签配置 在配置页面的【矢量显示】→标签选项卡下,有标签…...
wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)
漏洞概览 漏洞名称:Apache Flink REST API 任意文件读取漏洞CVE编号:CVE-2020-17519CVSS评分:7.5影响版本:Apache Flink 1.11.0、1.11.1、1.11.2修复版本:≥ 1.11.3 或 ≥ 1.12.0漏洞类型:路径遍历&#x…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...
2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案
一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...
「Java基本语法」变量的使用
变量定义 变量是程序中存储数据的容器,用于保存可变的数据值。在Java中,变量必须先声明后使用,声明时需指定变量的数据类型和变量名。 语法 数据类型 变量名 [ 初始值]; 示例:声明与初始化 public class VariableDemo {publi…...
前端打包工具简单介绍
前端打包工具简单介绍 一、Webpack 架构与插件机制 1. Webpack 架构核心组成 Entry(入口) 指定应用的起点文件,比如 src/index.js。 Module(模块) Webpack 把项目当作模块图,模块可以是 JS、CSS、图片等…...
多模态学习路线(2)——DL基础系列
目录 前言 一、归一化 1. Layer Normalization (LN) 2. Batch Normalization (BN) 3. Instance Normalization (IN) 4. Group Normalization (GN) 5. Root Mean Square Normalization(RMSNorm) 二、激活函数 1. Sigmoid激活函数(二分类&…...
