大数据开发之kafka(完整版)
第 1 章:Kafka概述
1.1 定义
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
1.2 消息队列
目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大多数场景主要采用Kafka作为消息队列
在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ
1.2.1 传统消息队列的应用场景
1、传统的消费队列的主要应用场景有:缓存/削峰(缓冲)、解耦(少依赖)、异步通信(不必要及时处理)
1)缓存/削峰(缓冲):有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

2)解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵循同样的接口约束。

3)异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后再需要的时候再去处理它们。

1.2.2 消息队列的两种模式
消息队列主要分为两种模式:点对点模式(一个生产者对口一个消费者)和发布/订阅模式(一对多)

1.3 Kafka基础框架

1、Producer:消息生产者,就是向Kafka broker发消息的客户端
2、Consumer:消息消费者,向kafka broker获取消息的客户端
3、Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个broker可以由多个不同的topic,一个topic下的一个分区只能被一个消费者组内的一个消费者所消费;消费者之间不受影响。消费者组是逻辑上的一个订阅者。
4、Broker:一个kafka服务器就是一个broker。一个broker可以容纳多个不同topic
5、Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
6、Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列
7、Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个发你去都有若干个副本,一个leader和若干个follower
8、leader:每个分区副本中的”主“,生产者发送数据的对象,以及消费者消费数据的对象都是leader
9、followeer:每个分区副本中的“从”,实现于leader副本保持同步,在leader发送故障时,称为新的leader
第 2 章:Kafka快速入门
2.1 安装部署
2.1.1 集群部署
2.1.2 集群部署
1、官方下载地址:http://kafka.apache.org/downloads.html
2、上传安装包到102的/opt/software目录下:
[atguigu@hadoop102 software]$ ll
-rw-rw-r--. 1 atguigu atguigu 86486610 3月 10 12:33 kafka_2.12-3.0.0.tgz
3、解压安装包到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
4、进入到/opt/module目录下,修改解压包名为kafka
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0 kafka
5、修改config目录下的配置文件server.properties内容如下
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=102
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
6、配置环境变量
[atguigu@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop102 kafka]$ source /etc/profile
7、分发环境变量文件并source
[atguigu@hadoop102 kafka]$ xsync /etc/profile.d/my_env.sh
==================== hadoop102 ====================
sending incremental file listsent 47 bytes received 12 bytes 39.33 bytes/sec
total size is 371 speedup is 6.29
==================== hadoop103 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.Sd7MUA" failed: Permission denied (13)sent 465 bytes received 126 bytes 394.00 bytes/sectotal size is 371 speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2]
==================== hadoop104 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.vb8jRj" failed: Permission denied (13)sent 465 bytes received 126 bytes 1,182.00 bytes/sec
total size is 371 speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2],
# 这时你觉得适用sudo就可以了,但是真的是这样吗?
[atguigu@hadoop102 kafka]$ sudo xsync /etc/profile.d/my_env.sh
sudo: xsync:找不到命令
# 这时需要将xsync的命令文件,copy到/usr/bin/下,sudo(root)才能找到xsync命令
[atguigu@hadoop102 kafka]$ sudo cp /home/atguigu/bin/xsync /usr/bin/
[atguigu@hadoop102 kafka]$ sudo xsync /etc/profile.d/my_env.sh
# 在每个节点上执行source命令,如何你没有xcall脚本,就手动在三台节点上执行source命令。
[atguigu@hadoop102 kafka]$ xcall source /etc/profile
8、分发安装包
[atguigu@hadoop102 module]$ xsync kafka/
9、修改配置文件的brokerid
分别在hadoop103和104上修改配置文件server.properties中的broker.id=103、broker.id=104
注:broker.id不得重复
[atguigu@hadoop103 kafka]$ vim config/server.properties
broker.id=103
[atguigu@hadoop104 kafka]$ vim config/server.properties
broker.id=104
10、启动集群
1)先启动Zookeeper集群
[atguigu@hadoop102 kafka]$ zk.sh start
2)一次在102、103、104节点启动kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
11、关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
2.1.4 kafka群起脚本
1、脚本编写
在/home/atguigu/bin目录下创建文件kafka.sh脚本文件:
#! /bin/bash
if (($#==0)); thenecho -e "请输入参数:\n start 启动kafka集群;\n stop 停止kafka集群;\n" && exit
ficase $1 in"start")for host in hadoop103 hadoop102 hadoop104doecho "---------- $1 $host 的kafka ----------"ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done;;"stop")for host in hadoop103 hadoop102 hadoop104doecho "---------- $1 $host 的kafka ----------"ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh"done;;*)echo -e "---------- 请输入正确的参数 ----------\n"echo -e "start 启动kafka集群;\n stop 停止kafka集群;\n" && exit;;
esac
2、脚本文件添加权限
[atguigu@hadoop102 bin]$ chmod +x kafka.sh
注意:
停止Kafka集群时,一定要等kafka所有节点进程全部停止后再停止Zookeeper集群。
因为Zookeeper集群当中记录着kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
2.2 Kafka命令行操作

2.2.1 主题命令行操作
1、查看操作主题命令需要的参数
2、重要的参数如下
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 连接kafka Broker主机名称和端口号 |
| –topic | 操作的topic名称 |
| –create | 创建主题 |
| –delete | 删除主题 |
| –alter | 修改主题 |
| –list | 查看所有主题 |
| –describe | 查看主题详细描述 |
| –partitions | 设置主题分区数 |
| –replication-factor | 设置主题分区副本 |
| –config | 更新系统默认的配置 |
3、查看当前服务器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
4、创建一个主题名称为first的topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic first
5、查看topic的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
6、修改分区数(注意:分区数只能增加,不能减少)
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
7、再次查看Topic的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104Topic: first Partition: 1 Leader: 103 Replicas: 103,104,102 Isr: 103,104,102Topic: first Partition: 2 Leader: 104 Replicas: 104,102,103 Isr: 104,102,103
8、删除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2.2.2 生产者命令行操作
1、查看命令行生产者的参数
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
2、重要的参数如下:
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 连接kafka Broker主机名称和端口号 |
| –topic | 操作的topic名称 |
| 3、生产消息 |
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>atguigu atguigu
2.2.3 消费者命令行操作
1、查看命令行消费者的参数
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
2、重要的参数如下:
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 连接kafka Broker主机名称和端口号 |
| –topic | 操作的topic名称 |
| –from-beginning | 从头开始消费 |
| –group | 指定消费者组名称 |
| 3、消费消息 |
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
4、从头开始消费
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
思考:再次查看当前kafka中的topic列表,发现了什么?为什么?
第 3 章:Kafka生产者
3.1 生产者消息发送流程
3.1.1 发送原理
Kafka的Producer发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator。
1、main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。
2、Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

