当前位置: 首页 > news >正文

Seata源码学习(五)- Seata服务端(TC)源码解读

Seata源码分析- Seata服务端(TC)源码解读

上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作

服务端表解释

我们的Seata服务端在应用的时候需要准备三张表,那么这三张表分别代表的意思就是

  1. branch_table 分支事务表
  2. global_table 全局事务表
  3. lock_table 全局锁表

客户端请求服务端以后,我们就需要把对应的全局事务包括分支事务和全局锁全部存放到这里。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-54n1wCiF-1676380289869)(image-20220311185545970-1647008601419.png)]

TC服务端启动入口

那么我们任何的Java工程启动都需要主函数main,所以我们就从这里入手,首先在seata源码工程中搜索这个入口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PemtLXu0-1676380289870)(image-20220311175232256.png)]

这里我们看Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务

// 协调协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

全局事务开始方法分析

然后进入到其中我们可以看到很多的全局事务处理的方法

// 处理全局事务开始
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 响应客户端XIDresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}// 处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.commit(request.getXid()));
}// 处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.rollback(request.getXid()));
}
.....

在这其中我们首先关注doGlobalBegin方法中的core.begin()方法,来看一下具体操作

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务SessionGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager// 观察者设计模式session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 全局事务开启session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();
}

在向下我们要关注一下全局Session管理器DataBaseSessionManager,进入到getRootSessionManager()方法中

/**
* Gets root session manager.
* 获取一个全局Session管理器
* @return the root session manager
*/
public static SessionManager getRootSessionManager() {if (ROOT_SESSION_MANAGER == null) {throw new ShouldNeverHappenException("SessionManager is NOT init!");}return ROOT_SESSION_MANAGER;
}

这个管理器如何生成的那,我们可以看一下init初始化方法

public static void init(String mode) {if (StringUtils.isBlank(mode)) {mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);}// 判断Seata模式,当前为DBStoreMode storeMode = StoreMode.get(mode);if (StoreMode.DB.equals(storeMode)) {// 通过SPI机制读取SessionManager接口实现类,读取的是META-INF.service目录,在通过反射机制创建对象DataBaseSessionManagerROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
....}

读取的文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f9FDfGGf-1676380289870)(image-20220311182615967.png)]

再回到begin方法中,我们就知道DataBaseSessionManager是如何创建的,包括下面这一步就是创建DataBaseSessionManager

// 观察者设计模式,创建DataBaseSessionManagersession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

但是此时有一个问题,就是我们的init方法在哪里调用的拿,其实我们回到Server中,我们发现在构建默认协调者之前就调用了init方法,说明在执行处理全局事务开始之前,就已经创建好了这个SessionManager了

SessionHolder.init(parameterParser.getStoreMode());// 默认协调者
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);

好了此时分析清楚如何得到这个SessionManager以后,我们在回过头来看代码session.begin()位置

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务SessionGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 为Session中添加回调监听 SessionHolder.getRootSessionManager()去获取一个全局Session管理器DataBaseSessionManager// 观察者设计模式,创建DataBaseSessionManagersession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 全局事务开始session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();
}

session.begin()

@Override
public void begin() throws TransactionException {// 声明全局事务开始this.status = GlobalStatus.Begin;// 开始时间this.beginTime = System.currentTimeMillis();// 激活全局事务this.active = true;// 将SessionManager放入到集合中,调用onBegin方法for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBegin(this);}
}

这里我们来看一下 onBegin方法,调用的是父级的方法,在这其中我们要关注addGlobalSession方法,但是要注意,这里我们用的是db模式所以调用的是db模式的DateBaseSessionManager

@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {addGlobalSession(globalSession);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8PPVVtOi-1676380289871)(image-20220311184728236.png)]

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 写入sessionboolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}
}

然后我们来看写入这里

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {// 第一次进入一定是写入if (LogOperation.GLOBAL_ADD.equals(logOperation)) {return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}
}

