Kafka 之事务消息
前言:
在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 Kafka 的事务消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
Kafka 之顺序消息
事务消息的使用场景
事务消息的使用场景众多,这里我简单列举几个如下:
- 金融交易处理:在金融领域,每笔交易都必须具备原子性,确保不发生不一致或重复的交易,事务性消息可用于金融交易的场景,来保证交易的完整性。
- 订单处理:用户订单处理必须是完整的,需要保订单的创建、支付和发货不会出现问题,事务性消息可以用于订单场景,来保证整个流程的完整性。
- 。。。。等等等场景。
什么是 Kafka 的事务消息?
Kafka 事务性消息是一种机制,用于确保消息的可靠性传递和处理,与非事务性消息相比,它们在数据处理中提供了额外的保证,一旦消息被写入Kafka 集群,它们将被认为是已经处理,无论发生了什么,Kafka 的事务不同于 RocketMQ,RocketMQ 是保障本地事务与 RocketMQ 消息发送的事务一致性,而 Kafka 的事务消息主要是保障一次发送多条消息的事务一致性,发送的消息要么同时成功要么同时失败。
Kafka 事务消息的特性?
- 原子性:Kafka 事务性消息要么完全成功,要么完全失败,保障了消息不会被部分处理。
- 可靠性:Kafka 事务性消息一旦写入Kafka,它们将被视为已经处理,即使发生了应用程序或系统故障。
- 幂等性:Kafka 生产者可以配置为幂等,确保相同的消息不会被重复发送,对于 Kafka 事务消息通常会配置为幂等。
- Exactly Once 语义:仅一次,Kafka 事务性消息支持仅一次,即消息要么完全到达一次,要么不到达。
Kafka的消息传输保障
Kafka 的消息传输保障分为 3个层级:
at most once:至多一次,消息可能丢失,但绝不会重复。
at least once:最少一次,消息绝不丢失,但可能重复传输。
exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次。
Kafka 事务消息的使用
Kafka 事务消息的配置
Kafka 的事务消息需要对生产者和消费者进行一些配置来启用 Kafka 对事务消息的支持。
生产者配置
- acks:这个参数我们前面有见到过,是有关生产者接收到确认之后才认为消息发送成功的设置,对于事务性消息,通常将其设置为acks=all(表示需要等到消息写入到所有 ISR 同步副本中,设置为 -1 也是一样的效果),以确保消息在事务完全提交后才被视为成功发送。
- transactional.id:事务id,这是用于标识生产者实例的唯一ID,在配置文件中设置 transactional.id 是启用事务性消息的核心步骤。
消费者配置
- isolation.level:Kafka 消费者隔离级别的设置,对于事务性消息,通常将其设置为isolation.level=read_committed,确保只读取已经提交的事务消息。
- auto.offset.reset:这是消费者启动时从哪里开始读取消息的设置,通常将其设置为auto.offset.reset=earliest(如果分区下有提交的 offset 从提交的 offset 开始消费,如果没有提交的 offset,从头开始消费),以确保不会错过任何已提交的消息。
具体配置如下:
#消息应答
spring.kafka.producer.acks = -1
#事务id配置
spring.kafka.producer.transaction-id-prefix=my-transaction-
#读取已提交的消息
spring.kafka.consumer.isolation-level=read_committed
Kafka 事务消息案例演示
Producer 代码案例
对于生产者我们两种类型可以选择,使用 KafkaTemplate 进行事务消息发送和使用原生 API 进行事务消息发送。
使用 KafkaTemplate 完成事务消息发送代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;/*** @ClassName: TransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者*/
@Slf4j
@Component
public class TransactionalProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//发送事务消息(编程式)public void sendTransactionalMessage(int number) {try {kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, String> kafkaOperations) {kafkaOperations.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaOperations.send("transactional-topic", "我是第二条事务消息");return true;}});} catch (Exception e) {e.printStackTrace();}}//发送事务消息(注解方式)@Transactional(rollbackFor = RuntimeException.class)public void sendTransactionalMessageByAnnotation(int number) {kafkaTemplate.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaTemplate.send("transactional-topic", "我是第二条事务消息");}}
上述消息生产者代码中我们使用了两种方式实现,分别是编程式事务和注解式事务消息,大家根据自己的喜好来选择,两种方式都是可以的。
Consumer 代码案例
事务消息的 Consumer 代码没有什么特殊之处,我们使用 Manual ACK 即可。
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: TransactionalConsumer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息消费者*/
@Slf4j
@Component
public class TransactionalConsumer {@KafkaListener(id = "transactional-consumer",groupId = "transactional-consumer-groupId",topics = "transactional-topic",containerFactory = "myContainerFactory")public void listen(String message, Acknowledgment acknowledgment) {log.info("事务消息消费成功,消息内容:{}", message);//手动提交 ACKacknowledgment.acknowledge();}
}
事务消息结果验证
我们触发注解式事务消息发送,numer 传值 1,得到如下结果:
2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
结果符合预期
2024-11-05 19:52:27.444 INFO 20652 --- [nio-8086-exec-8] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-my-transaction-0, transactionalId=my-transaction-0] Aborting incomplete transaction
2024-11-05 19:52:27.451 ERROR 20652 --- [y-transaction-0] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='我是注解式事务第一条事务消息' to topic transactional-topic:org.apache.kafka.common.KafkaException: Failing batch since transaction was abortedat org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]2024-11-05 19:52:27.456 ERROR 20652 --- [nio-8086-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root causejava.lang.ArithmeticException: / by zeroat com.order.service.kafka.producer.TransactionalProducer.sendTransactionalMessageByAnnotation(TransactionalProducer.java:49) ~[classes/:na]at com.order.service.kafka.producer.TransactionalProducer$$FastClassBySpringCGLIB$$dbc0d44d.invoke(<generated>) ~[classes/:na]at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.6.jar:5.3.6]at com.order.service.kafka.producer.TransactionalProducer$$EnhancerBySpringCGLIB$$e4350198.sendTransactionalMessageByAnnotation(<generated>) ~[classes/:na]at com.order.service.controller.KafkaController.sendTransactionalMessageByAnnotation(KafkaController.java:96) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
从日志结果我们看到消费者没有消费到任何一条消息(这里我们是在第一条消息发送结束后模拟的异常),结果符合预期。
编程式事务消息这里就不在验证了,有兴趣的朋友可以自己去测试一下。
Producer 使用原生 API 发送事务消息
使用原生 API 发送事务消息,需要有一个 KafkaProducer 对象,我这里使用注入 Bean 的方式,具体如下:
@Bean
public KafkaProducer<String, String> kafkaProducer() {return new KafkaProducer<>(getMyKafkaProps());
}
有了 KafkaProducer 只有我们调用其 API 即可完成事务消息发送,KafkaProducer 有几个重要的 API 如下:
//初始化事务
public void initTransactions();//开启事务
public void beginTransaction() throws ProducerFencedException ;//在事务内提交已经消费的偏移量
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;//提交事务
public void commitTransaction() throws ProducerFencedException;//丢弃事务
public void abortTransaction() throws ProducerFencedException ;
使用原生 API 进行事务消息发送代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: NativeTransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者 原生API*/
@Slf4j
@Component
public class NativeTransactionalProducer {@Autowiredprivate KafkaProducer<String, String> kafkaProducer;//发送事务消息(原生API)public void sendTransactionalMessageByNativeApi(int number) {try {//初始化一个事务kafkaProducer.initTransactions();//开启事务kafkaProducer.beginTransaction();//发送消息kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第一条事务消息"));if (number == 0) {//模拟异常int a = 1 / 0;}kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第二条事务消息"));//提交事务消息kafkaProducer.commitTransaction();} catch (ProducerFencedException e) {//关闭资源kafkaProducer.close();} finally {//关闭资源kafkaProducer.close();}}}
原生 API 发送事务消息结果验证
我们触发原生 API 发送事务消息发送,numer 传值 1,得到如下结果:
2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
结果符合预期,异常的情况我这里就不演示了,有兴趣的朋友可以自己去验证。
事务消息注意事项
只要开启了事务消息功能,不管是 KafkaProducer 还是 KafkaTemplate 发送的所有消息,都必须处于 Kafka 事务之中,否则会抛出如下异常:
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a recordat org.springframework.util.Assert.state(Assert.java:76)at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:640)at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:552)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)at com.order.service.kafka.producer.ManualKafkaProducer.sendManualMessage(ManualKafkaProducer.java:34)at com.order.service.controller.KafkaController.sendManualMessage(KafkaController.java:87)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)at javax.servlet.http.HttpServlet.service(HttpServlet.java:626)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)at java.lang.Thread.run(Thread.java:745)
如果业务中,存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer),按需使用。
总结:本篇分享了 Kafka 的事务消息的使用,可以感受到 Kafka 的事务消息和 RocketMQ 是完全不一样的,希望可以帮助到需要用到的伙伴们。
如有不正确的地方欢迎各位指出纠正。
相关文章:
Kafka 之事务消息
前言: 在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 …...

