Dubbo源码解析-Dubbo的线程模型(九)
一、Dubbo线程模型
首先明确一个基本概念:IO 线程和业务线程的区别
IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据
打交道的事件。
业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider 上写的代码所执行的线
程环境。
Dubbo 默认采用的是长链接的方式,即默认情况下一个consumer 和一个provider 之间只会建立
一条链接,这种情况下IO 线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;
有两个参数⽤来配置服务消费者和服务提供者直接的socket连接个数:
1. shareconnections:表示可共享的socket连接个数
2. connections:表示不共享的socket连接个数
服务A的shareconnections或者connections为2时,服务A的消费者会向服务A的提供者建⽴两个socket连接:
业务线程就是处理IO 线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯
粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要
比IO 线程池的配置大很多。
IO 线程部分,Netty 服务提供方NettyServer 又使用了两级线程池,master 主要用来接受客户
端的链接请求,并把接受的请求分发给worker 来处理。整个过程如下图:
IO 线程与业务线程的交互如下:
IO 线程的派发策略:
默认是all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
direct:worker 线程接收到事件后,由worker 执行到底。
message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO 线程上执行
execution:只有请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在IO线程上执行
connection:在IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
业务线程池设置:
fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
coresize:200
maxsize:200
队列:SynchronousQueue
回绝策略:AbortPolicyWithReport - 打印线程信息jstack,之后抛出异常
cached:缓存线程池,空闲一分钟自动删除,需要时重建。
limited:可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
配置示例:
<dubbo:protocol name="dubbo"dispatcher="all"threadpool="fixed"threads="100"/>
在整个消费者调用过程中,各个线程池都比较重要,其中比较有特色的就是AllChannelHandler,它完成了IO线程转向用户线程的任务转移,比较关键。
二、派发策略(All)源码解析
消费者启动的时候会执行DubboProtocol#initClient建议与服务端端的socket连接
private ExchangeClient initClient(URL url) {ExchangeClient client;try {// Replace InstanceAddressURL with ServiceConfigURL.url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getParameters());// connection should be lazyif (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;
}
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect最终返回的是NettyClient,点进这个对象的构造方法
public Client connect(URL url, ChannelHandler handler) throws RemotingException {return new NettyClient(url, handler);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}
NettyClient#wrapChannelHandler中再次利用SPI机制获取线程派发策略,dubbo默认的策略为allDispatcher策略。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {return new AllChannelHandler(handler, url);
}
除了默认的AllDispatcher,还有DirectDispatcher,MessageOnlyDispatcher等。
当消费端接收到远程服务端的响应之后,按照Netty的处理流程,消息会在channel绑定的handler上传递,netty底层会调用handler#received。可以看到connected,disconnected,received,caught等方法都是在新的线程池ExecutorService中执行,executor.execute方法会将任务ChannelEventRunnable提交到ExecutorService中。
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}@Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}
}
由于AllChannelHandler方法是在前面handler的基础上包装了一层,所以ChannelEventRunnable中会将消息传递给AllHandlel的下一个handler,从这里也清晰的看到了AllChannelHandler完成了IO线程转向用户线程的任务转移。
public void run() {if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}
三、AllDispatcher策略异常超时问题
Dubbo有一个经典问题,就是当配置了消息派发策略为AllDispatcher时,当服务端线程池满了之后,当消费端再次发送请求,就会一直傻傻等待超时导致没有任何服务端响应。那么问题就出现在AllChannelHandler,前面已经说了AllDispatcher策略就是所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
问题出现原因:
那么当服务端线程池打满之后,此时又再次来了一个请求,此时依然会提交给线程池执行,那么了解线程池原理的就清楚线程池任务满了之后会执行拒绝策略抛出RejectedExecutionException异常,此时就会进入到received的catch方法中去,然后就又再次抛出ExecutionException异常。
public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}
那么抛出的异常就又会被netty捕获,进而继续执行nettyHandler的caught方法,可以看到这里又再次将任务丢到了线程池中。但是此时线程池依然是满的,业务线程池所有线程都堵住了,所以也不能将异常消息返回给客户端,然后客户端消费者只能傻傻等到超时。
public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}
}
解决办法可以设置dispatcher为message,只有请求和响应交给业务线程池处理,其他的在IO线程处理,配置如下:
<dubbo:protocol name="dubbo" dispatcher="message" />
后面dubbo也修复了这个问题,再received方法的catch中新加了一部分逻辑,注释的大致意思也就是说:修复当线程池满了之后异常信息无法被发送给消费端的问题(当线程池满了,拒绝执行任务,会引起消费端等待超时),所以代码中判断了下当抛出异常为RejectedExecutionException时,就不把异常抛出交给AllChannelHandler#caught方法中的线程池执行,而是直接用IO线程在通过channel将消息及时反馈给消费者,消费者也就会收到服务端的“threadpool is exhausted ,detail msg”等响应消息。
public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time outif(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}
相关文章:

Dubbo源码解析-Dubbo的线程模型(九)
一、Dubbo线程模型 首先明确一个基本概念:IO 线程和业务线程的区别 IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据 打交道的事件。 业务线程:用于处理具体业务逻辑的线程,可以…...

【Canvas与标志】圆角三角形生化危险警示标志
【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>圆角三角形生化危险警示标志 Draft1</title><style type&qu…...

解决Dcat Admin laravel框架登录报错问题,(blocked:mixed-content)
前言 在使用 Dcat Admin 后台登录时,发生 error 报错:(blocked:mixed-content) xhr VM484:1,浏览器拦截 其实这是浏览器在 HTTPS 页面中尝试加载 HTTP 资源,导致浏览器阻止了这些不安全的请求。 解决 在 .env 文件中添加或修改 AD…...

(三)Sping Boot学习——升级jdk1.8-jdk18
1.修改系统环境变量。 2.idea中修改配置。 3.项目setting中设置修改 4.更新后还要重新下载依赖mvn clean install ,并且记住reload 项目。同时查看java -version查看一下jdk版本。...

语言模型中的多模态链式推理
神经网络的公式推导 简介摘要引言多模态思维链推理的挑战多模态CoT框架多模态CoT模型架构细节编码模块融合模块解码模块 实验结果运行代码补充细节安装包下载Flan-T5数据集准备rougenltkall-MiniLM-L6-v2运行 简介 本文主要对2023一篇论文《Multimodal Chain-of-Thought Reason…...

SCons:下一代构建工具,如何用 Python 驱动高效构建?
在现代软件开发中,构建工具是开发流程中不可或缺的一环。无论是小型项目还是跨平台的复杂工程,选择一个高效、灵活的工具都能显著提高开发效率和代码质量。SCons,一个以 Python 为基础的构建工具,通过自动化依赖管理、灵活的扩展性…...

springboot 整合 rabbitMQ (延迟队列)
前言: 延迟队列是一个内部有序的数据结构,其主要功能体现在其延时特性上。这种队列存储的元素都设定了特定的处理时间,意味着它们需要在规定的时间点或者延迟之后才能被取出并进行相应的处理。简而言之,延时队列被设计用于存放那…...

ES 基本使用与二次封装
概述 基本了解 Elasticsearch 是一个开源的分布式搜索和分析引擎,基于 Apache Lucene 构建。它提供了对海量数据的快速全文搜索、结构化搜索和分析功能,是目前流行的大数据处理工具之一。主要特点即高效搜索、分布式存储、拓展性强 核心功能 全文搜索:…...

分割一切2.0,SAM2详解
🏡作者主页:点击! 🤖编程探索专栏:点击! ⏰️创作时间:2024年11月24日20点03分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 论文链接 点击开启你的论文编程之旅…...

Spring AI Fluent API:与AI模型通信的流畅体验
引言 随着人工智能(AI)技术的飞速发展,越来越多的应用场景开始融入AI技术以提升用户体验和系统效率。在Java开发中,与AI模型通信成为了一个重要而常见的需求。为了满足这一需求,Spring AI引入了ChatClient,…...

基于python的长津湖评论数据分析与可视化,使用是svm情感分析建模
引言 研究背景及意义 上世纪初开始,中国电影就以自己独有的姿态登上了世界电影史的舞台。中国电影作为国家文化和思想观念的反映与延伸,能够增强文化自信,在文化输出方面有着极其重要的作用1[1]。 改革开放以来,随着生产力的提高…...

Lucene(2):Springboot整合全文检索引擎TermInSetQuery应用实例附源码
前言 本章代码已分享至Gitee: https://gitee.com/lengcz/springbootlucene01 接上文。Lucene(1):Springboot整合全文检索引擎Lucene常规入门附源码 如何在指定范围内查询。从lucene 7 开始,filter 被弃用,导致无法进行调节过滤。 TermInSetQuery 指定…...

shell完结
声明! 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&a…...

【2024最新】基于Springboot+Vue的智慧食堂系统Lw+PPT
作者:计算机搬砖家 开发技术:SpringBoot、php、Python、小程序、SSM、Vue、MySQL、JSP、ElementUI等,“文末源码”。 专栏推荐:SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:Java精选实战项…...

NVR小程序接入平台EasyNVR多品牌NVR管理工具:高效管理分散视频资源的解决方案
在当今数字化、智能化的时代背景下,视频监控已成为各行各业不可或缺的一部分,从公共安全到企业运维,再到智慧城市建设,视频资源的管理与应用正面临着前所未有的挑战。如何高效整合、管理这些遍布各地的分散视频资源,成…...

排序算法(三)--插入排序
文章目录 一、插入排序的基本原理二、插入排序的C语言实现三、代码解析 插入排序 C语言实例 一、插入排序的基本原理 插入排序的基本思想是将数组中的元素逐一取出,然后将其插入到已经排好序的部分中的适当位置,直到整个数组排序完成。具体步骤如下&…...

