当前位置: 首页 > news >正文

Dubbo源码解析-Dubbo的线程模型(九)

一、Dubbo线程模型

首先明确一个基本概念:IO 线程和业务线程的区别
IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据
打交道的事件。
业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider 上写的代码所执行的线
程环境。

Dubbo 默认采用的是长链接的方式,即默认情况下一个consumer 和一个provider 之间只会建立
一条链接,这种情况下IO 线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;

有两个参数⽤来配置服务消费者和服务提供者直接的socket连接个数:

1. shareconnections:表示可共享的socket连接个数
2. connections:表示不共享的socket连接个数

服务A的shareconnections或者connections为2时,服务A的消费者会向服务A的提供者建⽴两个socket连接:

业务线程就是处理IO 线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯
粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要
比IO 线程池的配置大很多。
IO 线程部分,Netty 服务提供方NettyServer 又使用了两级线程池,master 主要用来接受客户
端的链接请求,并把接受的请求分发给worker 来处理。整个过程如下图:

IO 线程与业务线程的交互如下:

IO 线程的派发策略:

默认是all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
direct:worker 线程接收到事件后,由worker 执行到底。
message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO 线程上执行
execution:只有请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在IO线程上执行
connection:在IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

业务线程池设置:
fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
coresize:200
maxsize:200
队列:SynchronousQueue
回绝策略:AbortPolicyWithReport - 打印线程信息jstack,之后抛出异常
cached:缓存线程池,空闲一分钟自动删除,需要时重建。
limited:可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

配置示例:

<dubbo:protocol name="dubbo"dispatcher="all"threadpool="fixed"threads="100"/>

在整个消费者调用过程中,各个线程池都比较重要,其中比较有特色的就是AllChannelHandler,它完成了IO线程转向用户线程的任务转移,比较关键。 

二、派发策略(All)源码解析

消费者启动的时候会执行DubboProtocol#initClient建议与服务端端的socket连接

private ExchangeClient initClient(URL url) {ExchangeClient client;try {// Replace InstanceAddressURL with ServiceConfigURL.url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(),  url.getParameters());// connection should be lazyif (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;
}
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect最终返回的是NettyClient,点进这个对象的构造方法

public Client connect(URL url, ChannelHandler handler) throws RemotingException {return new NettyClient(url, handler);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}

NettyClient#wrapChannelHandler中再次利用SPI机制获取线程派发策略,dubbo默认的策略为allDispatcher策略。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {return new AllChannelHandler(handler, url);
}

 除了默认的AllDispatcher,还有DirectDispatcher,MessageOnlyDispatcher等。

 当消费端接收到远程服务端的响应之后,按照Netty的处理流程,消息会在channel绑定的handler上传递,netty底层会调用handler#received。可以看到connected,disconnected,received,caught等方法都是在新的线程池ExecutorService中执行,executor.execute方法会将任务ChannelEventRunnable提交到ExecutorService中。

public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}@Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}
}

由于AllChannelHandler方法是在前面handler的基础上包装了一层,所以ChannelEventRunnable中会将消息传递给AllHandlel的下一个handler,从这里也清晰的看到了AllChannelHandler完成了IO线程转向用户线程的任务转移。

public void run() {if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}

三、AllDispatcher策略异常超时问题

Dubbo有一个经典问题,就是当配置了消息派发策略为AllDispatcher时,当服务端线程池满了之后,当消费端再次发送请求,就会一直傻傻等待超时导致没有任何服务端响应。那么问题就出现在AllChannelHandler,前面已经说了AllDispatcher策略就是所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。

问题出现原因:

那么当服务端线程池打满之后,此时又再次来了一个请求,此时依然会提交给线程池执行,那么了解线程池原理的就清楚线程池任务满了之后会执行拒绝策略抛出RejectedExecutionException异常,此时就会进入到received的catch方法中去,然后就又再次抛出ExecutionException异常。

            
public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