小菜家教平台(四):基于SpringBoot+Vue打造一站式学习管理系统
前言 昨天配置完了过滤器,权限检验,基本的SpringSecurity功能已经配置的差不多了,今天继续开发,明天可能会暂停一天整理一下需求,然后就进行CRUD了。 今日进度 补充SpringSecurity异常处理和全局异常处理器 详细操作…...

解决 Vue3、Vite 和 TypeScript 开发环境下跨域的问题,实现前后端数据传递
引言 本文介绍如何在开发环境下解决 Vite 前端(端口 3000)和后端(端口 80)之间的跨域问题: 在开发环境中,前端使用的 Vite 端口与后端端口不一致,会产生跨域错误提示: Access to X…...
量化交易系统开发-实时行情自动化交易-3.3.数据采集流程
19年创业做过一年的量化交易但没有成功,作为交易系统的开发人员积累了一些经验,最近想重新研究交易系统,一边整理一边写出来一些思考供大家参考,也希望跟做量化的朋友有更多的交流和合作。 接下来说说数据采集流程,后…...

探索PyAV:Python中的多媒体处理利器
文章目录 探索PyAV:Python中的多媒体处理利器第一部分:背景介绍第二部分:PyAV是什么?第三部分:如何安装PyAV?第四部分:简单的库函数使用方法1. 打开文件2. 查看流3. 遍历帧4. 编码帧5. 关闭输出…...

