当前位置: 首页 > article >正文

Java + Spring Boot 操作 Kafka 完整学习指南

前置条件ZooKeeper 集群 Kafka 集群已启动3个ZK节点 3个Broker Broker 地址172.17.0.7:9092, 172.17.0.7:9093, 172.17.0.7:9094第一阶段原生 Java API 操作 Kafka目的理解底层原理Spring Boot 只是对这层的封装一、Maven 依赖dependencies !-- Kafka 客户端原生Java API -- dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version3.9.0/version /dependency !-- 日志可选 -- dependency groupIdorg.slf4j/groupId artifactIdslf4j-simple/artifactId version1.7.36/version /dependency /dependencies二、生产者Producer2.1 最简单的生产者import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) throws Exception { // 1. 配置生产者 Properties props new Properties(); props.put(bootstrap.servers, 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 2. 创建生产者 KafkaProducerString, String producer new KafkaProducer(props); // 3. 发送消息 for (int i 0; i 10; i) { ProducerRecordString, String record new ProducerRecord(test-topic, key- i, hello kafka i); producer.send(record); System.out.println(发送消息: hello kafka i); } // 4. 关闭会自动flush producer.close(); } }2.2 三种发送方式// ① 发后不管Fire and Forget—— 性能最高可能丢消息 producer.send(record); // ② 同步发送 —— 等待结果性能最低最可靠 try { RecordMetadata metadata producer.send(record).get(); // .get() 阻塞等待 System.out.printf(发送成功 → Topic:%s Partition:%d Offset:%d%n, metadata.topic(), metadata.partition(), metadata.offset()); } catch (Exception e) { System.out.println(发送失败: e.getMessage()); } // ③ 异步发送推荐—— 性能高有回调处理结果 producer.send(record, (metadata, exception) - { if (exception null) { System.out.printf(发送成功 → Partition:%d Offset:%d%n, metadata.partition(), metadata.offset()); } else { System.out.println(发送失败: exception.getMessage()); } });2.3 生产者重要配置详解Properties props new Properties(); props.put(bootstrap.servers, 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // ★ acks消息可靠性核心配置 // 0 → 发完不管性能最高可能丢消息 // 1 → Leader写成功就返回Leader挂了可能丢 // all→ 所有ISR副本写成功才返回最可靠推荐生产环境 props.put(acks, all); // 重试次数遇到可重试错误时 props.put(retries, 3); // 批量发送消息积累到16KB一起发提升吞吐量 props.put(batch.size, 16384); // 等待时间即使不到16KB等1ms也发出去减少延迟 props.put(linger.ms, 1); // 缓冲区大小生产者本地缓存32MB满了send()会阻塞 props.put(buffer.memory, 33554432); // 幂等性开启后自动去重防止重试导致的消息重复 props.put(enable.idempotence, true);2.4 消息分区策略// 情况1指定分区 → 直接发到该分区 new ProducerRecord(test-topic, 0, key, value); // 发到分区0 // 情况2指定key → 对key做hash相同key一定进同一分区保证有序 new ProducerRecord(test-topic, user-001, 登录消息); new ProducerRecord(test-topic, user-001, 下单消息); // 上面两条消息一定在同一分区保证user-001的消息有序 // 情况3不指定key → 轮询分配到各分区负载均衡 new ProducerRecord(test-topic, hello);三、消费者Consumer3.1 最简单的消费者import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.*; public class SimpleConsumer { public static void main(String[] args) { // 1. 配置消费者 Properties props new Properties(); props.put(bootstrap.servers, 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094); props.put(group.id, my-consumer-group); // 消费者组ID重要 props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 2. 创建消费者 KafkaConsumerString, String consumer new KafkaConsumer(props); // 3. 订阅 Topic consumer.subscribe(Collections.singletonList(test-topic)); // 4. 持续拉取消息 while (true) { // poll()向Broker拉取消息最多等待1秒 ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { System.out.printf(收到消息 → Topic:%s Partition:%d Offset:%d Key:%s Value:%s%n, record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } }3.2 消费者组Consumer Group核心概念Topic: test-topic3个分区 消费者组A3个消费者 消费者组B1个消费者 Consumer1 → Partition0 Consumer1 → Partition0 Consumer2 → Partition1 → Partition1 Consumer3 → Partition2 → Partition2 结论 - 同一组内每条消息只被一个消费者消费负载均衡 - 不同组间每条消息都会被每个组各消费一次广播 - 消费者数 分区数多余的消费者空闲等待接管3.3 offset 提交方式// ① 自动提交默认—— 简单但可能重复消费或丢失 props.put(enable.auto.commit, true); props.put(auto.commit.interval.ms, 5000); // 每5秒自动提交一次 // ② 手动同步提交 —— 处理完消息再提交更可靠 props.put(enable.auto.commit, false); while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { // 处理消息... System.out.println(处理消息: record.value()); } consumer.commitSync(); // 处理完一批再提交阻塞等待 } // ③ 手动异步提交 —— 性能更好推荐 consumer.commitAsync((offsets, exception) - { if (exception ! null) { System.out.println(提交失败: exception.getMessage()); } });3.4 消费者重要配置详解// 从哪里开始消费当消费者组第一次启动没有记录offset时 // earliest → 从最早的消息开始--from-beginning // latest → 只消费启动后的新消息默认 props.put(auto.offset.reset, earliest); // 一次poll最多拉取多少条 props.put(max.poll.records, 500); // poll间隔超过这个时间broker认为消费者挂了触发Rebalance props.put(max.poll.interval.ms, 300000); // 5分钟 // 心跳间隔消费者定期向broker报活 props.put(heartbeat.interval.ms, 3000); // 超过这个时间没心跳认为消费者挂了 props.put(session.timeout.ms, 30000);四、Rebalance再均衡机制什么是Rebalance 消费者组内成员发生变化时Kafka重新分配分区给消费者的过程 触发时机 1. 新消费者加入组 2. 消费者离开组正常关闭或崩溃 3. Topic分区数变化 Rebalance过程STW 所有消费者停止消费 → GroupCoordinator重新分配 → 消费者恢复消费 ↑ 这段时间不能消费所以Rebalance要尽量避免频繁发生 避免不必要Rebalance的方法 - 合理设置 session.timeout.ms 和 heartbeat.interval.ms - 处理消息不要太慢避免超过 max.poll.interval.ms第二阶段Spring Boot 操作 Kafka一、项目搭建1.1 Maven 依赖parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version3.2.0/version /parent dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter/artifactId /dependency !-- Spring Kafka包含了kafka-clients -- dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency !-- 用于对象序列化 -- dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka-test/artifactId scopetest/scope /dependency /dependencies1.2 application.yml 核心配置spring: kafka: # Broker 地址列表 bootstrap-servers: 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094 # 生产者配置 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all # 最高可靠性 retries: 3 batch-size: 16384 linger-ms: 1 buffer-memory: 33554432 # 如果发送的是对象改用 JsonSerializer # value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置 consumer: group-id: my-spring-group # 消费者组ID auto-offset-reset: earliest # 第一次启动从最早消息开始 enable-auto-commit: false # 关闭自动提交让Spring管理 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 # 如果消费的是对象改用 JsonDeserializer # value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # properties: # spring.json.trusted.packages: * # 监听器配置 listener: ack-mode: manual_immediate # 手动提交offset concurrency: 3 # 3个消费者线程建议 分区数 type: batch # batch批量消费single逐条消费二、生产者2.1 发送字符串消息import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; Service public class KafkaProducerService { // Spring 自动注入无需手动创建 private final KafkaTemplateString, String kafkaTemplate; public KafkaProducerService(KafkaTemplateString, String kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } // ① 最简单只发消息内容 public void sendMessage(String message) { kafkaTemplate.send(test-topic, message); } // ② 带 key相同key进同一分区保证有序 public void sendWithKey(String key, String message) { kafkaTemplate.send(test-topic, key, message); } // ③ 指定分区 public void sendToPartition(int partition, String key, String message) { kafkaTemplate.send(test-topic, partition, key, message); } // ④ 异步回调推荐 public void sendWithCallback(String message) { CompletableFutureSendResultString, String future kafkaTemplate.send(test-topic, message); future.whenComplete((result, ex) - { if (ex null) { System.out.printf(发送成功 → Partition:%d Offset:%d%n, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { System.out.println(发送失败: ex.getMessage()); } }); } // ⑤ 同步发送等待结果 public void sendSync(String message) throws Exception { SendResultString, String result kafkaTemplate.send(test-topic, message).get(); System.out.printf(同步发送成功 → Partition:%d Offset:%d%n, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }2.2 发送对象JSON// 实体类 public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // getter/setter/构造方法省略 }# application.yml 中改为 JsonSerializer spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer// 注入对象类型的 KafkaTemplate Service public class OrderProducerService { private final KafkaTemplateString, OrderEvent kafkaTemplate; public OrderProducerService(KafkaTemplateString, OrderEvent kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } public void sendOrder(OrderEvent order) { // 用 orderId 作为 key保证同一订单的消息有序 kafkaTemplate.send(order-topic, order.getOrderId(), order); System.out.println(发送订单事件: order.getOrderId()); } }三、消费者3.1 基础消费逐条import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; Component public class KafkaConsumerService { // 最简单的监听 KafkaListener(topics test-topic, groupId my-spring-group) public void listen(String message) { System.out.println(收到消息: message); } // 获取消息元数据partition、offset等 KafkaListener(topics test-topic, groupId my-spring-group) public void listenWithMeta(ConsumerRecordString, String record) { System.out.printf(收到消息 → Partition:%d Offset:%d Key:%s Value:%s%n, record.partition(), record.offset(), record.key(), record.value()); } // 手动提交 offset需要 yml 中配置 ack-mode: manual_immediate KafkaListener(topics test-topic, groupId my-spring-group) public void listenWithAck(ConsumerRecordString, String record, Acknowledgment ack) { try { System.out.println(处理消息: record.value()); // 处理成功后手动提交 ack.acknowledge(); } catch (Exception e) { // 处理失败不提交下次重新消费 System.out.println(处理失败不提交offset: e.getMessage()); } } }3.2 批量消费推荐性能更好# yml 中配置 spring: kafka: listener: type: batch # 开启批量模式KafkaListener(topics test-topic, groupId my-spring-group) public void batchListen(ListConsumerRecordString, String records, Acknowledgment ack) { System.out.println(批量收到 records.size() 条消息); for (ConsumerRecordString, String record : records) { System.out.printf(Partition:%d Offset:%d Value:%s%n, record.partition(), record.offset(), record.value()); // 处理每条消息... } // 整批处理完后提交一次比逐条提交性能好很多 ack.acknowledge(); }3.3 消费对象JSONspring: kafka: consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: com.yourpackage.dto # 信任的包名 spring.json.value.default.type: com.yourpackage.dto.OrderEventKafkaListener(topics order-topic, groupId order-group) public void handleOrder(ConsumerRecordString, OrderEvent record, Acknowledgment ack) { OrderEvent order record.value(); System.out.printf(收到订单 → ID:%s 用户:%s 金额:%.2f%n, order.getOrderId(), order.getUserId(), order.getAmount()); // 处理订单业务逻辑... ack.acknowledge(); }3.4 监听多个 Topic / 指定分区// 监听多个 Topic KafkaListener(topics {topic1, topic2}, groupId my-group) public void listenMultipleTopics(ConsumerRecordString, String record) { System.out.println(来自 record.topic() : record.value()); } // 指定消费某个分区从offset0开始 KafkaListener( groupId my-group, topicPartitions { TopicPartition(topic test-topic, partitionOffsets { PartitionOffset(partition 0, initialOffset 0), PartitionOffset(partition 1, initialOffset 0) }) } ) public void listenSpecificPartition(ConsumerRecordString, String record) { System.out.println(分区 record.partition() : record.value()); }四、高级特性4.1 消息头Header传递额外信息// 生产者发送时携带 Header import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; public void sendWithHeader(String message) { MessageString msg MessageBuilder .withPayload(message) .setHeader(source, order-service) .setHeader(version, v1) .setHeader(KafkaHeaders.TOPIC, test-topic) .build(); kafkaTemplate.send(msg); } // 消费者读取 Header KafkaListener(topics test-topic) public void listenWithHeader( ConsumerRecordString, String record, Header(value source, required false) String source) { System.out.println(来源: source 消息: record.value()); }4.2 错误处理与重试import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; Configuration public class KafkaConfig { Bean public DefaultErrorHandler errorHandler() { // 重试2次每次间隔1秒 FixedBackOff backOff new FixedBackOff(1000L, 2); DefaultErrorHandler handler new DefaultErrorHandler(backOff); // 某些异常不重试直接跳过 handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } Bean public ConcurrentKafkaListenerContainerFactoryString, String kafkaListenerContainerFactory( ConsumerFactoryString, String consumerFactory, DefaultErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } }4.3 死信队列DLT// 重试多次后仍然失败的消息自动发到死信Topic原Topic名.DLT import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; Bean public DefaultErrorHandler errorHandler(KafkaTemplateObject, Object template) { // 失败消息发送到死信队列 DeadLetterPublishingRecoverer recoverer new DeadLetterPublishingRecoverer(template); // 重试3次后进死信队列 FixedBackOff backOff new FixedBackOff(1000L, 3); return new DefaultErrorHandler(recoverer, backOff); } // 消费死信队列 KafkaListener(topics test-topic.DLT, groupId dlt-group) public void handleDeadLetter(ConsumerRecordString, String record) { System.out.println(死信消息需人工处理: record.value()); // 记录日志、告警、人工干预... }4.4 事务保证消息发送原子性spring: kafka: producer: transaction-id-prefix: tx- # 开启事务Service public class TransactionalProducerService { private final KafkaTemplateString, String kafkaTemplate; // Transactional 注解方法内所有消息要么全发成功要么全回滚 Transactional public void sendTransactional() { kafkaTemplate.send(topic1, 消息1); kafkaTemplate.send(topic2, 消息2); // 如果这里抛异常上面两条消息都不会发出去 if (Math.random() 0.5) { throw new RuntimeException(模拟业务异常); } kafkaTemplate.send(topic3, 消息3); } }4.5 动态创建 Topicimport org.apache.kafka.clients.admin.NewTopic; import org.springframework.kafka.config.TopicBuilder; Configuration public class KafkaTopicConfig { // Spring Boot 启动时自动创建如果不存在 Bean public NewTopic orderTopic() { return TopicBuilder.name(order-topic) .partitions(3) .replicas(3) .build(); } Bean public NewTopic userTopic() { return TopicBuilder.name(user-topic) .partitions(3) .replicas(2) .build(); } }五、完整实战案例订单消息系统项目结构src/main/java/com/example/kafka/ ├── KafkaApplication.java ├── config/ │ ├── KafkaTopicConfig.java ← Topic 配置 │ └── KafkaConsumerConfig.java ← 消费者工厂配置 ├── dto/ │ └── OrderEvent.java ← 消息实体 ├── producer/ │ └── OrderProducer.java ← 生产者 └── consumer/ ├── OrderConsumer.java ← 正常消费者 └── OrderDltConsumer.java ← 死信队列消费者OrderEvent.javapackage com.example.kafka.dto; public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // CREATED / PAID / SHIPPED / COMPLETED public OrderEvent() {} public OrderEvent(String orderId, String userId, Double amount, String status) { this.orderId orderId; this.userId userId; this.amount amount; this.status status; } // getter setter public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId userId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount amount; } public String getStatus() { return status; } public void setStatus(String status) { this.status status; } Override public String toString() { return String.format(OrderEvent{orderId%s, userId%s, amount%.2f, status%s}, orderId, userId, amount, status); } }KafkaTopicConfig.javapackage com.example.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; Configuration public class KafkaTopicConfig { Bean public NewTopic orderTopic() { return TopicBuilder.name(order-topic) .partitions(3) .replicas(3) .build(); } }OrderProducer.javapackage com.example.kafka.producer; import com.example.kafka.dto.OrderEvent; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; Service public class OrderProducer { private final KafkaTemplateString, OrderEvent kafkaTemplate; public OrderProducer(KafkaTemplateString, OrderEvent kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } public void sendOrder(OrderEvent order) { kafkaTemplate.send(order-topic, order.getOrderId(), order) .whenComplete((result, ex) - { if (ex null) { System.out.printf([Producer] 订单发送成功 → %s Partition:%d%n, order.getOrderId(), result.getRecordMetadata().partition()); } else { System.out.println([Producer] 发送失败: ex.getMessage()); } }); } }OrderConsumer.javapackage com.example.kafka.consumer; import com.example.kafka.dto.OrderEvent; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; Component public class OrderConsumer { KafkaListener( topics order-topic, groupId order-service-group, containerFactory kafkaListenerContainerFactory ) public void handleOrder(ConsumerRecordString, OrderEvent record, Acknowledgment ack) { OrderEvent order record.value(); System.out.printf([Consumer] 收到订单 Partition:%d Offset:%d → %s%n, record.partition(), record.offset(), order); try { // 模拟业务处理 processOrder(order); ack.acknowledge(); // 处理成功提交offset } catch (Exception e) { // 不提交offset交给重试机制处理 System.out.println([Consumer] 处理失败: e.getMessage()); throw e; // 抛出让 ErrorHandler 处理重试 } } private void processOrder(OrderEvent order) { System.out.println([Consumer] 处理订单业务逻辑: order.getOrderId()); // 实际业务更新数据库、调用其他服务等 } }KafkaApplication.java启动类 测试package com.example.kafka; import com.example.kafka.dto.OrderEvent; import com.example.kafka.producer.OrderProducer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } // 启动后自动发送测试消息 Bean public CommandLineRunner runner(OrderProducer producer) { return args - { Thread.sleep(2000); // 等消费者就绪 for (int i 1; i 5; i) { OrderEvent order new OrderEvent( ORDER- i, USER- (i % 3 1), 100.0 * i, CREATED ); producer.sendOrder(order); Thread.sleep(500); } }; } }六、原生 API vs Spring Boot 对比功能原生 Java APISpring Boot配置手动写 Propertiesapplication.yml 声明式配置生产者KafkaProducer 手动管理KafkaTemplate自动注入消费者while(true)手动轮询KafkaListener注解驱动offset提交完全手动自动/手动可选ack-mode错误处理自己写try/catch重试逻辑DefaultErrorHandler开箱即用事务手动beginTransactionTransactional注解Topic创建调用AdminClientBean NewTopic自动创建多线程消费自己管理线程池concurrency参数一行搞定七、学习路线图第1天原生Java API ├── SimpleProducer 发送10条消息 ├── SimpleConsumer 消费并打印 └── 理解 Partition/Offset/ConsumerGroup 第2天Spring Boot 基础 ├── 搭项目配 yml ├── KafkaTemplate 发消息 └── KafkaListener 收消息 第3天Spring Boot 进阶 ├── 发送 JSON 对象 ├── 手动提交 offset └── 批量消费 第4天高级特性 ├── 错误处理 重试 ├── 死信队列 └── 事务消息 第5天实战演练 ├── 完成订单消息系统 ├── 模拟Broker宕机观察行为 └── 监控消费者Lag八、常见问题速查Q1: 消息重复消费怎么处理原因消费者处理完但offset提交前崩溃重启后重新消费 解决业务层做幂等处理 - 数据库唯一键约束 - Redis 记录已处理的 messageId - 数据库乐观锁版本号Q2: 消息顺序性怎么保证Kafka 只保证同一Partition内有序 做法 - 需要有序的消息使用相同的 key → 进入同一Partition - 如同一订单的所有事件用 orderId 作为 keyQ3: 如何监控消费进度Lag# 查看某个消费者组的消费进度 kafka-consumer-groups.sh \ --bootstrap-server 172.17.0.7:9092 \ --describe \ --group order-service-group # 输出 # TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG # order-topic 0 100 105 5 ← 落后5条Q4: 发送失败了怎么办生产者端 - acksall retries3 保证发送可靠性 - enable.idempotencetrue 防止重复发送 消费者端 - 手动提交offset处理失败不提交 - 配置重试 死信队列兜底最后更新2026年5月

相关文章:

Java + Spring Boot 操作 Kafka 完整学习指南

前置条件:ZooKeeper 集群 Kafka 集群已启动(3个ZK节点 3个Broker) Broker 地址:172.17.0.7:9092, 172.17.0.7:9093, 172.17.0.7:9094第一阶段:原生 Java API 操作 Kafka目的:理解底层原理,Spr…...

深入 QEMU 热迁移

深入 QEMU 热迁移:从状态机到数据平面的全链路剖析 “把一台正在运行的虚拟机从一台主机搬到另一台,还让里面的操作系统浑然不觉——这听起来像魔法,实则是精密的工程。” 引言 实时迁移是 QEMU 最核心的子系统之一。它允许将一个正在运行的…...

BetterJoy终极配置指南:让Switch手柄在电脑上完美运行

BetterJoy终极配置指南:让Switch手柄在电脑上完美运行 【免费下载链接】BetterJoy Allows the Nintendo Switch Pro Controller, Joycons and SNES controller to be used with CEMU, Citra, Dolphin, Yuzu and as generic XInput 项目地址: https://gitcode.com/…...

git--github

解决github无法访问的问题。...

vi与vim在openEuler中的差异及应用

openEuler两代系统命令差异与原理对比 1. 核心命令体系差异对比 对比维度传统Linux/早期openEuler (Vi模式)现代openEuler (Vim增强模式)核心编辑器vi (Visual Interface) 基础版vim (Vi IMproved) 增强版安装方式通常预装或通过yum install vi需手动安装yum install vim或dn…...

RAG 架构在网文创作中的应用:以茄子写作助手为例

当创作者遇上大模型作为一名既写代码又写小说的“斜杠青年”,我一直对 AI 在内容生成领域的应用保持着高度关注。传统的 LLM(大型语言模型)在长文本创作中存在两个致命弱点:上下文窗口限制导致的“失忆”问题,以及通用…...

手把手教你无损转换:把老电脑的Legacy启动盘改成UEFI+GPT(附DiskGenius操作截图)

老电脑焕新指南:从Legacy到UEFIGPT的无损迁移实战当你的老电脑开机速度越来越慢,或者被Windows 11的安装要求拒之门外时,很可能是因为它还在使用传统的Legacy启动方式和MBR分区表。本文将带你深入了解这两种启动方式的区别,并手把…...

智慧树自动刷课插件:3步安装指南,彻底解放学习时间

智慧树自动刷课插件:3步安装指南,彻底解放学习时间 【免费下载链接】zhihuishu 智慧树刷课插件,自动播放下一集、1.5倍速度、无声 项目地址: https://gitcode.com/gh_mirrors/zh/zhihuishu 还在为智慧树平台重复性的视频学习任务而烦恼…...

Token经济学正在重构芯片工程师的生存逻辑(万字长文深度拆解“token“这个计量单位的对于芯片工程师的意义)

英伟达CEO黄仁勋把AI产业分成五层:能源、芯片、基础设施、模型、应用。芯片在第二层,属于重资产制造业的核心环节。但问题来了,在 芯片(包括AI芯片)成本内卷时代,芯片工程师的技术,到底还能值多…...

以书香润心,借坚韧前行

一书一山海,一心一乾坤。身处车马喧嚣的世间,我们时常被生活的压力裹挟,被前路的未知困扰,在重复的日常里消磨热忱,在跌宕的波折中心生怯懦。而书籍,是治愈心灵、滋养成长的最好良方,于无声处给…...

从CentOS迁移到openEuler?手把手教你在vSphere ESXi 7.0上搭建测试环境

从CentOS迁移到openEuler:vSphere ESXi 7.0测试环境全指南当企业技术栈面临升级换代时,系统管理员往往需要在不影响生产环境的前提下进行充分验证。对于长期依赖CentOS/RHEL生态的用户而言,openEuler作为国产开源操作系统的代表,正…...

避坑指南:在openEuler 22.03上配置vsftpd虚拟用户,解决gdbm数据库和SELinux权限问题

openEuler 22.03虚拟用户FTP配置实战:从gdbm数据库到SELinux的完整解决方案当你在openEuler 22.03上尝试配置vsftpd虚拟用户时,是否遇到过这样的场景:按照CentOS教程一步步操作,却在PAM认证阶段卡壳,系统不断提示"…...

3.RAG

一、RAG初识: RAG(Retrieval-Augmented Generation,检索增强生成)是一种将 信息检索与文本生成 相结合的技术框架。它通过以下流程解决大模型(LLM)的“知识盲区”问题: 用户问题->从知识库检索相关文档->将文档作为上下文输入LLM->生成精准答…...

字节校招7000人转正率50%:大厂HR体系,正在“去经验化“

字节跳动刚刚用一组校招数据,扯下了大厂老兵最后一块遮羞布。 2026年春,ByteIntern规模狂飙至7000人,转正率史无前例地超过50%。 短短3到6个月,字节用远低于市场价的成本,批量生产出了3500个能够直接上岗的替代者。 同样的薪酬包,大厂宁愿招两个高潜应届生,也不愿意留…...

2026年学习Java还有前景吗?如何看待2026Java程序员就业难现状?

2026年Java的前景和就业情况,这是一个很现实的问题。我们直接来看核心。Java依然有前景,但“普通选手”的就业黄金期确实过去了,现在需要的是“高配选手”。所谓的“就业难”,本质不是Java不行了,而是行业对Java程序员…...

从 Session 到 JWT:Web 认证系统的发展与 JWT 原理详解

文章目录 前言一、Web 认证系统的发展史1.第一代认证方案:Cookie Session2.Session 方案的问题开始出现1. Session 存储压力大2. 分布式系统难处理3. 移动端时代到来 二、JWT 是什么三、JWT 的结构到底长什么样1.第一部分:Header(头部&#…...

匿名内部类的使用场景 java反射机制

一、匿名内部类的使用场景匿名内部类是一种没有显式类名、直接在创建对象时定义并实例化的内部类。它通常用于“一次性使用”的场景,让代码更简洁紧凑。主要使用场景包括:1. 事件监听器(GUI 编程)在 Swing、AWT 或 Android 开发中…...

小小屠龙原始火龙手游官网下载:小小屠龙原始火龙最新官方下载渠道

《小小屠龙原始火龙》又名《赤血火龙单职业》《龙城秘境移动版》,是由安徽游昕联合忆往游戏运营的正版 1.80 火龙复刻 MMORPG 手游。1:1 复刻比奇、盟重土城、祖玛寺庙、赤月峡谷、火龙神殿等经典场景,创新融合战法道三职业核心能力的单职业体系&#xf…...

IwaraDownloadTool:简单快速的Iwara视频下载神器

IwaraDownloadTool:简单快速的Iwara视频下载神器 【免费下载链接】IwaraDownloadTool Iwara 下载工具 | Iwara Downloader 项目地址: https://gitcode.com/gh_mirrors/iw/IwaraDownloadTool 你是否经常在Iwara平台发现精彩的视频内容,却苦于无法轻…...

Codex适配国产信创环境安装部署与技术适配全解析

随着国家信创产业持续落地推进,党政、金融、能源、工业等关键行业全面开启信息技术软硬件国产化替代工作。基于自主可控、安全可信的核心需求,传统国外架构软硬件体系逐步被国产操作系统、国产芯片硬件替代。Codex作为主流的智能代码辅助、自动化开发工具…...

FPGA在遥感机器学习中的优势与优化实践

1. FPGA在遥感机器学习中的核心优势解析 FPGA(现场可编程门阵列)在边缘计算场景中展现出独特的价值主张。与通用处理器不同,FPGA通过硬件级并行架构实现两个关键突破:首先是数据流驱动的计算模式,消除传统冯诺依曼架构…...

数据集上新:柬埔寨环境健康入户调查

本数据集基于柬埔寨马德望省约400户家庭的环境健康入户调查而成,包括基本社会经济信息、家庭成员结构、呼吸道健康信息、其他健康信息(包括部分测量信息)、营养信息、清洁炉灶和燃料使用、风险和时间偏好、调查员自观察信息等数百条子数据。如…...

卷积神经网络(CNN)与深度学习视觉应用综述

在深度学习领域,卷积神经网络(CNN)是实现计算机视觉任务的基石。通过对这些基础理论的学习,我们能够构建起从特征提取到复杂场景理解的知识体系。第一部分:卷积神经网络基础1. 全连接网络面临的挑战传统的全连接神经网…...

3分钟快速修复洛雪音乐播放问题:六音音源完整指南

3分钟快速修复洛雪音乐播放问题:六音音源完整指南 【免费下载链接】New_lxmusic_source 六音音源修复版 项目地址: https://gitcode.com/gh_mirrors/ne/New_lxmusic_source 你是否曾经在升级洛雪音乐后,发现心爱的歌单突然变成了灰色,…...

Go语言竞态检测:race条件

Go语言竞态检测:race条件 1. race检测 go test -race ./...2. 总结 -race检测器可以发现代码中的数据竞争。...

window11 恢复右键刷新

新建文本文档,粘贴下方代码,后缀改成 **.bat**,右键以管理员身份运行即可一键恢复传统右键菜单echo off reg add "HKCU\Software\Classes\CLSID\{86ca1aa0-34aa-4e8b-a509-50c905bae2a2}\InprocServer32" /f /ve taskkill /f /im e…...

鸿蒙PC:Qt适配OpenHarmony实战【番茄刻】:工作和休息两种倒计时如何写成一个 QML 状态机

前言 欢迎加入鸿蒙PC开发者社区,共同打造开发者工具生态:鸿蒙PC开发者社区 :https://harmonypc.csdn.net/ 项目开源地址:https://atomgit.com/lqjmac/qtfqk 我更愿意把这类 Demo 当成工程切片来看:功能要小&#xf…...

手把手教你:把Ubuntu 20.04完整系统塞进U盘,打造随身便携开发环境

手把手打造Ubuntu 20.04便携开发环境:从镜像制作到硬件兼容全指南 在咖啡厅调试代码时发现环境配置丢失?出差临时借用同事电脑却无法运行你的开发工具?这些困扰程序员多年的痛点,其实只需要一个装满完整Ubuntu系统的U盘就能彻底解…...

别再让Ubuntu22.04时间错乱了!用hwclock和timedatectl搞定硬件时钟时区的保姆级教程

彻底解决Ubuntu 22.04时间同步问题:硬件时钟与时区管理权威指南你是否曾在双系统切换后发现Ubuntu显示的时间比实际快了8小时?或者每次重启后系统时间都会"跳票"?这些看似小问题背后,隐藏着操作系统与硬件时钟&#xff…...

直接去偏机器学习:用Bregman散度统一因果推断与协变量平衡

1. 项目概述与核心动机在因果推断、政策评估乃至更广泛的计量经济学和机器学习应用中,我们常常关心一个“目标参数”,比如平均处理效应(ATE)——也就是某项干预或政策对结果的平均影响。传统上,一个非常自然的想法是&a…...