当前位置: 首页 > news >正文

Seata服务端的启动过程 学习记录

1.ServerRunner

ServerRunner类实现了CommandLineRunner与DisposableBean接口,将会在Spring容器启动和关闭的时间,分别执行

run 和 destory 方法。

而seata服务端的启动过程,都藏在run方法中

image-20230611101314534

2.整体流程

io.seata.server.Server#start

 public static void start(String[] args) {// create loggerfinal Logger logger = LoggerFactory.getLogger(Server.class);//initialize the parameter parser//Note that the parameter parser should always be the first line to execute.//Because, here we need to parse the parameters needed for startup.//1.解析配置文件ParameterParser parameterParser = new ParameterParser(args);//2.初始化监控MetricsManager.get().init();System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());//3.创建netty用于服务端TC与客户端TM,RM通信NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);//4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务idUUIDGenerator.init(parameterParser.getServerNode());//log store mode : file, db, redis//5.设置全局事务与分支事务持久化的三种方式SessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());//6.创建并初始化事务协调者 并绑定到NettyRemotingServer到TransactionMessageHandler,监听事务消息并处理DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);coordinator.init();nettyRemotingServer.setHandler(coordinator);//7.注册销毁事件// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);//127.0.0.1 and 0.0.0.0 are not valid here.if (NetUtil.isValidIp(parameterParser.getHost(), false)) {XID.setIpAddress(parameterParser.getHost());} else {String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);if (StringUtils.isNotBlank(preferredNetworks)) {XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));} else {XID.setIpAddress(NetUtil.getLocalIp());}}//8.启动nettyRemotingServernettyRemotingServer.init();}

1.解析配置文件

主要从三处地方获取配置

  1. 启动命令中

    解析启动命令使用的是JCommander,主要有以下的命令

    1. help 打印帮助信息

    2. –host -h :设置注册中心ip

    3. “–port”, “-p” :设置注册中心端口

    4. “–storeMode”, “-m” :设置日志存储模式

    5. “–serverNode”, “-n” :设置服务节点

    6. “–seataEnv”, “-e” : seata环境设置

    7. “–sessionStoreMode”, “-ssm” : 设置会话存储模式,主要有三种文件,数据库,redis

    8. “–lockStoreMode”, “-lsm” : 设置锁信息的存储,主要有三种文件,数据库,redis

      image-20230611104649382

  2. 容器中

  3. 配置中心或者文件中

      private void init(String[] args) {try {//从命令行中获取配置信息getCommandParameters(args);//从环境(容器中)获取配置信息getEnvParameters();if (StringUtils.isNotBlank(seataEnv)) {System.setProperty(ENV_PROPERTY_KEY, seataEnv);}//从文件中获取配置信息if (StringUtils.isBlank(storeMode)) {storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);}if (StringUtils.isBlank(sessionStoreMode)) {sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode);}if (StringUtils.isBlank(lockStoreMode)) {lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode);}} catch (ParameterException e) {printError(e);}}
    

2. 初始化监控信息

MetricsManager.get().init();

image-20230611103309800

默认是关闭的

3.创建netty用于服务端TC与客户端TM,RM通信

 ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());//3.创建netty用于服务端TC与客户端TM,RM通信NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

父抽象类 AbstractNettyRemotingServer 中设置 消息处理器

 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap = new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}

4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id

      //4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务idUUIDGenerator.init(parameterParser.getServerNode());

5.初始化SessionManager与LockManager,管理会话与锁信息

SessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());

SessionManager有三个不同的实现,通过SPI机制加载

io.seata.server.storage.file.session.FileSessionManager
io.seata.server.storage.db.session.DataBaseSessionManager
io.seata.server.storage.redis.session.RedisSessionManager

resource下的META-INF文件夹中的service文件,根据要加载类的权限定类名找对应的文件,文件中存放着对应需要加载的类的权限定类型

image-20230611105552100

io.seata.server.session.SessionHolder#init

