RabbitMQ深度探索:简单实现 MQ
基于多线程队列实现 MQ :
-
实现类:
public class ThreadMQ {private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();public static void main(String[] args) {//创建生产者线程Thread producer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {Thread.sleep(1000);JSONObject data = new JSONObject();data.put("phone","11111111");broker.offer(data);}catch (Exception e){}}}},"生产者");producer.start();Thread consumer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {JSONObject data = broker.poll();if(data != null){System.out.println(Thread.currentThread().getName() + data.toJSONString());}}catch (Exception e){}}}},"消费者");consumer.start();} }
基于 netty 实现 MQ:
-
执行过程:
- 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
- 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
-
执行流程:
- 导入 Maven 依赖:
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version> </dependency> <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.0.23.Final</version> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version> </dependency> <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version> </dependency> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version> </dependency> - 服务端:
package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.commons.lang3.StringUtils;import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.concurrent.LinkedBlockingDeque;/*** @ClassName BoyatopMQServer2021* @Author* @Version V1.0**/ public class BoyatopNettyMQServer {public void bind(int port) throws Exception {/*** Netty 抽象出两组线程池BossGroup和WorkerGroup* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。*/EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossGroup, workerGroup)// 设定NioServerSocketChannel 为服务器端.channel(NioServerSocketChannel.class)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。.option(ChannelOption.SO_BACKLOG, 100)// 服务器端监听数据回调Handler.childHandler(new BoyatopNettyMQServer.ChildChannelHandler());//绑定端口, 同步等待成功;ChannelFuture future = bootstrap.bind(port).sync();System.out.println("当前服务器端启动成功...");//等待服务端监听端口关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅关闭 线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 设置异步回调监听ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());}}public static void main(String[] args) throws Exception {int port = 9008;new BoyatopNettyMQServer().bind(port);}private static final String type_consumer = "consumer";private static final String type_producer = "producer";private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();// 生产者投递消息的:topicNamepublic class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {/*** 服务器接收客户端请求** @param ctx* @param data* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object data)throws Exception {//ByteBuf buf=(ByteBuf)data;//byte[] req = new byte[buf.readableBytes()];//buf.readBytes(req);//String body = new String(req, "UTF-8");//System.out.println("body:"+body);JSONObject clientMsg = getData(data);String type = clientMsg.getString("type");switch (type) {case type_producer:producer(clientMsg);break;case type_consumer:consumer(ctx);break;}}private void consumer(ChannelHandlerContext ctx) {// 保存消费者连接ctxs.add(ctx);// 主动拉取mq服务器端缓存中没有被消费的消息String data = msgs.poll();if (StringUtils.isEmpty(data)) {return;}// 将该消息发送给消费者byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}private void producer(JSONObject clientMsg) {// 缓存生产者投递 消息String msg = clientMsg.getString("msg");msgs.offer(msg); //保证消息不丢失还可以缓存硬盘//需要将该消息推送消费者ctxs.forEach((ctx) -> {// 将该消息发送给消费者String data = msgs.poll();if (data == null) {return;}byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);});}private JSONObject getData(Object data) throws UnsupportedEncodingException {ByteBuf buf = (ByteBuf) data;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");return JSONObject.parseObject(body);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}} } - 生产端:
package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/ public class BoyatopNettyMQProducer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器 // ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "producer");JSONObject msg = new JSONObject();msg.put("userId", "123456");msg.put("age", "23");data.put("msg", msg);// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}} } - 客户端:
package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/ public class NettyMQConsumer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器 // ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;NettyMQConsumer client = new NettyMQConsumer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "consumer");// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}} }
- 导入 Maven 依赖:
-
持久化机制:
- 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
- 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
-
MQ 服务器将该消息推送给消费者:
- 消费者已经和 MQ 服务器保持长连接
- 消费者在第一次启动的时候会主动拉取信息
-
MQ 如何实现高并发思想:
- MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
- 默认的情况下取出一条消息
-
缺点:
- 存在延迟问题
-
需要考虑 MQ 消费者提高速率的问题:
- 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可
相关文章:
RabbitMQ深度探索:简单实现 MQ
基于多线程队列实现 MQ : 实现类: public class ThreadMQ {private static LinkedBlockingDeque<JSONObject> broker new LinkedBlockingDeque<JSONObject>();public static void main(String[] args) {//创建生产者线程Thread producer n…...
React+AI 技术栈(2025 版)
文章目录 核心:React TypeScript元框架:Next.js样式设计:Tailwind CSSshadcn/ui客户端状态管理:Zustand服务器状态管理:TanStack Query动画效果:Motion测试工具表格处理:TanStack Table表单处理…...
计算机从何而来?计算技术将向何处发展?
计算机的前生:机械计算工具的演进 算盘是计算机的起点,它其实是一台“机械式半自动化运算器”。打算盘的“口诀”其实就是它的编程语言,算盘珠就是它的存储器。 第二阶段是可以做四则运算的加法器、乘法器。1642年,法国数学家帕斯…...
Docker使用指南(二)——容器相关操作详解(实战案例教学,创建/使用/停止/删除)
目录 1.容器操作相关命令编辑 案例一: 案例二: 容器常用命令总结: 1.查看容器状态: 2.删除容器: 3.进入容器: 二、Docker基本操作——容器篇 1.容器操作相关命令 下面我们用两个案例来具体实操一…...
从通讯工具到 AI 助理,AI手机如何发展?
随着AI进军各行各业,全面AI化时代已经到来。手机,作为现代人类的“数字器官”之一,更是首当其冲地融入了这一变革浪潮之中。 2024年年初,OPPO联合IDC发布了《AI手机白皮书》,公布OPPO已迈向AI手机这一全新阶段。到如今…...
小程序-基础加强
前言 这一节把基础加强讲完 1. 导入需要用到的小程序项目 2. 初步安装和使用vant组件库 这里还可以扫描二维码 其中步骤四没什么用 右键选择最后一个 在开始之前,我们的项目根目录得有package.json 没有的话,我们就初始化一个 但是我们没有npm这个…...
【CSS】谈谈你对BFC的理解
理解 CSS 中的 BFC(块格式化上下文) 在 CSS 中,BFC(Block Formatting Context) 是一个非常重要的概念,它决定了元素如何对其子元素进行定位,以及与其他元素的关系。理解 BFC 对于解决常见的布局…...
kubernetes-部署性能监控平台
在当今快速发展的云计算时代,Kubernetes 已成为容器编排的事实标准。随着越来越多的应用迁移到 Kubernetes 平台上,如何有效地监控集群的健康状态、资源使用情况以及应用性能变得尤为重要。一个完善的监控系统可以帮助我们及时发现问题、优化资源配置&am…...
【Uniapp-Vue3】iconfont图标库的使用
先在iconfont图标库中将需要的图标加入购物车 点击右侧购物车的图标 点击添加至项目,可以选中项目进行加入,也可以点击文件加号创建一个新的项目并添加 加入以后会来到如下界面,点击下载至本地 双击打开下载的.zip文件 将.css和.ttf文件进…...
Linux find 命令 | grep 命令 | 查找 / 列出文件或目录路径 | 示例
注:本文为 “Linux find 命令 | grep 命令使用” 相关文章合辑。 未整理去重。 如何在 Linux 中查找文件 作者: Lewis Cowles 译者: LCTT geekpi | 2018-04-28 07:09 使用简单的命令在 Linux 下基于类型、内容等快速查找文件。 如果你是 W…...
Day 28 卡玛笔记
这是基于代码随想录的每日打卡 77. 组合 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1: 输入:n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4], ]示例 2…...
1.PPT:天河二号介绍【12】
目录 NO1 NO2.3.4.5 NO6.7.8.9 NO1 PPT:新建一个空白演示文档→保存到考生文件夹下:天河二号超级计算机.pptx幻灯片必须选择一种设计主题:设计→主题(随便选中一种)幻灯片的版式:开始→版式&#x…...
AI大模型开发原理篇-4:神经概率语言模型NPLM
神经概率语言模型(NPLM)概述 神经概率语言模型(Neural Probabilistic Language Model, NPLM) 是一种基于神经网络的语言建模方法,它将传统的语言模型和神经网络结合在一起,能够更好地捕捉语言中的复杂规律…...
物联网领域的MQTT协议,优势和应用场景
MQTT(Message Queuing Telemetry Transport)作为轻量级发布/订阅协议,凭借其低带宽消耗、低功耗与高扩展性,已成为物联网通信的事实标准。其核心优势包括:基于TCP/IP的异步通信机制、支持QoS(服务质量&…...
电控---中断
中断 1.处理器系统在执行代码的时候,会从存储器依次取出指令和数据,这种能力需要在处理器里保存一个存储器地址,就是所谓的程序计数器(Program Counter,PC),也叫程序指针 2.当外部中断(Extern …...
动态规划DP 背包问题 多重背包问题(朴素版+二进制优化+单调队列)
概览检索 动态规划DP 概览(点击链接跳转) 动态规划DP 背包问题 概览(点击链接跳转) 多重背包问题1 原题链接 AcWiing 4. 多重背包问题1 题目描述 有 N种物品和一个容量是 V的背包。 第 i 种物品最多有 si件,每件体…...
调试与错误修复:Cursor 如何成为你的编程助手
引言 调试是软件开发过程中最耗时且最具挑战性的环节之一。据统计,开发者平均将 50% 以上的编码时间 用于定位和修复错误。传统调试工具(如断点调试器、日志分析)虽能解决问题,但往往需要开发者手动追溯代码执行流程,…...
PHP 常用函数2025.02
PHP implode() 函数 语法 implode(separator,array) 参数描述separator可选。规定数组元素之间放置的内容。默认是 ""(空字符串)。array必需。要组合为字符串的数组。 技术细节 返回值:返回一个由数组元素组合成的字符串。PHP 版…...
浏览器查询所有的存储信息,以及清除的语法
要在浏览器的控制台中查看所有的存储(例如 localStorage、sessionStorage 和 cookies),你可以使用浏览器开发者工具的 "Application" 标签页。以下是操作步骤: 1. 打开开发者工具 在 Chrome 或 Edge 浏览器中…...
Paimon写入性能
写入性能 Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量: 增加检查点间隔,或者仅使用批处理模式。增加写入缓冲区大小。启用写缓冲区溢出。如果您使用固定存储桶模式,请重新调整存储桶数量。 1 并行度 建议sink…...
Golang 并发机制-5:详解syn包同步原语
并发性是现代软件开发的一个基本方面,Go(也称为Golang)为并发编程提供了一组健壮的工具。Go语言中用于管理并发性的重要包之一是“sync”包。在本文中,我们将概述“sync”包,并深入研究其最重要的同步原语之一…...
排序算法与查找算法
1.十大经典排序算法 我们希望数据以一种有序的形式组织起来,无序的数据我们要尽量将其变得有序 一般说来有10种比较经典的排序算法 简单记忆为Miss D----D小姐 时间复杂度 :红色<绿色<蓝色 空间复杂度:圆越大越占空间 稳定性&…...
如何构建ObjC语言编译环境?构建无比简洁的clang编译ObjC环境?Windows搭建Swift语言编译环境?
如何构建ObjC语言编译环境? 除了在线ObjC编译器,本地环境Windows/Mac/Linux均可以搭建ObjC编译环境。 Mac自然不用多说,ObjC是亲儿子。(WSL Ubuntu 22.04) Ubuntu可以安装gobjc/gnustep和gnustep-devel构建编译环境。 sudo apt-get install gobjc gnus…...
数据结构课程设计(三)构建决策树
3 决策树 3.1 需求规格说明 【问题描述】 ID3算法是一种贪心算法,用来构造决策树。ID3算法起源于概念学习系统(CLS),以信息熵的下降速度为选取测试属性的标准,即在每个节点选取还尚未被用来划分的具有最高信息增益的…...
深度剖析八大排序算法
欢迎并且感谢大家指出我的问题,由于本人水平有限,有些内容写的不是很全面,只是把比较实用的东西给写下来,如果有写的不对的地方,还希望各路大牛多多指教!谢谢大家!🥰 在计算机科学领…...
python-leetcode-二叉树的层序遍历
102. 二叉树的层序遍历 - 力扣(LeetCode) # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right from coll…...
毕业设计:基于深度学习的高压线周边障碍物自动识别与监测系统
目录 前言 课题背景和意义 实现技术思路 一、算法理论基础 1.1 卷积神经网络 1.2 目标检测算法 1.3 注意力机制 二、 数据集 2.1 数据采集 2.2 数据标注 三、实验及结果分析 3.1 实验环境搭建 3.2 模型训练 3.2 结果分析 最后 前言 📅大四是整个大学…...
顺序表(ArrayList)
1、简介 顺序表是用一段物理地址连续 的存储单元依次存储数据元素的线性结构,一般情况下 采用数组存储 。在 数组 上完成数据的增删查改。( 顺序表的底层结构是一个数组 ) 2、顺序表的实现 下面是顺序表的一些基本成员和方法,能够…...
【Hadoop】Hadoop的HDFS
这里写目录标题 HDFS概述HDFS产出背景及定义HDFS产生背景HDFS定义 HDFS优缺点HDFS优点HDFS缺点 HDFS组成架构HDFS文件块大小 HDFS的Shell操作常用命令实操准备工作上传下载HDFS直接操作 HDFS的API操作客户端环境准备HDFS的API案例实操HDFS文件上传HDFS文件下载HDFS文件更名和移…...
C++ Primer 迭代器
欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...