batch.size:只有数据积累到batch size之后,sender才会发送数据。默认16k
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值0ms,表示没有延迟。
0:生产者发送过来的数据,不需要等数据磁盘应答。
1:生产者发送过来的数据,Leader收到数据后应答。
2:-l(all):生产者发送过来的数据,Leader和SR队列里面的所有节点收起数据后应答。-l和all等价。
3.1.2 生产者重要参数列表
| 参数名称 | 描述 |
|---|---|
| bootstrap.servers | 生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其它broker信息。 |
| key.serializer、value.serializer | 指定发送消息的key和value的序列化类型。要写全类名。(反射获取) |
| buffer.memory | RecordAccumulator缓冲区大小,默认32m。 |
| batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
| linger.ms | 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。 |
| acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader数据落盘后应答。-1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1 |
| max.in.flight.requests.per.connection | 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是1-5的数字。 |
| Retries(重试) | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_flight_requests_per_connection=1否则在重试此失败消息的时候,其它的消息可能发送成功了。 |
| retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
| enable.idempotence | 是否开启幂等性,默认true,开启幂等性。 |
| compression.type | 生产者发送的所有数据的压缩方式。默认是none,不压缩。支持压缩类型:none、gzip、snappy、lz4和zstd。 |
3.2 异步发送API
3.2.1 普通异步发送
1、需求:创建Kafka生产者,采用异步的方式发送到Kafka broker
2、异步发送流程如下:

3、代码编写
1)创建工程kafka-demo
2)导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>
3)创建包名:com.atguigu.kafka.producer
4)编写代码:不带回调函数的API
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class CustomProducer {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put("bootstrap.servers","hadoop102:9092");// key,value序列化properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));}// 5. 关闭资源kafkaProducer.close();}
}
5)测试:
在hadoop102上开启kafka消费者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
在IDEA中执行上述代码,观察hadoop102消费者输出
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
kafka3
……
3.2.2 带回调函数的异步发送
1、回调函数callback()会在producer受到ack时调用,为异步屌用。
该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
1)如果Exception为null,说明消息发送成功。
2)如果Exception不为null,说明消息发送不成功。
2、带回掉函数的异步调用发送流程

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
3、编写代码:带回调函数的生产者
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put("bootstrap.servers", "hadoop102:9092");properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须)properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i), new Callback() {// 该方法在Producer收到ack时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) // 没有异常,输出信息到控制台System.out.println("主题"+recordMetadata.topic() +", 分区:"+recordMetadata.partition()+", 偏移量:"+recordMetadata.offset());}});}// 5. 关闭资源kafkaProducer.close();}
}
4、测试
1)在hadoop102上开启kafka消费者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
2)在IDEA中执行代码,观察hadoop102消费者输出
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……
3)在IDEA控制台观察回调函数
主题first, 分区:0, 偏移量:10
主题first, 分区:0, 偏移量:11
主题first, 分区:0, 偏移量:12
主题first, 分区:0, 偏移量:13
主题first, 分区:0, 偏移量:14
主题first, 分区:0, 偏移量:15
主题first, 分区:0, 偏移量:16
主题first, 分区:0, 偏移量:17
主题first, 分区:0, 偏移量:18
主题first, 分区:0, 偏移量:19
……
3.3 同步发送API
1、同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要调用Future对象的get方法即可。
2、同步发送流程示意图如下:

3、编写代码:同步发送消息的生产者
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class ConsumerProducerSync {public static void main(String[] args) throws InterruptedException, ExecutionException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息//properties.put("bootstrap.servers","hadoop102:9092");properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value序列化(必须)properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 同步发送kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
4、测试
1)在hadoop102上开启kafka消费者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
2)在IDEA中执行代码,观察102消费者的消费情况
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……
3.4 生产者分区
3.4.1 分区的原因
1、便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块的数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者分区策略
1、默认分区器DefaultPartitioner
The default partitioning strategy:
·If a partition is specified in the record, use it
·If no partition is specified but a key is present choose a partition based on a hash of the key
·If no partition or key is present choose the sticky partition that changes when the batch is full.
public class DefaultPartitioner implements Partitioner {
… …
}
2、使用:
1)我们需要将producer发送的数据封装成一个ProducerRecord对象。
2)上述的分区策略,我们在ProducerRecord对象中进行配置。



