当前位置: 首页 > news >正文

Seata服务端的启动过程 学习记录

1.ServerRunner

ServerRunner类实现了CommandLineRunner与DisposableBean接口,将会在Spring容器启动和关闭的时间,分别执行

run 和 destory 方法。

而seata服务端的启动过程,都藏在run方法中

image-20230611101314534

2.整体流程

io.seata.server.Server#start

 public static void start(String[] args) {// create loggerfinal Logger logger = LoggerFactory.getLogger(Server.class);//initialize the parameter parser//Note that the parameter parser should always be the first line to execute.//Because, here we need to parse the parameters needed for startup.//1.解析配置文件ParameterParser parameterParser = new ParameterParser(args);//2.初始化监控MetricsManager.get().init();System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());//3.创建netty用于服务端TC与客户端TM,RM通信NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);//4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务idUUIDGenerator.init(parameterParser.getServerNode());//log store mode : file, db, redis//5.设置全局事务与分支事务持久化的三种方式SessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());//6.创建并初始化事务协调者 并绑定到NettyRemotingServer到TransactionMessageHandler,监听事务消息并处理DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);coordinator.init();nettyRemotingServer.setHandler(coordinator);//7.注册销毁事件// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);//127.0.0.1 and 0.0.0.0 are not valid here.if (NetUtil.isValidIp(parameterParser.getHost(), false)) {XID.setIpAddress(parameterParser.getHost());} else {String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);if (StringUtils.isNotBlank(preferredNetworks)) {XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));} else {XID.setIpAddress(NetUtil.getLocalIp());}}//8.启动nettyRemotingServernettyRemotingServer.init();}

1.解析配置文件

主要从三处地方获取配置

  1. 启动命令中

    解析启动命令使用的是JCommander,主要有以下的命令

    1. help 打印帮助信息

    2. –host -h :设置注册中心ip

    3. “–port”, “-p” :设置注册中心端口

    4. “–storeMode”, “-m” :设置日志存储模式

    5. “–serverNode”, “-n” :设置服务节点

    6. “–seataEnv”, “-e” : seata环境设置

    7. “–sessionStoreMode”, “-ssm” : 设置会话存储模式,主要有三种文件,数据库,redis

    8. “–lockStoreMode”, “-lsm” : 设置锁信息的存储,主要有三种文件,数据库,redis

      image-20230611104649382

  2. 容器中

  3. 配置中心或者文件中

      private void init(String[] args) {try {//从命令行中获取配置信息getCommandParameters(args);//从环境(容器中)获取配置信息getEnvParameters();if (StringUtils.isNotBlank(seataEnv)) {System.setProperty(ENV_PROPERTY_KEY, seataEnv);}//从文件中获取配置信息if (StringUtils.isBlank(storeMode)) {storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);}if (StringUtils.isBlank(sessionStoreMode)) {sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode);}if (StringUtils.isBlank(lockStoreMode)) {lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode);}} catch (ParameterException e) {printError(e);}}
    

2. 初始化监控信息

MetricsManager.get().init();

image-20230611103309800

默认是关闭的

3.创建netty用于服务端TC与客户端TM,RM通信

 ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());//3.创建netty用于服务端TC与客户端TM,RM通信NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

父抽象类 AbstractNettyRemotingServer 中设置 消息处理器

 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap = new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}

4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id

      //4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务idUUIDGenerator.init(parameterParser.getServerNode());

5.初始化SessionManager与LockManager,管理会话与锁信息

SessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());

SessionManager有三个不同的实现,通过SPI机制加载

io.seata.server.storage.file.session.FileSessionManager
io.seata.server.storage.db.session.DataBaseSessionManager
io.seata.server.storage.redis.session.RedisSessionManager

resource下的META-INF文件夹中的service文件,根据要加载类的权限定类名找对应的文件,文件中存放着对应需要加载的类的权限定类型

image-20230611105552100

io.seata.server.session.SessionHolder#init

