当前位置: 首页 > news >正文

Seata源码学习(五)- Seata服务端(TC)源码解读

Seata源码分析- Seata服务端(TC)源码解读

上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作

服务端表解释

我们的Seata服务端在应用的时候需要准备三张表,那么这三张表分别代表的意思就是

  1. branch_table 分支事务表
  2. global_table 全局事务表
  3. lock_table 全局锁表

客户端请求服务端以后,我们就需要把对应的全局事务包括分支事务和全局锁全部存放到这里。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-54n1wCiF-1676380289869)(image-20220311185545970-1647008601419.png)]

TC服务端启动入口

那么我们任何的Java工程启动都需要主函数main,所以我们就从这里入手,首先在seata源码工程中搜索这个入口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PemtLXu0-1676380289870)(image-20220311175232256.png)]

这里我们看Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务

// 协调协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

全局事务开始方法分析

然后进入到其中我们可以看到很多的全局事务处理的方法

// 处理全局事务开始
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 响应客户端XIDresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}// 处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.commit(request.getXid()));
}// 处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.rollback(request.getXid()));
}
.....

在这其中我们首先关注doGlobalBegin方法中的core.begin()方法,来看一下具体操作

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务SessionGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager// 观察者设计模式session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 全局事务开启session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();
}

在向下我们要关注一下全局Session管理器DataBaseSessionManager,进入到getRootSessionManager()方法中

/**
* Gets root session manager.
* 获取一个全局Session管理器
* @return the root session manager
*/
public static SessionManager getRootSessionManager() {if (ROOT_SESSION_MANAGER == null) {throw new ShouldNeverHappenException("SessionManager is NOT init!");}return ROOT_SESSION_MANAGER;
}

这个管理器如何生成的那,我们可以看一下init初始化方法

public static void init(String mode) {if (StringUtils.isBlank(mode)) {mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);}// 判断Seata模式,当前为DBStoreMode storeMode = StoreMode.get(mode);if (StoreMode.DB.equals(storeMode)) {// 通过SPI机制读取SessionManager接口实现类,读取的是META-INF.service目录,在通过反射机制创建对象DataBaseSessionManagerROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
....}

读取的文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f9FDfGGf-1676380289870)(image-20220311182615967.png)]

再回到begin方法中,我们就知道DataBaseSessionManager是如何创建的,包括下面这一步就是创建DataBaseSessionManager

// 观察者设计模式,创建DataBaseSessionManagersession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

但是此时有一个问题,就是我们的init方法在哪里调用的拿,其实我们回到Server中,我们发现在构建默认协调者之前就调用了init方法,说明在执行处理全局事务开始之前,就已经创建好了这个SessionManager了

SessionHolder.init(parameterParser.getStoreMode());// 默认协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

好了此时分析清楚如何得到这个SessionManager以后,我们在回过头来看代码session.begin()位置

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务SessionGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager// 观察者设计模式,创建DataBaseSessionManagersession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 全局事务开始session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();
}

session.begin()

@Override
public void begin() throws TransactionException {// 声明全局事务开始this.status = GlobalStatus.Begin;// 开始时间this.beginTime = System.currentTimeMillis();// 激活全局事务this.active = true;// 将SessionManager放入到集合中,调用onBegin方法for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBegin(this);}
}

这里我们来看一下 onBegin方法,调用的是父级的方法,在这其中我们要关注addGlobalSession方法,但是要注意,这里我们用的是db模式所以调用的是db模式的DateBaseSessionManager

@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {addGlobalSession(globalSession);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8PPVVtOi-1676380289871)(image-20220311184728236.png)]

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 写入sessionboolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}
}

然后我们来看写入这里

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {// 第一次进入一定是写入if (LogOperation.GLOBAL_ADD.equals(logOperation)) {return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}
}