public static void init(String mode) {if (StringUtils.isBlank(mode)) {mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));}StoreMode storeMode = StoreMode.get(mode);if (StoreMode.DB.equals(storeMode)) {ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());} else if (StoreMode.FILE.equals(storeMode)) {String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,DEFAULT_SESSION_STORE_FILE_DIR);if (StringUtils.isBlank(sessionStorePath)) {throw new StoreException("the {store.file.dir} is empty.");}ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());} else if (StoreMode.REDIS.equals(storeMode)) {ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());} else {// unknown storethrow new IllegalArgumentException("unknown store mode:" + mode);}reload(storeMode);}
EnhancedServiceLoader.load(SessionManager.class, xxxName);

通过SPI机制获取所有需要装配的类后,通过参数二最终选择某一个类

@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager
@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable {
@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager

除了META-INF/services/ 路径,还会找 META-INF/seata/路径下

io.seata.common.loader.EnhancedServiceLoader.InnerEnhancedServiceLoader#findAllExtensionDefinition

image-20230611134407708

LockManager同样也有三个实现类,也是通过SPI机制加载

io.seata.server.storage.db.lock.DataBaseLockManager
io.seata.server.storage.file.lock.FileLockManager
io.seata.server.storage.redis.lock.RedisLockManager

6.创建并初始化事务协调者 DefaultCoordinator 并监听事务消息并处理

创建DefaultCoordinator实列

实列 DefaultCoordinator最重要的就是给两个属性赋值

  1. remotingServer

  2. core

remotingServer是用来TC与TM,RM通信的,而core则是真正的事务协调者(TC)

core的coreMap属性将缓存4个实现类,对应seata的四种模式,XA,AT,TCC,SAGA,实现类同样也是通过SPI机制加载,并在完成加载后缓存到coreMap

key为枚举类BranchType到值,value为SPI加载到对应类

io.seata.server.coordinator.DefaultCore#DefaultCore

image-20230611140027239

image-20230611140005213

image-20230611140232206

DefaultCoordinator 初始化

初始化即启动5个周期线程

			//周期重试回滚事务retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期重试提交事务retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期异步提交事务asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期超时检测timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期清理回滚日志undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);

每个线程池的任务逻辑基本都是一样的,只不过针对不同的事务状态

 protected void handleRetryRollbacking() {//条件对象,只处理对应状态的事务会话SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);sessionCondition.setLazyLoadBranch(true);//查找所有的全局会话 如果当前事务会话持久方式是DB,则从表中条件查找Collection<GlobalSession> rollbackingSessions =SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);//如果为空,表示当前没有事务,则直接返回if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now = System.currentTimeMillis();//遍历所有的事务会话SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {try {//如果是正在回滚中的或者已经死亡的事务会话,跳过// prevent repeated rollbackif (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)&& !rollbackingSession.isDeadSession()) {// The function of this 'return' is 'continue'.return;}//如果超时if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {//有锁则释放全局锁if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}//删除全局事务会话// Prevent thread safety issuesSessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());SessionHelper.endRollbackFailed(rollbackingSession, true);// rollback retry timeout eventMetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);//The function of this 'return' is 'continue'.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//如果没有超时,执行回滚core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}

nettyRemotingServer 通信服务类设置handler

将创建的TC绑定到nettyRemotingServer

7.注册销毁事件

        // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);

将会在Sping容器关闭的时候调用coordinator的destory方法

    @Overridepublic void destroy() {// 1. first shutdown timed taskretryRollbacking.shutdown();retryCommitting.shutdown();asyncCommitting.shutdown();timeoutCheck.shutdown();undoLogDelete.shutdown();branchRemoveExecutor.shutdown();try {retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);} catch (InterruptedException ignore) {}// 2. second close netty flowif (remotingServer instanceof NettyRemotingServer) {((NettyRemotingServer) remotingServer).destroy();}// 3. third destroy SessionHolderSessionHolder.destroy();instance = null;}

8.启动NettyRemotingServer,处理TM与RM消息

    @Overridepublic void init() {// registry processorregisterProcessor();if (initialized.compareAndSet(false, true)) {super.init();}}

registerProcessor 方法主要是注册一系列消息处理器

 private void registerProcessor() {// 1. registry on request message processorServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);// 2. registry on response message processorServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);// 3. registry rm message processorRegRmProcessor regRmProcessor = new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);// 4. registry tm message processorRegTmProcessor regTmProcessor = new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);// 5. registry heartbeat message processorServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}

调用 super.registerProcessor 方法 将 消息处理器与线程池封装成一个Pair,然后在放到map中,map的key为消息类型,value为Pair。

ServerOnRequestProcessor 的 transactionMessageHandler 成员变量为第6步创建的TC

image-20230611143404021

super.init 方法则是启动netty

   @Overridepublic void init() {//启动超时检测super.init();//启动nettyserverBootstrap.start();}
 @Overridepublic void start() {this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker).channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),nettyServerConfig.getWriteBufferHighWaterMark())).localAddress(new InetSocketAddress(getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)).addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});try {this.serverBootstrap.bind(getListenPort()).sync();XID.setPort(getListenPort());LOGGER.info("Server started, service listen port: {}", getListenPort());RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));initialized.set(true);} catch (SocketException se) {throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);} catch (Exception exx) {throw new RuntimeException("Server start failed", exx);}}