3)策略实现
| 代码 | 解释 |
|---|---|
| ProducerRecord(topic,partition_num,…) | 指明partition的情况下直接发往指定的分区,key的分配方式将无效 |
| ProducerRecord(topic,key,value) | 没有指明partition值但有key的情况下:将key的hash值与topic的partition个数进行取余得到分区号 |
| ProducerRecord(topic,value) | 既没有partition值又没有key值得情况下:kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区(绝对不会是上一个)进行使用。 |
| 3、案例: | |
| 1)案例1:将数据发送到指定partition的情况下,如:将所有消息发送到分区1中。 |
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 4. 造数据for (int i = 0; i < 5; i++) {// 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数)kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e == null){System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}
2)测试:
(1)在hadoop102上开启kafka消费者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
(2)在IDEA中执行代码,观察hadoop102上的消费者消费情况
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu1
kafka2
……
(3)观察IDEA中控制台输出
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
3)案例2:没有指明partition但是有key的情况下的消费者分区分配
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class CustomProducerCallbackKey {public static void main(String[] args) {// 1. 创建配置对象Properties properties = new Properties();// 2. 配置属性properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 4. 造数据for (int i = 1; i < 11; i++) {// 创建producerRecord对象final ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", i + "",// 依次指定key值为i"atguigu " + i);kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e == null){System.out.println("消息:"+producerRecord.value()+", 主题:" + metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}
4)测试
观察IDEA中控制台输出
消息:atguigu 1, 主题:first->分区:0
消息:atguigu 5, 主题:first->分区:0
消息:atguigu 7, 主题:first->分区:0
消息:atguigu 8, 主题:first->分区:0
消息:atguigu 2, 主题:first->分区:2
消息:atguigu 3, 主题:first->分区:2
消息:atguigu 9, 主题:first->分区:2
消息:atguigu 4, 主题:first->分区:1
消息:atguigu 6, 主题:first->分区:1
消息:atguigu 10, 主题:first->分区:1
3.4.3 自定义分区器
1、生产环境中,我们往往需要更加自由的分区需求,我们可以自定义分区器。
2、需求:在上面的根据key分区案例中,我们发现与我们知道的hash分区结果不同。那么我们就实现一个。
3、实现步骤:
1)定义类,实现Partitioner接口
2)重写partition()方法
4、代码实现
package com.atguigu.kafka.partitioner;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;/*** @author leon* @create 2020-12-11 10:43* 1. 实现接口Partitioner* 2. 实现3个方法:partition,close,configure* 3. 编写partition方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 分区方法**/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 1. 获取keyString keyStr = key.toString();// 2. 创建分区号,返回的结果int partNum;// 3. 计算key的hash值int keyStrHash = keyStr.hashCode();// 4. 获取topic的分区个数int partitionNumber = cluster.partitionCountForTopic(topic);// 5. 计算分区号partNum = Math.abs(keyStrHash) % partitionNumber;// 4. 返回分区号return partNum;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}
5、测试
在生产者代码中,通过配置对象,添加自定义分区器
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," com.atguigu.kafka.partitioner.MyPartitioner ");
在hadoop102上启动kafka消费者
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
在IDEA中观察回调信息
消息:atguigu 2, 主题:first->分区:2
消息:atguigu 5, 主题:first->分区:2
消息:atguigu 8, 主题:first->分区:2
消息:atguigu 1, 主题:first->分区:1
消息:atguigu 4, 主题:first->分区:1
消息:atguigu 7, 主题:first->分区:1
消息:atguigu 10, 主题:first->分区:1
消息:atguigu 3, 主题:first->分区:0
消息:atguigu 6, 主题:first->分区:0
消息:atguigu 9, 主题:first->分区:0
3.5 生产经验-生产者如何提高吞吐量
3.5.1 吞吐量

3.5.2 实例
1、编写代码
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu" + i));}// 5. 关闭资源kafkaProducer.close();}
}
2、测试:
1)在hadoop102上开启kafka消费者
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
2)在IDEA中执行代码,观察hadoop102上的消费者消费情况
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu0
atguigu0
……
3.6 生产经验-数据可靠性
1、回顾消费发送流程

2、ack应答机制

3、ack应答级别

背景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Kafka提供的解决方案:ISR队列
1)Leader维护了一个动态的in-sync replica set(ISR)和leader保持同步的follower集合。
2)当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。
3)如果follower长时间(replica.lag.time.max.ms)未向leader同步数据,则该follower将被提出ISR。
Leader发生故障之后,就会从ISR中选举新的leader。
ack应答级别
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
| acks=0 | 这一操作提供了一个最低的延迟,partition的leader副本接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据 |
|---|---|
| acks=1 | partition的leader副本落盘后返回ack,如果在follower副本同步数据之前leader故障,那么将对丢失数据 |
| acks=-1 | partition的leader和follower副本全部落盘成功后才返回ack。但是如果在follower副本同步完成后,leader副本所在节点发送ack之前,leader副本发送故障,那么会造成数据重复 |
4、ack应答机制

5、案例
代码编写:
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数retries,默认是int最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}
3.7 生产经验-数据去重
3.7.1 数据传递语义
至少一次(At Least Once)=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
最多一次(At More Once)=ACK级别设置为0
总结:
1)At Least Once可以保证数据不丢失,但是不能保证数据不重复
2)At More Once可以保证数据不重复,但是不能保证数据不丢失
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
3.7.2 幂等性
1、幂等性原理:
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once)=幂等性+至少一次(ack=-1+分区副本数>=2+ISR最小副本数量>=2)。
重复数据的判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是producer每次重启都会分配一个新的:Partition表示分区号;SequenceNumber是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。

2、开启幂等性
在producer的配置对象中,添加参数enable.idempotence,参数值默认为true,设置为false就关闭了。
3.7.3 生产者事务
1、kafka事务原理

2、事务代码流程
// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
3.8 生产经验-数据有序

3.9生产经验-数据乱序
1、kafka在1.x版本之前保证单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
2、kafka在1.x及以后版本保证数据单分区有序,条件如下:
1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1
2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务器会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

第 4 章:Kafka Broker
4.1 Kafka Broker工作流程
4.1.1 Zookeeper存储的Kafka的信息
1、查看zookeeper中的kafka节点所存储的信息
启动Zookeeper客户端
[atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkCli.sh
通过ls命令列出kafka节点内容
[zk: localhost:2181(CONNECTED) 2] ls /kafka
2、zookeeper中存储的kafka信息
在zookeeper的服务端存储的Kafka相关信息:
1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器
2)/kafka/brokers/topics/first/partitions/0/state {“leader”.1,“isr”:[1,0,2]} 记录谁是leader,有哪些服务器可用
3)/kafka/controller {“brokerid”:0} 辅助选举Leader

4.1.2 Kafka Broker总体工作流程
1、Kafka Broker工作流程图示