那么抛出的异常就又会被netty捕获,进而继续执行nettyHandler的caught方法,可以看到这里又再次将任务丢到了线程池中。但是此时线程池依然是满的,业务线程池所有线程都堵住了,所以也不能将异常消息返回给客户端,然后客户端消费者只能傻傻等到超时。

public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}
}

解决办法可以设置dispatcher为message,只有请求和响应交给业务线程池处理,其他的在IO线程处理,配置如下:

<dubbo:protocol name="dubbo" dispatcher="message" />

后面dubbo也修复了这个问题,再received方法的catch中新加了一部分逻辑,注释的大致意思也就是说:修复当线程池满了之后异常信息无法被发送给消费端的问题(当线程池满了,拒绝执行任务,会引起消费端等待超时),所以代码中判断了下当抛出异常为RejectedExecutionException时,就不把异常抛出交给AllChannelHandler#caught方法中的线程池执行,而是直接用IO线程在通过channel将消息及时反馈给消费者,消费者也就会收到服务端的“threadpool is exhausted ,detail msg”等响应消息。

public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time outif(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

相关文章:

Dubbo源码解析-Dubbo的线程模型(九)

一、Dubbo线程模型 首先明确一个基本概念&#xff1a;IO 线程和业务线程的区别 IO 线程&#xff1a;配置在netty 连接点的用于处理网络数据的线程&#xff0c;主要处理编解码等直接与网络数据 打交道的事件。 业务线程&#xff1a;用于处理具体业务逻辑的线程&#xff0c;可以…...

【Canvas与标志】圆角三角形生化危险警示标志

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>圆角三角形生化危险警示标志 Draft1</title><style type&qu…...

解决Dcat Admin laravel框架登录报错问题,(blocked:mixed-content)

前言 在使用 Dcat Admin 后台登录时&#xff0c;发生 error 报错&#xff1a;(blocked:mixed-content) xhr VM484:1&#xff0c;浏览器拦截 其实这是浏览器在 HTTPS 页面中尝试加载 HTTP 资源&#xff0c;导致浏览器阻止了这些不安全的请求。 解决 在 .env 文件中添加或修改 AD…...

(三)Sping Boot学习——升级jdk1.8-jdk18

1.修改系统环境变量。 2.idea中修改配置。 3.项目setting中设置修改 4.更新后还要重新下载依赖mvn clean install &#xff0c;并且记住reload 项目。同时查看java -version查看一下jdk版本。...

语言模型中的多模态链式推理

神经网络的公式推导 简介摘要引言多模态思维链推理的挑战多模态CoT框架多模态CoT模型架构细节编码模块融合模块解码模块 实验结果运行代码补充细节安装包下载Flan-T5数据集准备rougenltkall-MiniLM-L6-v2运行 简介 本文主要对2023一篇论文《Multimodal Chain-of-Thought Reason…...

SCons:下一代构建工具,如何用 Python 驱动高效构建?

在现代软件开发中&#xff0c;构建工具是开发流程中不可或缺的一环。无论是小型项目还是跨平台的复杂工程&#xff0c;选择一个高效、灵活的工具都能显著提高开发效率和代码质量。SCons&#xff0c;一个以 Python 为基础的构建工具&#xff0c;通过自动化依赖管理、灵活的扩展性…...

springboot 整合 rabbitMQ (延迟队列)

前言&#xff1a; 延迟队列是一个内部有序的数据结构&#xff0c;其主要功能体现在其延时特性上。这种队列存储的元素都设定了特定的处理时间&#xff0c;意味着它们需要在规定的时间点或者延迟之后才能被取出并进行相应的处理。简而言之&#xff0c;延时队列被设计用于存放那…...

ES 基本使用与二次封装

概述 基本了解 Elasticsearch 是一个开源的分布式搜索和分析引擎&#xff0c;基于 Apache Lucene 构建。它提供了对海量数据的快速全文搜索、结构化搜索和分析功能&#xff0c;是目前流行的大数据处理工具之一。主要特点即高效搜索、分布式存储、拓展性强 核心功能 全文搜索:…...

分割一切2.0,SAM2详解

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;编程探索专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月24日20点03分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 论文链接 点击开启你的论文编程之旅…...

Spring AI Fluent API:与AI模型通信的流畅体验

引言 随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;越来越多的应用场景开始融入AI技术以提升用户体验和系统效率。在Java开发中&#xff0c;与AI模型通信成为了一个重要而常见的需求。为了满足这一需求&#xff0c;Spring AI引入了ChatClient&#xff0c…...

基于python的长津湖评论数据分析与可视化,使用是svm情感分析建模

引言 研究背景及意义 上世纪初开始&#xff0c;中国电影就以自己独有的姿态登上了世界电影史的舞台。中国电影作为国家文化和思想观念的反映与延伸&#xff0c;能够增强文化自信&#xff0c;在文化输出方面有着极其重要的作用1[1]。 改革开放以来&#xff0c;随着生产力的提高…...

Lucene(2):Springboot整合全文检索引擎TermInSetQuery应用实例附源码

前言 本章代码已分享至Gitee: https://gitee.com/lengcz/springbootlucene01 接上文。Lucene(1):Springboot整合全文检索引擎Lucene常规入门附源码 如何在指定范围内查询。从lucene 7 开始&#xff0c;filter 被弃用&#xff0c;导致无法进行调节过滤。 TermInSetQuery 指定…...

shell完结

声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&a…...

【2024最新】基于Springboot+Vue的智慧食堂系统Lw+PPT

作者&#xff1a;计算机搬砖家 开发技术&#xff1a;SpringBoot、php、Python、小程序、SSM、Vue、MySQL、JSP、ElementUI等&#xff0c;“文末源码”。 专栏推荐&#xff1a;SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;Java精选实战项…...

NVR小程序接入平台EasyNVR多品牌NVR管理工具:高效管理分散视频资源的解决方案

在当今数字化、智能化的时代背景下&#xff0c;视频监控已成为各行各业不可或缺的一部分&#xff0c;从公共安全到企业运维&#xff0c;再到智慧城市建设&#xff0c;视频资源的管理与应用正面临着前所未有的挑战。如何高效整合、管理这些遍布各地的分散视频资源&#xff0c;成…...

排序算法(三)--插入排序

文章目录 一、插入排序的基本原理二、插入排序的C语言实现三、代码解析 插入排序 C语言实例 一、插入排序的基本原理 插入排序的基本思想是将数组中的元素逐一取出&#xff0c;然后将其插入到已经排好序的部分中的适当位置&#xff0c;直到整个数组排序完成。具体步骤如下&…...

YOLOv11融合[ECCV 2018]RCAN中的RCAB模块及相关改进思路

YOLOv11v10v8使用教程&#xff1a; YOLOv11入门到入土使用教程 YOLOv11改进汇总贴&#xff1a;YOLOv11及自研模型更新汇总 《Image Super-Resolution Using Very Deep Residual Channel Attention Networks》 一、 模块介绍 论文链接&#xff1a;https://arxiv.org/abs/1807…...

排序(Java数据结构)

1. 排序的概念及引用 1.1 排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。(所有的排序都是默认从小到大排序) 稳定性&#xff1a;假定在待排序的记录序列中&#xff…...

【Java 解释器模式】实现高扩展性的医学专家诊断规则引擎

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…...

【超详细】卷积神经网络CNN基本架构以及工作原理详解

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

力扣热题100 k个一组反转链表题解

题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...

【从零开始学习JVM | 第四篇】类加载器和双亲委派机制(高频面试题)

前言&#xff1a; 双亲委派机制对于面试这块来说非常重要&#xff0c;在实际开发中也是经常遇见需要打破双亲委派的需求&#xff0c;今天我们一起来探索一下什么是双亲委派机制&#xff0c;在此之前我们先介绍一下类的加载器。 目录 ​编辑 前言&#xff1a; 类加载器 1. …...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !

我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...