Kafka 之事务消息
前言:
在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 Kafka 的事务消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
Kafka 之顺序消息
事务消息的使用场景
事务消息的使用场景众多,这里我简单列举几个如下:
- 金融交易处理:在金融领域,每笔交易都必须具备原子性,确保不发生不一致或重复的交易,事务性消息可用于金融交易的场景,来保证交易的完整性。
- 订单处理:用户订单处理必须是完整的,需要保订单的创建、支付和发货不会出现问题,事务性消息可以用于订单场景,来保证整个流程的完整性。
- 。。。。等等等场景。
什么是 Kafka 的事务消息?
Kafka 事务性消息是一种机制,用于确保消息的可靠性传递和处理,与非事务性消息相比,它们在数据处理中提供了额外的保证,一旦消息被写入Kafka 集群,它们将被认为是已经处理,无论发生了什么,Kafka 的事务不同于 RocketMQ,RocketMQ 是保障本地事务与 RocketMQ 消息发送的事务一致性,而 Kafka 的事务消息主要是保障一次发送多条消息的事务一致性,发送的消息要么同时成功要么同时失败。
Kafka 事务消息的特性?
- 原子性:Kafka 事务性消息要么完全成功,要么完全失败,保障了消息不会被部分处理。
- 可靠性:Kafka 事务性消息一旦写入Kafka,它们将被视为已经处理,即使发生了应用程序或系统故障。
- 幂等性:Kafka 生产者可以配置为幂等,确保相同的消息不会被重复发送,对于 Kafka 事务消息通常会配置为幂等。
- Exactly Once 语义:仅一次,Kafka 事务性消息支持仅一次,即消息要么完全到达一次,要么不到达。
Kafka的消息传输保障
Kafka 的消息传输保障分为 3个层级:
at most once:至多一次,消息可能丢失,但绝不会重复。
at least once:最少一次,消息绝不丢失,但可能重复传输。
exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次。
Kafka 事务消息的使用
Kafka 事务消息的配置
Kafka 的事务消息需要对生产者和消费者进行一些配置来启用 Kafka 对事务消息的支持。
生产者配置
- acks:这个参数我们前面有见到过,是有关生产者接收到确认之后才认为消息发送成功的设置,对于事务性消息,通常将其设置为acks=all(表示需要等到消息写入到所有 ISR 同步副本中,设置为 -1 也是一样的效果),以确保消息在事务完全提交后才被视为成功发送。
- transactional.id:事务id,这是用于标识生产者实例的唯一ID,在配置文件中设置 transactional.id 是启用事务性消息的核心步骤。
消费者配置
- isolation.level:Kafka 消费者隔离级别的设置,对于事务性消息,通常将其设置为isolation.level=read_committed,确保只读取已经提交的事务消息。
- auto.offset.reset:这是消费者启动时从哪里开始读取消息的设置,通常将其设置为auto.offset.reset=earliest(如果分区下有提交的 offset 从提交的 offset 开始消费,如果没有提交的 offset,从头开始消费),以确保不会错过任何已提交的消息。
具体配置如下:
#消息应答
spring.kafka.producer.acks = -1
#事务id配置
spring.kafka.producer.transaction-id-prefix=my-transaction-
#读取已提交的消息
spring.kafka.consumer.isolation-level=read_committed
Kafka 事务消息案例演示
Producer 代码案例
对于生产者我们两种类型可以选择,使用 KafkaTemplate 进行事务消息发送和使用原生 API 进行事务消息发送。
使用 KafkaTemplate 完成事务消息发送代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;/*** @ClassName: TransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者*/
@Slf4j
@Component
public class TransactionalProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//发送事务消息(编程式)public void sendTransactionalMessage(int number) {try {kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, String> kafkaOperations) {kafkaOperations.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaOperations.send("transactional-topic", "我是第二条事务消息");return true;}});} catch (Exception e) {e.printStackTrace();}}//发送事务消息(注解方式)@Transactional(rollbackFor = RuntimeException.class)public void sendTransactionalMessageByAnnotation(int number) {kafkaTemplate.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaTemplate.send("transactional-topic", "我是第二条事务消息");}}
上述消息生产者代码中我们使用了两种方式实现,分别是编程式事务和注解式事务消息,大家根据自己的喜好来选择,两种方式都是可以的。
Consumer 代码案例
事务消息的 Consumer 代码没有什么特殊之处,我们使用 Manual ACK 即可。
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: TransactionalConsumer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息消费者*/
@Slf4j
@Component
public class TransactionalConsumer {@KafkaListener(id = "transactional-consumer",groupId = "transactional-consumer-groupId",topics = "transactional-topic",containerFactory = "myContainerFactory")public void listen(String message, Acknowledgment acknowledgment) {log.info("事务消息消费成功,消息内容:{}", message);//手动提交 ACKacknowledgment.acknowledge();}
}
事务消息结果验证
我们触发注解式事务消息发送,numer 传值 1,得到如下结果:
2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
结果符合预期
2024-11-05 19:52:27.444 INFO 20652 --- [nio-8086-exec-8] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-my-transaction-0, transactionalId=my-transaction-0] Aborting incomplete transaction
2024-11-05 19:52:27.451 ERROR 20652 --- [y-transaction-0] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='我是注解式事务第一条事务消息' to topic transactional-topic:org.apache.kafka.common.KafkaException: Failing batch since transaction was abortedat org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]2024-11-05 19:52:27.456 ERROR 20652 --- [nio-8086-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root causejava.lang.ArithmeticException: / by zeroat com.order.service.kafka.producer.TransactionalProducer.sendTransactionalMessageByAnnotation(TransactionalProducer.java:49) ~[classes/:na]at com.order.service.kafka.producer.TransactionalProducer$$FastClassBySpringCGLIB$$dbc0d44d.invoke(<generated>) ~[classes/:na]at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.6.jar:5.3.6]at com.order.service.kafka.producer.TransactionalProducer$$EnhancerBySpringCGLIB$$e4350198.sendTransactionalMessageByAnnotation(<generated>) ~[classes/:na]at com.order.service.controller.KafkaController.sendTransactionalMessageByAnnotation(KafkaController.java:96) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
从日志结果我们看到消费者没有消费到任何一条消息(这里我们是在第一条消息发送结束后模拟的异常),结果符合预期。
编程式事务消息这里就不在验证了,有兴趣的朋友可以自己去测试一下。
Producer 使用原生 API 发送事务消息
使用原生 API 发送事务消息,需要有一个 KafkaProducer 对象,我这里使用注入 Bean 的方式,具体如下:
@Bean
public KafkaProducer<String, String> kafkaProducer() {return new KafkaProducer<>(getMyKafkaProps());
}
有了 KafkaProducer 只有我们调用其 API 即可完成事务消息发送,KafkaProducer 有几个重要的 API 如下:
//初始化事务
public void initTransactions();//开启事务
public void beginTransaction() throws ProducerFencedException ;//在事务内提交已经消费的偏移量
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;//提交事务
public void commitTransaction() throws ProducerFencedException;//丢弃事务
public void abortTransaction() throws ProducerFencedException ;
使用原生 API 进行事务消息发送代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: NativeTransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者 原生API*/
@Slf4j
@Component
public class NativeTransactionalProducer {@Autowiredprivate KafkaProducer<String, String> kafkaProducer;//发送事务消息(原生API)public void sendTransactionalMessageByNativeApi(int number) {try {//初始化一个事务kafkaProducer.initTransactions();//开启事务kafkaProducer.beginTransaction();//发送消息kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第一条事务消息"));if (number == 0) {//模拟异常int a = 1 / 0;}kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第二条事务消息"));//提交事务消息kafkaProducer.commitTransaction();} catch (ProducerFencedException e) {//关闭资源kafkaProducer.close();} finally {//关闭资源kafkaProducer.close();}}}
原生 API 发送事务消息结果验证
我们触发原生 API 发送事务消息发送,numer 传值 1,得到如下结果:
2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
结果符合预期,异常的情况我这里就不演示了,有兴趣的朋友可以自己去验证。
事务消息注意事项
只要开启了事务消息功能,不管是 KafkaProducer 还是 KafkaTemplate 发送的所有消息,都必须处于 Kafka 事务之中,否则会抛出如下异常:
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a recordat org.springframework.util.Assert.state(Assert.java:76)at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:640)at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:552)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)at com.order.service.kafka.producer.ManualKafkaProducer.sendManualMessage(ManualKafkaProducer.java:34)at com.order.service.controller.KafkaController.sendManualMessage(KafkaController.java:87)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)at javax.servlet.http.HttpServlet.service(HttpServlet.java:626)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)at java.lang.Thread.run(Thread.java:745)
如果业务中,存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer),按需使用。
总结:本篇分享了 Kafka 的事务消息的使用,可以感受到 Kafka 的事务消息和 RocketMQ 是完全不一样的,希望可以帮助到需要用到的伙伴们。
如有不正确的地方欢迎各位指出纠正。
相关文章:
Kafka 之事务消息
前言: 在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 …...
小菜家教平台(四):基于SpringBoot+Vue打造一站式学习管理系统
前言 昨天配置完了过滤器,权限检验,基本的SpringSecurity功能已经配置的差不多了,今天继续开发,明天可能会暂停一天整理一下需求,然后就进行CRUD了。 今日进度 补充SpringSecurity异常处理和全局异常处理器 详细操作…...
解决 Vue3、Vite 和 TypeScript 开发环境下跨域的问题,实现前后端数据传递
引言 本文介绍如何在开发环境下解决 Vite 前端(端口 3000)和后端(端口 80)之间的跨域问题: 在开发环境中,前端使用的 Vite 端口与后端端口不一致,会产生跨域错误提示: Access to X…...
量化交易系统开发-实时行情自动化交易-3.3.数据采集流程
19年创业做过一年的量化交易但没有成功,作为交易系统的开发人员积累了一些经验,最近想重新研究交易系统,一边整理一边写出来一些思考供大家参考,也希望跟做量化的朋友有更多的交流和合作。 接下来说说数据采集流程,后…...
探索PyAV:Python中的多媒体处理利器
文章目录 探索PyAV:Python中的多媒体处理利器第一部分:背景介绍第二部分:PyAV是什么?第三部分:如何安装PyAV?第四部分:简单的库函数使用方法1. 打开文件2. 查看流3. 遍历帧4. 编码帧5. 关闭输出…...
SpringBoot源码解析(三):启动开始阶段
SpringBoot源码系列文章 SpringBoot源码解析(一):SpringApplication构造方法 SpringBoot源码解析(二):引导上下文DefaultBootstrapContext SpringBoot源码解析(三):启动开始阶段 目录 前言一、入口二、SpringApplicationRunListener1、作用…...
C# const与readonly关键字的区别
在C#中,readonly关键字用于定义在对象创建后不能更改的字段。它可以与常量(const)有些相似,但也有显著不同。以下是readonly关键字的一些关键点: 定义与用法: readonly字段可以在类的构造函数中初始化,而const字段必须…...
【数据分享】1901-2023年我国省市县镇四级的逐年降水数据(免费获取/Shp/Excel格式)
之前我们分享过1901-2023年1km分辨率逐月降水栅格数据和Shp和Excel格式的省市县四级逐月降水数据,原始的逐月降水栅格数据来源于彭守璋学者在国家青藏高原科学数据中心平台上分享的数据!基于逐月数据我们采用求年累计值的方法得到逐年降水栅格数据&#…...
hhdb数据库介绍(9-4)
访问安全 权限体系 计算节点有两类用户,一类是计算节点数据库用户,用于操作数据,执行SELECT,UPDATE,DELETE,INSERT等SQL语句。另一类是关系集群数据库可视化管理平台用户,用于管理配置信息。此…...
苍穹外卖的分层所用到的技术以及工具+jwt令牌流程图(jwt验证)
分层用到的技术以及工具: jwt令牌流程图:...
Python——数列1/2,2/3,3/4,···,n/(n+1)···的一般项为Xn=n/(n+1),当n—>∞时,判断数列{Xn}是否收敛
没注释的源代码 from sympy import * n symbols(n) s n/(n1) print(数列的极限为:,limit(s,n,oo))...
css:还是语法
emmet的使用 emmet是一个插件,Emmet 是 Zen Coding 的升级版,由 Zen Coding 的原作者进行开发,可以快速的编写 HTML、CSS 以及实现其他的功能。很多文本编辑器都支持,我们只是学会使用它: 生成html结构 <!-- emme…...
关于 el-table 的合计行问题
目录 一.自定义合计行 二.合计行不展示,只有缩放/变大窗口或者F12弹出后台时才展示 三.合计行出现了表格滚动条下方 四.合计行整体样式的修改 五.合计行单元格样式修改 1.css 2.jsx方式 六.合计行单元格合并 一.自定义合计行 通过 show-summary 属性开启合计…...
解决SVN更新,提交错误乱码
执行清理操作,没有菜单的情况 1.点击TortoiseSVN-设置-如图勾选 注意:下图没有点击上下文菜单勾选清理 选择对应文件目录,执行【清理】操作 2.如果还是乱码,如上操作勾选解除文件锁定, 执行【破除锁定】后再次执行【…...
《Python网络安全项目实战》项目4 编写网络扫描程序
《Python网络安全项目实战》项目4 编写网络扫描程序 项目4 编写网络扫描程序任务4.1 扫描内网有效IP地址任务描述任务分析任务实施任务拓展 任务4.2 编写端口扫描工具任务描述任务分析任务实施相关知识任务评价任务拓展项目评价 项目4 编写网络扫描程序 许多扫描工具是由Pytho…...
Python金融大数据分析概述
💂 个人网站:【 摸鱼游戏】【神级代码资源网站】【海拥导航】💅 想寻找共同学习交流,摸鱼划水的小伙伴,请点击【全栈技术交流群】 金融大数据分析在金融科技领域越来越重要,它涉及从海量数据中提取洞察,为金…...
黑马产品经理
1、合格的产品经理 什么是产品? 什么是产品经理? 想清楚产品怎么做的人。 合格的产品经理 2、产品经理的分类 为什么会有不同的分类? 按服务对象划分 按产品平台划分 公司所属行业不同(不限于以下) 工作内容划分 …...
机器学习——损失函数、代价函数、KL散度
🌺历史文章列表🌺 机器学习——损失函数、代价函数、KL散度机器学习——特征工程、正则化、强化学习机器学习——常见算法汇总机器学习——感知机、MLP、SVM机器学习——KNN机器学习——贝叶斯机器学习——决策树机器学习——随机森林、Bagging、Boostin…...
首次超越扩散模型和非自回归Transformer模型!字节开源RAR:自回归生成最新SOTA!
文章链接:https://arxiv.org/pdf/2411.00776 项目链接:https://yucornetto.github.io/projects/rar.html 代码&模型链接:https://github.com/bytedance/1d-tokenizer 亮点直击 RAR(随机排列自回归训练策略)&#x…...
C语言最简单的扫雷实现(解析加原码)
头文件 #define ROW 9 #define COL 9 #define ROWS ROW2 #define COLS COL2 #include <stdio.h> #include <stdlib.h> #include <time.h> #define numlei 10do while可以循环玩 两个板子,内板子放0,外板子放* set函数初始化两个板子 …...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...
使用VMware克隆功能快速搭建集群
自己搭建的虚拟机,后续不管是学习java还是大数据,都需要集群,java需要分布式的微服务,大数据Hadoop的计算集群,如果从头开始搭建虚拟机会比较费时费力,这里分享一下如何使用克隆功能快速搭建一个集群 先把…...
篇章一 论坛系统——前置知识
目录 1.软件开发 1.1 软件的生命周期 1.2 面向对象 1.3 CS、BS架构 1.CS架构编辑 2.BS架构 1.4 软件需求 1.需求分类 2.需求获取 1.5 需求分析 1. 工作内容 1.6 面向对象分析 1.OOA的任务 2.统一建模语言UML 3. 用例模型 3.1 用例图的元素 3.2 建立用例模型 …...
比较数据迁移后MySQL数据库和PostgreSQL数据仓库中的表
设计一个MySQL数据库和PostgreSQL数据库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较两…...