public static void init(String mode) {if (StringUtils.isBlank(mode)) {mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));}StoreMode storeMode = StoreMode.get(mode);if (StoreMode.DB.equals(storeMode)) {ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());} else if (StoreMode.FILE.equals(storeMode)) {String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,DEFAULT_SESSION_STORE_FILE_DIR);if (StringUtils.isBlank(sessionStorePath)) {throw new StoreException("the {store.file.dir} is empty.");}ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());} else if (StoreMode.REDIS.equals(storeMode)) {ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());} else {// unknown storethrow new IllegalArgumentException("unknown store mode:" + mode);}reload(storeMode);}
EnhancedServiceLoader.load(SessionManager.class, xxxName);

通过SPI机制获取所有需要装配的类后,通过参数二最终选择某一个类

@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager
@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable {
@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager

除了META-INF/services/ 路径,还会找 META-INF/seata/路径下

io.seata.common.loader.EnhancedServiceLoader.InnerEnhancedServiceLoader#findAllExtensionDefinition

image-20230611134407708

LockManager同样也有三个实现类,也是通过SPI机制加载

io.seata.server.storage.db.lock.DataBaseLockManager
io.seata.server.storage.file.lock.FileLockManager
io.seata.server.storage.redis.lock.RedisLockManager

6.创建并初始化事务协调者 DefaultCoordinator 并监听事务消息并处理

创建DefaultCoordinator实列

实列 DefaultCoordinator最重要的就是给两个属性赋值

  1. remotingServer

  2. core

remotingServer是用来TC与TM,RM通信的,而core则是真正的事务协调者(TC)

core的coreMap属性将缓存4个实现类,对应seata的四种模式,XA,AT,TCC,SAGA,实现类同样也是通过SPI机制加载,并在完成加载后缓存到coreMap

key为枚举类BranchType到值,value为SPI加载到对应类

io.seata.server.coordinator.DefaultCore#DefaultCore

image-20230611140027239

image-20230611140005213

image-20230611140232206

DefaultCoordinator 初始化

初始化即启动5个周期线程

			//周期重试回滚事务retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期重试提交事务retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期异步提交事务asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期超时检测timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期清理回滚日志undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);

每个线程池的任务逻辑基本都是一样的,只不过针对不同的事务状态

 protected void handleRetryRollbacking() {//条件对象,只处理对应状态的事务会话SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);sessionCondition.setLazyLoadBranch(true);//查找所有的全局会话 如果当前事务会话持久方式是DB,则从表中条件查找Collection<GlobalSession> rollbackingSessions =SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);//如果为空,表示当前没有事务,则直接返回if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now = System.currentTimeMillis();//遍历所有的事务会话SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {try {//如果是正在回滚中的或者已经死亡的事务会话,跳过// prevent repeated rollbackif (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)&& !rollbackingSession.isDeadSession()) {// The function of this 'return' is 'continue'.return;}//如果超时if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {//有锁则释放全局锁if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}//删除全局事务会话// Prevent thread safety issuesSessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());SessionHelper.endRollbackFailed(rollbackingSession, true);// rollback retry timeout eventMetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);//The function of this 'return' is 'continue'.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//如果没有超时,执行回滚core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}

nettyRemotingServer 通信服务类设置handler

将创建的TC绑定到nettyRemotingServer

7.注册销毁事件

        // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);

将会在Sping容器关闭的时候调用coordinator的destory方法

    @Overridepublic void destroy() {// 1. first shutdown timed taskretryRollbacking.shutdown();retryCommitting.shutdown();asyncCommitting.shutdown();timeoutCheck.shutdown();undoLogDelete.shutdown();branchRemoveExecutor.shutdown();try {retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);} catch (InterruptedException ignore) {}// 2. second close netty flowif (remotingServer instanceof NettyRemotingServer) {((NettyRemotingServer) remotingServer).destroy();}// 3. third destroy SessionHolderSessionHolder.destroy();instance = null;}

8.启动NettyRemotingServer,处理TM与RM消息

    @Overridepublic void init() {// registry processorregisterProcessor();if (initialized.compareAndSet(false, true)) {super.init();}}

registerProcessor 方法主要是注册一系列消息处理器

 private void registerProcessor() {// 1. registry on request message processorServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);// 2. registry on response message processorServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);// 3. registry rm message processorRegRmProcessor regRmProcessor = new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);// 4. registry tm message processorRegTmProcessor regTmProcessor = new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);// 5. registry heartbeat message processorServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}

调用 super.registerProcessor 方法 将 消息处理器与线程池封装成一个Pair,然后在放到map中,map的key为消息类型,value为Pair。

ServerOnRequestProcessor 的 transactionMessageHandler 成员变量为第6步创建的TC

image-20230611143404021

super.init 方法则是启动netty

   @Overridepublic void init() {//启动超时检测super.init();//启动nettyserverBootstrap.start();}
 @Overridepublic void start() {this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker).channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),nettyServerConfig.getWriteBufferHighWaterMark())).localAddress(new InetSocketAddress(getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)).addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});try {this.serverBootstrap.bind(getListenPort()).sync();XID.setPort(getListenPort());LOGGER.info("Server started, service listen port: {}", getListenPort());RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));initialized.set(true);} catch (SocketException se) {throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);} catch (Exception exx) {throw new RuntimeException("Server start failed", exx);}}