在第3步时,创建并设置了双向通道消息处理器 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler

 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap = new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}

image-20230611143857729

ChannelDuplexHandler是Netty框架中的一个双向通道处理器,用于处理网络通信中的读写事件。它继承自ChannelInboundHandler和ChannelOutboundHandler,可以同时处理入站和出站事件。

当通道中有消息时,将调用io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler#channelRead方法

处理消息

    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息的类型,获取Pair,找到消息处理器final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {//处理消息pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());if (allowDumpStack) {String name = ManagementFactory.getRuntimeMXBean().getName();String pid = name.split("@")[0];long idx = System.currentTimeMillis();try {String jstackFile = idx + ".log";LOGGER.info("jstack command will dump to " + jstackFile);Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));} catch (IOException exx) {LOGGER.error(exx.getMessage());}allowDumpStack = false;}}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}

假设此次消息是TM开启全局事务的单条消息,则将交由ServerOnRequestProcessor处理,ServerOnRequestProcessor处理的消息类型如下

 * RM:* 1) {@link MergedWarpMessage}* 2) {@link BranchRegisterRequest}* 3) {@link BranchReportRequest}* 4) {@link GlobalLockQueryRequest}* TM:* 1) {@link MergedWarpMessage}* 2) {@link GlobalBeginRequest}* 3) {@link GlobalCommitRequest}* 4) {@link GlobalReportRequest}* 5) {@link GlobalRollbackRequest}* 6) {@link GlobalStatusRequest}
 @Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}
 private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"+ rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}if (!(message instanceof AbstractMessage)) {return;}//处理多条消息// the batch send request messageif (message instanceof MergedWarpMessage) {if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;for (int i = 0; i < msgs.size(); i++) {AbstractMessage msg = msgs.get(i);int msgId = msgIds.get(i);if (PARALLEL_REQUEST_HANDLE) {CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));} else {handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);}}} else {List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();List<CompletableFuture<Void>> completableFutures = null;for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {if (PARALLEL_REQUEST_HANDLE) {if (completableFutures == null) {completableFutures = new ArrayList<>();}int finalI = i;completableFutures.add(CompletableFuture.runAsync(() -> {results.add(finalI, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext));}));} else {results.add(i,handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));}}if (CollectionUtils.isNotEmpty(completableFutures)) {try {CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();} catch (InterruptedException | ExecutionException e) {LOGGER.error("handle request error: {}", e.getMessage(), e);}}MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);}} else {//处理单条消息// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}

最终将调用io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

    @Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//获取全局事务id放入response中response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}

core.begin方法

    @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventMetricsPublisher.postSessionDoingEvent(session, false);return session.getXid();}
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.lazyLoadBranch = lazyLoadBranch;if (!lazyLoadBranch) {this.branchSessions = new ArrayList<>();}this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;this.xid = XID.generateXID(transactionId);}

首先通过雪花算法生成事务id,然后调用 XID.generateXID(transactionId)

最终格式为 ip:端口号:事务id

public static String generateXID(long tranId) {return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();}

【大部分内容来源于https://saint.blog.csdn.net/article/details/126457129。推荐各位学习seata的朋友看一看】

相关文章:

Seata服务端的启动过程 学习记录

1.ServerRunner ServerRunner类实现了CommandLineRunner与DisposableBean接口&#xff0c;将会在Spring容器启动和关闭的时间&#xff0c;分别执行 run 和 destory 方法。 而seata服务端的启动过程&#xff0c;都藏在run方法中 2.整体流程 io.seata.server.Server#start pu…...

Log4J

