当前位置: 首页 > news >正文

SpringCloud集成Seata saga模式案例

文章目录

  • 一、前言
  • 二、Seata saga模式介绍
    • 1、示例状态图
    • 2、“状态机”介绍
      • 1)“状态机”属性
      • 2)“状态”属性
      • 3)更多状态相关内容
  • 三、SpringCloud 集成 seata saga
    • 1、saga模式状态机相关信息
      • 1)状态机配置相关的三个表
      • 2)状态图
    • 2、项目代码
      • 0)pom.xml
      • 1)线程池配置 -- MyThreadFactory
      • 2)seata saga相关配置 -- SagaConfiguration
      • 3)库存服务 -- InventoryService
        • InventoryServiceImpl
      • 4)账户余额服务 -- BalanceService
        • BalanceServiceImpl
      • 5)启动类 -- SagaTradeApplication
      • 6) 状态图对应的JSON文件 -- reduce_inventory_and_balance.json
        • 状态图流程解析
      • 7)application.yml
      • 8)file.conf
      • 9)开启状态机入口 -- TradeController
    • 3、测试 / 验证
      • 1)启动seata-server服务;
      • 2)启动seata-client(saga-trade)
      • 3)事务提交
      • 4)事务回滚
  • 三、总结

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容已出:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
  14. 分布式事务Seata源码解析九:分支事务如何注册到全局事务
  15. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
  16. 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交
  17. 分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚
  18. Spring Cloud整合Seata实现TCC分布式事务模式案例
  19. 分布式事务Seata源码解析13:TCC事务模式实现原理
  20. 分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案(seata1.5.1如何解决?)
  21. Seata XA模式概述+案例
  22. saga模式、Seata saga模式详介

至此,Seata常用的AT模式、TCC模式 和 XA模式已完结,SAGA模式也已做了基本介绍,本文接着聊Spring Cloud 如何集成Seata saga模式实现全局事务、分支事务

二、Seata saga模式介绍

官方文档地址:https://seata.io/zh-cn/docs/user/saga.html

Seata提供的Saga模式目前只能通过状态机引擎来实现,整体机制为:

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件;
    • 换言之,需要开发者手工的进行Saga业务流程绘制,并将其转换为JSON配置文件;
  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点;
    • 注意: 异常发生时是否进行补偿也可由用户自定义决定,可以选择不配置;
  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚;
    • 在程序启动时,会根据saga状态图加载业务处理流程(包括:服务补偿处理);
  4. 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能;

1、示例状态图

在这里插入图片描述

2、“状态机”介绍

seata saga的状态语言在一定程度上参考了AWS Step Functions

1)“状态机”属性

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个"状态"
  • States: 状态列表,是一个map结构,key是"状态"的名称,在状态机内必须唯一
  • IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新
  • IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新

2)“状态”属性

  1. Type: “状态” 的类型,比如有:
    • ServiceTask: 执行调用服务任务
    • Choice: 单条件选择路由
    • CompensationTrigger: 触发补偿流程
    • Succeed: 状态机正常结束
    • Fail: 状态机异常结束
    • SubStateMachine: 调用子状态机
    • CompensateSubMachine: 用于补偿一个子状态机
  2. ServiceName: 服务名称,通常是服务的beanId(也就是Spring容器中的beanName)
    • 无论是SpringCloud,还是Dubbo、HSF…,最重要的就是配置这个beanId。
  3. ServiceMethod: 服务方法名称(也就是:Spring Bean中的某个方法名)
  4. CompensateState: 该"状态"的补偿"状态"
  5. Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行
  6. Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用 SpringEL, 如果是常量直接写值即可
  7. Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  8. Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  9. Catch: 捕获到异常后的路由
  10. Next: 服务执行完成后下一个执行的"状态"
  11. Choices: Choice类型的"状态"里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个"状态"
  12. ErrorCode: Fail类型"状态"的错误码
  13. Message: Fail类型"状态"的错误信息

3)更多状态相关内容

更多详细的状态语言使用示例见github:
https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine

三、SpringCloud 集成 seata saga

官方提供的saga案例地址:https://github.com/seata/seata-samples/tree/master/saga

在这里插入图片描述

