Kafka 入门与实战
一、Kafka 基础
1.1 创建topic
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
1.2 查看消费者偏移量位置
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test
1.3 消息的生产与发送
#生产者
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test#消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
二、Kafka 集群部署
在 Kafka 集群中,每个节点是一个 broker,broker 中可以创建多个 Topic,每个Topic都可以被分成多个Partition,每个 Partition 都是一个有序的、不可变的消息序列。Partition 是Kafka集群中消息存储的最小单元,也是Kafka集群中消息分发和负载均衡的最小单位,Partition 通过副本机制 Replication 来保证数据的高可用性和容错性,每个 Partition 都有一个 leader 的副本和 多个 follower 副本,leader 副本负责接收和处理消息,follower 副本负责复制leader 副本的数据。
2.1 修改server.properties
#设置 broker 不唯一
broker.id=1
#若是在一台机器上,需要更改端口号,避免冲突
listeners=PLAINTEXT://:9091
#日志目录,选择性更改
log.dirs=D:/kafka-cluster/data/kafka-broker-1
2.2 创建启动脚本文件
#zookeeper 启动脚本
@echo off
cd /d %~dp0
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
pause#每个Kafka服务器创建启动脚本
@echo off
cd /d %~dp0
.\bin\windows\kafka-server-start.bat .\config\server.properties
pause
2.3 创建批处理启动脚本
@echo off
cd /d %~dp0
cd ./kafka-broker-1
start zookeeper.bat
ping 127.0.0.1 -n 10 >nul
start kafka.bat
cd ../kafka-broker-2
start kafka.bat
cd ../kafka-broker-3
start kafka.bat
pause
2.4 创建批处理清除脚本
再次重新启动会启动失败,需要清除文件夹
@echo off
cd /d %~dp0
rd /s /q D:\kcluster\data
pause
Windows 下使用反斜杠,若使用正斜杠,可能会报错。
三、zookeeper 服务启动
3.1 kafka 启动,zookeeper节点变化
controller:集群模式下会有多个 broker 节点,而这些broker节点会选举出一个管理者-controller。
zookeeper 中数据存储的方式类似文件:
当有kafka连接 zk 时,就会创建节点,存储相关信息,此时的 controller 是临时节点,当我们的 kafka 服务器关闭时,该节点就会被删除(默认创建临时节点):
3.2 集群模式
当我们以集群的方式去启动时,controller 的选举策略是 “先入为主”,即哪个 broker 先注册至 zk 中,和 zk 建立连接并创建 controller 节点,它就会被选举为 controller。此时其他 broker 节点在尝试创建 controller 节点时就会失败,但仍会监听 controller 节点,如果 controller 节点挂掉了,此时其他 broker 节点就会去竞争创建 controller 节点,先创建的就会被选举为 controller 节点,而其他的 broker 节点会创建失败,继续去监听新的 controller。
3.3 controller 与 broker 通信
监听 /brokers/ids 节点是为了监听是否有新的 broker 节点,并管理这些 broker 节点。
第二个broker 启动流程
四、主题创建
主题创建时需要指定 name,partition 和 replication,partition 是数据负载均衡的最小单位,partition 数一般小于等于 broker 节点数,replication 是partition的副本节点,replication 一般会和partition 存储到不同的 broker 节点。
五、生产数据
5.1 生产流程
先将信息封装成 ProducerRecord ,然后再经过拦截器得到一个经过拦截处理后的ProducerRecord,之后再通过序列化器针对 key 和 value 进行序列化,通过分区器计算数据发送至哪个partition,即发送到对应的 broker 节点,然后加入到缓冲区,直到缓存刷新或者缓存区满后通过 Sender 发送线程将数据发送至 broker。
分区器会通过 MetadataCache 来获取 topic 的相关信息,并针对 partition 来计算对应的分区。(如果设置分区编号,则会直接使用,不会对编号进行校验,如果没有对应的分区,数据则无法正确存储到对应的 topic 中)
5.2 数据的异步发送和同步发送
数据默认是异步发送的, 将数据发送至数据缓冲区中,之后会由 sender 线程来发送。同步发送:
@RequestMapping("/send")public String send(String msg) throws ExecutionException, InterruptedException {CompletableFuture future=kafkaTemplate.send(TOPIC,msg);future.get();return "success";}
5.3 ACKS 应答处理机制
- 0:sender 线程发送消息后,直接返回 ack,此时数据只是放到了网络当中,不会考虑数据是否真正存到 Kafka 中。
- 1:当数据保存到磁盘后,即保存到对应的分区后会直接返回 ack。
- -1(all):当多个副本 replication 都同步消息完成后,才会返回 ack,该级别是最安全的。
5.4 数据重复及乱序的原因
要了解原因首先知道重传机制: 当数据没有正常发送,没有接收到对应的 ack 时,此时就会重新发送,直到发送成功或者超过最大重试次数。
当我们的leader 节点保存数据到磁盘后,在返回ack的时候,由于网络问题,导致连接超时或者ack 丢失,就会导致我们的数据重新发送,此时就会导致数据重复。
由于数据没有正常发送,此时数据就会重新回到缓冲区,sender 线程再重新拉取并发送到对应的 topic 中,而在重新发送成功之前,此时其他消息已经保存到了 topic 中,这时就导致数据乱序。
5.5 幂等性操作
幂等性可以解决上述数据乱序和重复的问题,但是幂等性开启后有以下几点要求:
- acks= -1
- 需要开启重试机制
- 在图请求缓冲区不能大于5
如何解决的?
在broker维护了一个保存生产者生产数据的分区状态 ProducerState ,之里面会维护最近五条消息,在新发送消息后,会去验证消息是否相同,若重复则不会继续添加,若没有重复,则会判断顺序号是否是连续的,如果顺序号不是连续的,则会将数据重新返回发送缓冲区,再重新发送,这也是要求为什么要求开启重试机制。
值得注意的是: 幂等性只能保证一个分区的数据不重复和顺序连续,无法保证多个分区的连续。由于我们的幂等性由生产者id + 数据顺序号来决定的,当我们的 broker 重启,生产者 id就会改变,此时相同的数据由于不同的生产者id 仍然会造成上述问题,也就是说无法实现跨会话幂等。
5.6 事务操作及流程
为了解决跨会话幂等性,可以通过事务来解决。
当我们开启事务后,可以保证broker 节点多次重启,保证生产者 id 不变,这就解决了我们上述的幂等性出现的问题。但是事务仍然只能保证单个分区的幂等性,即开启事务可以保证跨会话的幂等性,但无法保证跨分区的幂等性。引文在 bachMetadata 中保存的只有一个分区的最近五条消息,无法跨会话进行判断数据的重复和顺序。
流程:producer 首先会发送请求事务管理器的所在分区节点,Broker 根据事务 id 的 hash 值并对事务管理器状态分区个数(50)取余,返回对应的事务管理器所在分区。producer 初始化生产者 id,并将数据的分区信息发送给事务管理器。之后 prducer 开始生产数据,并将数据发送对应分区的 leader 节点,当数据保存后,对应的 broker 节点会将数据保存成功后的数据分区信息发送给事务协调器,并向生产者返回 acks 应答响应。producer 接收到应答响应后向事务协调器发送关闭事务,事务协调器接收到请求后,首先会将 __trancsaction_state 中的事务状态修改为 PrepareCommit(准备提交),然后再将事务当前的状态返回给 broker 节点,最后事务协调器会将 __transaction_state 中保存的事务状态改为 CompleteCommit。
六、代码片段
6.1 消费数据偏移量
@RequestMapping("/send")public String send(String msg) throws ExecutionException, InterruptedException {CompletableFuture future=kafkaTemplate.send(TOPIC,msg);future.get();return "success";}//监听所订阅的主题@KafkaListener(topics = {Producer.TOPIC})public void onMessage(String data,Acknowledgment ack){System.out.println("receive: " + data);ack.acknowledge();}
kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --group test --topic test --reset-offsets --to-earliest --execute
手动更改偏移量可使消费过的数据从头消费
spring:kafka:consumer:producer:#如果之前用相同的消费者组消费过该主题,并且 offset已经记录在 kafka 中,那么从kafka中读取的offset就是该offset,kafka 只会在找不到偏移量时会使用这个配置,如果想要从头消费,可以使用心得消费者group-id,或者手动提交偏移量.auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: false
- earliest:自动将偏移量重置为最早的偏移量
- latest:自动将偏移量重置为最新的偏移量
- none:如果没有为消费者找到以前的偏移量,则向消费者抛异常
6.2 send() 常用方法
@RequestMapping("/send2")public String sendMessage(){Message<String> message= MessageBuilder.withPayload("hello kafka").setHeader(KafkaHeaders.TOPIC,"test").build();kafkaTemplate.send(TOPIC,"hello kafka");return "success";}@RequestMapping("/send3")public String sendRecord(){//Headers 中可以存放一些信息(信息是key-valur 键值对),当消费者接收到消息后,可以拿到这个 headers 里面存放的信息.Headers headers=new RecordHeaders();headers.add("phone","1234567".getBytes(StandardCharsets.UTF_8));headers.add("name","zhangsna".getBytes(StandardCharsets.UTF_8));ProducerRecord record=new ProducerRecord<>("test",0,System.currentTimeMillis(),"k1","hello kafka",headers);kafkaTemplate.send(record);return "success";}//消费者获取 header 信息@KafkaListener(topics = {Producer.TOPIC})public void onMessage(String data, Acknowledgment ack,@Header(value = KafkaHeaders.RECEIVED_TOPIC, required = false) String topic,@Header(value = "name", required = false)String name,@Header(value = "phone", required = false)String phone){User user= JsonUtils.fromJson(data, User.class);System.out.println("receive: " + data.toString()+" topic: "+topic+" name: "+name+" phone: "+phone);ack.acknowledge();}//同步提交@RequestMapping("/send4")public String sendSync(){for(int i=0;i<10;i++) {System.out.println("发送消息: "+i);CompletableFuture future=kafkaTemplate.send("test",i+"");try {SendResult<String,String> result= (SendResult<String, String>) future.get();if(result.getRecordMetadata()!=null){System.out.println("消息发送成功: "+result.getRecordMetadata().toString());}System.out.println(result.getRecordMetadata());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}System.out.println("成功发送消息: "+i);}return "success";}//异步提交@RequestMapping("/send5")public String sendAsync(){for(int i=0;i<10;i++){System.out.println("发送消息: "+i);CompletableFuture<SendResult<String,String> >future=kafkaTemplate.send("test",i+"");future.thenAccept((sendResult)->{if(sendResult.getRecordMetadata()!=null){System.out.println("消息发送成功: "+sendResult.getRecordMetadata().toString());}System.out.println(sendResult.getProducerRecord());}).exceptionally((t)->{t.printStackTrace();//做失败处理return null;});}return "success";}
6.3 发送引用类型信息
将应用类型转换成 json 对象
package org.aliang.kafkademo.utils;import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class JsonUtils {public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();public static String toJson(Object object) {try {return OBJECT_MAPPER.writeValueAsString(object);} catch (Exception e) {e.printStackTrace();}return null;}public static <T> T fromJson(String json, Class<T> clazz) {try {return OBJECT_MAPPER.readValue(json, clazz);} catch (Exception e) {e.printStackTrace();}return null;}}
生产者消费者代码
@Autowiredprivate KafkaTemplate<String,User> kafkaTemplate2;@RequestMapping("/send6")public void sendObject(User user){String msg= JsonUtils.toJson(user);kafkaTemplate.send("test",msg);}@KafkaListener(topics = {Producer.TOPIC})public void onMessage(String data,Acknowledgment ack){User user= JsonUtils.fromJson(data, User.class);System.out.println("receive: " + data.toString());ack.acknowledge();}
6.4 继承 springboot 创建 topic
@Configuration
public class CreateTopic {@Beanpublic NewTopic newTopic() {return new NewTopic("test", 1, (short) 1);}@Beanpublic NewTopic updateTopic() {return new NewTopic("test", 2, (short) 1);}
}
更新 分区时只能增加分区数量,无法减少数量。
副本分区的数量不能大于 broker 节点个数。
6.5 发送消息配置分区策略
@Value("${kafka.cluster.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfig(){Map<String,Object> props =new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;}public ProducerFactory<String,Object> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfig());}@Beanpublic KafkaTemplate<String,Object> kafkaTemplate(){return new KafkaTemplate<>(producerFactory());}
但是当我们运行后,发现并不是每个分区都有数据,不符合我们的轮询算法,这是因为在发送消息的过程中,会调用两次我们的 partition 方法,最终就导致不符合我们的预期。
6.6 配置自定义分区策略
public class CustomPartitioner implements Partitioner {private AtomicInteger nextPartition = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(key==null){int next=nextPartition.getAndIncrement();if(next>=numPartitions){nextPartition.set(next,0);}
// System.out.println("分区值"+next);return next;}else {//kafka 默认的分区策略return Utils.toPositive(Utils.murmur2(keyBytes))%numPartitions;}}}public Map<String, Object> producerConfig(){Map<String,Object> props =new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);//配置自定义分区策略props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);return props;}
6.7 自定义消息拦截器
public class CustomerInterceptor implements ProducerInterceptor{/*** 发送消息是会调用该方法,可以在拦截器中做一些处理,记录日志操作* @param producerRecord* @return*/@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return producerRecord;}/*** @param recordMetadata 服务器返回的元数据信息* @param e*/@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(recordMetadata!=null){System.out.println("消息发送成功,偏移量: "+recordMetadata.offset());}else{System.out.println("消息发送失败");}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}public Map<String, Object> producerConfig(){Map<String,Object> props =new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomerInterceptor.class.getName());return props;}
6.8 消费者同时接受消息体和消息头
@KafkaListener(topics = {Producer.TOPIC})public void onMessage(Acknowledgment ack,ConsumerRecord<String,String> consumerRecord ){System.out.println("接收到consumerRecord消息: "+consumerRecord.toString());ack.acknowledge();}
6.9 指定 topic-partition-offset 消费
@KafkaListener(topicPartitions = @TopicPartition(//监听 test 主题topic = "test",//消费 0,1,2 分区的所有消息partitions = {"0","1","2"},partitionOffsets = {//第三、四个分区的偏移量从 2 开始消费@PartitionOffset(partition = "3",initialOffset = "2"),@PartitionOffset(partition = "4",initialOffset = "2")}))public void onMessage(ConsumerRecord record, Acknowledgment ack){System.out.println("receive: " + record.value()+"partition: "+record.partition()+" offset: "+record.offset());
// ack.acknowledge();}
若配置了监听的分区,但该主题下还有其他分区没配置,例如没有配置 5 分区,则不会消费 partition5 分区的消息。
6.10 批量消费信息
//批量消费数据@KafkaListener(topics = "test")public void onMessage(List<ConsumerRecord<String,String>> recordList, Acknowledgment ack){System.out.println("receive: " + recordList.size());ack.acknowledge();}
#设置批量消费
spring.kafka.type=batch
#批量消费每次消费多少条消息
spring.kafka.consummer.max-poll-records
6.11 集成消费拦截器
1. 实现kafka 的 ConsumerInterceptor 拦截器接口
@Configuration
public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {//在监听器消费消息之前执行@Overridepublic ConsumerRecords<String,String> onConsume(ConsumerRecords consumerRecords) {System.out.println("拦截器拦截到消息:"+consumerRecords);return consumerRecords;}//在消息提交偏移量之前执行@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> var) {System.out.println("拦截器执行 onCommit,提交offset");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
2. 在 kafka 消费者的 ConsumerFactory 配置中注册拦截器
@Configuration
public class KafkaConfig {@Value("${kafka.cluster.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public Map<String, Object> consumerConfig(){Map<String,Object> props =new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");return props;}@Beanpublic ConsumerFactory<String,Object> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfig());}@Beanpublic KafkaListenerContainerFactory<?> customListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}
3. 消费者代码
@KafkaListener(topics = "test",containerFactory = "customListenerContainerFactory", groupId = "test")public void onMessage(String data, Acknowledgment ack){System.out.println("receive: " + data);ack.acknowledge();}
总结: 通过配置新的监听器工厂,并配置监听器工厂中的消费者厂,消费者中配置自定义拦截器
上述代码虽然实现了自定义消息拦截器,但在运行过程中,发现我们的消费者和监听器的配置都没有生效,这是怎么回事呢?
public static void main(String[] args) {// SpringApplication.run(KafkaDemoApplication.class, args);ApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);Map<String, ConsumerFactory> beansOfType = context.getBeansOfType(ConsumerFactory.class);beansOfType.forEach((k,v)->{System.out.println(k+"-->"+v);});System.out.println("--------------------------------------------------------------");Map<String , KafkaListenerContainerFactory> beansOfType1 = context.getBeansOfType(KafkaListenerContainerFactory.class);beansOfType1.forEach((k,v)->{System.out.println(k+"-->"+v);});}
我们进行调试,从打印信息中可以看到,kafka 监听器有两个bean工厂,有一个是我们自定义的的,一个是默认的,默认的 listener工厂中的消费者配置都是我们的配置文件中的,而我们自定义的 listener 工厂中的消费者配置是我们在创建监听器时,传入的 consumerFactory 中的 ConsumerConfig 中的配置信息,由于我们在监听器制定了自定义 listener 工厂,因此我们配置文件中的配置才会失效,如果想要配置生效,就需要把想要配置的选项重新在 ConsumerConfig 中配置。
6.12 消息的转发
@KafkaListener(topics = "testA", groupId = "test")
// 要转发的 topic@SendTo(value = "testB")public String onMessageA(String data, Acknowledgment ack){System.out.println("TestA receive消息: " + data);ack.acknowledge();return data;}@KafkaListener(topics = "testB", groupId = "test2")public void onMessageB(String data, Acknowledgment ack){System.out.println("TestB receive转发后的消息: " + data);ack.acknowledge();}
注: 在使用@SendTo 注解后,同时配置了新的分区策略和拦截器后,不知道为何原因,因为重新注入了新的 KafkaTemplate,在项目启动后,会找不到对应的 bean,而不使用@SendTo 注解却可以正常加载。就算定义加载顺序(注入的bean的名字也为更改),仍然找不到对应的 bean。原因位置(埋点)
6.12 配置消费者分区策略
- RangeAssignor(默认策略):按范围进行分配,如果由8个分区,3个消费者,C1 消费0、1、2;C2消费 3、4、5;C3 消费 6、7
- RoundRobinAssignor:轮询,如果由8个分区,3个消费者,C1 消费0、3、6;C2消费 1、4、7;C3 消费 2、5
- StickAssignor:尽量保持现有分区不变,当有新的消费者加入或离开后,只更改少量消费者所消费分区,大部分保持不变,仍然保持现有消费分区
- CooperativeStickAssignor:优化后的粘性分区策略
代码:
配置消费策略
@Configuration
public class KafkaConfig {@Value("${kafka.cluster.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public Map<String, Object> consumerConfig(){Map<String,Object> props =new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
// props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.GROUP_ID_CONFIG,"test");return props;}@Beanpublic ConsumerFactory<String,Object> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfig());}@Beanpublic KafkaListenerContainerFactory<?> customListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}
配置消费者
//批量消费数据@KafkaListener(topics = "test",groupId = "testC",containerFactory = "customListenerContainerFactory",concurrency = "3")public void onMessage(ConsumerRecord record, Acknowledgment ack){System.out.println("receive: " + record.partition() +"分区"+record.value()+"消费者: "+Thread.currentThread().getId());ack.acknowledge();}
七、_consmuer_offsets
在每次消费一个消息并且提交后,会保存当前消费的最近的一个 offset;
在 zookeeper 中,有一个 _consumer_offsets主题,消费者提交的 offset 信息会写入到该topic 中,_consumer_offsets 保存了每个 consumer group 某一时刻提交的 offset 信息,_consumer_offsets 默认有 50 个分区,集群模式 zk 会给每个 broker 节点分配分区
consumer_group 保存到哪个分区中的计算公式:
Math.abs("groupid".hashCode())%groupMetadataTopicPartitionCount
八、ISR、HW、LEO
8.1 ISR:
在同步中的副本(In-Sync-Replicas),包含了 Leader 副本和所有与 Leader 副本保持同步的 Follower 副本
写请求首先由 Leader 副本处理,之后 Follwer 副本会从 Leader 上拉取写入的消息,这个过程会有一定延迟,导致 Follower 副本中保存的消息略少于 Leader 副本,但是只要没有超出阈值都可以容忍,但是一旦一个 Follower 副本出现异常,那这是 Leader 就会把他踢出去,Kafka 通过 ISR 集合来维护一个 “ 可用且消息量与 Leader 差不多的副本集合,他是整个副本的一个子集”
在 kafka 中,一个副本要成为 ISR 副本,要满足以下条件:
- Leader 副本本身就是一个 ISR 副本
- Follower 副本的最后一条消息的 offset 与 leader 副本的最后一条消息的 offset 之间的差值不能超过指定的阈值,超过阈值则该 Follower 副本将会从 ISR 列表中删除
- replica.lag.time.max.ms:默认是 30 秒;如果该 follower 在此时间间隔内一直没有追上过 Leader 副本就会被 ISR 集合剔除
- replica.lag.max.messages:落后了多少条消息时,该 Follower 副本就会被剔除 ISR 列表,该配置参数现在在新版本已经过时了。
8.2 LEO
日志末端偏移量(Log End Offset),该消息日志中下一条消息的偏移量
8.3 HW
HW(High Watermark),即高水位,它表示一个偏移量 offset 信息,表示消息的复制进度,也就是消息已经成功复制到哪个位置了。即在 HW 之前的所有消息都已经成功写入副本中,并且可以在所有的副本中找到,因此,消费者可以安全的消费这些消息。而对于消费者而言,它只能拉取 HW之前的消息,对于这之后未同步完成的消息,是不可见的。
相关文章:

Kafka 入门与实战
一、Kafka 基础 1.1 创建topic kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create 1.2 查看消费者偏移量位置 kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test 1.3 消息的生产与发送 #生产者 kafka-cons…...
数学知识学习1
1、数论 1质数判定 i<n/i优化O(sqrt(n)) bool is_prime(int n){if(n<2)return false;for(int i2;i<n/i;i){if(n%i0)return false;} true; } 分解质因数 i<n/i优化O(sqrt(n)) // 定义一个函数 divide,接收一个整数 n 作为参数,用于分解质…...
【AI日记】25.02.08
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】【AI应用】 探索 AI 应用探索周二有个面试,明后天打算好好准备一下,我打算主要研究下 AI 如何在该行业赋能和应用,以及该行业未来的发展前景和公司痛点&#…...

Lecture8 | LPV VXGI SSAO SSDO
Review: Lecture 7 | Lecture 8 LPV (Light Propagation Volumes) Light Propagation Volumes(LPV)-孤岛惊魂CryEngine引进的技术 LPV做GI快|好 大体步骤: Step1.Generation of Radiance Point Set Scene Representation 生成辐射点集的场景表示:辐射…...
Java中实现定时锁屏的功能(可以指定时间执行)
Java中实现定时锁屏的功能(可以指定时间执行) 要在Java中实现定时锁屏的功能,可以使用java.util.Timer或java.util.concurrent.ScheduledExecutorService来调度任务,并通过调用操作系统的命令来执行锁屏。下面我将给出一个基本的…...

Java集合List详解(带脑图)
允许重复元素,有序。常见的实现类有 ArrayList、LinkedList、Vector。 ArrayList ArrayList 是在 Java 编程中常用的集合类之一,它提供了便捷的数组操作,并在动态性、灵活性和性能方面取得了平衡。如果需要频繁在中间插入和删除元素…...

[实验日志] VS Code 连接服务器上的 Python 解释器进行远程调试
目录 0. 前言 1. 环境 2. 准备工作 2.1 安装VS Code 2.2 安装插件 2.3 配置远程服务器 2.4 修改设置 2.5 打开远程调试窗口 3. 调试代码 3.1 输密码 3.2 打开服务器文件夹 3.3 配置Python环境 3.4 调试Python代码 补充:使用调试控制台,查看…...

(14)gdb 笔记(7):以日志记录的方式来调试多进程多线程程序,linux 命令 tail -f 实时跟踪日志
(44)以日志记录的方式来调试多进程多线程程序 : 这是老师的日志文件,可以用来模仿的模板: (45)实时追踪日志的 tail -f 命令: (46) 多种调试方法结合起来用 …...

Sentinel的安装和做限流的使用
一、安装 Release v1.8.3 alibaba/Sentinel GitHubA powerful flow control component enabling reliability, resilience and monitoring for microservices. (面向云原生微服务的高可用流控防护组件) - Release v1.8.3 alibaba/Sentinelhttps://github.com/alibaba/Senti…...

四柱预测学
图表 后天八卦 十二地支不仅代表了时间,还代表了方位。具体来说: 子:代表正北方丑寅:合起来代表东北方卯:代表正东方辰巳:合起来代表东南方午:代表正南方未申:合起来代表西南方酉:代表正西方戌亥:合起来代表西北方四季-五行-六神…...

【个人开发】macbook m1 Lora微调qwen大模型
本项目参考网上各类教程整理而成,为个人学习记录。 项目github源码地址:Lora微调大模型 项目中微调模型为:qwen/Qwen1.5-4B-Chat。 去年新发布的Qwen/Qwen2.5-3B-Instruct同样也适用。 微调步骤 step0: 环境准备 conda create --name fin…...

sqli-labs靶场实录(二): Advanced Injections
sqli-labs靶场实录: Advanced Injections Less21Less22Less23探测注入点 Less24Less25联合注入使用符号替代 Less25aLess26逻辑符号绕过and/or过滤双写and/or绕过 Less26aLess27Less27aLess28Less28aLess29Less30Less31Less32(宽字节注入)Less33Less34Le…...

Linux系统 环境变量
环境变量 写在前面概念查看环境变量main函数的参数argc & argvenv bash环境变量 写在前面 对于环境变量,本篇主要介绍基本概念及三四个环境变量 —— PATH、HOME、PWD。其中 PATH 作为 “ 敲门砖 ”,我们会更详细讲解;理解环境变量的全局…...

机器学习-线性回归(最大似然估计)
机器学习任务可以分为两类: 一类是样本的特征向量 𝒙 和标签 𝑦 之间存在未知的函数关系𝑦 h(𝒙),另一类是条件概率𝑝(𝑦|𝒙)服从某个未知分布。最小二乘法是属于第一类,…...

【信息系统项目管理师-案例真题】2017上半年案例分析答案和详解
更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 试题一【问题1】8 分【问题2】4 分【问题3】8 分【问题4】5 分试题二【问题1】10 分【问题2】8 分【问题3】6 分【问题4】5 分试题三【问题1】5 分【问题2】7 分【问题3】6 分【问题4】3 分试题一 阅读下列说明…...
CSP晋级组比赛生成文件夹与文件通用代码Python
快速生成文件夹与文件的脚本 import sys import osmyfiles sys.argv[1::] for f in myfiles:os.mkdir(f)os.system(f"touch {f}/{f}.in")os.system(f"touch {f}/{f}.out")os.system(f"touch {f}/{f}.cpp")with open("template.cpp",…...
正则表达式进阶(二)——零宽断言详解:\b \B \K \z \A
在正则表达式中,零宽断言是一种非常强大的工具,能够在不消费字符的情况下对匹配位置进行约束。除了环视(lookahead 和 lookbehind)以外,还有一些常用的零宽断言,它们用于处理边界、字符串的开头和结尾等特殊…...
Android 中实现 PDF 预览三种方式
目录 1. 使用第三方库 PdfRenderer(适用于 Android 5.0 及以上) 步骤:2. 使用第三方库 MuPDF步骤:3. 使用第三方库 PdfiumAndroid步骤: 1. 使用第三方库 PdfRenderer(适用于 Android 5.0 及以上)…...
尚硅谷课程【笔记】——大数据之Zookeeper【二】
课程视频:【尚硅谷Zookeeper教程】 四、Zookeeper实战 4.1分布式安装部署 1. 集群规划 在Hadoop102、Hadoop103和Hadoop104三个节点上部署Zookeeper 2. 解压安装 1)解压Zookeeper.tar.gz到指定目录 tar -zxvf zookeeper-3.7.2.tar.gz -C /opt/mod…...

CodeGPT + IDEA + DeepSeek,在IDEA中引入DeepSeek实现AI智能开发
CodeGPT IDEA DeepSeek,在IDEA中引入DeepSeek 版本说明 建议和我使用相同版本,实测2022版IDEA无法获取到CodeGPT最新版插件。(在IDEA自带插件市场中搜不到,可以去官网搜索最新版本) ToolsVersionIntelliJ IDEA202…...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...
鸿蒙(HarmonyOS5)实现跳一跳小游戏
下面我将介绍如何使用鸿蒙的ArkUI框架,实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...

JDK 17 序列化是怎么回事
如何序列化?其实很简单,就是根据每个类型,用工厂类调用。逐个完成。 没什么漂亮的代码,只有有效、稳定的代码。 代码中调用toJson toJson 代码 mapper.writeValueAsString ObjectMapper DefaultSerializerProvider 一堆实…...
机器学习的数学基础:线性模型
线性模型 线性模型的基本形式为: f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法,得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...

海云安高敏捷信创白盒SCAP入选《中国网络安全细分领域产品名录》
近日,嘶吼安全产业研究院发布《中国网络安全细分领域产品名录》,海云安高敏捷信创白盒(SCAP)成功入选软件供应链安全领域产品名录。 在数字化转型加速的今天,网络安全已成为企业生存与发展的核心基石,为了解…...
初级程序员入门指南
初级程序员入门指南 在数字化浪潮中,编程已然成为极具价值的技能。对于渴望踏入程序员行列的新手而言,明晰入门路径与必备知识是开启征程的关键。本文将为初级程序员提供全面的入门指引。 一、明确学习方向 (一)编程语言抉择 编…...

SQLSERVER-DB操作记录
在SQL Server中,将查询结果放入一张新表可以通过几种方法实现。 方法1:使用SELECT INTO语句 SELECT INTO 语句可以直接将查询结果作为一个新表创建出来。这个新表的结构(包括列名和数据类型)将与查询结果匹配。 SELECT * INTO 新…...