2、案例
1)案例内容:模拟kafka上下线,查看zookeeper中数据变化
2)查看kafka节点相关信息:
(1)查看zookeeper上的kafka集群节点信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 103, 104]
(2)查看当前kafka集群节点中的controller信息
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller
{"version":1,"brokerid":103,"timestamp":"1637292471777"}
(3)查看kafka中的first主题的0号分区的状态
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,103,104]}
3)模拟kafka下线:停止hadoop103上的kafka
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
4)查看kafka相关节点信息
(1)查看zookeeper上的kafka集群节点信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 104]
(2)查看当前kafka集群节点中的controller信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/controller
{"version":1,"brokerid":102,"timestamp":"1637292471777"}
(3)查看kafka中的first主题的0号分区的状态
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/partitions/0/state
{"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,104]}
5)重新启动hadoop103上的kafka服务
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
6)再次查看上述节点,观察区别变化
4.1.3 Broker重要参数
| 参数名称 | 描述 |
|---|---|
| replica.lag.time.max.ms | ISR中的Follower超过该事件阈值(默认30s)未向leader发送同步数据,则该Follower将被提出ISR |
| auto.leader.rebalance.enable | 默认是true。自动Leader Partition平衡 |
| leader.imbalance.per.broker.percentage | 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡 |
| leader.imbalance.check.interval.seconds | 默认值300秒。检查leader负载是否平衡的间隔时间 |
| log.index.interval.bytes | 默认4kb,kafka里面每当写入了4kv大小的日志(.log),然后就往index文件里面记录一个索引 |
| log.retention.hours | Kafka中数据保存的时间,默认7天 |
| log.retention.minutes | Kafka中数据保存的时间,分钟级别,默认关闭 |
| log.retention.ms | Kafka中数据保存的时间,毫秒级别,默认关闭(优先级最高) |
| log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是5分钟 |
| log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment |
| log.cleanup.policy | 默认是delete,表示所有数据启用删除策略;如果设置值未compact,表示所有数据启用压缩策略 |
| num.io.threads | 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50% |
| num.replica.fetchers | 副本拉取线程数,这个参数占总核数的50%的1/3 |
| num.network.threads | 默认是3。数据传输线程数,这个参数占总核数的50%的1/3 |
| log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是Max(long)(9223372036854775807)。一般交给系统管理 |
| log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理 |
4.2 Kafka 副本
4.2.1 kafka副本的基本信息
| kafka副本作用 | 提供数据可靠性 |
|---|---|
| kafka副本个数 | 默认1个,生产环境中一般配置为2个,保证数据可靠性;但是过多的副本会增加磁盘存储空间、增加网络数据传输、降低kafka效率 |
| kafka副本角色 | 副本角色分为leader和follower。kafka生产者只会把数据发送给leader,follower会主动从leader上同步数据 |
| kafka中的ar | 是所有副本的统称(Assigned repllicas),ar=isr+osr。isr:表示和leader保持同步(默认30s)的follower集合。osr:表示follower与leader副本同步时,延迟过多的副本 |
4.2.2 leader选举过程
1、Kafka controller:
kafka集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线、所有topic的分区副本分配和Leader选举等工作。Controller的信息同步工作是依赖于Zookeeper的。
2、kafka分区副本的选举流程

1)各个broker启动并注册到zookeeper的/brokers/ids路径下
2)选择一个controller,controller是kafka集群中的一个broker,负责管理partition leader的选举
3)controller监控broker的上下线状态
4)当controller检测到leader broker宕机时,会触发leader选举
5)controller更新zookeeper中的/brokers/topics/~/state路径,写入新的leader信息
6)其它broker检测到controller的信息变更,更新本地的元数据
7)新的leaderbroker开始服务,接收读写请求
8)controller同时更新isr,即与leader数据保持同步的副本信息
9)如果controller自身宕机,将会有新的broker被选举为controller
10)选举过程中会考虑isr列表中的broker,确保数据的一致性和可靠性
11)完成leader选举和isr更新后,集群恢复到正常状态
3、案例
1)查看first的详细信息,注意观察副本分布情况
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: aUFTM5wES7eSBiuSKT0UpA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102
2)停掉103上的kafka进程
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
3)再次查看first的相应信息,观察副本分布
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: aUFTM5wES7eSBiuSKT0UpA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 102 Replicas: 102,104,103 Isr: 102,104Topic: first Partition: 1 Leader: 102 Replicas: 103,102,104 Isr: 102,104Topic: first Partition: 2 Leader: 104 Replicas: 104,103,102 Isr: 104,102
4)处理分区leader分布不均匀问题
[atguigu@hadoop102 kafka]$ bin/kafka-leader-election.sh --bootstrap-server hadoop102:9092 --topic first --election-type preferred --partition 0
[atguigu@hadoop102 kafka]$ bin/kafka-leader-election.sh --bootstrap-server hadoop102:9092 --topic first --election-type preferred --partition 1
[atguigu@hadoop102 kafka]$ bin/kafka-leader-election.sh --bootstrap-server hadoop102:9092 --topic first --election-type preferred --partition 2
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: aUFTM5wES7eSBiuSKT0UpA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 102,104,103Topic: first Partition: 2 Leader: 104 Replicas: 104,103,102 Isr: 104,102,103
###4.2.3 leader和follower故障处理细节
1、follower故障处理细节(被踢-重连-追上hw-连接成功)

1)Follower发生故障后会被临时踢出isr
2)这个期间leader和follower继续接收数据
3)待该follower恢复后,follower会读取本地磁盘记录上次的hw,并将log文件高于hw的部分截取掉,从hw开始向leader进行同步
4)等该follower的leo大于等于该partition的hw,即follower追上leader之后,就可以重新加入isr了
2、leader故障处理细节(从isr队列读取ar中靠前的节点选为leader,新leader短则follower“剪”,反之则向leader同步)

1)leader发生故障后,会从isr中选出一个新的leader
2)为保证对各副本之间的数据一致性,其余的follower会先将各自的log文件高于hw的部分截掉,然后从新的leader同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
关键词:
leo:指的是每个副本最大的offset
hw:指的是消费者能见到的最大的offset,isr队列中最小的leo
4.3 文件存储
4.3.1 文件存储机制
1、topic数据的存储机制
2、查看文件存储
1)查看hadoop102的kafka文件存储
[atguigu@hadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata
2)直接查看log日志,是乱码需要使用工具查看
[atguigu@hadoop104 first-1]$ cat 00000000000000000092.log
\CYnF|©|©ÿÿÿÿÿÿÿÿÿÿÿÿÿÿ"hello world
3)通过工具查看
查看index文件
[atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 3 position: 152
查看log文件
[atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping datas/first-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1636338440962 size: 75 magic: 2 compresscodec: none crc: 2745337109 isvalid: true
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 75 CreateTime: 1636351749089 size: 77 magic: 2 compresscodec: none crc: 273943004 isvalid: true
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 152 CreateTime: 1636351749119 size: 77 magic: 2 compresscodec: none crc: 106207379 isvalid: true
baseOffset: 4 lastOffset: 8 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 229 CreateTime: 1636353061435 size: 141 magic: 2 compresscodec: none crc: 157376877 isvalid: true
baseOffset: 9 lastOffset: 13 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 370 CreateTime: 1636353204051 size: 146 magic: 2 compresscodec: none crc: 4058582827 isvalid: true
3、index文件和log文件详解
1)详解

segment文件:kafka的日志被分割为多个segment,每个segment由一组连续的消息组成。例如,segment-0包含偏移量0到521的消息,segment-1包含偏移量522到1004的消息。
索引文件(.index):每个segment都有一个索引文件,用于快速查找特定偏移量的消息。这个文件包含一系列的索引项,每个索引项包含一个偏移量和对应日志文件中的位置(字节数)。
日志文件(.log):实际存储消息记录的文件。每条消息都有一个基准偏移量(baseOffset)和最后一个偏移量(lastOffset),以及它在文件中的位置(position)
2)日志存储参数配置
| 参数 | 描述 |
|---|---|
| log.segment.bytes | kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小,默认值1G |
| log.index.interval.bytes | 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。稀疏索引 |
4.3.2 文件清理策略
1、kafka数据文件保持时间:默认是7天
2、kafka数据文件保持可通过如下参数修改
1)log.retention.hours:最低优先级小时,默认7天
2)log.retention.minutes:分钟
3)log.retention.ms:最高优先级毫秒
4)log.retention.check.interval.ms:负责设置检查周期,默认5分钟
3、那么一旦查过了设置的时间就会采取清理策略,清理策略有两种:delete和compact
1)delete策略
delete日志删除:将过去数据删除。
配置:log.cleanup.policy=delete
基于时间:默认打开,以segment中所有记录中的最大时间戳作为文件时间戳
基于大小:默认关闭,超过设置的所有日志大小,删除最早的segment
log.retention.bytes,默认等于-1,表示无穷大
2、compact日志策略