在第3步时,创建并设置了双向通道消息处理器 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler

 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap = new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}

image-20230611143857729

ChannelDuplexHandler是Netty框架中的一个双向通道处理器,用于处理网络通信中的读写事件。它继承自ChannelInboundHandler和ChannelOutboundHandler,可以同时处理入站和出站事件。

当通道中有消息时,将调用io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler#channelRead方法

处理消息

    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息的类型,获取Pair,找到消息处理器final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {//处理消息pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());if (allowDumpStack) {String name = ManagementFactory.getRuntimeMXBean().getName();String pid = name.split("@")[0];long idx = System.currentTimeMillis();try {String jstackFile = idx + ".log";LOGGER.info("jstack command will dump to " + jstackFile);Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));} catch (IOException exx) {LOGGER.error(exx.getMessage());}allowDumpStack = false;}}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}

假设此次消息是TM开启全局事务的单条消息,则将交由ServerOnRequestProcessor处理,ServerOnRequestProcessor处理的消息类型如下

 * RM:* 1) {@link MergedWarpMessage}* 2) {@link BranchRegisterRequest}* 3) {@link BranchReportRequest}* 4) {@link GlobalLockQueryRequest}* TM:* 1) {@link MergedWarpMessage}* 2) {@link GlobalBeginRequest}* 3) {@link GlobalCommitRequest}* 4) {@link GlobalReportRequest}* 5) {@link GlobalRollbackRequest}* 6) {@link GlobalStatusRequest}
 @Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}
 private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"+ rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}if (!(message instanceof AbstractMessage)) {return;}//处理多条消息// the batch send request messageif (message instanceof MergedWarpMessage) {if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;for (int i = 0; i < msgs.size(); i++) {AbstractMessage msg = msgs.get(i);int msgId = msgIds.get(i);if (PARALLEL_REQUEST_HANDLE) {CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));} else {handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);}}} else {List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();List<CompletableFuture<Void>> completableFutures = null;for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {if (PARALLEL_REQUEST_HANDLE) {if (completableFutures == null) {completableFutures = new ArrayList<>();}int finalI = i;completableFutures.add(CompletableFuture.runAsync(() -> {results.add(finalI, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext));}));} else {results.add(i,handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));}}if (CollectionUtils.isNotEmpty(completableFutures)) {try {CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();} catch (InterruptedException | ExecutionException e) {LOGGER.error("handle request error: {}", e.getMessage(), e);}}MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);}} else {//处理单条消息// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}

最终将调用io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

    @Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//获取全局事务id放入response中response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}

core.begin方法

    @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventMetricsPublisher.postSessionDoingEvent(session, false);return session.getXid();}
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.lazyLoadBranch = lazyLoadBranch;if (!lazyLoadBranch) {this.branchSessions = new ArrayList<>();}this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;this.xid = XID.generateXID(transactionId);}

首先通过雪花算法生成事务id,然后调用 XID.generateXID(transactionId)

最终格式为 ip:端口号:事务id

public static String generateXID(long tranId) {return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();}

【大部分内容来源于https://saint.blog.csdn.net/article/details/126457129。推荐各位学习seata的朋友看一看】

相关文章:

Seata服务端的启动过程 学习记录

1.ServerRunner ServerRunner类实现了CommandLineRunner与DisposableBean接口&#xff0c;将会在Spring容器启动和关闭的时间&#xff0c;分别执行 run 和 destory 方法。 而seata服务端的启动过程&#xff0c;都藏在run方法中 2.整体流程 io.seata.server.Server#start pu…...

Log4J

引言 为什么要用日志? --> 方便调试代码 什么时候用?什么时候不用? ​ 出错调试代码时候用 生产环境下就不需要,就需要删除 怎么用? --> 输出语句 一、Log4J 1.1 介绍 ​ log4j是Apache的一个开放源代码的项目&#xff0c;通过使用log4j&#xff0c;我们可以控…...

【零基础学机器学习 5】机器学习中的分类:什么是分类以及分类模型

&#x1f468;‍&#x1f4bb; 作者简介&#xff1a;程序员半夏 , 一名全栈程序员&#xff0c;擅长使用各种编程语言和框架&#xff0c;如JavaScript、React、Node.js、Java、Python、Django、MySQL等.专注于大前端与后端的硬核干货分享,同时是一个随缘更新的UP主. 你可以在各个…...

目标检测算法:Faster-RCNN论文解读

目标检测算法&#xff1a;Faster-RCNN论文解读 前言 ​ 其实网上已经有很多很好的解读各种论文的文章了&#xff0c;但是我决定自己也写一写&#xff0c;当然&#xff0c;我的主要目的就是帮助自己梳理、深入理解论文&#xff0c;因为写文章&#xff0c;你必须把你所写的东西表…...

基于Python的接口自动化-Requests模块

目录 引言 一、模块说明 二、Requests模块快速入门 1 发送简单的请求 2 发送带参数的请求 3 定制header头和cookie 4 响应内容 5 发送post请求 6 超时和代理 三、Requests实际应用 引言 在使用Python进行接口自动化测试时&#xff0c;实现接口请求…...

Vue框架中监测数组变化的方法

在 Vue 中&#xff0c;如果直接对数组进行操作&#xff0c;比如使用下标直接修改元素&#xff0c;数组长度不变时&#xff0c; Vue 是无法监测到这种变化的&#xff0c;导致无法触发视图更新。针对该问题&#xff0c;总结如下解决方法&#xff1a; 一、使用 Vue.js 提供的方法…...

PHP isset()函数使用详解,PHP判断变量是否存在

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 isset 一、判断变量是否存在二、判断变量是否为NUL…...

2021~2022 学年第二学期《信息安全》考试试题(A 卷)

北京信息科技大学 2021~2022 学年第二学期《信息安全》考试试题&#xff08;A 卷&#xff09; 课程所在学院&#xff1a;计算机学院 适用专业班级&#xff1a;计科1901-06&#xff0c;重修 考试形式&#xff1a;(闭卷) 一、选择题&#xff08;本题满分10分,共含10道小题,每小题…...

通俗讲解元学习(Meta-Learning)

元学习通俗的来说&#xff0c;就是去学习如何学习&#xff08;Learning to learn&#xff09;,掌握学习的方法&#xff0c;有时候掌握学习的方法比刻苦学习更重要&#xff01; 下面我们进行详细讲解 1. 从传统机器学习到元学习 传统的机器学中&#xff0c;我们选择一个算法&…...

生成全球定位系统、伽利略和北斗二号的Matlab代码及实际数据捕获文件,为测试功能提供完整信号与频谱

使用Matlab生成和分析GNSS信号&#xff08;第一部分&#xff09; 全球导航卫星系统(Global Navigation Satellite System, GNSS)是一个提供全球覆盖的&#xff0c;定位、导航、时间传递服务的系统。由全球定位系统(GPS)&#xff0c;俄罗斯的格洛纳斯(GLONASS)&#xff0c;欧洲…...

Android 14 版本变更总览

Android 14 版本 Android 14 总览Android 14 功能和变更列表行为变更&#xff1a;所有应用行为变更&#xff1a;以 Android 14 或更高版本为目标平台的应用功能和 API 概览 Android 14 总览 https://developer.android.google.cn/about/versions/14?hlzh-cn 文章基于官方资料…...