SpringBoot源码解析(三):启动开始阶段
SpringBoot源码系列文章 SpringBoot源码解析(一):SpringApplication构造方法 SpringBoot源码解析(二):引导上下文DefaultBootstrapContext SpringBoot源码解析(三):启动开始阶段 目录 前言一、入口二、SpringApplicationRunListener1、作用…...
C# const与readonly关键字的区别
在C#中,readonly关键字用于定义在对象创建后不能更改的字段。它可以与常量(const)有些相似,但也有显著不同。以下是readonly关键字的一些关键点: 定义与用法: readonly字段可以在类的构造函数中初始化,而const字段必须…...

【数据分享】1901-2023年我国省市县镇四级的逐年降水数据(免费获取/Shp/Excel格式)
之前我们分享过1901-2023年1km分辨率逐月降水栅格数据和Shp和Excel格式的省市县四级逐月降水数据,原始的逐月降水栅格数据来源于彭守璋学者在国家青藏高原科学数据中心平台上分享的数据!基于逐月数据我们采用求年累计值的方法得到逐年降水栅格数据&#…...
hhdb数据库介绍(9-4)
访问安全 权限体系 计算节点有两类用户,一类是计算节点数据库用户,用于操作数据,执行SELECT,UPDATE,DELETE,INSERT等SQL语句。另一类是关系集群数据库可视化管理平台用户,用于管理配置信息。此…...

