kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务)
目录
- 生产者
- 发送数据原理
- 参数说明
- 代码示例(同步发送数据)
- 代码示例(异步)
- 异步和同步的区别
- 同步发送
- 定义与流程
- 特点
- 异步发送
- 定义与流程
- 特点
- 异步回调
- 描述
- 代码示例
- 拦截器
- 描述
- 代码示例
- 消息序列化
- 描述
- 代码示例(自定义序列化)
- 分区
- 描述
- 分区策略
- 代码示例
- 写入默认分区(0号分区)
- 自定义分区机制
- 消息丢失
- 消息绝对不丢失的条件
- 数据去重
- 描述
- 幂等性
- 事务
- 代码示例(事务)
生产者
发送数据原理
说明
- 拦截器允许有多个,可以组成拦截器链
- 生产者发送的消息会被分配到不同的分区(Partition)。每个分区在内存中都有一个对应的缓冲区(RecordAccumulator),用于暂存即将发送的消息。sender线程从中读取数据
- sender线程两个重要参数(大小默认16KB,当数据量达到16KB时读取,读取时间默认0ms,当达到读取时间时,自动读取数据,不管大小有没有达到),这两个参数均可以调整
- NetworkClient负责将生产者的请求(如发送消息、获取元数据等)发送到相应的broker,并存储这些请求的响应;NetworkClient还负责处理网络连接的建立、维护和关闭。
- 当生产者发送消息到broker时,可以选择不同的应答级别(acks参数):
acks=0:生产者不等待broker的应答,直接认为消息发送成功。这种方式性能最高,但可靠性最低。
acks=1:生产者等待leader broker的应答,只要leader broker确认收到消息,就认为消息发送成功。这种方式性能较高,但可靠性略低。
acks=all(或acks=-1):leader和follower都落地回应,才认为消息发送成功。这种方式性能最低,但可靠性最高。
Broker在收到消息后,会根据配置的应答机制向生产者发送应答(或错误)信息。 - 请注意关于broker的落地是指数据存储到磁盘或持久化
- 数据发送成功后(接收到应答)删除缓冲区内对应的数据
参数说明
参数 | 默认值 | 作用描述 |
---|---|---|
bootstrap.servers | node2:9092[,node3:9092][,node4:9092] | 生产者连接集群所需的broker地址清单,一个或多个(逗号隔开) |
key.serializer | (无) | 指定发送消息的key的序列化类型,必须写全类名 |
value.serializer | (无) | 指定发送消息的value的序列化类型,必须写全类名 |
buffer.memory | 32M | RecordAccumulator缓冲区总大小 |
batch.size | 16K | 缓冲区一批数据最大值,适当增加可提高吞吐量,但可能增加延迟 |
linger.ms | 0ms (表示没有延迟) | 如果数据未达到batch.size,sender等待linger.time后发送数据 |
acks | -1 | 应答机制:0-不需要应答,1-Leader应答,-1(all)-所有节点应答 |
max.in.flight.requests.per.connection | 5 | 允许最多没有返回ack的次数,开启幂等性时建议1-5之间 |
enable.idempotence | true | 是否开启幂等性,默认开启 |
retries | 2147483647 (int最大值) | 消息发送错误时的重试次数 |
retry.backoff.ms | 100ms | 两次重试之间的时间间隔 |
compression.type | none | 生产者发送的所有数据的压缩方式,默认不压缩 |
代码示例(同步发送数据)
package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class SyncCustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop = new Properties();//TODO 2.为配置文件对象设置参数//TODO 2.1 配置bootstrap_server(生产者连接集群所需的broker地址清单)prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");//TODO 2.2 配置key和value的序列化类// 设置序列化器:指定key和value的序列化类为StringSerializer,用于将字符串类型的key和value转换为字节数组,以便发送到Kafka。prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducer<String,String> producer =new KafkaProducer<String, String>(prop);//TODO 4.发送消息// producer.send(...).get():同步发送消息,send()方法返回一个Future对象,调用get()方法等待发送完成并获取结果。for(int i = 0;i<5;i++){//同步发送消息producer.send(new ProducerRecord<>("topicA","sync_msg"+i)).get();}//TODO 5.关闭生产者producer.close();}
}
代码说明
类/对象 | 描述 | 用途 |
---|---|---|
Properties | Java标准库中的类,用于维护键值对列表。 Properties类提供了一种方便的方式来读取和写入属性文件(通常是.properties文件) | 在本代码中用于存储Kafka生产者的配置参数。 |
KafkaProducer<K, V> | Kafka客户端库中的类,用于向Kafka主题发送消息。泛型参数K 和V 分别表示消息键和值的类型。 | 创建生产者实例,发送消息到Kafka主题。 |
ProducerConfig | Kafka客户端库中的类,包含生产者配置的常量。 | 提供配置参数的常量值,如broker地址、序列化器等。 |
ProducerRecord<K, V> | Kafka客户端库中的类,表示要发送到Kafka主题的消息记录。泛型参数K 和V 分别表示消息键和值的类型。 | 创建消息记录对象,包含主题、键和值。 |
StringSerializer | Kafka客户端库中的类,实现了Serializer<String> 接口,用于将字符串类型的键或值序列化为字节数组。 | 作为键和值的序列化器,将字符串转换为字节数组进行传输。 |
效果
代码示例(异步)
package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class UnSyncCustomProducer {public static void main(String[] args) {//实例化PropertiesProperties prop = new Properties();//集群节点prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");//key和valueprop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建kafka生产者对象,并写入响应参数KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);//发送数据for (int i = 0; i < 5; i++) {//异步发送数据,不调用get方法producer.send(new ProducerRecord<>("topicA", "unsync_msg" + i));}producer.close();}
}
异步和同步的区别
同步发送
定义与流程
定义: 同步发送是指生产者在发送一条消息后,会立即等待Kafka服务器的响应。只有在服务器返回成功响应后,生产者才会继续发送下一条消息。
流程:
生产者调用send()方法发送消息。
send()方法返回一个Future对象。
生产者调用Future对象的get()方法,该方法会阻塞当前线程,直到Kafka服务器返回响应或抛出异常。
生产者收到响应后,根据结果(成功或失败)进行后续操作。
特点
高可靠性:同步发送确保每条消息都被Kafka集群接收并持久化。生产者等待Kafka确认消息已经成功写入指定分区且复制到满足副本因子的节点上,从而提高了消息的可靠性。
异常处理:如果发送过程中发生异常,生产者可以立即感知并处理,避免了消息的丢失。
性能较低:由于同步发送需要阻塞等待响应,因此会增加消息的延迟,降低系统的吞吐量。特别是在高并发场景下,可能会导致线程资源的大量占用和性能瓶颈。
易调试:便于发现和处理异常,有利于开发和测试阶段的调试工作。
异步发送
定义与流程
定义: 异步发送是指生产者在发送一条消息后,不会立即等待Kafka服务器的响应,而是继续发送下一条消息。发送方通过传递一个回调函数给send()方法,该回调函数将在消息发送结果(成功或失败)可用时被异步调用。
流程:
生产者调用send()方法发送消息,并传递一个回调函数。
Kafka客户端将消息放入内部缓冲区,并立即返回。
Sender线程负责将缓冲区中的消息批量发送到Kafka集群。
当消息发送成功或失败时,Kafka客户端调用之前传递的回调函数,通知生产者消息发送的结果。
特点
高性能:异步发送方式下,生产者无需等待每个消息的确认即可继续发送下一条消息,从而提高了消息的发送效率,适用于高吞吐量场景。
灵活性:通过回调函数,生产者可以对消息发送的结果进行异步处理,如记录日志、重试发送等。
可靠性相对较低:由于生产者不会立即得知消息是否成功写入Kafka,因此消息的可靠性需要额外关注。如果生产者在发送消息后立即崩溃,可能会导致部分消息丢失。
调试复杂:由于消息发送和结果是异步的,因此调试时可能需要更多的日志记录和监控手段来确保消息的可靠性和完整性。
异步回调
描述
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。消息发送失败后会自动重试,不需要再回调函数中手动重试。
代码示例
package com.wunaiieq;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class UnSyncCallBackCustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop = new Properties();//TODO 2.为配置文件对象设置参数// 2.1 配置bootstrap_serversprop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");// 2.2 配置key和value的序列化类prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);//TODO 4.发送消息for(int i = 0;i<5;i++){//异步发送消息 不调用get()方法producer.send(new ProducerRecord<>("topicA", "unsync_msg" + i),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,笑死发送成功,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}//TODO 5.关闭生产者producer.close();}
}
拦截器
描述
拦截器(Interceptor)是kafka0.10.0.0版本中引入的新功能,主要用于实现clients端的定制化控制逻辑。它可以使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时允许指定多个Interceptor按序作用于同一条消息从而形成一个拦截器链(Interceptor Chain)。
自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
拦截器内部方法
-
onSend
方法- 作用:在消息发送之前进行拦截,允许对消息进行修改或处理。
- 参数:接收一个
ProducerRecord
对象。 - 返回值:返回一个
ProducerRecord
对象,可能是修改后的记录。 - 应用场景:添加消息头、修改消息内容、过滤消息等。
-
onAcknowledgement
方法- 作用:在消息发送成功或失败后进行回调。
- 参数:
RecordMetadata
:包含消息的元数据。Exception
:发送过程中可能抛出的异常,成功时为null
。
- 返回值:无。
- 应用场景:记录发送结果、统计发送成功率、处理发送失败等。
-
close
方法- 作用:在拦截器不再使用时进行资源清理。
- 参数:无。
- 返回值:无。
- 应用场景:关闭打开的文件、释放内存、断开网络连接等。
拦截器Interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外,若指定了多个Interceptor,则producer将按照指定顺序调用它们,同时把每个Interceptor中捕获的异常记录写入到错误日志中而不是向上传递。这在使用过程中要特别留意。
代码示例
实现一个简单的双interceptor组成的拦截链。
第一个Interceptor会在消息发送前将时间戳信息加到消息value的前面
第二个Interceptor在消息发送后更新成功发送消息数或失败发送消息数。
第一个拦截器示例
package com.wunaiieq;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class TimeStampInterceptor implements ProducerInterceptor<String,String> {/**初始化拦截器,并接收Kafka生产者的配置参数。* */@Overridepublic void configure(Map<String, ?> configs) {}/**发送之前被调用,对消息进行处理。* */@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return new ProducerRecord<String, String>(//原始消息记录的主题、分区、时间戳和键。record.topic(),record.partition(),record.timestamp(), record.key(),//将当前系统时间戳(System.currentTimeMillis())和原始消息值拼接成新的消息值,中间用逗号分隔。System.currentTimeMillis()+","+record.value());}/**消息发送成功或失败后被调用。* */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}/**在拦截器不再使用时进行资源清理。* */@Overridepublic void close() {}
}
第二个拦截器示例
package com.wunaiieq;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CounterIntercepter implements ProducerInterceptor<String,String> {private int errorCounter = 0;private int successCounter = 0;/**onSend方法,该方法在消息发送之前被调用,用于对消息进行处理。* 由于这是第二个拦截器,因此这里接受的是前一个拦截器的输出* */@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}/**在消息发送成功或失败后被调用。* 统计消息发送成功或失败的数量* */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(exception==null){successCounter++;}else{errorCounter++;}}/**拦截器关闭时会进行的额外操作* 打印成功或失败的消息数量* */@Overridepublic void close() {System.out.println("successful send:"+successCounter);System.out.println("failed send:"+errorCounter);}@Overridepublic void configure(Map<String, ?> configs) {}
}
拦截器调用
package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class SyncCustomProducerInterceptor {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 构造拦截器链List<String> interceptors = new ArrayList<>();interceptors.add("com.wunaiieq.TimeStampInterceptor");interceptors.add("com.wunaiieq.CounterIntercepter");//配置拦截器链(将拦截器链加入到配置文件中)prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);KafkaProducer<String, String> producer =new KafkaProducer<String, String>(prop);for (int i = 5; i < 10; i++) {//同步发送消息producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();}//一定要关闭生产者,这样才会调用interceptor的close方法producer.close();}
}
效果
消息序列化
描述
消息序列化是将对象转换为字节流的过程。在Kafka中,生产者需要将消息对象序列化为字节流,以便通过网络发送给Kafka集群;而消费者则需要将从Kafka集群接收到的字节流反序列化为对象,以便进行后续处理。
代码示例(自定义序列化)
pom.xml
增加依赖
<dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><version>1.9.13</version></dependency>
UserVo.java
值对象
package com.wunaiieq;public class UserVo {private String name;private int age;private String address;public UserVo(String name, int age, String address) {this.name = name;this.age = age;this.address = address;}@Overridepublic String toString() {return "UserVo{" +"name='" + name + '\'' +", age=" + age +", address='" + address + '\'' +'}';}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}
UserSerializer.java
重写Serializer接口实现序列化操作
package com.wunaiieq;import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;public class UserSerializer implements Serializer<UserVo> {private ObjectMapper objectMapper;@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {objectMapper = new ObjectMapper();//Serializer.super.configure(configs, isKey);}/*** @param topic 消息将要发送到的主题名* @param data 需要序列化的UserVo对象。* */@Overridepublic byte[] serialize(String topic, UserVo data) {//存储序列化后的字节数组byte[] ret = null;try {//data写成JSON字符串再写成UTF_8的字节数组ret = objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8);} catch (IOException e) {throw new SerializationException("Error when serializing UserVo to byte[],exception is " + e.getMessage());}return ret;}@Overridepublic void close() {objectMapper = null;//Serializer.super.close();}
}
UserSerProducer.java
调用自定义序列化机制
package com.wunaiieq;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class UserSerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");// TODO 不用修改key的序列化机制,后续没用到prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 修改value的序列化机制prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());KafkaProducer<String,UserVo> producer = new KafkaProducer<String, UserVo>(prop);UserVo userVo = new UserVo("wunaiieq",18,"北京");producer.send(// TODO 关于消息记录的构造中,可以指定 1.主题、值 2.主题、键、值new ProducerRecord<String,UserVo>("topicA", userVo),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(50);producer.close();}
}
效果
前面的不用管,只是没清空而已
分区
描述
分区位于拦截器链后面
生产者分区的优势:
-
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
-
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
-
分区后,更方便于做副本备份,提高了数据安全性。
分区策略
以下为提供的默认分区策略,自行选择即可
- 轮询策略(Round-Robin Strategy)
原理:按照顺序将消息发送到不同的分区,每个消息被发送到其对应分区,循环轮询每个分区,确保消息在所有分区之间均匀分布。
特点:适用于生产者不需要根据消息内容或键选择特定分区的场景,能够实现负载均衡,最大限度地利用集群资源。
默认情况:这是Kafka Java生产者API默认提供的分区策略。如果没有指定分区策略,则会默认使用轮询策略。 - 按键分配策略(Key-Based Partitioning)
原理:消息的键被用作决定消息分区的依据。生产者将消息的键发送给Kafka,Kafka根据键的哈希值将消息路由到相应的分区。
特点:适用于键值对的数据结构,通过将具有相同键的消息发送到同一分区,可以提高数据局部性和处理效率。同时,能够保证具有相同键的消息顺序性。 - 范围分区策略(Range Partitioning)
原理:根据消息键的范围将消息分配到不同的分区。每个分区包含一个键值范围内的消息。
特点:适用于有序数据的处理,如时间戳或递增的ID。通过将具有相似时间戳或递增ID的消息分配到同一分区,可以提高处理效率并保证数据的顺序性。 - 自定义分区策略(Custom Partitioning)
原理:用户可以根据特定的业务逻辑或规则来决定消息的分区。通过实现自定义的分区器类,根据应用程序的需求来定义分区的逻辑。
特点:提供了更高的灵活性,可以根据地理位置、用户ID或其他业务规则来决定消息的分区。
实现方式:实现org.apache.kafka.clients.producer.Partitioner接口,并重写partition、close和configure方法。其中,partition方法是核心,用于根据给定的键、值和分区信息来计算分区号。 - 粘性分区策略(Sticky Partitioning)
原理:尽可能将消息分配到与之前消息相同的分区,以减少跨分区的数据移动和复制。通过维护一个分区和消费者的映射关系来实现。
特点:在消费者组或分区数量发生变化时,能够尽可能减少对现有分区分配的影响,减少负载均衡的开销,提高处理效率。
代码示例
写入默认分区(0号分区)
消息只会发送到指定的分区内部
package com.wunaiieq.partition;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class ProducerToPartition {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop = new Properties();//TODO 2.为配置文件对象设置参数// 2.1 配置bootstrap_serversprop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");// 2.2 配置key和value的序列化类prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);//TODO 4.发送消息for(int i = 0;i<5;i++){//指定数据发送到0号分区,key为nullproducer.send(new ProducerRecord<>("topicA",0,null, "unsync_msg" + i),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}//TODO 5.关闭生产者producer.close();}
}
自定义分区机制
部分消息可能需要额外的处理内容,比如审计等等,这类消息的key会携带关键字符串“wunaiieq”,现在让其发送到topicA主题的最后一个分区上,以便于后续处理,其他的消息则随机发送(不包括最后一个分区)
WunaiieqPartitioner.java
分区写入策略
package com.wunaiieq.partition;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;
import java.util.Random;public class WunaiieqPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> configs) {//该方法实现必要资源的初始化工作random = new Random();}/** 计算信息对应的分区* @param topic 主题* @param key 消息的key* @param keyBytes 消息的key序列化后的字节数组* @param value 消息的value* @param valueBytes 消息value序列化后的字节数组* @param cluster 集群元数据 可以获取分区信息* @return 息对应的分区号*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//将key转换为字符串String keyInfo = (String)key;//获取主题的分区对象列表List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);//获取主题下的分区总数量int partCount = partitionInfoList.size();if (partCount <= 1) {System.out.println("1 partition");return 0; // 只有一个分区时,直接返回0}//最后一个分区号int wunaiieqPartition = partCount-1;//如果 key 为空、key 为空字符串或 key 不包含 "wunaiieq",则随机选择一个除最后一个分区外的分区;否则,消息发送到最后一个分区。return keyInfo==null || keyInfo.isEmpty()||!keyInfo.contains("wunaiieq")? random.nextInt(partCount-1) : wunaiieqPartition ;}@Overridepublic void close() {//该方法实现必要资源的清理工作random = null;}
}
CustomPartitionerProducer.java
调用分区策略
package com.wunaiieq.partition;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomPartitionerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 在用于构造KafkaProducer的Properties对象中设置partitioner.class参数prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.wunaiieq.partition.WunaiieqPartitioner");KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);for(int i = 0;i<5;i++){// TODO 不指定分区号,key为"wunaiieq"测试运行一次,改为"kafka"后再测试一次。producer.send(new ProducerRecord<>("topicA","aa", "unsync_msg" + i),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}producer.close();}
}
消息丢失
判断消息丢失时,一般看应答机制
- acks=0:因为不需要等待leader数据持久化就完成应答,leader宕机后可能存在数据丢失(follower内部数据从leader中同步,leader没有---->>follower也没有)
- acks=1:此时leader持久化完成应答(但是follower可能没有完成数据同步,leader宕机,导致数据丢失)一般用于传输普通日志
- acks=all(或acks=-1):leader和follower都持久化后并回应,才认为消息发送成功。这种方式性能最低,但可靠性最高。传输重要数据。
特殊情况
在acks=-1或all的情况下,Leader接收到数据并持久化后,所有Follower开始同步Leader刚刚持久化的数据,但是有一个Follower因故障迟迟不能进行数据同步,该问题应该怎么解决?
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。
该时间阈值由replica.lag.time.max.ms参数设定,默认30000ms。例如1超时,(leader:0, isr:0,2)。这样就不用等长期联系不上或者已经故障的节点。
消息绝对不丢失的条件
- ACK级别设置为-1
- 分区副本>=2
- ISR应答的最小副本数>=2 (最小副本数有min.insync.replicas设置,默认为1)
生产中配置响应级别代码块:
// 设置 acks
prop.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数 retries,默认是 int 最大值,2147483647
prop.put(ProducerConfig.RETRIES_CONFIG, 3);
数据去重
描述
数据重复的原因
- 当生产者发送消息到Kafka集群时,如果由于网络故障或Kafka Broker的临时问题导致消息发送失败,生产者通常会进行重试。如果重试时Kafka Broker已经成功处理了之前的消息但尚未发送确认(ACK),那么重试发送的消息就会导致数据重复。
- 网络延迟或不稳定可能导致消息发送失败,生产者会进行重试,从而增加消息重复的风险。
- 如果Kafka Broker在消息发送成功后崩溃,但在发送确认(ACK)之前崩溃,生产者可能会重试发送相同的消息,导致消息重复。
数据去重
-
至少一次(At Least Once):ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2(保证数据绝对不丢失,但不能保证数据不重复)
-
最多一次(At Most Once):ACK级别设置为0(保证数据绝对不重复,但不能保证数据不丢失)
-
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
- 精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
- 重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。
使用幂等性的使用
开启参数 enable.idempotence 默认为 true,false 关闭。
事务
- 寻找事务协调器:KafkaProducer 使用 trans.id 寻找事务协调器 (Transaction Coordinator)。
- 协调器通过 broker0 返回事务协调器的地址,包括事务信息的主题和分区领导者 (transaction_state-分区-Leader)。
- 请求 partId(开启幂等性): KafkaProducer向事务协调器请求 partId,并开启幂等性。
- 事务协调器接收到请求后,将请求持久化,并返回 partId给 KafkaProducer。
- 发送消息: KafkaProducer 使用返回的 partId 发送消息到指定主题的指定分区(topicA-Partition0 或 topicA-Partition1)。
6.发送 commit 请求: KafkaProducer发送 commit 请求到事务协调器,以提交事务。
7.事务协调器接收到 commit 请求后,将其持久化。- 事务成功:事务协调器确认事务成功,并返回成功信息给 KafkaProducer。
代码示例(事务)
package com.wunaiieq;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTransaction {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 设置事务idprop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_topicA_0");KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);//TODO 初始化事务producer.initTransactions();//TODO 开启事务producer.beginTransaction();//TODO 添加异常处理,成功提交事务,失败回滚事务try {//发送消息for (int i = 0; i < 5; i++) {//同步发送消息producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();}//TODO 提交事务producer.commitTransaction();}catch (Exception e){//TODO 放弃事务producer.abortTransaction();}producer.close();}
}
相关文章:

kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务)
目录 生产者发送数据原理参数说明代码示例(同步发送数据)代码示例(异步) 异步和同步的区别同步发送定义与流程特点 异步发送定义与流程特点 异步回调描述代码示例 拦截器描述代码示例 消息序列化描述代码示例(自定义序…...

【React+TypeScript+DeepSeek】穿越时空对话机
引言 在这个数字化的时代,历史学习常常给人一种距离感。教科书中的历史人物似乎永远停留在文字里,我们无法真正理解他们的思想和智慧。如何让这些伟大的历史人物"活"起来?如何让历史学习变得生动有趣?带着这些思考&…...

公共数据授权运营系统建设手册(附下载)
在全球范围内,许多国家和地区已经开始探索公共数据授权运营的路径和模式。通过建立公共数据平台,推动数据的开放共享,促进数据的创新应用,不仅能够提高政府决策的科学性和公共服务的效率,还能够激发市场活力࿰…...

基于HTML和CSS的旅游小程序
一、技术基础 HTML(HyperText Markup Language):超文本标记语言,用于定义网页的内容和结构。在旅游小程序中,HTML用于搭建页面的基本框架,包括标题、段落、图片、链接等元素,以及用于交互的表单…...

maven之插件调试
当使用maven进行项目管理的时候,可能会碰到一些疑难问题。网上资料很少,可能会想着直接调试定位问题。这里以maven-compiler-plugin为例: (1)准备maven-compiler-plugin源码 进入maven 官网-》Maven Plugins-》找到对…...

SQL Sever 数据库损坏,只有.mdf文件,如何恢复?
SQL Sever 数据库损坏,只有.mdf文件,如何恢复 在SQL Server 2008中,如果只有MDF文件而没有LDF文件,附加数据库的过程会稍微复杂一些。以下是几种可能的方法 一、使用紧急模式重建日志文件 1、新建一个同名的数据库。 2、停止SQ…...

【AWS SDK PHP】This operation requests `sigv4a` auth schemes 问题处理
使用AWS SDK碰到的错误,其实很简单,要装个扩展库 保持如下 Fatal error: Uncaught Aws\Auth\Exception\UnresolvedAuthSchemeException: This operation requests sigv4a auth schemes, but the client currently supports sigv4, none, bearer, sigv4-…...

primevue的<Menu>组件
1.使用场景 2.代码 1.给你的menu组件起个引用名 2.<Menu>组件需要一个MenuItem[] 3.你要知道MenuItem[ ]的特殊的数据格式,就像TreeNode[ ]一样,数据格式不对是不渲染的。。。。 常用的属性就这几种,js语言和java不一样,J…...

利用Deeplearning4j进行 图像识别
目录 图像识别简介 神经网络 感知器 前馈神经网络 自动编码器 受限玻尔兹曼机 深度卷积网络 理解图像内容以及图像含义方面,计算机遇到了很大困难。本章先介绍计算机理解图像教育方面 遇到的难题,接着重点讲解一个基于深度学习的解决方法。我们会…...

练习题:37
目录 Python题目 题目 题目分析 套接字概念剖析 通信原理分析 服务器 - 客户端连接建立过程: 基于套接字通信的底层机制: 代码实现 基于 TCP 的简单服务器 - 客户端通信示例 服务器端代码(tcp_server.py) 客户端代码&a…...

Unity热更文件比较工具类
打包出来的热更文件,如果每次都要全部上传到CDN文件服务器,不进耗费时间长,还浪费流量。 所以让AI写了个简单的文件比较工具类,然后修改了一下可用。记录一下。 路径可自行更改。校验算法这里使用的是MD5,如果使用SH…...

【hustoj注意事项】函数返回值问题
原文 https://lg.h-fmc.cn/index.php/BC/27.html 问题回顾 此题目选自HFMC_OJ:4312: 简单递归操作 hustoj测试 此问题错误的代码是 #include<bits/stdc.h> using namespace std; int a[10000];int n; int b[10000]{0}; int pailie(int deep) {int i; for(…...

实现一个通用的树形结构构建工具
文章目录 1. 前言2. 树结构3. 具体实现逻辑3.1 TreeNode3.2 TreeUtils3.3 例子 4. 小结 1. 前言 树结构的生成在项目中应该都比较常见,比如部门结构树的生成,目录结构树的生成,但是大家有没有想过,如果在一个项目中有多个树结构&…...

数势科技:解锁数据分析 Agent 的智能密码(14/30)
一、数势科技引领数据分析变革 在当今数字化浪潮中,数据已然成为企业的核心资产,而数据分析则是挖掘这一资产价值的关键钥匙。数势科技,作为数据智能领域的领军者,以其前沿的技术与创新的产品,为企业开启了高效数据分析…...

机器学习之过采样和下采样调整不均衡样本的逻辑回归模型
过采样和下采样调整不均衡样本的逻辑回归模型 目录 过采样和下采样调整不均衡样本的逻辑回归模型1 过采样1.1 样本不均衡1.2 概念1.3 图片理解1.4 SMOTE算法1.5 算法导入1.6 函数及格式1.7 样本类别可视化理解 2 下采样2.1 概念2.2 图片理解2.3 数据处理理解2.4 样本类别可视化…...

解决 ssh connect to host github.com port 22 Connection timed out
一、问题描述 本地 pull/push 推送代码到 github 项目报 22 端口连接超时,测试连接也是 22 端口连接超时 ssh 密钥没问题、也开了 Watt Toolkit 网络是通的,因此可以强制将端口切换为 443 二、解决方案 1、测试连接 ssh -T gitgithub.com意味着无法通…...

mybatis/mybatis-plus中mysql报错
文章目录 一、sql执行正常,mybatis报错二、sql执行正常,mybatis-plus报错直接改变字段利用mybatis-plus特性处理 总结 一、sql执行正常,mybatis报错 Caused by: net.sf.jsqlparser.parser.ParseException: Encountered unexpected token: "ur" <K_ISOLATION>a…...

在ros2 jazzy和gazebo harmonic下的建图导航(cartographer和navigation)实现(基本)
我的github分支!!! 你可以在这里找到相对应的源码。 DWDROME的MOGI分支 来源于!! MOGI-ROS/Week-3-4-Gazebo-basics 学习分支整理日志 分支概述 这是一个用于个人学习的新分支,目的是扩展基本模型并添加…...

《Rust权威指南》学习笔记(五)
高级特性 1.在Rust中,unsafe是一种允许绕过Rust的安全性保证的机制,用于执行一些Rust默认情况下不允许的操作。unsafe存在的原因是:unsafe 允许执行某些可能被 Rust 的安全性检查阻止的操作,从而可以进行性能优化,如手…...

GitHub的简单操作
引言 今天开始就要开始做项目了,上午是要把git搭好。搭的过程中遇到好多好多的问题。下面就说一下git的简单操作流程。我们是使用的GitHub,下面也就以这个为例了 一、GitHub账号的登录注册 https://github.com/ 通过这个网址可以来到GitHub首页 点击中间绿色的S…...

「Mac畅玩鸿蒙与硬件54」UI互动应用篇31 - 滑动解锁屏幕功能
本篇教程将实现滑动解锁屏幕功能,通过 Slider 组件实现滑动操作,学习事件监听、状态更新和交互逻辑的实现方法。 关键词 滑动解锁UI交互状态管理动态更新事件监听 一、功能说明 滑动解锁屏幕功能包含以下功能: 滑动解锁区域:用…...

SMMU软件指南之系统架构考虑
安全之安全(security)博客目录导读 目录 5.1 I/O 一致性 5.2 客户端设备 5.2.1 地址大小 5.2.2 缓存 5.3 PCIe 注意事项 5.3.1 点对点通信 5.3.2 No_snoop 5.3.3 ATS 5.4 StreamID 分配 5.5 MSI 本博客介绍与 SMMU 相关的一些系统架构注意事项。 5.1 I/O 一致性 如…...

使用高云小蜜蜂GW1N-2实现MIPI到LVDS(DVP)转换案例分享
作者:Hello,Panda 大家晚上好,熊猫君又来了。 今天要分享的是一个简单的MIPI到LVDS(DVP)接口转换的案例。目的就是要把低成本FPGA的应用潜力充分利用起来。 一、应用背景 这个案例的应用背景是:现在还在…...

「C++笔记」unordered_map:哈希化的无序映射函数(键值对)
unordered_map 是 C 中一个经过哈希函数(Hash)处理的映射(map)容器。 本文中的map和set是差不多的,unordered_map与unordered_set也是对应的。所以不再单独写一篇了。 这里的内容建议看完本文之后再回过头来看 二者虽然…...

Linux 安装jdk
1、官网下载jdk https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html2、以tar包为例,在window或者Linux解压都可以,这里直接在win解压了,上传到服务器 3、在/usr/local/ 创建jdk目录,将jdk上传到…...

asp.net core 发布到iis后,一直500.19,IIS设置没问题,安装了sdk,文件夹权限都有,还是报错
原因就是没有安装ASP.NET Core 9.0 Runtime (v9.0.0) - Windows Hosting Bundle,我是只安装了.net core的sdk,下面介绍下sdk和hosting bundle的关系 在 .NET Core 和 ASP.NET Core 的开发中,SDK(Software Development Kit&#x…...

【Go】运行自己的第一个Go程序
运行自己的第一个Go程序 一、Go语言的安装Go环境安装查看是否安装成功配置GOPROXY(代理) 二、Goland安装三、Goland破解四、新建项目 开一篇专栏记录学习Go的过程,一门新语言从hello world开始,这篇文章详细讲解Go语言环境搭建及hello world实现 一、Go语…...

qt qss文件的使用
qt样式的修改方式 一 通过ui界面的改变样式表来直接修改显示效果。 不推荐,其他人不好修改,不够直观,不易维护。 二 通过setStyleSheet接口修改。 一般,界面很少的时候可以使用。一旦界面多起来,代码部分就显得杂乱…...

【管道——二分+区间合并】
题目 思路 区间合并 1、按照左端点排序2、遍历窗口,若窗口非法,继续遍历;否则执行33、若是第一个窗口,设定合并结果初值,判断结果左端点是否造成“起点过大”,是,FALSE退出;否则执行…...

宽带、光猫、路由器、WiFi、光纤之间的关系
1、宽带(Broadband) 1.1 宽带的定义宽带指的是一种高速互联网接入技术,通常包括ADSL、光纤、4G/5G等不同类型的接入方式。宽带的关键特点是能够提供较高的数据传输速率,使得用户可以享受到稳定的上网体验。 1.2 宽带的作用宽带是…...