高性能队列 Disruptor 在 IM 系统中的实战
高性能队列 Disruptor 在 IM 系统中的实战
前三期我们介绍了Disruptor的典型使用场景和相关高性能原理,本期我介绍一下Disruptor在IM系统用的应用实战,IM系统即社交聊天系统,对实时性的要求非常高,非常符合Disruptor的使用场景。
本篇文章将结合实际代码,介绍如何在 IM 系统中使用 Disruptor 进行高效的消息转发。
1. Disruptor 在 IM 系统中的作用
在 IM 系统中,用户 A 发送消息给 B、C、D 时,需要根据 B、C、D 所在的服务器节点进行分组,并将消息转发到对应的节点上。为了确保高吞吐量和低延迟,我们使用 Disruptor 作为高性能队列。
2. 代码实现
2.1 初始化 Disruptor
当某个节点 nodeId 还没有对应的 RingBuffer 时,我们需要创建一个新的 Disruptor,并将其存入 ringBufferMap 中。
private final Map<String, RingBuffer<ClusterPublishEvent>> ringBufferMap = new ConcurrentHashMap<>();
public ClusterQueueService(Server server) {
this.mServer = server;
}
public void publishMessage(String nodeId, String fromUser, String clientId, String topic, byte[] payload) {
if (!ringBufferMap.containsKey(nodeId)) {
long st = System.currentTimeMillis();
synchronized (ringBufferMap){
if(!ringBufferMap.containsKey(nodeId)) {
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
Disruptor<ClusterPublishEvent> disruptor = new Disruptor<>(
new ClusterPublishEventFactory(), 1024 * 1024, DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE, strategy);
disruptor.handleEventsWith(new ClusterPublishEventHandler(mServer, nodeId));
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
ringBufferMap.put(nodeId, disruptor.getRingBuffer());
}
}
log.info("publishMessage create RingBuffer cost:{}ms, ringBufferMap:{},size:{}", System.currentTimeMillis() - st, ringBufferMap, ringBufferMap.size());
}
RingBuffer<ClusterPublishEvent> ringBuffer = ringBufferMap.get(nodeId);
long sequence = ringBuffer.next();
// 当环形缓冲区未用完时, 返回的是空对象,否则,返回的是缓存的数据。
ClusterPublishEvent clusterEvent = ringBuffer.get(sequence);
clusterEvent.setFromUser(fromUser);
clusterEvent.setClientId(clientId);
// 此topic,是节点转发的topic: NM2R, NTF,DESTROYUSER, 只有这三种
clusterEvent.setTopic(topic);
clusterEvent.setPayload(payload);
clusterEvent.setTraceId(MDC.get(ImSvcConstants.TRACE_ID));
// 发布事件, 会触发ClusterPublishEventHandler.onEvent方法
ringBuffer.publish(sequence);
}
关键点解析:
-
采用 BlockingWaitStrategy作为等待策略,确保高效的 CPU 资源利用。 -
采用 DaemonThreadFactory.INSTANCE创建线程池,避免应用程序退出时线程未正常回收。 -
handleEventsWith设定事件处理器ClusterPublishEventHandler,用于消息处理。 -
setDefaultExceptionHandler避免异常影响消息处理流程。
2.2 按照节点转发消息
根据用户所在的服务节点,进行消息转发(发送消息事件到Disruptor)
public void publish2Receivers(Long messageId, Set<String> receivers, String exceptClientId, int pullType, String topic) {
//未绑定broker的用户默认由本中心处理
Map<String, String> allReceiverMap = new HashMap<>();
for (String receiver : receivers) {
allReceiverMap.put(receiver, localNodeId);
}
//从分布式缓存获取获取用户路由
Map<String, String> receiverMap = userRouteStore.getAll(receivers);
allReceiverMap.putAll(receiverMap);
Map<String, Set<String>> nodeMap = new HashMap<>();
//使用nodeId分组
allReceiverMap.forEach((receiver, nodeId) -> {
if (!nodeMap.containsKey(nodeId)) {
nodeMap.put(nodeId, new HashSet<>());
}
nodeMap.get(nodeId).add(receiver);
});
//获取可用节点
Cluster cluster = mServer.getHazelcastInstance().getCluster();
Set<Member> members = cluster.getMembers();
List<String> collect = members.stream().map(member -> member.getStringAttribute(HZ_Cluster_Node_ID)).collect(Collectors.toList());
log.info("hazelcast node list:{}",JSON.toJSONString(collect));
Map<String, Member> memberMap = members.stream().collect(Collectors.toMap(
member -> member.getStringAttribute(HZ_Cluster_Node_ID), member -> member, (k1, k2 )->k1));
//按照节点分发
nodeMap.forEach((nodeId, set) -> {
// 转发到其他节点发送
if (!nodeId.equals(localNodeId) && memberMap.containsKey(nodeId)) {
WFCMessage.NotifyMessage2Receivers notifyMessage2Receivers = WFCMessage.NotifyMessage2Receivers.newBuilder()
.setMessageId(messageId)
.addAllReceivers(set)
.setExceptClientId(exceptClientId==null?"":exceptClientId)
.setPullType(pullType)
.setTopic(topic)
.build();
clusterQueueService.publishMessage(nodeId,nodeId,null, IMTopic.NotifyMessage2ReceiversTopic, notifyMessage2Receivers.toByteArray());
}
// 当前节点处理发送
else {
WFCMessage.Message message = mServer.getStore().messagesStore().getMessage(messageId);
if (message != null) {
// Add By Youqibin 16:11 2022/3/15 接收通知前置处理
preHandle(message, set);
mServer.getImBusinessScheduler().execute(() ->messagesPublisher.publish2Receivers(message, set, exceptClientId, pullType, localNodeId));
// Add By Youqibin 16:11 2022/3/15 接收通知后置处理
postHandle(message, set);
}
}
});
}
关键点解析:
-
clusterQueueService.publishMessage, 使用Disruptor发送消息事件,高性能异步处理
2.3 事件处理器 onEvent
当 Disruptor 事件发布后,ClusterPublishEventHandler.onEvent 负责实际的消息转发逻辑。
public class ClusterPublishEventHandler implements EventHandler<ClusterPublishEvent> {
private final Server server;
private final String nodeId;
public ClusterPublishEventHandler(Server server, String nodeId) {
this.server = server;
this.nodeId = nodeId;
}
@Override
public void onEvent(ClusterPublishEvent event, long sequence, boolean endOfBatch) {
log.info("Processing event: {} on node: {}", event, nodeId);
server.forwardMessage(nodeId, event.getFromUser(), event.getClientId(), event.getTopic(), event.getPayload());
}
}
关键点解析:
-
onEvent方法接收到ClusterPublishEvent后,调用server.forwardMessage进行消息转发。 -
endOfBatch用于标识当前事件是否为批处理中的最后一个事件。 -
log.info记录消息处理的关键日志,便于后续排查。
3. 总结
本文介绍了 Disruptor 在 IM 系统中的应用,核心逻辑包括:
-
初始化 Disruptor:为每个 nodeId创建独立的 RingBuffer。 -
按照节点转发消息:将用户消息存入对应节点的 RingBuffer。 -
消息处理: onEvent方法从 RingBuffer 读取消息,并执行转发。
通过 Disruptor,可以大幅降低锁竞争,提高 IM 系统的吞吐量,使其能够在高并发环境下稳定运行。
4. 最后
欢迎关注加瓦点灯,每天推送干货知识,一起进步!
本文由 mdnice 多平台发布
相关文章:
高性能队列 Disruptor 在 IM 系统中的实战
高性能队列 Disruptor 在 IM 系统中的实战 前三期我们介绍了Disruptor的典型使用场景和相关高性能原理,本期我介绍一下Disruptor在IM系统用的应用实战,IM系统即社交聊天系统,对实时性的要求非常高,非常符合Disruptor的使用场景。 …...
原生HTML集合
一、表格 1、固定表格 <div class"tablebox"><div class"table-container"><table id"myTable" border"0" cellspacing"0" cellpadding"0"><thead><tr></tr></thead>…...
ES6 简单练习笔记--变量申明
一、ES5 变量定义 1.在全局作用域中 this 其实就是window对象 <script>console.log(window this) </script>输出结果: true 2.在全局作用域中用var定义一个变量其实就相当于在window上定义了一个属性 例如: var name "孙悟空" 其实就相当于执行了 win…...
2025.1.21——六、BUU XSS COURSE 1
题目来源:buuctf BUU XSS COURSE 1 一、打开靶机,整理信息 有吐槽和登陆两个尝试点,题目名称提示是XSS漏洞 XSS(Cross-Site Scripting)漏洞 1.定义:跨站脚本攻击,是一种常见的 Web 安全漏洞。攻…...
Linux - 五种常见I/O模型
I/O操作 (输入/输出操作, Input/Output) 是指计算机与外部设备就行数据交互的过程. 什么是外部设备: 如键盘, 鼠标, 硬盘, 网卡等. 五种常见的 I/O 模型: 阻塞 I/O非阻塞 I/O信号驱动 I/OI/O 多路复用异步 I/O 阻塞 I/O 阻塞 I/O 的特点: 当用户发起 I/O 请求后, 进程/线程就…...
【负载均衡式在线OJ】加载题目信息(文件版)
目录 如何读取文件 -- 常见流程 代码 如何读取文件 -- 常见流程 在C中使用 std::ifstream来打开文件流是一个常见的操作,用于创建一个输入文件流,并尝试打开名为 question_list的文件。if (!in.is_open()):检查文件是否成功打开。如果文件未…...
“上门按摩” 小程序开发项目:基于 SOP 的全流程管理
在竞争激烈的生活服务市场,“上门按摩” 服务需求日益增长。为满足这一需求,我们启动了 O2O 模式的 “上门按摩” 小程序开发项目,该项目涵盖客户端、系统管理端、技师端。以下将通过各类 SOP 对项目进行全面管理,确保项目顺利推进。 一、项目启动 SOP:5W2H 分析法 What(…...
WPF1-从最简单的xaml开始
1. 最简单的WPF应用 1.1. App.config1.2. App.xaml 和 App.xaml.cs1.3. MainWindow.xaml 和 MainWindow.xaml.cs 2. 正式开始分析 2.1. 声明即定义2.2. 命名空间 2.2.1. xaml的Property和Attribute2.2.2. xaml中命名空间2.2.3. partial关键字 学习WPF,肯定要先学…...
2025牛客寒假算法营2
A题 知识点:模拟 打卡。检查给定的七个整数是否仅包含 1,2,3,5,6 即可。为了便于书写,我们可以反过来,检查这七个整数是否不为 4 和 7。 时间 O(1);空间 O(1)。 #include <bits/stdc.h> using namespace std;signed main()…...
编译Android平台使用的FFmpeg库
目录 前言 一、编译环境 二、搭建环境 1.安装MSYS2 2.更新系统包 2.1 打开MSYS2 MinGW 64-bit终端(mingw64.exe) 2.2 更新所有软件包到最新版本 2.3 安装必要的工具和库。 3. 克隆FFmpeg源码 4. 配置编译选项 5. 执行编译 总结 前言 记录学习…...
【C++高并发服务器WebServer】-2:exec函数簇、进程控制
本文目录 一、exec函数簇介绍二、exec函数簇 一、exec函数簇介绍 exec 函数族的作用是根据指定的文件名找到可执行文件,并用它来取代调用进程的内容,换句话说,就是在调用进程内部执行一个可执行文件。 exec函数族的函数执行成功后不会返回&…...
力扣707题(2)——设计链表
#题目 #3,5和6的代码 今天看剩下几个题的代码,1,2,4的代码已经在上篇博客写过了想看的小伙伴移步到: 力扣707题——设计链表-CSDN博客 //第3题头插法 void addAtHead(int val){ //记录头结点ListNode nhead; //新节点的创建,并让它指向原本头结点的后…...
K8S中ingress详解
Ingress介绍 Kubernetes 集群中,服务(Service)是一种抽象,它定义了一种访问 Pod 的方式,无论这些 Pod 如何变化,服务都保持不变。服务可以被映射到一个静态的 IP 地址(ClusterIP)、一…...
SpringBoot打包为JAR包或WAR 包,这两种打包方式在运行时端口将如何采用?又有什么不同?这篇文章将给你解惑
你们好,我是金金金。 前提 SpringBoot打包方式:不是jar就是war包 场景 写这篇文章也是我遇到一个很不理解的点,所以就去研究了一下 场景如下: 这是后端生产配置文件给项目设置的端口,然后我前端写的url是:我就很纳闷,前端写了域名以及后端服务上下文路径,咋没写端口呢,…...
zabbix6.0安装及常用监控配置
文章目录 部署zabbix-serverzabbix监控节点部署解决zabbix中文乱码创建主机组创建模版配置主机与模版关联 监控boot分区监控网卡流量出网卡流量监控进入和出的总流量监控内存监控服务器端口用户自定应监控key值 (监控mysql查询数量)zabbix触发器监控cpu监控入网卡流量 邮件告警…...
SQL-leetcode—1179. 重新格式化部门表
1179. 重新格式化部门表 表 Department: ---------------------- | Column Name | Type | ---------------------- | id | int | | revenue | int | | month | varchar | ---------------------- 在 SQL 中,(id, month) 是表的联合主键。 这个表格有关…...
JavaWeb 学习笔记 XML 和 Json 篇 | 020
今日推荐语 愿你遇见好天气,愿你的征途铺满了星星——圣埃克苏佩里 日期 学习内容 打卡编号2025年01月23日JavaWeb笔记 XML 和 Json 篇020 前言 哈喽,我是菜鸟阿康。 以下是我的学习笔记,既做打卡也做分享,希望对你也有所帮助…...
在Raspbian上,如何获取树莓派的CPU当前频率
本文不用汇编实现,因为我是要用在 Go 里的,Go 并不支持内联汇编,要用汇编比较麻烦。而且项目并不是很在意性能,所以直接用命令获取内核准备好的。 在Raspbian上,CPU 信息存放在/sys/devices/system/cpu/中,…...
网络打印机的搜索与连接(一)
介绍 网络打印机就是可以通过网络连接上的打印机,这类打印机分2种:自身具有互联网接入功能可以分配IP的打印机我们称为网络打印机、另外一种就是被某台电脑连接上去后通过共享的方式共享到网络里面的我们称为共享打印机。现在还有一种可以通过互联网连接…...
LangChain + llamaFactory + Qwen2-7b-VL 构建本地RAG问答系统
单纯仅靠LLM会产生误导性的 “幻觉”,训练数据会过时,处理特定知识时效率不高,缺乏专业领域的深度洞察,同时在推理能力上也有所欠缺。 正是在这样的背景下,检索增强生成技术(Retrieval-Augmented Generati…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