4.4 高效读写数据
1、kafka本身是分布式集群,可以采用分区技术,并行度高
2、读数据采用稀疏索引,可以快速定位要消费的数据
3、顺序写磁盘
kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官方有数据表明,同样的磁盘,顺序写能到600MB/s,而随机写只有100K/s。中间省去大量磁头寻址的时间。

4、页存储+零拷贝技术
零拷贝:Kafka Broker应用层不关心存储的数据,所以就不走应用层,传输效率高。
PageCache页缓存:当上层有写操作的时候,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。

第 5 章:Kafka消费者
5.1 Kafka消费方式
Kafka的consumer采用pull(拉)模式从broker中读取数据
| push(推)模式 | 很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞 |
|---|---|
| pull模式 | 可以根据consumer的消费能力以适当的速率消费消息。不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka的消费者在消费数据时会传入一个时常参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时常即为timeout |
5.2 Kafka消费者工作流程
5.2.1 消费者总体工作流程

5.2.2 消费者组原理
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
1)消费者组内每个消费者负责消费不同分区的数据,一个分区由一个组内消费者消费。
2)消费者者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

注意:
如果向消费者组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置。不会接收任何消息。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组初始化流程

消费者组详细消费流程

5.2.3 消费者重要参数
| 参数名称 | 描述 |
|---|---|
| bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
| key.deserializer、value.deserializer | 指定接收消息的key和value的反序列化类型。要写全类名。 |
| group.id | 标记消费者所属的消费者组。 |
| enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
| auto.commit.interval.ms | 若enable.auto.commit=true, 表示消费者提交偏移量频率,默认5s。 |
| auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
| offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
| heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 |
| session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
| max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
| fetch.min.bytes | 默认1个字节。消费者获取服务器端一批消息最小的字节数。 |
| etch.max.wait.ms | 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
| fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
| max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条。 |
5.3 消费者API
5.3.1 独立消费者案例(订阅主题)

1、需求:创建一个独立消费者,消费first主题中的数据
注意:在消费者API代码中必须配置消费者组id
2、案例
1)创建包名:com.atguigu.kafka.consumer
2)编写代码
package com.atguigu.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 4. 订阅主题ArrayList<String> topics= new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 5. 拉取数据打印while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 6. 遍历并输出消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
3)测试
(1)在IDEA中执行消费者程序
(2)hadoop102中创建kafka生产者,并输入数据
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello
(3)在IDEA中观察接收到的数据
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 3, offset = 0, CreateTime = 1629160841112, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
5.3.2 消费者组案例
1、需求:观察消费者组消费数据时的如下特点
1)消费者组中的消费者消费不同分区的数据
2)消费者数量大于Topic的分区数时,会在消费者消费不到数据
2)每当消费者组内成员数发生变化时,就会进行主题分区对消费者的重分配
2、案例
1)添加日志框架jar
配置日志框架,可以将kafka的运行日志打印在控制台
<dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.0</version>
</dependency>
2)创建主题
创建一个只有两个分区的topic,topic的名字为“groupTest01”
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 2 --topic groupTest01
3)编辑消费者
复制两份基础消费者的代码,消费者组改为“group01”,订阅主题修改为“groupTest01”,在idea中同时启动,即开启同一个消费者组中的两个消费者。
package com.atguigu.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 4. 订阅主题ArrayList<String> topics= new ArrayList<>();topics.add("groupTest01");consumer.subscribe(topics);// 5. 拉取数据打印while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 6. 遍历并输出消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
4)启动生产者后,观察控制台输出,先启动一个消费者customconsumer01后,观察控制台输出
package com.atguigu.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 4. 订阅主题ArrayList<String> topics= new ArrayList<>();topics.add("groupTest01");consumer.subscribe(topics);// 5. 拉取数据打印while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 6. 遍历并输出消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
5)再启动一个消费者customconsumer02后,观察控制台输出
// CustomConsumer01控制台输出
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group01-1-44ba7ef2-e39b-4d3d-a78f-0c1afeceb871', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0])
Adding newly assigned partitions: groupTest01-0// CustomConsumer02控制台输出
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group01-1-acc35e3e-371b-4702-8b9b-376c6144e676', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1])
Adding newly assigned partitions: groupTest01-1
5.4 生产经验-分区分配策略及再平衡
分区的分配以及再平衡
1、一个consumer group中有多个consumer,一个topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。
2、kafka有四种主流的分区分配策略:range、roundrobin、sticky、cooperativesticky
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是range+cooperativesticky。kafka可以同时使用多个分区分配策略。

| 参数名称 | 描述 |
|---|---|
| heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。 |
| session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
| max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
| partition.assignment.strategy | 消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky |
5.4.1 生产者分区分配之Range及再平衡
1、range分区策略原理
range是对每个topic而言的。
首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假如现在又7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排完后将会是c0、c1、c2。
通过partitions数/consumer数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区。
例如,7/3=2余1,除不尽,那么消费者c0便会多消费1个分区。
注意:如果只是针对1个topic而言,c0消费者多消费1个分区影响不是很大。但是如果在n多个topic,那么针对每个topic,消费者c0都将多消费1个分区,topic越多,c0消费的分区会比其它消费者明显多消费n个分区。

