kafka整理
kafka整理
一、kafka概述
kafka是apache旗下一款开源的顶级的消息队列的系统, 最早是来源于领英, 后期将其贡献给apache, 采用语言是scala.基于zookeeper, 启动kafka集群需要先启动zookeeper集群, 同时在zookeeper记录kafka相关的元数据
kafka本质上就是消息队列的中间件产品 ,kafka中消息数据是直接存储在磁盘上
kafka的特点:
- 可靠性
- 可扩展性
- 耐用性
- 高性能
二、kafka的架构图
kafka cluster :kafka的集群
broker:kafka的节点
producer:生产者
consumer:消费者
topic:主题,一个逻辑容器
shard:分片,分片的数量
replicas:副本,受节点的限制,副本<=节点数
zookeeper:对kafka集群进行管理,保存kafka的元数据信息
三、安装
3.1解压
[pxj@pxj62 /opt/software]$tar -zxvf kafka_2.12-2.4.1.tgz -C /opt/app/
3.2建软连接
[pxj@pxj62 /opt/app]$ln -s kafka_2.12-2.4.1 kafka
3.3修改 server.properties
[pxj@pxj62 /opt/app/kafka/config]$vim server.properties
3.4启动与停止
前台启动: ./kafka-server-start.sh ../config/server.properties
后台启动: nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
注意: 第一次启动, 建议先前台启动, 观察是否可以正常启动, 如果OK, ctrl +C 退出, 然后挂载到后台
启动: ./start-kafka.sh
四、shell命令操作
4.1创建top
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --create --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01 --partitions 3 --replication-factor 2
Created topic test01.
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --create --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test02 --partitions 3 --replication-factor 3
Created topic test02.
4.2 查看当前有那些topic
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --list --zookeeper pxj62:2181,pxj63:2181,pxj64:2181
test01
test02
4.3 如何查看某一个topic的详细信息
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Topic: test01 PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: test01 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: test01 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: test01 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test02
Topic: test02 PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: test02 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0Topic: test02 Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1Topic: test02 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --create --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test03 --partitions 3 --replication-factor 1
Created topic test03.
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test03
Topic: test03 PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: test03 Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: test03 Partition: 1 Leader: 2 Replicas: 2 Isr: 2Topic: test03 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
4.4修改topic
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --alter --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01 --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Topic: test01 PartitionCount: 5 ReplicationFactor: 2 Configs: Topic: test01 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: test01 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: test01 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 2,1Topic: test01 Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: test01 Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
[pxj@pxj62 /opt/app/kafka/bin]$
注意:只能调大分片的数量, 无法调小以及无法调整副本数量
4.5删除topic
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --delete --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Topic test01 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Error while executing topic command : Topic 'test01' does not exist as expected
[2023-04-09 22:36:54,129] ERROR java.lang.IllegalArgumentException: Topic 'test01' does not exist as expectedat kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:484)at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:390)at kafka.admin.TopicCommand$.main(TopicCommand.scala:67)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$)
[pxj@pxj62 /opt/app/kafka/bin]$
4.6模拟一个生产者. 用于生产数据到topic中
[pxj@pxj62 /opt/app/kafka/bin]$./kafka-console-producer.sh --broker-list pxj62:9092,pxj63:9092,pxj64:9092 --topic test02
>pxj
>pxj
>jps
>ll
4.7消费者接收
[pxj@pxj63 /opt/app/kafka/bin]$./kafka-console-consumer.sh --bootstrap-server pxj62:9092,pxj63:9092,pxj64:9092 --topic test02 --from-beginning
pxj
pxj
jps
ll
五、kafkaAPI
5.1生产者
package com.ccj.pxj.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.Properties;public class KafkaProducerTest {public static void main(String[] args) {// 1- 创建 生产者对象// 1.1 设置生产者相关的配置Properties props = new Properties();props.put("bootstrap.servpackage com.ccj.pxj.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {public static void main(String[] args) {// 1. 创建 kafka的消费者对象//1.1: 设置消费者的配置信息Properties props = new Properties();props.setProperty("bootstrap.servers", "pxj62:9092,pxj63:9092,pxj64:9092"); // 指定 kafka地址props.setProperty("group.id", "test"); // 指定消费组 idprops.setProperty("enable.auto.commit", "true"); // 是否开启自动提交数据的偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 设置key反序列类props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 设置value反序列化类//1.2: 创建kafka消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//2.设置消费者监听那些Topicconsumer.subscribe(Arrays.asList("test02"));//3. 消费数据: 一直在消费, 只要有数据,立马进行处理操作while (true) {//3.1: 获取消息数据, 参数表示等待(超时)的时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {long offset = record.offset(); // 偏移量信息String key = record.key(); // 获取keyString value = record.value(); // 获取valueint partition = record.partition();// 从哪个分区读取的数据System.out.println("偏移量:"+ offset +"; key值:"+key +";value值:"+ value +"; 分区:"+partition);}}}
}
ers", "pxj62:9092,pxj63:9092,pxj64:9092"); // 指定kafka的地址props.put("acks", "all"); // 指定消息确认方案props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// key序列化类props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化类//1.2: 构建生产者Producer<String, String> producer = new KafkaProducer<>(props);//2. 发送数据for (int i = 0; i < 10; i++) {//2.1 构建 数据的承载对象ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test02",Integer.toString(i));producer.send(producerRecord);}//3. 释放资源producer.close();}
}
5.2 消费者
package com.ccj.pxj.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {public static void main(String[] args) {// 1. 创建 kafka的消费者对象//1.1: 设置消费者的配置信息Properties props = new Properties();props.setProperty("bootstrap.servers", "pxj62:9092,pxj63:9092,pxj64:9092"); // 指定 kafka地址props.setProperty("group.id", "test"); // 指定消费组 idprops.setProperty("enable.auto.commit", "true"); // 是否开启自动提交数据的偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 设置key反序列类props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 设置value反序列化类//1.2: 创建kafka消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//2.设置消费者监听那些Topicconsumer.subscribe(Arrays.asList("test02"));//3. 消费数据: 一直在消费, 只要有数据,立马进行处理操作while (true) {//3.1: 获取消息数据, 参数表示等待(超时)的时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {long offset = record.offset(); // 偏移量信息String key = record.key(); // 获取keyString value = record.value(); // 获取valueint partition = record.partition();// 从哪个分区读取的数据System.out.println("偏移量:"+ offset +"; key值:"+key +";value值:"+ value +"; 分区:"+partition);}}}
}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
偏移量:1; key值:null;value值:0; 分区:1
偏移量:2; key值:null;value值:1; 分区:1
偏移量:3; key值:null;value值:2; 分区:1
偏移量:4; key值:null;value值:3; 分区:1
偏移量:5; key值:null;value值:4; 分区:1
偏移量:6; key值:null;value值:5; 分区:1
偏移量:7; key值:null;value值:6; 分区:1
偏移量:8; key值:null;value值:7; 分区:1
偏移量:9; key值:null;value值:8; 分区:1
偏移量:10; key值:null;value值:9; 分区:1
六、kafka的核心原理
6.1kafka的分区和副本
分区:
topic可以理解为是一个大的容器(逻辑), 分片相当于将topic划分为多个小容器, 将这些小容器分布在不同的broker上, 进行分布式存储, 分片的数量不受节点数量限制作用:1- 提升吞吐量, 前提 kafka节点充足下2- 解决单台节点存储有限的问题, 可以通过分片实现分布式存储3- 提高并发能力
副本:
对topic中每一个分片构建多个副本, 从而保证数据不能丢失, 副本的数量最多与节点数量是相等, 一般来说副本为 1~3个
作用:提升数据可靠性, 防止数据丢失
6.2kafka数据传输过程
三阶段:
第一阶段:生产者将数据生产到集群的broket端
第二阶段:broker将数据存储
第三阶段:消费者从broker端消费数据
6.3生产者如何保证数据不丢失
对于kafka,主要采用ack认证机制处理的
0:生产者只管发送到broket端,不管broker的响应
1:生产者只管发送到broket端,需要等待对应接受分片的主副本接收到数据后,给予响应,认为数据发送成功
-1:ALL;生产者只管发送到broket端,需要等待对应接受分片所有的 副本接收到数据后,给予响应认为数据发送成功
效率:0>1>-1
安全:-1>1>0
ack模式的选择:根据生产需求确定,
props.put(“acks”,''all'')
6.3如果broker端迟迟没有给予响应,如何解决
采用先等待(超时时间)再重试的策略,一般重试3次,如果重试后依然没有给予响应,此时让程序直接报错。通知相关人员处理即可
6.4宽带占用如何解决
可以引入缓存池,采用异步发送方案,生产者将数据在发送数据时候,底层会将这个数据保存到缓存池中,当池子中数据达到一批数据大小后,将达一批数据直接发送到broker,此时broker针对这一批数据给予一次性响应即可(批量发送数据)
6.5 采用批量发送数据,如果发送一批数据到broker端,broker端又没有给予响应,此时缓存池中数据满了,如何解决呢?
解决方案:
1.丢弃缓存池中数据,报异常(适用于数据不重要,或者可以重读的消息总数据)
2.在写入缓冲池的时候,需要将数据在其他的地方也持久存储一份,发送成功一批数据,将持久化地方数据删除一部分,以保证在出现此问题后,数据依然存在,下次启动的时候,优先从持久化容器中读取即可
七、安装 kafka-eagle
7.1.解压
7.2环境变量
[pxj@pxj62 /home/pxj]$vim .bashrc
export PS1='[\u@\h `pwd`]\$'
export JAVA_HOME=/usr/java/jdk1.8.0_141
export PATH=$JAVA_HOME/bin:$PATH
export HADOOP_HOME=/opt/app/hadoop
export ZOOKEEPER_HOME=/opt/app/zookeeper
export KAFKA_HOME=/opt/app/kafka
export KE_HOME=/opt/app/kafka-eagle
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${KAFKA_HOME}/bin:${KE_HOME}/bin:$PATH[pxj@pxj62 /home/pxj]$source .bashrc
7.3配置 kafka_eagle。
使用vi打开conf目录下的system-config.propertie
[pxj@pxj62 /opt/app/kafka-eagle/conf]$vim system-config.properties
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=pxj62:2181,pxj63:2181,pxj64:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
# kafka metrics, 30 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://pxj63:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=I LOVE PXJ
7.4配置JAVA_HOME
在24行加入
export JAVA_HOME=/usr/java/jdk1.8.0_141
7.5授权运行
[pxj@pxj62 /opt/app/kafka-eagle/bin]$chmod +x ke.sh
7.6启动
[pxj@pxj62 /opt/app/kafka-eagle/bin]$./ke.sh start
7.7访问web
http://pxj62:8048/ke
八、同步发送
package com.ccj.pxj.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerSyncTest {public static void main(String[] args) {Properties props=new Properties();props.put("bootstrap.servers", "pxj62:9092,pxj63:9092,pxj64:9092"); // 指定kafka的地址props.put("acks", "all"); // 指定消息确认方案props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// key序列化类props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化类//构造生产者KafkaProducer<String,String> producer = new KafkaProducer<>(props);
// 2.发送数据for (int i = 0; i <10 ; i++) {
// 构建 数据承载对象ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01", i+"_02");// 使用get 其实就是同步方式, 会当发送后, 会一直等待响应, 如果长时间没有响应, 就会重试, 如果依然没有, 直接报错// get支持自定义超时的时间try{producer.send(producerRecord).get();}catch (Exception e){e.printStackTrace();}}producer.close();}
}
九、异步发送
package com.ccj.pxj.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerAsyncTest {public static void main(String[] args) {// 1- 创建 生产者对象// 1.1 设置生产者相关的配置Properties props = new Properties();props.put("bootstrap.servers", "pxj62:9092,pxj63:9092,pxj64:9092"); // 指定kafka的地址props.put("acks", "all"); // 指定消息确认方案props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// key序列化类props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化类//1.2: 构建生产者Producer<String, String> producer = new KafkaProducer<>(props);
// 2.发送数据for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01", i+"_22");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {// 此方法为回调函数的方式, 当进行异步发送的时候, 不管最终是成功了还是失败了, 都会回调此函数if(e!=null){// 说明有异常, 发送失败了// 在此处, 编写发送失败的处理业务逻辑代码System.err.println("发送消息失败:" +e.getStackTrace());}if(metadata!=null){if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}}}});}//3. 释放资源producer.close();}
}
十、消费者异步
package com.ccj.pxj.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaConsumerTest02 {public static void main(String[] args) {Properties props=new Properties();// 1. 创建 kafka的消费者对象//1.1: 设置消费者的配置信息props.setProperty("bootstrap.servers", "pxj62:9092,pxj63:9092,pxj64:9092"); // 指定 kafka地址props.setProperty("group.id", "test"); // 指定消费组 idprops.setProperty("enable.auto.commit", "false"); // 是否开启自动提交数据的偏移量props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 设置key反序列类props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 设置value反序列化类
//创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test01"));while(true){ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {long offset = record.offset(); // 偏移量信息String key = record.key(); // 获取keyString value = record.value(); // 获取valueint partition = record.partition();// 从哪个分区读取的数据System.out.println("偏移量:"+ offset +"; key值:"+key +";value值:"+ value +"; 分区:"+partition);// 当消息消费完成后, 提交偏移量信息 : 一定不要丢失提交偏移量的代码. 否则 会造成大量的重复消费问题consumer.commitSync(); // 同步提交consumer.commitAsync(); // 异步提交}}}
}
十一、broker端如何保证数据不丢失
broker主要将消息数据存储下来, 那么如何保证数据不丢失呢?
多副本机制 + 生产者的ack为 -1
消费偏移量数据是存储在哪里呢? 在kafka的老版本(kafka 0.8x下)是存储在zookeeper中, 在新版本中消费者消息偏移量信息是存储在broker端, 通过一个topic来存储的: __consumer_offset此topic具有50个分区, 1个副本
如何修改默认的过期时间呢?
# server.properties的103行位置: 默认值为 168小时
log.retention.hours=168# 设置一个log文件的大小, 默认为: 1073741824 (1GB)
log.segment.bytes=1073741824
十二、kafka的数据查询机制
查询过程
- 先确定这条消息在那个segment片段中
- 到对应片段中找index文件, 根据offset查询消息数据在log文件的那个物理偏移量位置
- 根据从index查询到的偏移量信息, 到 log文件顺序查询(磁盘查询方式)到对应范围下数据即可
磁盘的读写分为两种读写方式: 顺序读写 和 随机读写
顺序读写效率远远高于随机读写
十三、kafka中生产者的数据分发策略
kafka生产者数据分发策略: 指的生产者在生产数据到达broker指定topic中, 最终这条数据被topic中哪一个分片接收到了, 这就是生产者分发机制
思考: 常见的分发策略
1) hash策略
2) 轮询策略
3) 指定分区策略
4) 确定每个分区范围分发那么kafka支持那些分发策略呢?
1) 粘性分区策略(老版本(2.4以前): 轮询)
2) hash取模策略
3) 指定分区策略
4) 自定义分区如何设置分发策略呢? 与 ProducerRecord 和 DefaultPartitioner关系很大1) 粘性分区策略(老版本(2.4以前): 轮询)# 当生成数据时候, 使用这个只需要传递value发送方案, 底层走的 粘性分区策略(老版本(2.4以前): 轮询)public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);}# 为什么这么说呢? 原因是 DefaultPartitionerpublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {# 当 key为null的时候, 执行 stickyPartitionCache (粘性分区)if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);} List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}2) hash取模策略# 当发送数据的时候, 如果传递 k 和 v , 默认使用 hash取模分区方案, 根据key进行hash取模public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}# 为什么这么说呢? 原因是 DefaultPartitionerpublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {# 当 key为null的时候, 执行 stickyPartitionCache (粘性分区)if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);} # 当key不为null的时候, 获取topic的所有分区, 然后根据key进行hash取模List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}3) 指定分区策略# 当发送数据的时候, 需要明确指出给那个partition发送数据 : ProducerRecord构造# 分片是从0开始的, 如果是三个分片: 0 1 2public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}此时这种分发策略 与 defaultPartitions 没有关系了4) 自定义分区策略: (抄. 官方源码DefaultPartitioner)4.1) 创建一个类, 实现Partitioner 接口4.2) 重写接口中的partition方法, 返回值表示分区的编号4.3) 按照业务逻辑实现方法中分区方案4.4) 告知给kafka, 使用新的分区方案当生产者开始发送数据, 如果只传递了value的数据, 此时kafka会采用粘性分区策略, 首先会先随机的选择一个分区, 然后尽可能的黏上这个分区, 将这一批数据全部写入到这一个分区中, 当下次请求再来的时候, 重新在随机选择一个分区(如果间隔时间比较短, 大概率会黏住上一个分区), 再黏住这个分区, 将数据写入到这个分区下, 这种分区方案称为粘性分区策略粘性分区是kafka2.4.x及以上版本支持的一种全新的分区策略 在2.4以下的版本中, 采用的轮询方案老版本轮询:当生产者准备好一批数据后, 将这一批数据写入到某一个topic中, 如果采用轮询方案, 需要将这一批数据分为多个小批次, 分别对应不同的分片,将各个小批次的数据发送给对应的分片下即可, 而这种操作需要额外在一批数据上再次进行分批处理, 导致生产效率下降, 所以说在新版本中, 将其替换为粘性分区参数: partitioner.class :默认值: org.apache.kafka.clients.producer.internals.DefaultPartitioner通过生产者的properties对象, 重新设置一下partitioner.class 参数即可
什么是粘性分区策略:
当生产者开始发送数据, 如果只传递了value的数据, 此时kafka会采用粘性分区策略, 首先会先随机的选择一个分区, 然后尽可能的黏上这个分区, 将这一批数据全部写入到这一个分区中, 当下次请求再来的时候, 重新在随机选择一个分区(如果间隔时间比较短, 大概率会黏住上一个分区), 再黏住这个分区, 将数据写入到这个分区下, 这种分区方案称为粘性分区策略粘性分区是kafka2.4.x及以上版本支持的一种全新的分区策略 在2.4以下的版本中, 采用的轮询方案老版本轮询:当生产者准备好一批数据后, 将这一批数据写入到某一个topic中, 如果采用轮询方案, 需要将这一批数据分为多个小批次, 分别对应不同的分片,将各个小批次的数据发送给对应的分片下即可, 而这种操作需要额外在一批数据上再次进行分批处理, 导致生产效率下降, 所以说在新版本中, 将其替换为粘性分区
十四、kafka的负载均衡机制
如果使用kafka模拟点对点 和 发布订阅 方式点对点: 一个消费只能被一个消费者所接收让所有监听这个topic的消费者都属于同一个消费者组内即可发布订阅: 一个消息可以被多个消费者所接收让所有监听这个topic的消费者都属于不同的消费者组内即可
作者:潘陈(pxj)
日期:2023-04-30
相关文章:

kafka整理
kafka整理 一、kafka概述 kafka是apache旗下一款开源的顶级的消息队列的系统, 最早是来源于领英, 后期将其贡献给apache, 采用语言是scala.基于zookeeper, 启动kafka集群需要先启动zookeeper集群, 同时在zookeeper记录kafka相关的元数据 kafka本质上就是消息队列的中间件产品…...

为什么有些情况下需要重写equals()和hashCode()方法?
目录 方法作用实战案例 方法作用 equals():判断对象是否相等,比如判断是否能放入Set集合中 情况1:没有重写equals()方法:由于所有类的默认基类都是Object类,所以默认使用Object类的equals()方法,那就是对象…...

14-Vue技术栈之Vue3快速上手
目录 1.Vue3简介2. Vue3带来了什么2.1 性能的提升2.2 源码的升级2.3 拥抱TypeScript2.4 新的特性 1、海贼王,我当定了!——路飞 2、人,最重要的是“心”啊!——山治 3、如果放弃,我将终身遗憾。——路飞 4、人的梦想是…...

JavaScript高级三、深入面向对象
零、文章目录 JavaScript高级三、深入面向对象 1、编程思想 (1)面向过程介绍 面向过程:分析出解决问题所需要的步骤,然后用函数把这些步骤一步一步实现,使用的时候再一个一个的依次调用就可以了。 (2&…...
static
1. 静态局部变量 : 用于函数体的内部修饰变量,这种变量的生存期长于该函数。 2. 静态全局变量: 定义在函数体外,用于修饰全局变量,表示该变量只在本文件可见。 3. 静态函数: 准确的说,静态函数跟…...

zabbix动作执行失败 No media defined for user.
问题 zabbix动作执行失败 No media defined for user. 详细问题 解决方案 1(导航栏)用户 → \rightarrow →报警媒介 → \rightarrow →添加 2 选择类型 → \rightarrow →收件人 → \rightarrow →添加 3 更新 解决原因 笔者由于未点击更新钮导…...
JavaScript this 关键字
在JavaScript中,this关键字是一个特殊的关键字,它在函数内部使用,用于引用当前执行上下文中的对象。 this的值是在函数调用时动态确定的,它取决于函数的调用方式。下面列举了几种常见的调用方式和this的取值: 1. 全局…...
ubuntu基本信息查询
查询CPU信息 cat /proc/cpuinfo cat /proc/stat top lscpu 查询内存 free -m Options: -b, --bytes show output in bytes -k, --kilo show output in kilobytes -m, --mega show output in megabytes -g, --giga show output in gigab…...

Revit问题:创建牛腿柱和快速生成圈梁
一、Revit中如何用体量创建牛腿柱 牛腿:悬臂体系的挂梁与悬臂间必然出现搁置构造,通常就将悬臂端和挂梁端的局部构造,又称梁托。牛腿的作用是衔接悬臂梁与挂梁, 并传递来自挂梁的荷载。牛腿柱可以用于桥梁、厂房的搭建,…...
k8s节点删除
1.设置该节点为不可调度状态 kubectl cordon k8s-node01 2.驱逐该节点上的pod kubectl drain k8s-node01 --ignore-daemonsets --delete-local-data 若是有pod删除不掉则加上--force参数强制驱逐 3.从集群中删除该node节点 kubectl delete node k8s-node01 4.在k8s-node…...
45°装备系统
45装备系统,规则:1、45 脚后剧情,场景地面出现,个体视角,非群体。2、产生寒暖对立,衣饰自动改变。3、地图下方块蛇,脚步顺逆差,让衣饰自动改变后出现形态特效。(形成进入…...

逻辑漏洞学习-身份验证漏洞
逻辑漏洞就是程序在实现业务逻辑上存在的错误,辑漏洞的出现通常是因为程序在设计业务逻辑时考虑不够全面,或者程序员的思维过程存在瑕疵,没有充分考虑到各种可能的情况 大部分程序员在设计的时候,目标是实现功能需求,…...

【ChatGPT】ChatGPT自动生成思维导图
参考视频:https://edu.csdn.net/learn/38346/613917 应用场景:自学,“研一学生如何学习机器学习”的思维导图 问:写一个“研一学生如何学习机器学习”的思维导图内容,以markdown代码块格式输出 # 研一学生如何学习…...
cf1200构造15道
最近做构造,想对比下先做后看答案归纳,留下思路之后直接看答案归纳,然后再统一检测,还有直接看答案,归纳,检测三种方法哪种效率高些,于是先做个十五题试试第一个方法,花3天写了15道构…...

【JavaSE】Java基础语法(十七)
文章目录 1. final2. 代码块2.1 代码块概述2.2 代码块分类 1. final fianl关键字的作用 final代表最终的意思,可以修饰成员方法,成员变量,类 final修饰类、方法、变量的效果 fianl修饰类:该类不能被继承(不能有子类&a…...

《Spring Guides系列学习》guide11 - guide15
要想全面快速学习Spring的内容,最好的方法肯定是先去Spring官网去查阅文档,在Spring官网中找到了适合新手了解的官网Guides,一共68篇,打算全部过一遍,能尽量全面的了解Spring框架的每个特性和功能。 接着上篇看过的gu…...

软件测试面试了一个00后,让我见识到了什么是内卷届的天花板
公司前段缺人,也面了不少测试,结果竟然没有一个合适的。一开始瞄准的就是中级的水准,也没指望来大牛,提供的薪资也不低,面试的人很多,但平均水平很让人失望。令我印象最深的是一个00后测试员,他…...
JAVA BigDecimal 比较大小 、计算
1:比较大小 注意:使用compareTo()方法比较大小时 参与比较的两个值 必须有值 不能为空 BigDecimal a new BigDecimal("3"); BigDecimal b new BigDecimal("4"); if (a.compareTo(b) < 0) { System.…...
并发编程Bug的根源
并发编程Bug的根源 并发编程Bug是指在多线程编程中出现的错误。并发编程需要考虑多个线程同时执行的情况,因此需要特别小心,以避免出现各种问题。在本文中,我们将探讨并发编程Bug的根源,并提供一些例子,以帮助读者更好…...

从零搭建微服务-认证中心(二)
写在最前 如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。 源码地址:https://gitee.com/csps/mingyue 文档地址:https://gitee.com/csps/mingyue/wikis 创建新项目 MingYue Idea 创建 maven 项目这…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...

初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
OD 算法题 B卷【正整数到Excel编号之间的转换】
文章目录 正整数到Excel编号之间的转换 正整数到Excel编号之间的转换 excel的列编号是这样的:a b c … z aa ab ac… az ba bb bc…yz za zb zc …zz aaa aab aac…; 分别代表以下的编号1 2 3 … 26 27 28 29… 52 53 54 55… 676 677 678 679 … 702 703 704 705;…...

ABAP设计模式之---“Tell, Don’t Ask原则”
“Tell, Don’t Ask”是一种重要的面向对象编程设计原则,它强调的是对象之间如何有效地交流和协作。 1. 什么是 Tell, Don’t Ask 原则? 这个原则的核心思想是: “告诉一个对象该做什么,而不是询问一个对象的状态再对它作出决策。…...