苍穹外卖的分层所用到的技术以及工具+jwt令牌流程图(jwt验证)
分层用到的技术以及工具: jwt令牌流程图:...
Python——数列1/2,2/3,3/4,···,n/(n+1)···的一般项为Xn=n/(n+1),当n—>∞时,判断数列{Xn}是否收敛
没注释的源代码 from sympy import * n symbols(n) s n/(n1) print(数列的极限为:,limit(s,n,oo))...

css:还是语法
emmet的使用 emmet是一个插件,Emmet 是 Zen Coding 的升级版,由 Zen Coding 的原作者进行开发,可以快速的编写 HTML、CSS 以及实现其他的功能。很多文本编辑器都支持,我们只是学会使用它: 生成html结构 <!-- emme…...

关于 el-table 的合计行问题
目录 一.自定义合计行 二.合计行不展示,只有缩放/变大窗口或者F12弹出后台时才展示 三.合计行出现了表格滚动条下方 四.合计行整体样式的修改 五.合计行单元格样式修改 1.css 2.jsx方式 六.合计行单元格合并 一.自定义合计行 通过 show-summary 属性开启合计…...

解决SVN更新,提交错误乱码
执行清理操作,没有菜单的情况 1.点击TortoiseSVN-设置-如图勾选 注意:下图没有点击上下文菜单勾选清理 选择对应文件目录,执行【清理】操作 2.如果还是乱码,如上操作勾选解除文件锁定, 执行【破除锁定】后再次执行【…...

《Python网络安全项目实战》项目4 编写网络扫描程序
《Python网络安全项目实战》项目4 编写网络扫描程序 项目4 编写网络扫描程序任务4.1 扫描内网有效IP地址任务描述任务分析任务实施任务拓展 任务4.2 编写端口扫描工具任务描述任务分析任务实施相关知识任务评价任务拓展项目评价 项目4 编写网络扫描程序 许多扫描工具是由Pytho…...

Python金融大数据分析概述
💂 个人网站:【 摸鱼游戏】【神级代码资源网站】【海拥导航】💅 想寻找共同学习交流,摸鱼划水的小伙伴,请点击【全栈技术交流群】 金融大数据分析在金融科技领域越来越重要,它涉及从海量数据中提取洞察,为金…...

黑马产品经理
1、合格的产品经理 什么是产品? 什么是产品经理? 想清楚产品怎么做的人。 合格的产品经理 2、产品经理的分类 为什么会有不同的分类? 按服务对象划分 按产品平台划分 公司所属行业不同(不限于以下) 工作内容划分 …...

机器学习——损失函数、代价函数、KL散度
🌺历史文章列表🌺 机器学习——损失函数、代价函数、KL散度机器学习——特征工程、正则化、强化学习机器学习——常见算法汇总机器学习——感知机、MLP、SVM机器学习——KNN机器学习——贝叶斯机器学习——决策树机器学习——随机森林、Bagging、Boostin…...

首次超越扩散模型和非自回归Transformer模型!字节开源RAR:自回归生成最新SOTA!
文章链接:https://arxiv.org/pdf/2411.00776 项目链接:https://yucornetto.github.io/projects/rar.html 代码&模型链接:https://github.com/bytedance/1d-tokenizer 亮点直击 RAR(随机排列自回归训练策略)&#x…...
C语言最简单的扫雷实现(解析加原码)
头文件 #define ROW 9 #define COL 9 #define ROWS ROW2 #define COLS COL2 #include <stdio.h> #include <stdlib.h> #include <time.h> #define numlei 10do while可以循环玩 两个板子,内板子放0,外板子放* set函数初始化两个板子 …...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf
FTP 客服管理系统 实现kefu123登录,不允许匿名访问,kefu只能访问/data/kefu目录,不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

恶补电源:1.电桥
一、元器件的选择 搜索并选择电桥,再multisim中选择FWB,就有各种型号的电桥: 电桥是用来干嘛的呢? 它是一个由四个二极管搭成的“桥梁”形状的电路,用来把交流电(AC)变成直流电(DC)。…...

Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...