2、range分区分配策略及再平衡案例
1)准备
(1)修改主题grouptest01的分区为7个分区
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic groupTest01--partitions 7
(2)创建3个消费者,并组成消费者组group02,消费主题grouptest01
(3)观察分区分配情况
启动第一个消费者,观察分区分配情况
// consumer01
Successfully synced group in generation Generation{generationId=1, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
启动第二个消费者,观察分区分配情况
// consumer01
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0
// consumer02
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group02-1-60afd984-6916-4101-8e72-ae52fa8ded6c', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5, groupTest01-4
启动第三个消费者,观察分区分配情况
// consumer01
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-1, groupTest01-0
// consumer02
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group02-1-60afd984-6916-4101-8e72-ae52fa8ded6c', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-4])
Adding newly assigned partitions: groupTest01-3, groupTest01-4
// consumer03
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group02-1-fd15f50f-0a33-4e3a-8b8c-237252a41f4d', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5
5.4.2 生产者分区分配之roundrobin策略及再平衡
1、roundrobin分区分配策略原理
roundrobin针对集群中所有topic而言。
roundrobin轮询分区策略,是把所有partition和所有consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。

2、roundrobin分区分配策略及再平衡案例
案例:
修改消费者代码,消费者组都是group03.
修改分区分配策略为roundobin
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
依次启动3个消费者,观察控制台输出
启动消费者customconsumer01,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=1, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
启动消费者customconsumer02,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-2, groupTest01-4, groupTest01-6])
Adding newly assigned partitions: groupTest01-2, groupTest01-0, groupTest01-6, groupTest01-4
// customConsumter02
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group03-1-5771a594-4e99-47e8-9df6-ca820af6698b', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1, groupTest01-3, groupTest01-5])
Adding newly assigned partitions: groupTest01-3, groupTest01-1, groupTest01-5
启动消费者customconsumer03,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-3, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-0, groupTest01-6
// customConsumter02
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group03-1-5771a594-4e99-47e8-9df6-ca820af6698b', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1, groupTest01-4])
Adding newly assigned partitions: groupTest01-1, groupTest01-4
// customConsumter03
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group03-1-d493b011-d6ea-4c36-8ae5-3597db635219', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-2, groupTest01-5])
Adding newly assigned partitions: groupTest01-2, groupTest01-5
5.4.3 生产者分区分配之sticky及再平衡
1、粘性分区定义:
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。粘性分区是kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
2、sticky分区分配策略及再平衡案例
1)修改分区分配策略,修改消费者组为grouptest03
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
2)分别启动三个消费者后,查看第三次分配如下
启动消费者customconsumer01,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=1, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
启动消费者customconsumer02,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0
// customConsumer02
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5, groupTest01-4
启动消费者customconsumer03,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-1, groupTest01-0
// customConsumer02
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5])
Adding newly assigned partitions: groupTest01-5, groupTest01-4
// customConsumer03
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group06-1-eb0ac20c-5d94-43a7-b7c1-96a561513995', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-6
3)杀死消费者~01,观察控制台输出
// customConsumer02
Successfully synced group in generation Generation{generationId=4, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-0, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-0, groupTest01-5, groupTest01-4
// customConsumer03
Successfully synced group in generation Generation{generationId=4, memberId='consumer-group06-1-eb0ac20c-5d94-43a7-b7c1-96a561513995', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-6, groupTest01-1])
Adding newly assigned partitions: groupTest01-3, groupTest01-1, groupTest01-6
5.5 offset位移
5.5.1 offset的默认维护位置

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
kafka0.9版本之前,consumer默认将offset保存在zookeeper,从0.9版本开始,consumer默认将offset保存在kafka一个内置的topic中,该topic为__consumer_offsets。
在consumer_offsets主题里面采用key+value的方式存储数据。key是groupid+topic+分区号,value是当前offset的值。每个一段时间,kafka内部就会对这个topic进行compact(压实),即每个groupid+topic+分区号就保留最新的数据。
1、消费offset案例
1)设计思想:
__consumer_offsets为kafka中的topic,那就可以通过消费者进行消费。
2)在配置文件config/consumer.properties中添加配置exclude.internal.topics=false,默认就是true,表示不能消费系统主题。我们为了查看系统主题数据,需要将参数修改为false。
[atguigu@hadoop102 kafka]$ vim /opt/module/kafka/config/consumer.properties
exclude.internal.topics=false
3)在命令行创建一个新的topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --topic atguigu --bootstrap-server hadoop102:9092 --partitions 2 --replication-factor 2
4)启动生产者向主题atguigu中生产数据
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
5)启动消费者消费主题atguigu中生产数据
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic atguigu --group test
注意:指定消费者组的名称,能够更好的观察数据存储位置(key->groupid+topic+分区号)
6)启动消费者消费主题__consumer_offsets
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
7)消费到的数据
[test,atguigu,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
[test,atguigu,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
5.5.2 自动提交offset
1、自动提交offset图示
为了使我们能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
1)enable.auto.commit:是否开启自动提交offset功能,默认是true
2)auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

