当前位置: 首页 > news >正文

高性能队列 Disruptor 在 IM 系统中的实战

高性能队列 Disruptor 在 IM 系统中的实战

前三期我们介绍了Disruptor的典型使用场景和相关高性能原理,本期我介绍一下Disruptor在IM系统用的应用实战,IM系统即社交聊天系统,对实时性的要求非常高,非常符合Disruptor的使用场景。

本篇文章将结合实际代码,介绍如何在 IM 系统中使用 Disruptor 进行高效的消息转发。

1. Disruptor 在 IM 系统中的作用

在 IM 系统中,用户 A 发送消息给 B、C、D 时,需要根据 B、C、D 所在的服务器节点进行分组,并将消息转发到对应的节点上。为了确保高吞吐量和低延迟,我们使用 Disruptor 作为高性能队列。

2. 代码实现

2.1 初始化 Disruptor

当某个节点 nodeId 还没有对应的 RingBuffer 时,我们需要创建一个新的 Disruptor,并将其存入 ringBufferMap 中。

   private final Map<String, RingBuffer<ClusterPublishEvent>> ringBufferMap = new ConcurrentHashMap<>();

    public ClusterQueueService(Server server) {
        this.mServer = server;
    }

    public void publishMessage(String nodeId, String fromUser, String clientId, String topic, byte[] payload) {
        if (!ringBufferMap.containsKey(nodeId)) {
            long st = System.currentTimeMillis();
            synchronized (ringBufferMap){
                if(!ringBufferMap.containsKey(nodeId)) {
                    BlockingWaitStrategy strategy = new BlockingWaitStrategy();
                    Disruptor<ClusterPublishEvent> disruptor = new Disruptor<>(
                        new ClusterPublishEventFactory(), 1024 * 1024, DaemonThreadFactory.INSTANCE,
                        ProducerType.SINGLE, strategy);
                    disruptor.handleEventsWith(new ClusterPublishEventHandler(mServer, nodeId));
                    disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
                    disruptor.start();
                    ringBufferMap.put(nodeId, disruptor.getRingBuffer());
                }
            }
            log.info("publishMessage create RingBuffer cost:{}ms, ringBufferMap:{},size:{}", System.currentTimeMillis() - st, ringBufferMap, ringBufferMap.size());
        }
        RingBuffer<ClusterPublishEvent> ringBuffer = ringBufferMap.get(nodeId);
        long sequence = ringBuffer.next();
        // 当环形缓冲区未用完时, 返回的是空对象,否则,返回的是缓存的数据。
        ClusterPublishEvent clusterEvent = ringBuffer.get(sequence);
        clusterEvent.setFromUser(fromUser);
        clusterEvent.setClientId(clientId);
        // 此topic,是节点转发的topic: NM2R, NTF,DESTROYUSER, 只有这三种
        clusterEvent.setTopic(topic);
        clusterEvent.setPayload(payload);
        clusterEvent.setTraceId(MDC.get(ImSvcConstants.TRACE_ID));
        // 发布事件, 会触发ClusterPublishEventHandler.onEvent方法
        ringBuffer.publish(sequence);
    }

关键点解析:

  • 采用 BlockingWaitStrategy 作为等待策略,确保高效的 CPU 资源利用。
  • 采用 DaemonThreadFactory.INSTANCE 创建线程池,避免应用程序退出时线程未正常回收。
  • handleEventsWith 设定事件处理器 ClusterPublishEventHandler,用于消息处理。
  • setDefaultExceptionHandler 避免异常影响消息处理流程。

2.2 按照节点转发消息