YOLOv11融合[ECCV 2018]RCAN中的RCAB模块及相关改进思路
YOLOv11v10v8使用教程: YOLOv11入门到入土使用教程 YOLOv11改进汇总贴:YOLOv11及自研模型更新汇总 《Image Super-Resolution Using Very Deep Residual Channel Attention Networks》 一、 模块介绍 论文链接:https://arxiv.org/abs/1807…...

排序(Java数据结构)
1. 排序的概念及引用 1.1 排序的概念 排序:所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。(所有的排序都是默认从小到大排序) 稳定性:假定在待排序的记录序列中ÿ…...

【Java 解释器模式】实现高扩展性的医学专家诊断规则引擎
🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…...

【超详细】卷积神经网络CNN基本架构以及工作原理详解
《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…...

Html前后端Ajax交互数据前端JavaScript脚本后台C#ashx服务
本示例使用设备:https://item.taobao.com/item.htm?spma21dvs.23580594.0.0.52de2c1bU8Fdbo&ftt&id615391857885 前端以GET模式向后台请求数据 function MyGetAjax() {var xhr new XMLHttpRequest();xhr.open(GET, http://192.168.1.211/HttpReader.ash…...

问:Spring Boot应用监控组件工具,梳理一下?
在日常运维与开发过程中,Spring Boot 应用的监控是确保系统稳定性和性能的关键环节。本文将探讨 Spring Boot 常用的监控组件及工具的原理、适用场景,并针对不同场景下的运维监控方案进行介绍。 1. Spring Boot Actuator 原理: Spring Boo…...

利用Hooka开源的多种功能shellcode加载器实现快速免杀火绒,静态360+360杀毒,微步查杀1,vt查杀7(教程)
免责声明: 本文旨在提供有关特定漏洞的深入信息,帮助用户充分了解潜在的安全风险。发布此信息的目的在于提升网络安全意识和推动技术进步,未经授权访问系统、网络或应用程序,可能会导致法律责任或严重后果。因此,作者不对读者基于…...

2025-2026财年美国CISA国际战略规划(下)
文章目录 前言四、加强综合网络防御(一)与合作伙伴共同实施网络防御,降低集体风险推动措施有效性衡量 (二)大规模推动标准和安全,以提高网络安全推动措施有效性衡量 (三)提高主要合作…...

iframe通过url方式来获传递的参数
iframe通过url方式来获传递的参数 一、src"http://xxxx/#/policyOverview?codeaaaa"二、 src"/static/iframePhone/html/main.html?codeaaaa" 一、src“http://xxxx/#/policyOverview?codeaaaa” <iframedata-v-47a50536""src"http:/…...

蓝桥杯不知道叫什么题目
小蓝有一个整数,初始值为1,他可以花费一些代价对这个整数进行变换。 小蓝可以花贵1的代价将教数增加1。 小蓝可以花费3的代价将整数增加一个值,这个值是整数的数位中最大的那个(1到9) .小蓝可以花费10的代价将整数变为原来的2倍, 例如,如果整…...

最多可收集的水果数目
三个小朋友收集水果问题:最大水果收集路径 问题描述 有一个游戏,游戏由 n x n 个房间网格状排布组成。给定一个大小为 n x n 的二维整数数组 fruits,其中 fruits[i][j] 表示房间 (i, j) 中的水果数目。 游戏开始时,三个小朋友分…...

戴尔 AI Factory 上的 Agentic RAG 搭载 NVIDIA 和 Elasticsearch 向量数据库
作者:来自 Elastic Hemant Malik, Dell Team 我们很高兴与戴尔合作撰写白皮书《戴尔 AI Factory with NVIDIA 上的 Agentic RAG》。白皮书是一份供开发人员参考的设计文档,概述了实施 Agentic 检索增强生成 (retrieval augmented generation - RAG) 应用…...

HarmonyOS4+NEXT星河版入门与项目实战(16)------ 状态管理 @State(页面数据刷新与渲染)
文章目录 1、@State装饰器2、视图渲染演示1、无嵌套的对象属性值变化时可以触发页面渲染2、嵌套对象的嵌套属性值变化时不能够触发页面刷新渲染3、数组中对象的属性值变化时不能触发页面刷新渲染3、总结1、@State装饰器 2、视图渲染演示 常规的 string、number 这里就不演示了…...

Origin教程003:数据导入(2)-从文件导入和导入矩阵数据
文章目录 3.3 从文件导入3.3.1 导入txt文件3.3.2 导入excel文件3.3.3 合并工作表3.4 导入矩阵数据3.3 从文件导入 所需数据 https://download.csdn.net/download/WwLK123/900267473.3.1 导入txt文件 选择【数据->从文件导入->导入向导】: 选择文件之后,点击完成即可…...