因为我们第一次调用一定是写入,所以此时我们应该查看insertGlobalTransactionDO,此方法的作用就是写入全局事务表中global_table

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);ps.setString(1, globalTransactionDO.getXid());ps.setLong(2, globalTransactionDO.getTransactionId());ps.setInt(3, globalTransactionDO.getStatus());ps.setString(4, globalTransactionDO.getApplicationId());ps.setString(5, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,transactionNameColumnSize) : transactionName;ps.setString(6, transactionName);ps.setInt(7, globalTransactionDO.getTimeout());ps.setLong(8, globalTransactionDO.getBeginTime());ps.setString(9, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}

我们可以查看GlobalTransactionDO实体类的属性,和global_table 的字段进行比对,就能看出其中道理。

相关文章:

Seata源码学习(五)- Seata服务端(TC)源码解读

Seata源码分析- Seata服务端(TC)源码解读 上节课我们已经分析到了SQL语句最终的执行器,但是再往下分析之前,我们需要先来分析一下TM客户端与TC端通讯以后,TC端的具体操作 服务端表解释 我们的Seata服务端在应用的时…...

低版本jQuery导致XSS Nuclei FUZZ POC

目录 1.前言 2. Nuclei FUZZ jQuery XSS POC 3.漏洞验证 4.修复建议 1.前言 我记得以前用那些漏扫工具时时常会报一个低版本jQuery的安全问题,当时还不会验证。直到有一天,它托梦给我。我悟了。低版本jQuery导致XSS POC文件文末获取。...

【Linux】进程的描述组织与进程状态

文章目录🎪 进程的描述组织🚀1.什么是进程🚀2.进程的形成🚀3.进程标识符 *⭐3.1 PS命令查看PID⭐3.2 /proc目录查看进程属性🚀4.父子进程⭐4.1 系统调用获取PID⭐4.2 fork创建子进程⭐4.3 fork双返回值问题⭐4.4 写时拷…...

8.2.1.1 WHERE 子句优化

本节讨论可用于处理 WHERE 子句的优化。示例使用 SELECT 语句,但相同的优化适用于 DELETE 和 UPDATE 语句中的 WHERE 子句。 注意 因为 MySQL 优化器的工作正在进行,所以这里并没有记录 MySQL 执行的所有优化。 您可能会尝试重写查询以使算术运算更快&am…...

拆个微波炉,分析一下电路

微波炉是用2450MHz的超高频电磁波来加热食品,它能无损穿越塑料,陶瓷,不能穿越金属,碰到金属会反射,但穿过含水食物,食物内的分子会高速摩擦,产生热量,使食物变熟。在厨房电器中&…...

DM8:DMDSC共享存储集群搭建-共享存储绑定

DM8:DMDSC共享存储集群搭建-共享存储绑定环境介绍:1 发现共享磁盘2 对共享存储进行分区格式化2.1 格式化成功但不可用2.2 解决问题修改错误的分区格式3 配置/etc/rc.d/rc.local3.1 编辑文件(两个节点配置相同)3.2 使rc.local生效4 重启操作系…...

Spark OOM问题常见解决方式

文章目录Spark OOM问题常见解决方式1.map过程产生大量对象导致内存溢出2.数据不平衡导致内存溢出3.coalesce调用导致内存溢出4.shuffle后内存溢出5. standalone模式下资源分配不均匀导致内存溢出6.在RDD中,共用对象能够减少OOM的情况优化1.使用mapPartitions代替大部…...

【Calcite源码学习】ImmutableBitSet介绍

Calcite中实现了一个ImmutableBitSet类,用于保存bit集合。在很多优化规则和物化视图相关的类中都使用了ImmutableBitSet来保存group by字段或者聚合函数参数字段对应的index,例如: //MaterializedViewAggregateRule#compensateViewPartial()…...

RabbitMQ相关概念介绍

这篇文章主要介绍RabbitMQ中几个重要的概念,对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很…...

在jenkins容器内部使用docker

在jenkins容器内部使用docker 1.使用本地的docker 进入/var/run,找到docker.sock [rootnpy run]# ls auditd.pid containerd cryptsetup dmeventd-client docker.pid initramfs lvm netreport sepermit sudo tmpfiles.d user chro…...

分布式事务解决方案

数据不会无缘无故丢失,也不会莫名其妙增加 一、概述 1、曾几何时,知了在一家小公司做项目的时候,都是一个服务打天下,所以涉及到数据一致性的问题,都是直接用本地事务处理。 2、随着时间的推移,用户量增…...