因为我们第一次调用一定是写入,所以此时我们应该查看insertGlobalTransactionDO,此方法的作用就是写入全局事务表中global_table

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);ps.setString(1, globalTransactionDO.getXid());ps.setLong(2, globalTransactionDO.getTransactionId());ps.setInt(3, globalTransactionDO.getStatus());ps.setString(4, globalTransactionDO.getApplicationId());ps.setString(5, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,transactionNameColumnSize) : transactionName;ps.setString(6, transactionName);ps.setInt(7, globalTransactionDO.getTimeout());ps.setLong(8, globalTransactionDO.getBeginTime());ps.setString(9, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}

我们可以查看GlobalTransactionDO实体类的属性,和global_table 的字段进行比对,就能看出其中道理。

相关文章:

Seata源码学习(五)- Seata服务端(TC)源码解读

Seata源码分析- Seata服务端(TC)源码解读 上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作 服务端表解释 我们的Seata服务端在应用的时…...

低版本jQuery导致XSS Nuclei FUZZ POC

目录 1.前言 2. Nuclei FUZZ jQuery XSS POC 3.漏洞验证 4.修复建议 1.前言 我记得以前用那些漏扫工具时时常会报一个低版本jQuery的安全问题,当时还不会验证。直到有一天,它托梦给我。我悟了。低版本jQuery导致XSS POC文件文末获取。...

【Linux】进程的描述组织与进程状态

文章目录🎪 进程的描述组织🚀1.什么是进程🚀2.进程的形成🚀3.进程标识符 *⭐3.1 PS命令查看PID⭐3.2 /proc目录查看进程属性🚀4.父子进程⭐4.1 系统调用获取PID⭐4.2 fork创建子进程⭐4.3 fork双返回值问题⭐4.4 写时拷…...

8.2.1.1 WHERE 子句优化

本节讨论可用于处理 WHERE 子句的优化。示例使用 SELECT 语句,但相同的优化适用于 DELETE 和 UPDATE 语句中的 WHERE 子句。 注意 因为 MySQL 优化器的工作正在进行,所以这里并没有记录 MySQL 执行的所有优化。 您可能会尝试重写查询以使算术运算更快&am…...

拆个微波炉,分析一下电路

微波炉是用2450MHz的超高频电磁波来加热食品,它能无损穿越塑料,陶瓷,不能穿越金属,碰到金属会反射,但穿过含水食物,食物内的分子会高速摩擦,产生热量,使食物变熟。在厨房电器中&…...

DM8:DMDSC共享存储集群搭建-共享存储绑定

DM8:DMDSC共享存储集群搭建-共享存储绑定环境介绍:1 发现共享磁盘2 对共享存储进行分区格式化2.1 格式化成功但不可用2.2 解决问题修改错误的分区格式3 配置/etc/rc.d/rc.local3.1 编辑文件(两个节点配置相同)3.2 使rc.local生效4 重启操作系…...

Spark OOM问题常见解决方式

文章目录Spark OOM问题常见解决方式1.map过程产生大量对象导致内存溢出2.数据不平衡导致内存溢出3.coalesce调用导致内存溢出4.shuffle后内存溢出5. standalone模式下资源分配不均匀导致内存溢出6.在RDD中,共用对象能够减少OOM的情况优化1.使用mapPartitions代替大部…...

【Calcite源码学习】ImmutableBitSet介绍

Calcite中实现了一个ImmutableBitSet类,用于保存bit集合。在很多优化规则和物化视图相关的类中都使用了ImmutableBitSet来保存group by字段或者聚合函数参数字段对应的index,例如: //MaterializedViewAggregateRule#compensateViewPartial()…...

RabbitMQ相关概念介绍

这篇文章主要介绍RabbitMQ中几个重要的概念,对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很…...

在jenkins容器内部使用docker

在jenkins容器内部使用docker 1.使用本地的docker 进入/var/run,找到docker.sock [rootnpy run]# ls auditd.pid containerd cryptsetup dmeventd-client docker.pid initramfs lvm netreport sepermit sudo tmpfiles.d user chro…...