内网安全:Cobalt Strike 工具 渗透多层内网主机.(正向 || 反向)

内网安全&#xff1a;Cobalt Strike 工具 渗透多层内网主机. Cobalt Strike 是一款以 metasploit 为基础的 GUI 的框架式渗透工具&#xff0c;又被业界人称为 CS。拥有多种协议主机上线方式&#xff0c;集成了端口转发&#xff0c;服务扫描&#xff0c;自动化溢出&#xff0c;…...

ChatGPT 五个写论文的神技巧,让你的老师对你刮目相看!

导读&#xff1a;ChatGPT这款AI工具在推出两个月内就累积了超过1亿用户。我们向您展示如何使用ChatGPT进行写作辅助&#xff0c;以及其他一些有用的写作技巧。 本文字数&#xff1a;2000&#xff0c;阅读时长大约&#xff1a;12分钟 ChatGPT这款AI工具在推出两个月内就累积了超…...

模型服务文档自动生成,要素追溯关联、结构规范易读|ModelWhale 版本更新

整装待发的初夏&#xff0c;ModelWhale 持续聚焦 AI for Science&#xff0c;针对大模型等前沿带来了新一轮的版本更新&#xff0c;期待为你提供更好的使用体验。 本次更新中&#xff0c;ModelWhale 主要进行了以下功能迭代&#xff1a; • 新增 模型服务文档自动生成&#xf…...

《微服务实战》 第三十一章 ShardingSphere - ShardingSphere-JDBC

前言 Apache ShardingSphere 是一款分布式的数据库生态系统&#xff0c; 可以将任意数据库转换为分布式数据库&#xff0c;并通过数据分片、弹性伸缩、加密等能力对原有数据库进行增强。 Apache ShardingSphere 设计哲学为 Database Plus&#xff0c;旨在构建异构数据库上层的…...

【论文阅读】Twin neural network regression is a semi- supervised regression algorithm

论文下载 GitHub bib: ARTICLE{,title {Twin neural network regression is a semi- supervised regression algorithm},author {Sebastian J Wetzel and Roger G Melko and Isaac Tamblyn},journal {Machine Learning: Science and Technology},year {2022},volum…...

java之反射机制和注解(更新中......)

Reflect在文档中的位置&#xff1a; 文档链接&#xff1a;https://docs.oracle.com/javase/8/docs/api/index.html 用于获取类或对象的反射信息。 常用的反射机制重要的类&#xff1a; java.lang.Class&#xff1a;整个字节码&#xff0c;代表一个类型。包含了以下三块内容&a…...

【Unity入门】25.入门结课Demo--神鸟大战怪兽

【Unity入门】入门结课Demo--神鸟大战怪兽 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity入门系列博客&#xff0c;所学知识来自B站阿发老师~感谢 (一) 前言 经过了两个月的学习&#xff0c;我们也顺利的完成了入门课程&#xff0c;最后就用一个Demo作为我们的结课句号吧&am…...

HTTP协议基本格式

HTTP即HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09;&#xff0c;HTTP基于TCP/IP协议传输数据。 目录 Chrome抓包Fiddler代理抓包HTTP协议格式HTTP请求首行URL方法Get方法Post方法Get与Post的区别 请求报头中的属性Cookie和SessionCookie与Session的区别…...

在 ubuntu 22.04 上配置界面服务器 xrdp

文章目录 图形界面解决方案VNCXRDP XRDP 实例安装和配置使用 XRDP 使用原理谁更快 : X11转发 > XRDP > VNC 图形界面解决方案 1. VNC 2. XRDP 3. X11 ssh : // https://blog.csdn.net/u011011827/article/details/131065690VNC 外部开放端口 用的 是 5901-5910 桌面用…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

关于easyexcel动态下拉选问题处理

前些日子突然碰到一个问题&#xff0c;说是客户的导入文件模版想支持部分导入内容的下拉选&#xff0c;于是我就找了easyexcel官网寻找解决方案&#xff0c;并没有找到合适的方案&#xff0c;没办法只能自己动手并分享出来&#xff0c;针对Java生成Excel下拉菜单时因选项过多导…...