当前位置: 首页 > news >正文

Seata源码学习(五)- Seata服务端(TC)源码解读

Seata源码分析- Seata服务端(TC)源码解读

上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作

服务端表解释

我们的Seata服务端在应用的时候需要准备三张表,那么这三张表分别代表的意思就是

  1. branch_table 分支事务表
  2. global_table 全局事务表
  3. lock_table 全局锁表

客户端请求服务端以后,我们就需要把对应的全局事务包括分支事务和全局锁全部存放到这里。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-54n1wCiF-1676380289869)(image-20220311185545970-1647008601419.png)]

TC服务端启动入口

那么我们任何的Java工程启动都需要主函数main,所以我们就从这里入手,首先在seata源码工程中搜索这个入口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PemtLXu0-1676380289870)(image-20220311175232256.png)]

这里我们看Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务

// 协调协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

全局事务开始方法分析

然后进入到其中我们可以看到很多的全局事务处理的方法

// 处理全局事务开始
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 响应客户端XIDresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}// 处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.commit(request.getXid()));
}// 处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.rollback(request.getXid()));
}
.....

在这其中我们首先关注doGlobalBegin方法中的core.begin()方法,来看一下具体操作

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务SessionGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager// 观察者设计模式session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 全局事务开启session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();
}

在向下我们要关注一下全局Session管理器DataBaseSessionManager,进入到getRootSessionManager()方法中

/**
* Gets root session manager.
* 获取一个全局Session管理器
* @return the root session manager
*/
public static SessionManager getRootSessionManager() {if (ROOT_SESSION_MANAGER == null) {throw new ShouldNeverHappenException("SessionManager is NOT init!");}return ROOT_SESSION_MANAGER;
}

这个管理器如何生成的那,我们可以看一下init初始化方法

public static void init(String mode) {if (StringUtils.isBlank(mode)) {mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);}// 判断Seata模式,当前为DBStoreMode storeMode = StoreMode.get(mode);if (StoreMode.DB.equals(storeMode)) {// 通过SPI机制读取SessionManager接口实现类,读取的是META-INF.service目录,在通过反射机制创建对象DataBaseSessionManagerROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
....}

读取的文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f9FDfGGf-1676380289870)(image-20220311182615967.png)]

再回到begin方法中,我们就知道DataBaseSessionManager是如何创建的,包括下面这一步就是创建DataBaseSessionManager

// 观察者设计模式,创建DataBaseSessionManagersession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

但是此时有一个问题,就是我们的init方法在哪里调用的拿,其实我们回到Server中,我们发现在构建默认协调者之前就调用了init方法,说明在执行处理全局事务开始之前,就已经创建好了这个SessionManager了

SessionHolder.init(parameterParser.getStoreMode());// 默认协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

好了此时分析清楚如何得到这个SessionManager以后,我们在回过头来看代码session.begin()位置

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务SessionGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager// 观察者设计模式,创建DataBaseSessionManagersession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 全局事务开始session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();
}

session.begin()

@Override
public void begin() throws TransactionException {// 声明全局事务开始this.status = GlobalStatus.Begin;// 开始时间this.beginTime = System.currentTimeMillis();// 激活全局事务this.active = true;// 将SessionManager放入到集合中,调用onBegin方法for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBegin(this);}
}

这里我们来看一下 onBegin方法,调用的是父级的方法,在这其中我们要关注addGlobalSession方法,但是要注意,这里我们用的是db模式所以调用的是db模式的DateBaseSessionManager

@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {addGlobalSession(globalSession);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8PPVVtOi-1676380289871)(image-20220311184728236.png)]

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 写入sessionboolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}
}

然后我们来看写入这里

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {// 第一次进入一定是写入if (LogOperation.GLOBAL_ADD.equals(logOperation)) {return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}
}

因为我们第一次调用一定是写入,所以此时我们应该查看insertGlobalTransactionDO,此方法的作用就是写入全局事务表中global_table

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);ps.setString(1, globalTransactionDO.getXid());ps.setLong(2, globalTransactionDO.getTransactionId());ps.setInt(3, globalTransactionDO.getStatus());ps.setString(4, globalTransactionDO.getApplicationId());ps.setString(5, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,transactionNameColumnSize) : transactionName;ps.setString(6, transactionName);ps.setInt(7, globalTransactionDO.getTimeout());ps.setLong(8, globalTransactionDO.getBeginTime());ps.setString(9, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}

我们可以查看GlobalTransactionDO实体类的属性,和global_table 的字段进行比对,就能看出其中道理。

相关文章:

Seata源码学习(五)- Seata服务端(TC)源码解读

Seata源码分析- Seata服务端(TC)源码解读 上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作 服务端表解释 我们的Seata服务端在应用的时…...

低版本jQuery导致XSS Nuclei FUZZ POC

目录 1.前言 2. Nuclei FUZZ jQuery XSS POC 3.漏洞验证 4.修复建议 1.前言 我记得以前用那些漏扫工具时时常会报一个低版本jQuery的安全问题,当时还不会验证。直到有一天,它托梦给我。我悟了。低版本jQuery导致XSS POC文件文末获取。...

【Linux】进程的描述组织与进程状态

文章目录🎪 进程的描述组织🚀1.什么是进程🚀2.进程的形成🚀3.进程标识符 *⭐3.1 PS命令查看PID⭐3.2 /proc目录查看进程属性🚀4.父子进程⭐4.1 系统调用获取PID⭐4.2 fork创建子进程⭐4.3 fork双返回值问题⭐4.4 写时拷…...

8.2.1.1 WHERE 子句优化