2、编写代码
1)需要用到的类
kafkaconsumer:需要创建一个消费者对象,用来消费数据
consumerconfig:获取所需的一系列配置参数
consumerrecord:每条数据都要封装成一个consumerrecord对象
为了使我们能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。
2)消费者自动提交offset
package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1. 创建kafka消费者配置类Properties properties = new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 是否自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 提交offset的时间周期,默认5s,properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 3. 创建kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList("first"));// 5. 消费数据while (true){// 6. 读取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 7. 输出消息for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}
5.5.3 手动提交offset
1、手动提交offset图示

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此kafka还提供了手动提交offset的api。
手动提交offset的方法有两种:分别是commitsync(同步提交)和commitasync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前进程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
1)commitsync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
2)commitasync(异步提交):发送完提交offset请求后,就开始消费下一批数据。
同步提交offset
由于同步提交offset有失败重试机制,故更加可靠
package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;public class CustomConsumerByHand {public static void main(String[] args) {// 1. 创建kafka消费者配置类Properties properties = new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 是否自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 提交offset的时间周期properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 3. 创建kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList("first"));// 5. 消费数据while (true){// 6. 读取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 7. 输出消息for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}
// 同步提交offsetconsumer.commitSync();}}
}
异步提交offset
同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;public class CustomConsumerByHand {public static void main(String[] args) {// 1. 创建kafka消费者配置类Properties properties = new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 是否自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 提交offset的时间周期properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 3. 创建kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList("first"));// 5. 消费数据while (true){// 6. 读取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 7. 输出消息for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}// 异步提交offsetconsumer.commitAsync(new OffsetCommitCallback() {/*** 回调函数输出* @param offsets offset信息* @param exception 异常*/@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {// 如果出现异常打印if (exception != null ){System.err.println("Commit failed for " + offsets);}}});}}
}
5.5.4 指定offset消费

当kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
1、earliest:自动将偏移量重置为最早的偏移量
2、latest(默认值):自动将偏移量重置为最新偏移量
3、none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
5.5.5 数据漏消费和重复消费分析
1、问题:无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费
2、漏消费:先提交offset后消费,有可能造成数据的漏消费
3、重复消费:而先消费后提交offset,有可能会造成数据的重复消费

5.6 生产经验之Consumer事务
1、消费者事务
如果想完成Consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如mysql)。者部分知识会再后续项目部分涉及。

5.7 数据积压(消费者如何提高吞吐量)
1、如果是kafka消费能力不足,则可以考虑增加topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。

2、如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数量过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

| 参数名称 | 描述 |
|---|---|
| fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
| max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条 |
第 6 章:kafka-eagle监控
kafka-eagle框架可以监控kafka集群的整体运行情况,再生产环境中经常使用。
6.1 kafka准备
1、关闭kafka集群
[atguigu@hadoop102 kafka]$ kafka.sh stop
2、修改kafka启动命令
[atguigu@hadoop102 kafka]$ vim /opt/module/kafka/bin/kafka-server-start.sh
……
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
注意:修改之后在启动kafka之前要分发之其它节点
6.2 安装kafka-eagle
1、上传压缩包kafka-~bin.tar.gz到集群/opt/software目录
[atguigu@hadoop102 software]$ ll
-rw-rw-r--. 1 atguigu atguigu 81074069 4月 19 20:07 kafka-eagle-bin-2.0.8.tar.gz
2、将jar包解压到本地
[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
3、进入刚才解压的目录,再次将jar包解压到/opt/module/目录下
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
4、修改kafka-eagle名称为eagle
[atguigu@hadoop102 module]$ mv efak-web-2.0.8/ efak
6.3 配置eagle
1、修改eagle的配置文件system-config.properties
[atguigu@hadoop102 efak]$ vim conf/system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123######################################
# broker size online list
######################################
cluster1.efak.broker.size=20######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32######################################
# EFAK webui port
######################################
efak.webui.port=8048######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=######################################
# kafka sqlite jdbc driver address
######################################
# 配置mysql连接
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=你的密码######################################
# kafka mysql jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=你的密码
6.4 添加环境变量
[atguigu@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh# kafkaEfak
export KE_HOME=/opt/module/efak
export PATH=$PATH:$KE_HOME/bin
注意:source /etc/profile
[atguigu@hadoop102 kafka]$ source /etc/profile
6.5 启动eagle
[atguigu@hadoop102 efak]$ bin/ke.sh start
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.202.102:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
注意:启动之前需要先启动zk以及kafka



第 7 章:kafka-kraft模式
7.1 kafka-kraft架构

左图为kafka现有架构,元数据在zookeeper中,运行时动态选举controller,由controller进行kafka集群管理。右图为kraft模式架构(实验性),不再依赖zookeeper集群,而是用三台controller节点代替zookeeper,元数据保存在controller中,由controller直接进行kafka集群管理。
好处是:
1、kafka不再依赖外部框架,而是能够独立运行
2、controller管理集群时,不再需要从zookeeper中先读取数据,集群性能上升
3、由于不依赖zookeeper,集群扩展时不再受到zookeeper读写能力限制
4、controller不再动态选举,而是由配置文件规定。这样我们可以有针对性地加强controller节点的配置,而不是像以前一样对随机controller节点的高负载束手无策。
7.2 kafka-kraft集群部署
1、再次解压一份kafka安装包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
2、重命名为kafka2
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka2
3、在hadoop102上修改/opt/module/~/server.properties配置文件
[atguigu@hadoop102 kraft]$ vim server.properties
#kafka的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能)
process.roles=broker, controller
#节点ID
node.id=2
#controller服务协议别名
controller.listener.names=CONTROLLER
#全Controller列表
controller.quorum.voters=2@hadoop102:9093,3@hadoop103:9093,4@hadoop104:9093
#不同服务器绑定的端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#broker服务协议别名
inter.broker.listener.name=PLAINTEXT
#broker对外暴露的地址
advertised.Listeners=PLAINTEXT://hadoop102:9092
#协议别名到安全协议的映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#kafka数据存储目录
log.dirs=/opt/module/kafka2/data
4)分发kafka2
4、分发kafka2
atguigu@hadoop102 module]$ xsync kafka2/
注意:
① 在hadoop103和hadoop104上需要对node.id相应改变,值需要和controller.quorum.voters对应。
② 在hadoop103和hadoop104上需要根据各自的主机名称,修改相应的advertised.Listeners地址。
5、初始化集群数据目录
1)首先生成存储目录唯一ID
[atguigu@hadoop102 kafka2]$ bin/kafka-storage.sh random-uuid
J7s9e8PPTKOO47PxzI39VA
2)用该id格式化kafka存储目录(三台节点)
[atguigu@hadoop102 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
[atguigu@hadoop103 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
[atguigu@hadoop104 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
6、启动kafka集群
[atguigu@hadoop102 kafka2]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties
[atguigu@hadoop103 kafka2]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties
[atguigu@hadoop104 kafka2]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties
7、停止kafka集群
[atguigu@hadoop102 kafka2]$ bin/kafka-server-stop.sh
[atguigu@hadoop103 kafka2]$ bin/kafka-server-stop.sh
[atguigu@hadoop104 kafka2]$ bin/kafka-server-stop.sh
7.3 kafka-kraft集群启动停止脚本
1、在/home/atguigu/bin目录下创建文件kf2.sh脚本文件
[atguigu@hadoop102 bin]$ vim kf2.sh
脚本如下:
#! /bin/bash
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "done
};;
esac
2、添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf2.sh
3、启动集群命令
[atguigu@hadoop102 ~]$ kf2.sh start
4、停止集群命令
[atguigu@hadoop102 ~]$ kf2.sh stop
相关文章:
大数据开发之kafka(完整版)
第 1 章:Kafka概述 1.1 定义 Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只…...
单体架构、微服务和无服务器架构
前言 在这篇文章中,我将演示在决定使用单体架构、微服务架构和无服务器架构时的权衡的简化心智模型。目标是突显每种风格的固有优势和缺陷,并提供关于何时选择哪种架构风格的指导。 单体架构 对于小团队或项目来说是理想的入门架构。它简单易上手&…...
Github仓库使用方式
主要参考: 「详细教程」使用git将本地项目上传至Github仓库(MacOS为例)_github上传代码到仓库-CSDN博客 新建文件夹参考: GitHub使用指南——建立仓库、建立文件夹、上传图片详细教程-CSDN博客 一、新建一个 github 仓库&#…...
Harmony Ble蓝牙App(四)描述符
Harmony Ble蓝牙App(四)描述符 前言正文一、优化二、描述① 概念② 描述提供者③ 显示描述符 三、源码 前言 上一篇中了解了特性和属性,同时显示设备蓝牙服务下的特性和属性,本文中就需要来使用这些特性和属性来完成一些功能。 正…...
C# 实现单线程异步互斥锁
文章目录 前言一、异步互斥锁的作用是什么?示例一、创建和销毁 二、如何实现?1、标识(1)标识是否锁住(2)加锁(3)解锁 2、异步通知(1)创建对象(2&a…...
Java设计模式中策略模式可以解决许多if-else的代码结构吗? 是否能满足开闭原则?
Java设计模式中策略模式可以解决许多if-else的代码结构吗? 是否能满足开闭原则? 是的,策略模式可以帮助解决许多if-else的代码结构。通过将不同的算法封装成不同的策略类,然后在需要的时候动态地切换策略,可以避免使…...
[C#]C# winform部署yolov8目标检测的openvino模型
【官方框架地址】 https://github.com/ultralytics/ultralytics 【openvino介绍】 OpenVINO(Open Visual Inference & Neural Network Optimization)是由Intel推出的,用于加速深度学习模型推理的工具套件。它旨在提高计算机视觉和深度学…...
力扣刷MySQL-第五弹(详细讲解)
🎉欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克🍹 ✨博客主页:小小恶斯法克的博客 🎈该系列文章专栏:力扣刷题讲解-MySQL 🍹文章作者技术和水平很有限,如果文中出…...
用C语言实现简单的三子棋游戏
目录 1 -> 模块简介 2 -> test.c 3 -> game.c 4 -> game.h 1 -> 模块简介 test.c:测试游戏逻辑 game.c: 函数的实现 game.h:函数的声明 2 -> test.c #define _CRT_SECURE_NO_WARNINGS 1#include "game.h";void menu() {printf("****…...
Yaklang 中的类型和变量
Yaklang 的类型其实非常简单,我们仅需要记住如下类型即可 string 字符串类型,用以快速构建一个字符串int 整数类型:在 64 位机中,int 和 int64 是一样的float 浮点类型,用来定义和表示浮点数byte 本质上等同于 uint8u…...
C语言从入门到实战——编译和链接
编译和链接 前言一、 翻译环境和运行环境二、 翻译环境2.1 预处理(预编译)2.2 编译2.2.1 词法分析2.2.2 语法分析2.2.3 语义分析 2.3 汇编2.4 链接 三、 运行环境 前言 在C语言中,编译和链接是将源代码转换为可执行文件的两个主要步骤。 编…...
【实战教程】ThinkPHP6分页功能轻松实现,让你的网站更高效!
ThinkPHP是一款非常流行的PHP开发框架,其最新版本ThinkPHP6在性能和易用性方面都得到了很大的改善。分页功能是网页开发中非常常见的功能,而ThinkPHP6也提供了非常方便的分页方法。本文将介绍如何实现ThinkPHP6的分页功能。 一、了解分页功能 在Web应用…...
专业130+总分380+哈尔滨工程大学810信号与系统考研经验水声电子信息与通信
今年专业课810信号与系统130,总分380顺利考上哈尔滨工程大学,一年的努力终于换来最后的录取,期中复习有得有失,以下总结一下自己的复习经历,希望对大家有帮助,天道酬勤,加油!专业课&…...
旅游项目day08
1. 旅游日记(游记) 后端:实体类,列表,查看,审核 前端:目的地明细中-游记->带范围条件查询,游记首页,【扩展】游记添加/编辑,【扩展】添加游记时间没登录时…...
蓝桥杯真题(Python)每日练Day2
题目 题目分析 对于本题首先确定其数据结构为优先队列,即邮费最小的衣服优先寄,算法符合贪心算法。可以直接使用queue库的PriorityQueue方法实现优先队列。关于PriorityQueue的使用方法主要有: import queue q queue.Queue()# 队列 pq qu…...
IntelliJ IDEA 拉取gitlab项目
一、准备好Gitlab服务器及项目 http://192.168.31.104/root/com.saas.swaggerdemogit 二、打开 IntelliJ IDEA安装插件 打开GitLab上的项目,输入项目地址 http://192.168.31.104/root/com.saas.swaggerdemogit 弹出输入登录用户名密码,完成。 操作Comm…...
RHCSA上课笔记(前半部分)
第一部分 网络服务 第一章 例行性工作 1.单一执行的例行性工作 单一执行的例行性工作(就像某一个时间点 的闹钟):仅处理执行一次 1.1 at命令:定时任务信息 [rhellocalhost ~]$ rpm -qa |grep -w at at-spi2-core-2.40.3-1.el9.x…...
C++代码入门05 字符串容器
图源:文心一言 上机题目练习整理,本篇作为字符串容器的代码,提供了常规解法及其详细解释,供小伙伴们参考~🥝🥝 第1版:在力扣新手村刷题的记录~🧩🧩 方法:常…...
vue3 项目中 arguments 对象获取失败问题
问题 在 vue3 项目中 获取到的 arguments 对象与传入实参不符,打印出函数中的 arguments 对象显示如下: 原因 作者仔细回看代码才发现,自己一直用的是 vue3 的组合式写法,函数都是箭头函数,而箭头函数不存在 argumen…...
12.线程同步
12.线程同步 1. 为什么需要线程同步2. 互斥锁2.1 互斥锁初始化2.1.1 PTHREAD_MUTEX_INITIALIZER 宏初始化2.1.2 使用函数初始化 2.2 加锁和解锁2.3 pthread_mutex_trylock()2.4 销毁互斥锁2.5 互斥锁死锁2.6 互斥锁的属性 3. 条件变量3.1 条件变量初始化3.2 通知和等待条件变量…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