分布式事务解决方案

数据不会无缘无故丢失,也不会莫名其妙增加 一、概述 1、曾几何时,知了在一家小公司做项目的时候,都是一个服务打天下,所以涉及到数据一致性的问题,都是直接用本地事务处理。 2、随着时间的推移,用户量增…...

2022黑马Redis跟学笔记.实战篇(三)

2022黑马Redis跟学笔记.实战篇 三4.2.商家查询的缓存功能4.3.1.认识缓存4.3.1.1.什么是缓存4.3.1.2.缓存的作用1.为什么要使用缓存2.如何使用缓存3. 添加商户缓存4. 缓存模型和思路4.3.1.3.缓存的成本4.3.2.添加redis缓存4.3.3.缓存更新策略4.3.3.1.三种策略(1).内存淘汰:Redis…...

hadoop环境新手安装教程

1、资源准备: (1)jdk安装包:我的是1.8.0_202 (2)hadoop安装包:我的是hadoop-3.3.1 注意这里不要下载成下面这个安装包了,我就一开始下载错了 错误示例: 2、主机网络相…...

数据结构与算法基础-学习-11-线性表之链栈的初始化、判断非空、压栈、获取栈长度、弹栈、获取栈顶元素

一、个人理解链栈相较于顺序栈不存在上溢(数据满)的情况,除非内存不足,但存储密度会低于顺序栈,因为会多存一个指针域,其他逻辑和顺序表一致。总结如下:头指针指向栈顶。链栈没有头节点直接就是…...

Hive内置函数

文章目录Hive内置函数字符串函数时间类型函数数学函数集合函数条件函数类型转换函数数据脱敏函数其他函数用户自定义函数Hive内置函数 查询内置函数用法: DESCRIBE FUNCTION EXTENDED 函数名;字符串函数 字符串连接函数:concat带分隔符字符串连接函数…...

Git如何快速入门

什么是Git?我们开发的项目,也需要一个合适的版本控制系统来协助我们更好地管理版本迭代,而Git正是因此而诞生的(有关Git的历史,这里就不多做阐述了,感兴趣的小伙伴可以自行了解,是一位顶级大佬在…...

netcore构建webservice以及调用的完整流程

目录构建前置准备编写服务挂载服务处理SoapHeader调用添加服务调用服务补充内容构建 前置准备 框架版本要求:netcore3.1以上 引入nuget包 SoapCore 编写服务 1.编写服务接口 示例 using System.ServiceModel;namespace Services;[ServiceContract(Namespace &…...

Mysql事务基础(解析)

并发事务带来的问题A和B是并发事务脏写(A被B覆盖)两个事务。B事务覆盖了A事务。解决:应该事务并行脏读(B读到了A的执行中间结果)A修改了东西。B看到了他的中间状态。解决:读写冲突。加锁,改完再…...

2023 年首轮土地销售活动来了 与 The Sandbox 一起体验「体素狂热」!

2 月 14 日晚上 11 点,开始你的体素冒险。 The Sandbox 很高兴推出 2023 年的第一次土地销售活动。欢迎来到「体素狂热 (Voxel Madness)」! 简要概括 土地销售抽奖活动将于北京时间 2 月 14 日星期二晚上 11 点开始 「体素狂热」 土地销售活动将于 2 月…...

vue AntD中栅格布局的四种大小xs,sm,md,lg

cssBootstrap栅格布局的四种大小xs,sm,md,lg前端为了页面在不同大小的设备上也能够正常显示,通常会使用栅格布局的方式来实现。使用bootStrap的网格系统时,常见到一下格式的类名col-*-*visible-*-*hidden_*_* 中间可为xs,xsm,md,lg等表示大小的单词的缩写…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...

基于Java+VUE+MariaDB实现(Web)仿小米商城

仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...