根据用户所在的服务节点,进行消息转发(发送消息事件到Disruptor)

    public void publish2Receivers(Long messageId, Set<String> receivers, String exceptClientId, int pullType, String topic) {
        //未绑定broker的用户默认由本中心处理
        Map<String, String> allReceiverMap = new HashMap<>();
        for (String receiver : receivers) {
            allReceiverMap.put(receiver, localNodeId);
        }
        //从分布式缓存获取获取用户路由
        Map<String, String> receiverMap = userRouteStore.getAll(receivers);
        allReceiverMap.putAll(receiverMap);
        Map<String, Set<String>> nodeMap = new HashMap<>();
        //使用nodeId分组
        allReceiverMap.forEach((receiver, nodeId) -> {
            if (!nodeMap.containsKey(nodeId)) {
                nodeMap.put(nodeId, new HashSet<>());
            }
            nodeMap.get(nodeId).add(receiver);
        });
        //获取可用节点
        Cluster cluster = mServer.getHazelcastInstance().getCluster();
        Set<Member> members = cluster.getMembers();
        List<String> collect = members.stream().map(member -> member.getStringAttribute(HZ_Cluster_Node_ID)).collect(Collectors.toList());
        log.info("hazelcast node list:{}",JSON.toJSONString(collect));
        Map<String, Member> memberMap = members.stream().collect(Collectors.toMap(
            member -> member.getStringAttribute(HZ_Cluster_Node_ID), member -> member, (k1, k2 )->k1));
        //按照节点分发
        nodeMap.forEach((nodeId, set) -> {
            // 转发到其他节点发送
            if (!nodeId.equals(localNodeId) && memberMap.containsKey(nodeId)) {
                WFCMessage.NotifyMessage2Receivers notifyMessage2Receivers = WFCMessage.NotifyMessage2Receivers.newBuilder()
                    .setMessageId(messageId)
                    .addAllReceivers(set)
                    .setExceptClientId(exceptClientId==null?"":exceptClientId)
                    .setPullType(pullType)
                    .setTopic(topic)
                    .build();
                clusterQueueService.publishMessage(nodeId,nodeId,null, IMTopic.NotifyMessage2ReceiversTopic, notifyMessage2Receivers.toByteArray());
            }
            // 当前节点处理发送
            else {
                WFCMessage.Message message = mServer.getStore().messagesStore().getMessage(messageId);

                if (message != null) {
                    // Add By Youqibin 16:11 2022/3/15 接收通知前置处理
                    preHandle(message, set);
                    mServer.getImBusinessScheduler().execute(() ->messagesPublisher.publish2Receivers(message, set, exceptClientId, pullType, localNodeId));
                    // Add By Youqibin 16:11 2022/3/15 接收通知后置处理
                    postHandle(message, set);
                }
            }
        });
    }

关键点解析:

  • clusterQueueService.publishMessage, 使用Disruptor发送消息事件,高性能异步处理

2.3 事件处理器 onEvent

当 Disruptor 事件发布后,ClusterPublishEventHandler.onEvent 负责实际的消息转发逻辑。

public class ClusterPublishEventHandler implements EventHandler<ClusterPublishEvent{
    private final Server server;
    private final String nodeId;

    public ClusterPublishEventHandler(Server server, String nodeId) {
        this.server = server;
        this.nodeId = nodeId;
    }

    @Override
    public void onEvent(ClusterPublishEvent event, long sequence, boolean endOfBatch) {
        log.info("Processing event: {} on node: {}", event, nodeId);
        server.forwardMessage(nodeId, event.getFromUser(), event.getClientId(), event.getTopic(), event.getPayload());
    }
}

关键点解析:

  • onEvent 方法接收到 ClusterPublishEvent 后,调用 server.forwardMessage 进行消息转发。
  • endOfBatch 用于标识当前事件是否为批处理中的最后一个事件。
  • log.info 记录消息处理的关键日志,便于后续排查。

3. 总结

本文介绍了 Disruptor 在 IM 系统中的应用,核心逻辑包括:

  1. 初始化 Disruptor:为每个 nodeId 创建独立的 RingBuffer。
  2. 按照节点转发消息:将用户消息存入对应节点的 RingBuffer。
  3. 消息处理onEvent 方法从 RingBuffer 读取消息,并执行转发。

通过 Disruptor,可以大幅降低锁竞争,提高 IM 系统的吞吐量,使其能够在高并发环境下稳定运行。

4. 最后

欢迎关注加瓦点灯,每天推送干货知识,一起进步!

本文由 mdnice 多平台发布

相关文章:

高性能队列 Disruptor 在 IM 系统中的实战