2022黑马Redis跟学笔记.实战篇(三)

2022黑马Redis跟学笔记.实战篇 三4.2.商家查询的缓存功能4.3.1.认识缓存4.3.1.1.什么是缓存4.3.1.2.缓存的作用1.为什么要使用缓存2.如何使用缓存3. 添加商户缓存4. 缓存模型和思路4.3.1.3.缓存的成本4.3.2.添加redis缓存4.3.3.缓存更新策略4.3.3.1.三种策略(1).内存淘汰:Redis…...

hadoop环境新手安装教程

1、资源准备: (1)jdk安装包:我的是1.8.0_202 (2)hadoop安装包:我的是hadoop-3.3.1 注意这里不要下载成下面这个安装包了,我就一开始下载错了 错误示例: 2、主机网络相…...

数据结构与算法基础-学习-11-线性表之链栈的初始化、判断非空、压栈、获取栈长度、弹栈、获取栈顶元素

一、个人理解链栈相较于顺序栈不存在上溢(数据满)的情况,除非内存不足,但存储密度会低于顺序栈,因为会多存一个指针域,其他逻辑和顺序表一致。总结如下:头指针指向栈顶。链栈没有头节点直接就是…...

Hive内置函数

文章目录Hive内置函数字符串函数时间类型函数数学函数集合函数条件函数类型转换函数数据脱敏函数其他函数用户自定义函数Hive内置函数 查询内置函数用法: DESCRIBE FUNCTION EXTENDED 函数名;字符串函数 字符串连接函数:concat带分隔符字符串连接函数…...

Git如何快速入门

什么是Git?我们开发的项目,也需要一个合适的版本控制系统来协助我们更好地管理版本迭代,而Git正是因此而诞生的(有关Git的历史,这里就不多做阐述了,感兴趣的小伙伴可以自行了解,是一位顶级大佬在…...

netcore构建webservice以及调用的完整流程

目录构建前置准备编写服务挂载服务处理SoapHeader调用添加服务调用服务补充内容构建 前置准备 框架版本要求:netcore3.1以上 引入nuget包 SoapCore 编写服务 1.编写服务接口 示例 using System.ServiceModel;namespace Services;[ServiceContract(Namespace &…...

Mysql事务基础(解析)

并发事务带来的问题A和B是并发事务脏写(A被B覆盖)两个事务。B事务覆盖了A事务。解决:应该事务并行脏读(B读到了A的执行中间结果)A修改了东西。B看到了他的中间状态。解决:读写冲突。加锁,改完再…...

2023 年首轮土地销售活动来了 与 The Sandbox 一起体验「体素狂热」!

2 月 14 日晚上 11 点,开始你的体素冒险。 The Sandbox 很高兴推出 2023 年的第一次土地销售活动。欢迎来到「体素狂热 (Voxel Madness)」! 简要概括 土地销售抽奖活动将于北京时间 2 月 14 日星期二晚上 11 点开始 「体素狂热」 土地销售活动将于 2 月…...

vue AntD中栅格布局的四种大小xs,sm,md,lg

cssBootstrap栅格布局的四种大小xs,sm,md,lg前端为了页面在不同大小的设备上也能够正常显示,通常会使用栅格布局的方式来实现。使用bootStrap的网格系统时,常见到一下格式的类名col-*-*visible-*-*hidden_*_* 中间可为xs,xsm,md,lg等表示大小的单词的缩写…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...

【JavaEE】-- HTTP

1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

智能AI电话机器人系统的识别能力现状与发展水平

一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

pycharm 设置环境出错

pycharm 设置环境出错 pycharm 新建项目,设置虚拟环境,出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...

​​企业大模型服务合规指南:深度解析备案与登记制度​​

伴随AI技术的爆炸式发展,尤其是大模型(LLM)在各行各业的深度应用和整合,企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者,还是积极拥抱AI转型的传统企业,在面向公众…...

sshd代码修改banner

sshd服务连接之后会收到字符串: SSH-2.0-OpenSSH_9.5 容易被hacker识别此服务为sshd服务。 是否可以通过修改此banner达到让人无法识别此服务的目的呢? 不能。因为这是写的SSH的协议中的。 也就是协议规定了banner必须这么写。 SSH- 开头&#xff0c…...