引言 为什么要用日志? --> 方便调试代码 什么时候用?什么时候不用? ​ 出错调试代码时候用 生产环境下就不需要,就需要删除 怎么用? --> 输出语句 一、Log4J 1.1 介绍 ​ log4j是Apache的一个开放源代码的项目&#xff0c;通过使用log4j&#xff0c;我们可以控…...

【零基础学机器学习 5】机器学习中的分类:什么是分类以及分类模型

&#x1f468;‍&#x1f4bb; 作者简介&#xff1a;程序员半夏 , 一名全栈程序员&#xff0c;擅长使用各种编程语言和框架&#xff0c;如JavaScript、React、Node.js、Java、Python、Django、MySQL等.专注于大前端与后端的硬核干货分享,同时是一个随缘更新的UP主. 你可以在各个…...

目标检测算法:Faster-RCNN论文解读

目标检测算法&#xff1a;Faster-RCNN论文解读 前言 ​ 其实网上已经有很多很好的解读各种论文的文章了&#xff0c;但是我决定自己也写一写&#xff0c;当然&#xff0c;我的主要目的就是帮助自己梳理、深入理解论文&#xff0c;因为写文章&#xff0c;你必须把你所写的东西表…...

基于Python的接口自动化-Requests模块

目录 引言 一、模块说明 二、Requests模块快速入门 1 发送简单的请求 2 发送带参数的请求 3 定制header头和cookie 4 响应内容 5 发送post请求 6 超时和代理 三、Requests实际应用 引言 在使用Python进行接口自动化测试时&#xff0c;实现接口请求…...

Vue框架中监测数组变化的方法

在 Vue 中&#xff0c;如果直接对数组进行操作&#xff0c;比如使用下标直接修改元素&#xff0c;数组长度不变时&#xff0c; Vue 是无法监测到这种变化的&#xff0c;导致无法触发视图更新。针对该问题&#xff0c;总结如下解决方法&#xff1a; 一、使用 Vue.js 提供的方法…...

PHP isset()函数使用详解,PHP判断变量是否存在

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 isset 一、判断变量是否存在二、判断变量是否为NUL…...

2021~2022 学年第二学期《信息安全》考试试题(A 卷)

北京信息科技大学 2021~2022 学年第二学期《信息安全》考试试题&#xff08;A 卷&#xff09; 课程所在学院&#xff1a;计算机学院 适用专业班级&#xff1a;计科1901-06&#xff0c;重修 考试形式&#xff1a;(闭卷) 一、选择题&#xff08;本题满分10分,共含10道小题,每小题…...

通俗讲解元学习(Meta-Learning)

元学习通俗的来说&#xff0c;就是去学习如何学习&#xff08;Learning to learn&#xff09;,掌握学习的方法&#xff0c;有时候掌握学习的方法比刻苦学习更重要&#xff01; 下面我们进行详细讲解 1. 从传统机器学习到元学习 传统的机器学中&#xff0c;我们选择一个算法&…...

生成全球定位系统、伽利略和北斗二号的Matlab代码及实际数据捕获文件,为测试功能提供完整信号与频谱

使用Matlab生成和分析GNSS信号&#xff08;第一部分&#xff09; 全球导航卫星系统(Global Navigation Satellite System, GNSS)是一个提供全球覆盖的&#xff0c;定位、导航、时间传递服务的系统。由全球定位系统(GPS)&#xff0c;俄罗斯的格洛纳斯(GLONASS)&#xff0c;欧洲…...

Android 14 版本变更总览

Android 14 版本 Android 14 总览Android 14 功能和变更列表行为变更&#xff1a;所有应用行为变更&#xff1a;以 Android 14 或更高版本为目标平台的应用功能和 API 概览 Android 14 总览 https://developer.android.google.cn/about/versions/14?hlzh-cn 文章基于官方资料…...

内网安全:Cobalt Strike 工具 渗透多层内网主机.(正向 || 反向)

内网安全&#xff1a;Cobalt Strike 工具 渗透多层内网主机. Cobalt Strike 是一款以 metasploit 为基础的 GUI 的框架式渗透工具&#xff0c;又被业界人称为 CS。拥有多种协议主机上线方式&#xff0c;集成了端口转发&#xff0c;服务扫描&#xff0c;自动化溢出&#xff0c;…...

ChatGPT 五个写论文的神技巧,让你的老师对你刮目相看!