高性能队列 Disruptor 在 IM 系统中的实战 前三期我们介绍了Disruptor的典型使用场景和相关高性能原理&#xff0c;本期我介绍一下Disruptor在IM系统用的应用实战&#xff0c;IM系统即社交聊天系统&#xff0c;对实时性的要求非常高&#xff0c;非常符合Disruptor的使用场景。 …...

原生HTML集合

一、表格 1、固定表格 <div class"tablebox"><div class"table-container"><table id"myTable" border"0" cellspacing"0" cellpadding"0"><thead><tr></tr></thead>…...

ES6 简单练习笔记--变量申明

一、ES5 变量定义 1.在全局作用域中 this 其实就是window对象 <script>console.log(window this) </script>输出结果: true 2.在全局作用域中用var定义一个变量其实就相当于在window上定义了一个属性 例如: var name "孙悟空" 其实就相当于执行了 win…...

2025.1.21——六、BUU XSS COURSE 1

题目来源&#xff1a;buuctf BUU XSS COURSE 1 一、打开靶机&#xff0c;整理信息 有吐槽和登陆两个尝试点&#xff0c;题目名称提示是XSS漏洞 XSS&#xff08;Cross-Site Scripting&#xff09;漏洞 1.定义&#xff1a;跨站脚本攻击&#xff0c;是一种常见的 Web 安全漏洞。攻…...

Linux - 五种常见I/O模型

I/O操作 (输入/输出操作, Input/Output) 是指计算机与外部设备就行数据交互的过程. 什么是外部设备: 如键盘, 鼠标, 硬盘, 网卡等. 五种常见的 I/O 模型: 阻塞 I/O非阻塞 I/O信号驱动 I/OI/O 多路复用异步 I/O 阻塞 I/O 阻塞 I/O 的特点: 当用户发起 I/O 请求后, 进程/线程就…...

【负载均衡式在线OJ】加载题目信息(文件版)

目录 如何读取文件 -- 常见流程 代码 如何读取文件 -- 常见流程 在C中使用 std::ifstream来打开文件流是一个常见的操作&#xff0c;用于创建一个输入文件流&#xff0c;并尝试打开名为 question_list的文件。if (!in.is_open())&#xff1a;检查文件是否成功打开。如果文件未…...

“上门按摩” 小程序开发项目:基于 SOP 的全流程管理