本节讨论可用于处理 WHERE 子句的优化。示例使用 SELECT 语句,但相同的优化适用于 DELETE 和 UPDATE 语句中的 WHERE 子句。 注意 因为 MySQL 优化器的工作正在进行,所以这里并没有记录 MySQL 执行的所有优化。 您可能会尝试重写查询以使算术运算更快&am…...

拆个微波炉,分析一下电路

微波炉是用2450MHz的超高频电磁波来加热食品,它能无损穿越塑料,陶瓷,不能穿越金属,碰到金属会反射,但穿过含水食物,食物内的分子会高速摩擦,产生热量,使食物变熟。在厨房电器中&…...

DM8:DMDSC共享存储集群搭建-共享存储绑定

DM8:DMDSC共享存储集群搭建-共享存储绑定环境介绍:1 发现共享磁盘2 对共享存储进行分区格式化2.1 格式化成功但不可用2.2 解决问题修改错误的分区格式3 配置/etc/rc.d/rc.local3.1 编辑文件(两个节点配置相同)3.2 使rc.local生效4 重启操作系…...

Spark OOM问题常见解决方式

文章目录Spark OOM问题常见解决方式1.map过程产生大量对象导致内存溢出2.数据不平衡导致内存溢出3.coalesce调用导致内存溢出4.shuffle后内存溢出5. standalone模式下资源分配不均匀导致内存溢出6.在RDD中,共用对象能够减少OOM的情况优化1.使用mapPartitions代替大部…...

【Calcite源码学习】ImmutableBitSet介绍

Calcite中实现了一个ImmutableBitSet类,用于保存bit集合。在很多优化规则和物化视图相关的类中都使用了ImmutableBitSet来保存group by字段或者聚合函数参数字段对应的index,例如: //MaterializedViewAggregateRule#compensateViewPartial()…...

RabbitMQ相关概念介绍

这篇文章主要介绍RabbitMQ中几个重要的概念,对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很…...

在jenkins容器内部使用docker

在jenkins容器内部使用docker 1.使用本地的docker 进入/var/run,找到docker.sock [rootnpy run]# ls auditd.pid containerd cryptsetup dmeventd-client docker.pid initramfs lvm netreport sepermit sudo tmpfiles.d user chro…...

分布式事务解决方案

数据不会无缘无故丢失,也不会莫名其妙增加 一、概述 1、曾几何时,知了在一家小公司做项目的时候,都是一个服务打天下,所以涉及到数据一致性的问题,都是直接用本地事务处理。 2、随着时间的推移,用户量增…...

2022黑马Redis跟学笔记.实战篇(三)

2022黑马Redis跟学笔记.实战篇 三4.2.商家查询的缓存功能4.3.1.认识缓存4.3.1.1.什么是缓存4.3.1.2.缓存的作用1.为什么要使用缓存2.如何使用缓存3. 添加商户缓存4. 缓存模型和思路4.3.1.3.缓存的成本4.3.2.添加redis缓存4.3.3.缓存更新策略4.3.3.1.三种策略(1).内存淘汰:Redis…...

hadoop环境新手安装教程

1、资源准备: (1)jdk安装包:我的是1.8.0_202 (2)hadoop安装包:我的是hadoop-3.3.1 注意这里不要下载成下面这个安装包了,我就一开始下载错了 错误示例: 2、主机网络相…...

数据结构与算法基础-学习-11-线性表之链栈的初始化、判断非空、压栈、获取栈长度、弹栈、获取栈顶元素

一、个人理解链栈相较于顺序栈不存在上溢(数据满)的情况,除非内存不足,但存储密度会低于顺序栈,因为会多存一个指针域,其他逻辑和顺序表一致。总结如下:头指针指向栈顶。链栈没有头节点直接就是…...

Hive内置函数

文章目录Hive内置函数字符串函数时间类型函数数学函数集合函数条件函数类型转换函数数据脱敏函数其他函数用户自定义函数Hive内置函数 查询内置函数用法: DESCRIBE FUNCTION EXTENDED 函数名;字符串函数 字符串连接函数:concat带分隔符字符串连接函数…...

Git如何快速入门

什么是Git?我们开发的项目,也需要一个合适的版本控制系统来协助我们更好地管理版本迭代,而Git正是因此而诞生的(有关Git的历史,这里就不多做阐述了,感兴趣的小伙伴可以自行了解,是一位顶级大佬在…...

netcore构建webservice以及调用的完整流程

目录构建前置准备编写服务挂载服务处理SoapHeader调用添加服务调用服务补充内容构建 前置准备 框架版本要求:netcore3.1以上 引入nuget包 SoapCore 编写服务 1.编写服务接口 示例 using System.ServiceModel;namespace Services;[ServiceContract(Namespace &…...

Mysql事务基础(解析)

并发事务带来的问题A和B是并发事务脏写(A被B覆盖)两个事务。B事务覆盖了A事务。解决:应该事务并行脏读(B读到了A的执行中间结果)A修改了东西。B看到了他的中间状态。解决:读写冲突。加锁,改完再…...

2023 年首轮土地销售活动来了 与 The Sandbox 一起体验「体素狂热」!

2 月 14 日晚上 11 点,开始你的体素冒险。 The Sandbox 很高兴推出 2023 年的第一次土地销售活动。欢迎来到「体素狂热 (Voxel Madness)」! 简要概括 土地销售抽奖活动将于北京时间 2 月 14 日星期二晚上 11 点开始 「体素狂热」 土地销售活动将于 2 月…...

vue AntD中栅格布局的四种大小xs,sm,md,lg

cssBootstrap栅格布局的四种大小xs,sm,md,lg前端为了页面在不同大小的设备上也能够正常显示,通常会使用栅格布局的方式来实现。使用bootStrap的网格系统时,常见到一下格式的类名col-*-*visible-*-*hidden_*_* 中间可为xs,xsm,md,lg等表示大小的单词的缩写…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 ​ 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...