导读&#xff1a;ChatGPT这款AI工具在推出两个月内就累积了超过1亿用户。我们向您展示如何使用ChatGPT进行写作辅助&#xff0c;以及其他一些有用的写作技巧。 本文字数&#xff1a;2000&#xff0c;阅读时长大约&#xff1a;12分钟 ChatGPT这款AI工具在推出两个月内就累积了超…...

模型服务文档自动生成,要素追溯关联、结构规范易读|ModelWhale 版本更新

整装待发的初夏&#xff0c;ModelWhale 持续聚焦 AI for Science&#xff0c;针对大模型等前沿带来了新一轮的版本更新&#xff0c;期待为你提供更好的使用体验。 本次更新中&#xff0c;ModelWhale 主要进行了以下功能迭代&#xff1a; • 新增 模型服务文档自动生成&#xf…...

《微服务实战》 第三十一章 ShardingSphere - ShardingSphere-JDBC

前言 Apache ShardingSphere 是一款分布式的数据库生态系统&#xff0c; 可以将任意数据库转换为分布式数据库&#xff0c;并通过数据分片、弹性伸缩、加密等能力对原有数据库进行增强。 Apache ShardingSphere 设计哲学为 Database Plus&#xff0c;旨在构建异构数据库上层的…...

【论文阅读】Twin neural network regression is a semi- supervised regression algorithm

论文下载 GitHub bib: ARTICLE{,title {Twin neural network regression is a semi- supervised regression algorithm},author {Sebastian J Wetzel and Roger G Melko and Isaac Tamblyn},journal {Machine Learning: Science and Technology},year {2022},volum…...

java之反射机制和注解(更新中......)

Reflect在文档中的位置&#xff1a; 文档链接&#xff1a;https://docs.oracle.com/javase/8/docs/api/index.html 用于获取类或对象的反射信息。 常用的反射机制重要的类&#xff1a; java.lang.Class&#xff1a;整个字节码&#xff0c;代表一个类型。包含了以下三块内容&a…...

【Unity入门】25.入门结课Demo--神鸟大战怪兽

【Unity入门】入门结课Demo--神鸟大战怪兽 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity入门系列博客&#xff0c;所学知识来自B站阿发老师~感谢 (一) 前言 经过了两个月的学习&#xff0c;我们也顺利的完成了入门课程&#xff0c;最后就用一个Demo作为我们的结课句号吧&am…...

HTTP协议基本格式

HTTP即HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09;&#xff0c;HTTP基于TCP/IP协议传输数据。 目录 Chrome抓包Fiddler代理抓包HTTP协议格式HTTP请求首行URL方法Get方法Post方法Get与Post的区别 请求报头中的属性Cookie和SessionCookie与Session的区别…...

在 ubuntu 22.04 上配置界面服务器 xrdp

文章目录 图形界面解决方案VNCXRDP XRDP 实例安装和配置使用 XRDP 使用原理谁更快 : X11转发 > XRDP > VNC 图形界面解决方案 1. VNC 2. XRDP 3. X11 ssh : // https://blog.csdn.net/u011011827/article/details/131065690VNC 外部开放端口 用的 是 5901-5910 桌面用…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具

第2章 虚拟机性能监控&#xff0c;故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令&#xff1a;jps [options] [hostid] 功能&#xff1a;本地虚拟机进程显示进程ID&#xff08;与ps相同&#xff09;&#xff0c;可同时显示主类&#x…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

springboot整合VUE之在线教育管理系统简介

可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生&#xff0c;小白用户&#xff0c;想学习知识的 有点基础&#xff0c;想要通过项…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)

本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...

如何把工业通信协议转换成http websocket

1.现状 工业通信协议多数工作在边缘设备上&#xff0c;比如&#xff1a;PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发&#xff0c;当设备上用的是modbus从站时&#xff0c;采集设备数据需要开发modbus主站&#xff1b;当设备上用的是西门子PN协议时&#xf…...

大数据驱动企业决策智能化的路径与实践

&#x1f4dd;个人主页&#x1f339;&#xff1a;慌ZHANG-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、引言&#xff1a;数据驱动的企业竞争力重构 在这个瞬息万变的商业时代&#xff0c;“快者胜”的竞争逻辑愈发明显。企业如何在复杂环…...