在竞争激烈的生活服务市场,“上门按摩” 服务需求日益增长。为满足这一需求,我们启动了 O2O 模式的 “上门按摩” 小程序开发项目,该项目涵盖客户端、系统管理端、技师端。以下将通过各类 SOP 对项目进行全面管理,确保项目顺利推进。 一、项目启动 SOP:5W2H 分析法 What(…...

WPF1-从最简单的xaml开始

1. 最简单的WPF应用 1.1. App.config1.2. App.xaml 和 App.xaml.cs1.3. MainWindow.xaml 和 MainWindow.xaml.cs 2. 正式开始分析 2.1. 声明即定义2.2. 命名空间 2.2.1. xaml的Property和Attribute2.2.2. xaml中命名空间2.2.3. partial关键字 学习WPF&#xff0c;肯定要先学…...

2025牛客寒假算法营2

A题 知识点&#xff1a;模拟 打卡。检查给定的七个整数是否仅包含 1,2,3,5,6 即可。为了便于书写&#xff0c;我们可以反过来&#xff0c;检查这七个整数是否不为 4 和 7。 时间 O(1)&#xff1b;空间 O(1)。 #include <bits/stdc.h> using namespace std;signed main()…...

编译Android平台使用的FFmpeg库

目录 前言 一、编译环境 二、搭建环境 1.安装MSYS2 2.更新系统包 2.1 打开MSYS2 MinGW 64-bit终端&#xff08;mingw64.exe&#xff09; 2.2 更新所有软件包到最新版本 2.3 安装必要的工具和库。 3. 克隆FFmpeg源码 4. 配置编译选项 5. 执行编译 总结 前言 记录学习…...

【C++高并发服务器WebServer】-2:exec函数簇、进程控制

本文目录 一、exec函数簇介绍二、exec函数簇 一、exec函数簇介绍 exec 函数族的作用是根据指定的文件名找到可执行文件&#xff0c;并用它来取代调用进程的内容&#xff0c;换句话说&#xff0c;就是在调用进程内部执行一个可执行文件。 exec函数族的函数执行成功后不会返回&…...

力扣707题(2)——设计链表

#题目 #3,5和6的代码 今天看剩下几个题的代码&#xff0c;1,2,4的代码已经在上篇博客写过了想看的小伙伴移步到&#xff1a; 力扣707题——设计链表-CSDN博客 //第3题头插法 void addAtHead(int val){ //记录头结点ListNode nhead; //新节点的创建,并让它指向原本头结点的后…...

K8S中ingress详解

Ingress介绍 Kubernetes 集群中&#xff0c;服务&#xff08;Service&#xff09;是一种抽象&#xff0c;它定义了一种访问 Pod 的方式&#xff0c;无论这些 Pod 如何变化&#xff0c;服务都保持不变。服务可以被映射到一个静态的 IP 地址&#xff08;ClusterIP&#xff09;、一…...

SpringBoot打包为JAR包或WAR 包,这两种打包方式在运行时端口将如何采用?又有什么不同?这篇文章将给你解惑

你们好,我是金金金。 前提 SpringBoot打包方式:不是jar就是war包 场景 写这篇文章也是我遇到一个很不理解的点,所以就去研究了一下 场景如下: 这是后端生产配置文件给项目设置的端口,然后我前端写的url是:我就很纳闷,前端写了域名以及后端服务上下文路径,咋没写端口呢,…...

zabbix6.0安装及常用监控配置

文章目录 部署zabbix-serverzabbix监控节点部署解决zabbix中文乱码创建主机组创建模版配置主机与模版关联 监控boot分区监控网卡流量出网卡流量监控进入和出的总流量监控内存监控服务器端口用户自定应监控key值 (监控mysql查询数量)zabbix触发器监控cpu监控入网卡流量 邮件告警…...

SQL-leetcode—1179. 重新格式化部门表

1179. 重新格式化部门表 表 Department&#xff1a; ---------------------- | Column Name | Type | ---------------------- | id | int | | revenue | int | | month | varchar | ---------------------- 在 SQL 中&#xff0c;(id, month) 是表的联合主键。 这个表格有关…...

JavaWeb 学习笔记 XML 和 Json 篇 | 020

今日推荐语 愿你遇见好天气,愿你的征途铺满了星星——圣埃克苏佩里 日期 学习内容 打卡编号2025年01月23日JavaWeb笔记 XML 和 Json 篇020 前言 哈喽&#xff0c;我是菜鸟阿康。 以下是我的学习笔记&#xff0c;既做打卡也做分享&#xff0c;希望对你也有所帮助…...

在Raspbian上,如何获取树莓派的CPU当前频率

本文不用汇编实现&#xff0c;因为我是要用在 Go 里的&#xff0c;Go 并不支持内联汇编&#xff0c;要用汇编比较麻烦。而且项目并不是很在意性能&#xff0c;所以直接用命令获取内核准备好的。 在Raspbian上&#xff0c;CPU 信息存放在/sys/devices/system/cpu/中&#xff0c…...

网络打印机的搜索与连接(一)

介绍 网络打印机就是可以通过网络连接上的打印机&#xff0c;这类打印机分2种&#xff1a;自身具有互联网接入功能可以分配IP的打印机我们称为网络打印机、另外一种就是被某台电脑连接上去后通过共享的方式共享到网络里面的我们称为共享打印机。现在还有一种可以通过互联网连接…...

LangChain + llamaFactory + Qwen2-7b-VL 构建本地RAG问答系统

单纯仅靠LLM会产生误导性的 “幻觉”&#xff0c;训练数据会过时&#xff0c;处理特定知识时效率不高&#xff0c;缺乏专业领域的深度洞察&#xff0c;同时在推理能力上也有所欠缺。 正是在这样的背景下&#xff0c;检索增强生成技术&#xff08;Retrieval-Augmented Generati…...

javaweb共享汽车调度管理系统 新能源共享汽车租赁管理系统的设计与实现

目录同行可拿货,招校园代理 ,本人源头供货商功能模块分析技术实现要点新能源特色功能项目技术支持源码获取详细视频演示 &#xff1a;文章底部获取博主联系方式&#xff01;同行可合作同行可拿货,招校园代理 ,本人源头供货商 功能模块分析 用户管理模块 用户注册、登录、个人…...

ai辅助开发新思路:让快马ai为你生成一个notepad++智能编程助手插件原型

今天想和大家分享一个有趣的AI辅助开发思路——如何用InsCode(快马)平台快速构建一个Notepad智能编程助手插件的原型。这个项目特别适合想体验AI与本地编辑器联动的开发者&#xff0c;整个过程不需要复杂的配置&#xff0c;直接在网页上就能完成原型验证。 插件功能设计 这个插…...

D3KeyHelper深度评测:5大实战技巧提升暗黑破坏神3操作效率

D3KeyHelper深度评测&#xff1a;5大实战技巧提升暗黑破坏神3操作效率 【免费下载链接】D3keyHelper D3KeyHelper是一个有图形界面&#xff0c;可自定义配置的暗黑3鼠标宏工具。 项目地址: https://gitcode.com/gh_mirrors/d3/D3keyHelper 还在为暗黑破坏神3中复杂的技能…...

忍者像素绘卷惊艳作品:使用‘火之意志’隐式提示词触发的系列像素艺术

忍者像素绘卷惊艳作品&#xff1a;使用火之意志隐式提示词触发的系列像素艺术 1. 像素艺术新纪元&#xff1a;忍者绘卷的视觉革命 忍者像素绘卷是一款基于Z-Image-Turbo深度优化的图像生成工作站&#xff0c;它将传统忍者文化与16-Bit复古游戏美学完美融合&#xff0c;创造出…...

基于工件高度检测的机电传动与控制:factory建模博图v16plc程序的设计任务

机电传动与控制&#xff0c;基于工件高度检测的分拣(A)控制系统设计任务 内容&#xff1a;factory 建模博图 v16plc 程序&#xff08;v16 版本以上均可使用&#xff09;传送带上的金属工件哐当哐当地滑过&#xff0c;突然被机械臂稳稳抓取——这看似简单的动作背后藏着精密的高…...

告别键盘连击困扰:KeyboardChatterBlocker的智能防抖解决方案

告别键盘连击困扰&#xff1a;KeyboardChatterBlocker的智能防抖解决方案 【免费下载链接】KeyboardChatterBlocker A handy quick tool for blocking mechanical keyboard chatter. 项目地址: https://gitcode.com/gh_mirrors/ke/KeyboardChatterBlocker 你是否曾在重要…...

图卷积网络终极指南:如何在PyTorch中实现GCN模型

图卷积网络终极指南&#xff1a;如何在PyTorch中实现GCN模型 【免费下载链接】pygcn Graph Convolutional Networks in PyTorch 项目地址: https://gitcode.com/gh_mirrors/py/pygcn 图卷积网络&#xff08;Graph Convolutional Networks&#xff0c;简称GCN&#xff09…...

LAV Filters专业配置进阶指南:深度解析开源解码器架构与性能优化

LAV Filters专业配置进阶指南&#xff1a;深度解析开源解码器架构与性能优化 【免费下载链接】LAVFilters LAV Filters - Open-Source DirectShow Media Splitter and Decoders 项目地址: https://gitcode.com/gh_mirrors/la/LAVFilters LAV Filters是一套基于FFmpeg的高…...

NCM格式转换技术解析:从加密限制到音频自由的技术实现

NCM格式转换技术解析&#xff1a;从加密限制到音频自由的技术实现 【免费下载链接】ncmdump 项目地址: https://gitcode.com/gh_mirrors/ncmd/ncmdump 一、问题场景&#xff1a;数字音乐的格式枷锁与用户困境 1.1 音乐人的设备兼容困境 独立音乐人小林最近遇到了一个…...

SpringBoot+Vue IT交流和分享平台平台完整项目源码+SQL脚本+接口文档【Java Web毕设】

系统架构设计### 摘要 随着信息技术的快速发展&#xff0c;互联网已成为人们获取和分享知识的重要渠道。尤其是在IT领域&#xff0c;技术人员和爱好者需要一个高效、便捷的交流平台来分享经验、讨论技术问题并获取最新行业动态。传统的论坛和社交媒体平台虽然功能丰富&#xff…...