然而并没有提供SpringCloud与saga模式集成的案例;以下介绍SpringCloud与saga模式集成的案例。

1、saga模式状态机相关信息

1)状态机配置相关的三个表

首先,我们需要 在使用状态机开启saga分支事务的 服务对应的数据库连接中创建三个表(以MYSQL为例):

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(`id`               VARCHAR(32)  NOT NULL COMMENT 'id',`name`             VARCHAR(128) NOT NULL COMMENT 'name',`tenant_id`        VARCHAR(32)  NOT NULL COMMENT 'tenant id',`app_name`         VARCHAR(32)  NOT NULL COMMENT 'application name',`type`             VARCHAR(20)  COMMENT 'state language type',`comment_`         VARCHAR(255) COMMENT 'comment',`ver`              VARCHAR(16)  NOT NULL COMMENT 'version',`gmt_create`       DATETIME(3)  NOT NULL COMMENT 'create time',`status`           VARCHAR(2)   NOT NULL COMMENT 'status(AC:active|IN:inactive)',`content`          TEXT COMMENT 'content',`recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',PRIMARY KEY (`id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(`id`                  VARCHAR(128)            NOT NULL COMMENT 'id',`machine_id`          VARCHAR(32)             NOT NULL COMMENT 'state machine definition id',`tenant_id`           VARCHAR(32)             NOT NULL COMMENT 'tenant id',`parent_id`           VARCHAR(128) COMMENT 'parent id',`gmt_started`         DATETIME(3)             NOT NULL COMMENT 'start time',`business_key`        VARCHAR(48) COMMENT 'business key',`start_params`        TEXT COMMENT 'start parameters',`gmt_end`             DATETIME(3) COMMENT 'end time',`excep`               BLOB COMMENT 'exception',`end_params`          TEXT COMMENT 'end parameters',`status`              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',`compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',`is_running`          TINYINT(1) COMMENT 'is running(0 no|1 yes)',`gmt_updated`         DATETIME(3) NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS `seata_state_inst`
(`id`                       VARCHAR(48)  NOT NULL COMMENT 'id',`machine_inst_id`          VARCHAR(128) NOT NULL COMMENT 'state machine instance id',`name`                     VARCHAR(128) NOT NULL COMMENT 'state name',`type`                     VARCHAR(20)  COMMENT 'state type',`service_name`             VARCHAR(128) COMMENT 'service name',`service_method`           VARCHAR(128) COMMENT 'method name',`service_type`             VARCHAR(16) COMMENT 'service type',`business_key`             VARCHAR(48) COMMENT 'business key',`state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',`state_id_retried_for`     VARCHAR(50) COMMENT 'state retried for',`gmt_started`              DATETIME(3)  NOT NULL COMMENT 'start time',`is_for_update`            TINYINT(1) COMMENT 'is service for update',`input_params`             TEXT COMMENT 'input parameters',`output_params`            TEXT COMMENT 'output parameters',`status`                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',`excep`                    BLOB COMMENT 'exception',`gmt_updated`              DATETIME(3) COMMENT 'update time',`gmt_end`                  DATETIME(3) COMMENT 'end time',PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;

数据库表的出处,见seata官方地址:https://github.com/seata/seata/blob/1.5.2/script/client/saga/db/mysql.sql

在这里插入图片描述

2)状态图

状态机设计器演示(在线画图工具)地址:http://seata.io/saga_designer/index.html

在这里插入图片描述

2、项目代码

整体代码结构:

在这里插入图片描述

此处案例和saga官方提供的一样,仅示范saga模式的使用,不涉及RPC、业务表操作,若读者想丰富案例,可在笔者的todo标注处自行添加。

0)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><modelVersion>4.0.0</modelVersion><version>0.0.1-SNAPSHOT</version><groupId>com.saint</groupId><artifactId>saga-trade</artifactId><properties><java.version>1.8</java.version><druid.version>1.2.8</druid.version><mysql.version>8.0.22</mysql.version><!--seata1.5.2 版本源码验证--><spring-boot.version>2.3.12.RELEASE</spring-boot.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version><spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version><druid.version>1.2.8</druid.version><mysql.version>8.0.22</mysql.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.10</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud alibaba--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

1)线程池配置 – MyThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程工厂*/
public class MyThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber;private ThreadGroup group;private String namePrefix;public MyThreadFactory(String namePrefix) {this.threadNumber = new AtomicInteger(1);SecurityManager s = System.getSecurityManager();this.group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();this.namePrefix = namePrefix + "_THREAD_";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);return t;}
}

2)seata saga相关配置 – SagaConfiguration

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
import io.seata.saga.rm.StateMachineEngineHolder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @author Saint*/
@Configuration
public class SagaConfiguration {@Bean@ConfigurationProperties(prefix = "spring.datasource")public DataSource dataSource() {return new DruidDataSource();}@Beanpublic ThreadPoolExecutor sagaThreadPool() {ThreadPoolExecutor executor = new ThreadPoolExecutor(1,20,30, TimeUnit.SECONDS,new LinkedBlockingQueue<>(2000),new MyThreadFactory("SAGA_ASYNC_EXE_"),new ThreadPoolExecutor.AbortPolicy());return executor;}@Beanpublic DbStateMachineConfig dbStateMachineConfig() {DbStateMachineConfig config = new DbStateMachineConfig();config.setDataSource(dataSource());config.setResources(new String[]{"statelang/*.json"});config.setEnableAsync(true);config.setApplicationId("saga-trade");config.setTxServiceGroup("saint-trade-tx-group");config.setThreadPoolExecutor(sagaThreadPool());return config;}@Beanpublic ProcessCtrlStateMachineEngine stateMachineEngine() {ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine();engine.setStateMachineConfig(dbStateMachineConfig());return engine;}@Beanpublic StateMachineEngineHolder stateMachineEngineHolder() {StateMachineEngineHolder holder = new StateMachineEngineHolder();holder.setStateMachineEngine(stateMachineEngine());return holder;}
}

3)库存服务 – InventoryService

InventoryService提供了两个方法:一个reduce()、一个reduce()对应的补偿方法compensateReduce()

package com.saint.saga.trade.service;/*** Inventory Actions*/
public interface InventoryService {/*** reduce** @param businessKey 业务上的唯一标识* @param count* @return*/boolean reduce(String businessKey, int count);/*** increase** @param businessKey 业务上的唯一标识* @return*/boolean compensateReduce(String businessKey);
}

InventoryServiceImpl

package com.saint.saga.trade.service.impl;import com.saint.saga.trade.service.InventoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;/*** 库存** @author Saint*/
@Service(value = "inventoryService")
public class InventoryServiceImpl implements InventoryService {private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);@Overridepublic boolean reduce(String businessKey, int count) {LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);// todo rpc / httpreturn true;}@Overridepublic boolean compensateReduce(String businessKey) {LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);// todo rpc / httpreturn true;}
}

4)账户余额服务 – BalanceService

BalanceService提供了两个方法:一个reduce()、一个reduce()对应的补偿方法compensateReduce()

package com.saint.saga.trade.service;import java.math.BigDecimal;
import java.util.Map;/*** Balance Actions*/
public interface BalanceService {/*** reduce** @param businessKey 业务上的唯一标识* @param amount* @param params* @return*/boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);/*** compensateReduce** @param businessKey 业务上的唯一标识* @param params* @return*/boolean compensateReduce(String businessKey, Map<String, Object> params);}

BalanceServiceImpl

package com.saint.saga.trade.service.impl;import com.saint.saga.trade.service.BalanceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import java.math.BigDecimal;
import java.util.Map;/*** 账户余额** @author Saint*/
@Service(value = "balanceService")
public class BalanceServiceImpl implements BalanceService {private static final Logger LOGGER = LoggerFactory.getLogger(BalanceServiceImpl.class);@Overridepublic boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {if (params != null) {Object throwException = params.get("throwException");if (throwException != null && "true".equals(throwException.toString())) {throw new RuntimeException("reduce balance failed");}}LOGGER.info("reduce balance succeed, amount: " + amount + ", bizCode:" + businessKey);// todo rpc / httpreturn true;}@Overridepublic boolean compensateReduce(String businessKey, Map<String, Object> params) {if (params != null) {Object throwException = params.get("throwException");if (throwException != null && "true".equals(throwException.toString())) {throw new RuntimeException("compensate reduce balance failed");}}LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);// todo rpc / httpreturn true;}
}

5)启动类 – SagaTradeApplication

@SpringBootApplication
public class SagaTradeApplication {public static void main(String[] args) {ConfigurableApplicationContext run = SpringApplication.run(SagaTradeApplication.class, args);InventoryService bean = run.getBean(InventoryService.class);BalanceService bean1 = run.getBean(BalanceService.class);}
}

6) 状态图对应的JSON文件 – reduce_inventory_and_balance.json

{"Name": "reduceInventoryAndBalance","Comment": "reduce inventory then reduce balance in a transaction","StartState": "ReduceInventory","Version": "0.0.1","States": {"ReduceInventory": {"Type": "ServiceTask","ServiceName": "inventoryService","ServiceMethod": "reduce","CompensateState": "CompensateReduceInventory","Next": "ChoiceState","Input": ["$.[businessKey]","$.[count]"],"Output": {"reduceInventoryResult": "$.#root"},"Status": {"#root == true": "SU","#root == false": "FA","$Exception{java.lang.Throwable}": "UN"}},"ChoiceState": {"Type": "Choice","Choices": [{"Expression": "[reduceInventoryResult] == true","Next": "ReduceBalance"}],"Default": "Fail"},"ReduceBalance": {"Type": "ServiceTask","ServiceName": "balanceService","ServiceMethod": "reduce","CompensateState": "CompensateReduceBalance","Input": ["$.[businessKey]","$.[amount]",{"throwException": "$.[mockReduceBalanceFail]"}],"Output": {"compensateReduceBalanceResult": "$.#root"},"Status": {"#root == true": "SU","#root == false": "FA","$Exception{java.lang.Throwable}": "UN"},"Catch": [{"Exceptions": ["java.lang.Throwable"],"Next": "CompensationTrigger"}],"Next": "Succeed"},"CompensateReduceInventory": {"Type": "ServiceTask","ServiceName": "inventoryService","ServiceMethod": "compensateReduce","Input": ["$.[businessKey]"]},"CompensateReduceBalance": {"Type": "ServiceTask","ServiceName": "balanceService","ServiceMethod": "compensateReduce","Input": ["$.[businessKey]"]},"CompensationTrigger": {"Type": "CompensationTrigger","Next": "Fail"},"Succeed": {"Type": "Succeed"},"Fail": {"Type": "Fail","ErrorCode": "PURCHASE_FAILED","Message": "purchase failed"}}
}

状态图流程解析

在这里插入图片描述

7)application.yml

server:port: 9099
spring:application:name: saga-tradedatasource:url: jdbc:mysql://127.0.0.1:3306/seata_saga?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=falseusername: rootpassword: 123456driver-class-name: com.mysql.cj.jdbc.Driverjpa:show-sql: trueseata:tx-service-group: saint-trade-tx-group

8)file.conf

transport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = true#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThread-prefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"
}
service {#transaction service group mappingvgroupMapping.saint-trade-tx-group = "seata-server-sh"#only support when registry.type=file, please don't set multiple addressesseata-server-sh.grouplist = "127.0.0.1:8091"#degrade, current not supportenableDegrade = false#disable seatadisableGlobalTransaction = false
}client {rm {asyncCommitBufferLimit = 10000lock {retryInterval = 10retryTimes = 30retryPolicyBranchRollbackOnConflict = true}reportRetryCount = 5tableMetaCheckEnable = falsereportSuccessEnable = false}tm {commitRetryCount = 5rollbackRetryCount = 5}undo {dataValidation = truelogSerialization = "jackson"logTable = "undo_log"}log {exceptionRate = 100}
}

9)开启状态机入口 – TradeController

状态机支持两种执行方式:同步执行、异步执行;

在这里插入图片描述

  • 同步执行API:StateMachineEngine#startWithBusinessKey();
  • 异步执行API:StateMachineEngine#startWithBusinessKeyAsync(…, AsyncCallback)
    • 其中的AsyncCallback为异步执行结束之后的回调函数
package com.saint.saga.trade.controller;import io.seata.saga.engine.AsyncCallback;
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.proctrl.ProcessContext;
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateMachineInstance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;/*** @author Saint*/
@RestController
@RequestMapping("saga")
@Slf4j
public class TradeController {@Autowiredprivate StateMachineEngine stateMachineEngine;/*** POST请求 http://localhost:9099/saga/commit?amount=50&count=2*/@RequestMapping(value = "/commit", method = RequestMethod.POST)public String commit(Integer amount, Integer count) {String businessKey = String.valueOf(System.currentTimeMillis());Map<String, Object> startParams = generateStartParams(amount, count, false);// 1、sync testStateMachineInstance instance = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null,businessKey, startParams);// 2、async test
//        StateMachineInstance instance = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams,
//                CALL_BACK);
//        waitingForFinish(instance);// PS: instance is not nullif (!ExecutionStatus.SU.equals(instance.getStatus())) {log.error("saga transaction execute failed. XID: {}", instance.getId());return "rollback";}log.info("saga transaction commit succeed. XID: {}", instance.getId());return "succeed";}/*** POST请求 http://localhost:9099/saga/rollback?amount=50&count=2*/@RequestMapping(value = "/rollback", method = RequestMethod.POST)public String rollback(Integer amount, Integer count) {String businessKey = String.valueOf(System.currentTimeMillis());// unique difference is hereMap<String, Object> startParams = generateStartParams(amount, count, true);// 1、sync testStateMachineInstance instance = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null,businessKey, startParams);// 2、async test
//        StateMachineInstance instance = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams,
//                CALL_BACK);
//        waitingForFinish(instance);// PS: instance is not nullif (!ExecutionStatus.SU.equals(instance.getStatus())) {log.error("saga transaction execute failed. XID: {}", instance.getId());return "rollback";}log.info("saga transaction commit succeed. XID: {}", instance.getId());return "succeed";}/*** parameters to be used in the state machine(状态机需要用到的参数在这里组装)*/private Map<String, Object> generateStartParams(Integer amount, Integer count, Boolean mockFail) {String businessKey = String.valueOf(System.currentTimeMillis());Map<String, Object> startParams = new HashMap<>(8);startParams.put("businessKey", businessKey);startParams.put("count", 10);startParams.put("amount", new BigDecimal(String.valueOf(amount)));if (mockFail)startParams.put("mockReduceBalanceFail", true);return startParams;}private static volatile Object lock = new Object();private static AsyncCallback CALL_BACK = new AsyncCallback() {@Overridepublic void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) {synchronized (lock) {lock.notifyAll();}}@Overridepublic void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) {synchronized (lock) {lock.notifyAll();}}};private static void waitingForFinish(StateMachineInstance inst) {synchronized (lock) {if (!ExecutionStatus.RU.equals(inst.getStatus()))return;try {lock.wait();} catch (InterruptedException e) {log.error("occur exception, ", e);}}}
}

3、测试 / 验证

1)启动seata-server服务;

参考博文:超细的Spring Cloud 整合Seata实现分布式事务(排坑版)进行seata-server的配置和启动;

在这里插入图片描述

注意:本文使用的seata版本是1.5.2,切勿使用成参考博文中的1.3.0。

在这里插入图片描述

seata server1.5.2启动成功后控制台输出:

在这里插入图片描述

2)启动seata-client(saga-trade)

在这里插入图片描述

3)事务提交

执行 POST类型请求:http://localhost:9099/saga/commit?amount=50&count=2

在这里插入图片描述

saga-trade控制台输出:

在这里插入图片描述

seata-server日志:

在这里插入图片描述

4)事务回滚

执行 POST类型请求:http://localhost:9099/saga/commit?amount=50&count=2

在这里插入图片描述

saga-trade控制台输出:

在这里插入图片描述

seata-server日志:

在这里插入图片描述

三、总结

seata的saga模式适用于长流程 或 长事务场景。saga模式复杂的地方在于引入状态机,需要自己根据业务定义状态机的流程,然后把定义好的流程用json文件导入到工程中。

此外,saga模式需要开发者自定义回滚事件,并要考虑空补偿、悬挂、幂等三种问题,即:允许空补偿、做防悬挂控制、做幂等控制。读者可以参考TCC模式中的解决方案(分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案(seata1.5.1如何解决?))实现;

相关文章:

SpringCloud集成Seata saga模式案例

文章目录一、前言二、Seata saga模式介绍1、示例状态图2、“状态机”介绍1&#xff09;“状态机”属性2&#xff09;“状态”属性3&#xff09;更多状态相关内容三、SpringCloud 集成 seata saga1、saga模式状态机相关信息1&#xff09;状态机配置相关的三个表2&#xff09;状态…...

逍遥自在学C语言 | 位运算符的高级用法

前言 在上一篇文章中&#xff0c;我们介绍了&运算符的基础用法&#xff0c;本篇文章&#xff0c;我们将介绍& 运算符的一些高级用法。 一、人物简介 第一位闪亮登场&#xff0c;有请今后会一直教我们C语言的老师 —— 自在。 第二位上场的是和我们一起学习的小白程序…...

Java实现输入行数打印取缔字符,打印金字塔三角形的两个代码程序

目录 前言 一、实现输入行数&#xff0c;打印取缔字符 1.1运行流程&#xff08;思想&#xff09; 1.2代码段 1.3运行截图 二、打印金字塔三角形 1.1运行流程&#xff08;思想&#xff09; 1.2代码段 1.3运行截图​​​​​​​ 前言 1.因多重原因&#xff0c;本博文有…...

express项目的创建

前言 前端开发者若要进行后端开发&#xff0c;大多都会选择node.js&#xff0c;在node生态下是有大量框架的&#xff0c;其中最受新手喜爱的便是老牌的express.js&#xff0c;接下来我们就从零创建一个express项目。 安装node 在这里&#xff1a;https://nodejs.org/dist/v16…...

RK3399平台开发系列讲解(基础篇)Linux 传统间隔定时器

🚀返回专栏总目录 文章目录 一、设置间隔定时器 setitimer()二、查询定时器状态 getitimer()三、更简单的定时接口 alarm()四、传统定时器的应用4.1、为阻塞操作设置超时4.2、性能剖析五、传统定时器的局限性沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将详细…...

Kafka 3.4.0 kraft 集群搭建

文章目录简介基础环境服务器三台安装下载安装初始化集群启动集群验证创建Topic查看Topic详情简介 Apache 软件基金会发布了包含许多新特性和改进的 Kafka 3.3.1。这是第一个标志着可以在生产环境中使用 KRaft&#xff08;Kafka Raft&#xff09;共识协议的版本。在几年的开发过…...

微信小程序 iphone14 css mask 使用图片实现遮照 疑似 no-repeat 失效

1. 将图片转为 换成svg类型 2. css设置属性时书写顺序&#xff08;如果顺序不对会导致展示问题 T T 奇妙的bug&#xff09; .water-inner {-webkit-mask-image: url("./water-black.svg");mask-image: url("./water-black.svg");-webkit-mask-size: cont…...

密码学实践-04

密码强度 你要揭榜的任务非常简单&#xff0c;内容如下。 用户输入口令后&#xff0c;请进行强度检测&#xff1a; 等级三种&#xff1a;强&#xff0c;中&#xff0c;弱 1、口令长度小于等于8位&#xff0c;并且纯小写英文或大写英文&#xff0c;弱 2、口令长度小于等于8位&am…...

SpringBoot整合swagger实现接口管理并设置加密访问

pom.xml pom.xml文件加入swagger <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>com.github.xiaoymin&l…...

C语言单例模式-实现高性能日志管理器

C语言单例模式-实现高性能日志管理器 代码中&#xff0c;使用了单例模式来创建日志管理器对象&#xff0c;保证了整个程序中只有一个日志管理器对象。 日志管理器中包含了日志文件指针、日志级别、互斥锁等成员&#xff0c;通过这些成员来实现日志的写入和级别控制。 在主函数…...

Flutter - flutter项目添加 Web 支持

demo 地址: https://github.com/iotjin/jh_flutter_demo 代码不定时更新&#xff0c;请前往github查看最新代码 参考&#xff1a; 官方&#xff1a;构建 Flutter Web 应用 Flutter Desktop Support flutter项目添加 Web 支持 在项目的根目录下运行&#xff1a;flutter create …...

关键词数据分析-搜索词和关键词分析工具

要搜索热门关键词获取&#xff0c;可以采用以下几种方法&#xff1a; 使用百度指数&#xff1a;百度指数是一个实用的工具&#xff0c;可用于查看关键词的热度趋势、搜索量等数据。在百度指数中&#xff0c;您可以输入您要搜索的关键词&#xff0c;并查看近期的相关数据。这可以…...

SpringCloud微服务技术栈之网关服务Gateway

文章目录SpringCloud微服务技术栈之网关服务Gateway前言网关服务Gateway的基本概念Gateway的体系结构Gateway的主要功能网关服务Gateway的架构设计架构设计方案示例代码网关服务Gateway的实践操作1. 创建工程2. 配置路由规则3. 实现过滤器4. 集成服务注册中心5. 启动网关服务器…...

什么原因导致了儿童自闭症?跟父母养育有关吗?

导致儿童自闭症的原因是什么&#xff1f;这和父母的抚养有关吗&#xff1f;学习教育孩子的方法&#xff0c;让孩子快乐健康地成长&#xff0c;是家庭和孩子生活中的一件重要事情。不良的环境和错误的教育会导致儿童自闭症&#xff0c;这是真的吗&#xff1f;自闭症&#xff0c;…...

抽象轻松web

不断学习&#xff0c;不断进步&#xff0c;才能不被替代 只有你的不可被替代性才是价值所在 千变万化的叶子 根只有一个 ----2023年4月7日 弹性盒布局的作用其实是定位 我们设置弹性盒子的时候目的是为了让元素放在页面中的某个位置&#xff0c;从而达到布局的效果 定位的本质…...

如何获取系统下目录的文件系统类型

最近看到一个问题&#xff0c;如何获取当前系统的文件类型&#xff1f; 这个时候就要介绍下/proc/mounts文件&#xff1a;这个文件以/etc/mtab文件的格式给出当前系统所安装的文件系统信息。同时也能反映出任何手工安装从而在/etc/mtab文件中没有包含的文件系统。 我们可以通…...

【Linux】GCC编译器的使用

目录 前言&#xff1a; 一、GCC编译过程 1.预处理&#xff1a; 2.编译 3.汇编 4.链接 二、制作、使用动态库和静态库 1.静态库 2.动态库 三、好用的选项 1.gcc -E main.c 2.gcc -E -dM main.c > 1.txt 3.gcc -Wp,-MD,abc.dep -c -o main.o main.c 4.echo main(){}| …...

浅谈一下socks5协议原理详解与应用场景分析

SOCKS5协议是一种网络传输协议&#xff0c;主要用于代理服务器和客户端之间的通信。它能够通过认证授权等多种方式&#xff0c;提供安全可靠的代理服务&#xff0c;适用于各种应用场景。 SOCKS5协议原理&#xff1a; 1.连接建立&#xff1a;客户端向代理服务器发送连接请求&…...

java面试准备17

事务的四大特性 &#xff08;1&#xff09;原子性&#xff1a;事务执行的最小单位&#xff0c;不可被分割&#xff0c;事务的原子性保证事务中的一连串动作要么都执行&#xff0c;要么都不执行。 &#xff08;2&#xff09;一致性&#xff1a;执行事务前后的数据保持一致&…...

ffmpeg的滤镜

FFmpeg 是一款开源的跨平台音视频处理工具&#xff0c;它提供了众多功能强大的滤镜用于视频/音频的加工处理。其中&#xff0c;滤镜(Filter)是 FFmpeg 中一个十分重要且常用的组件&#xff0c;它们可以实现对视频和音频的各种操作和变化&#xff0c;如转码、编解码、剪裁、裁剪…...

Python爬虫实战:研究feedparser库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

【网络安全产品大调研系列】2. 体验漏洞扫描

前言 2023 年漏洞扫描服务市场规模预计为 3.06&#xff08;十亿美元&#xff09;。漏洞扫描服务市场行业预计将从 2024 年的 3.48&#xff08;十亿美元&#xff09;增长到 2032 年的 9.54&#xff08;十亿美元&#xff09;。预测期内漏洞扫描服务市场 CAGR&#xff08;增长率&…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...