消息中间件-RocketMQ入门 消息发送的三种方式
消息中间件-RocketMQ入门 消息发送的三种方式
- 消息中间件简介
- 应用场景
- 常用消息中间件
- RocketMQ核心概念
- 入门案例-生产者和消费者代码逻辑
- 消息发送的三种方式
- 同步发送
- 异步发送
- 一次性消息
消息中间件简介
应用场景
假设现在有订单微服务和积分微服务,正常请求流程之后是不是一个订单完成后给对应的用户加上积分,但如果积分微服务坏掉了,正常来说会回滚,但实际中情况中,积分完全可以晚一点加,没有什么影响
1.解决代码耦合的问题

解决问题的方法

这样订单微服务把参数发送给中间件之后就完成了它自己的任务,使微服务不用依赖其它微服务,就算中间件挂了也不需要担心,它虽然默认存储在内存里面,但也会在磁盘里面存一份
2.进行流量的削峰

3.数据分发


解决办法:

常用消息中间件
1.ActiveMQ是Apache出品,比较老的一个开源的消息中间件,以前在中小企业应用广泛.
2.Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决.
3.RabbitMQ是一个基于Erlang语言开发的消息中间件,
RabbitMQ最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
对数据的一致性,稳定性和可靠性要求比较高的场景
4.RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache软件基金会,并于2017年9月25日成为 Apache的顶级项目。作为经历过多次阿里巴巴双十一这种"超级工程"的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
淘宝内部的交易系统使用了淘宝自主研发的Noify消息中间件,使用MySQL作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kaka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用Javai语言编写了RocketMQ,定位于非日志的可靠消息传输〈(日志场最也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binlog分发等场景。

RocketMQ核心概念
消息中间件里面集群了多个代理服务器,如何做到负载?
在创造RocketMQ的时候,它本身有一个轻量级的注册中心称为"NameServer命名服务",因为像Nacos和zookeeper这样复杂的注册中心,运行起来对性能肯定也会有一定的影响,倘若有一天该注册中心不开源不维护了,该中间件是不是也会因此遇到很大的麻烦

入门案例-生产者和消费者代码逻辑
第一步:创建两个两个项目,分别为生产者和消费者

创建生产者
第一步:导入依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency>
</dependencies>
第二步:创建生产类模拟生产
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");//连接nameServerproducer.setNamesrvAddr("43.143.161.59:9876");//启动生产者producer.start();//设置消息发送的目的地String topic = "helloTopic";//发送消息for(int i=0;i<10;i++){Message msg = new Message(topic,("RocketMQ普通消息"+i).getBytes(Charset.defaultCharset()));SendResult result = producer.send(msg);System.out.println("发送状态"+result.getSendStatus());}System.out.println("消息发送完毕.");//关闭资源producer.shutdown();}
}
创建消费者
第一步:导入依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency>
</dependencies>
第二步:创建消费类模拟接收
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
消息发送的三种方式
同步发送

应用程序给消息中间件发送消息的时候。需要等待消息中间件将消息存储完毕之后,才响应回去。业务代码才能往下执行
异步发送

应用程序给消息中间件发送消息的时候,消息中间件收到这个消息之后,直接给应用程序响应了.(此时消息并没有完全存储到磁盘),消息中间件继续存储消息。存储完成(成功或者失败),通过回调地址通知有应用程序。消息存储的结果
示例代码
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");//连接nameServerproducer.setNamesrvAddr("43.143.161.59:9876");//启动生产者producer.start();//设置消息发送的目的地String topic = "helloTopic";//发送消息Message msg = new Message(topic,("RocketMQ异步消息").getBytes(Charset.defaultCharset()));System.out.println("消息发送前");//异步发送producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息存储状态:"+sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送出现异常");}});System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}
运行结果

业务逻辑处理 ----> 执行send方法,不需要等待消息中间件存储消息,可以直接执行业务逻辑代码
与同步发送相比,异步发送时间更短一点,响应更快一点,为了使响应时间更快的可以选择异步发送,但同步发送也有它自己的意义,同步发送更加可靠
一次性消息
应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了.

public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");//连接nameServerproducer.setNamesrvAddr("43.143.161.59:9876");//启动生产者producer.start();//设置消息发送的目的地String topic = "helloTopic";//发送消息Message msg = new Message(topic,("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));System.out.println("消息发送前");producer.sendOneway(msg);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}
}
运行结果

相关文章:
消息中间件-RocketMQ入门 消息发送的三种方式
消息中间件-RocketMQ入门 消息发送的三种方式消息中间件简介应用场景常用消息中间件RocketMQ核心概念入门案例-生产者和消费者代码逻辑消息发送的三种方式同步发送异步发送一次性消息消息中间件简介 应用场景 假设现在有订单微服务和积分微服务,正常请求流程之后是不是一个订…...
【FLASH存储器系列十九】固态硬盘掉电后如何恢复掉电前状态?
掉电分两种,一种是正常掉电,另一种是异常掉电。不管是哪种原因导致的掉电,我们都希望,重新上电后,SSD都需要能从掉电中恢复过来,继续正常工作。正常掉电恢复,这个好理解,主机通知SSD…...
Java知识点细节简易汇总——(7)面向对象编程(高级部分)
一、类变量、静态变量static static访问方式: public class VisitStatic {public static void main(String[] args) {//方法一://类名.类变量名//说明:类变量是随着类的加载而创建,所以即使没有创建对象实例也可以访问System.out.println(A.…...
阻塞式队列-生产者消费者模型
1.阻塞队列是什么 阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则. 阻塞队列能是一种线程安全的数据结构, 并且具有以下特性: 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队…...
引导滤波code
文章目录1. 原理概述2. 实验环节2.1 验证与opencv 库函数的结果一致2.2 与 双边滤波比较2.3 引导滤波应用,fathering2.3 引导滤波应用,图像增强2.4 灰度图引导,和各自通道引导的效果差异2.5 不同参数设置影响3. 参考引导滤波1. 原理概述 引导…...
Leetcode.2353 设计食物评分系统
题目链接 Leetcode.2353 设计食物评分系统 Rating : 1782 题目描述 设计一个支持下述操作的食物评分系统: 修改 系统中列出的某种食物的评分。 返回系统中某一类烹饪方式下评分最高的食物。 实现 FoodRatings类: FoodRatings(String[] foo…...
C语言学习_DAY_2_变量的定义_输入与输出
高质量博主,点个关注不迷路🌸🌸🌸! 目录 I. 变量的定义 II. 变量的赋值 III. 输出 IV. 输入 I. 变量的定义 首先,我们新建一个.c文件在Dev C中,并把之前定义好的程序框架放进去。 此时我…...
mac 安装navicat
由于各种原因发布不了链接,这里记录下,保存在了阿里云里...
RocketMQ快速入门
2.1 消息生产和消费介绍使用RocketMQ可以发送普通消息、顺序消息、事务消息,顺序消息能实现有序消费,事务消息可以解决分布式事务实现数据最终一致。RocketMQ有2种常见的消费模式,分别是DefaultMQPushConsumer和DefaultMQPullConsumer模式,这…...
【虚拟仿真】Unity3D实现从浏览器拉起本地exe程序并传参数
推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 最近有项目需求,从浏览器调起来本地的exe程序&…...
Intel中断体系(1)中断与异常处理
文章目录概述中断与异常中断可屏蔽中断与不可屏蔽中断(NMI)异常异常分类中断与异常向量中断描述符表中断描述符中断与异常处理中断与异常处理过程堆栈切换错误码64位模式下的中断异常处理64位中断描述符64位处理器下的堆栈切换相关参考概述 中断是现代计…...
财报解读:四季度营收超预期,优步却越来越“不务正业”了
“公司第四季度的业绩表现将是强劲的”。 公布2022年第三季度财报时,优步的高管给出了这样的预告,给资本市场打了一针“强心剂”。然而有人对此表示质疑,后疫情时代,带着新模式、新车型的全新网约车公司层出不穷,车企…...
C语言-程序环境和预处理(14.2)
目录 预处理详解 1.预定义符号 2. #define 2.1 #define定义标识符 2.2 #define 定义宏 2.3 #define 替换规则 注意事项: 2.4 #和## 2.5 带副作用的宏参数 2.6 宏和函数对比 3. #undef 4. 条件编译 4.1 单分支条件编译 4.2 多分支条件编译 4.3 判断是…...
VHDL语言基础-时序逻辑电路-计数器
目录 计数器的设计: 计数器的作用: 计数器的实现: 1、用“”函数描述: 用T触发器级联构成的串行进位的二进制加法计数器的仿真波形: 计数器的仿真: 计数器的设计: 计数是一种最简单基本的…...
MySQL数据库07——高级条件查询
前面一章介绍了基础的一个条件的查询,如果多条件,涉及到逻辑运算,and or 之类的。就是高级一点的条件查询。本章来介绍复杂的条件搜索表达式。 AND运算符 AND运算符只有当两边操作数均为True时,最后结果才为True。人们使用AND描述…...
《Terraform 101 从入门到实践》 第四章 States状态管理
《Terraform 101 从入门到实践》这本小册在南瓜慢说官方网站和GitHub两个地方同步更新,书中的示例代码也是放在GitHub上,方便大家参考查看。 军书十二卷,卷卷有爷名。 为什么需要状态管理 Terraform的主要作用是管理云平台上的资源ÿ…...
数据结构之二叉树
🎈一.二叉树相关概念 1.树 树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合,树结构通常用来存储逻辑关系为 "一对多" 的数据。例如: 关于树的几个重要概念&…...
上海亚商投顾:三大指数集体调整 消费板块逆市活跃
上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。市场情绪三大指数今日集体调整,沪指全天弱势震荡,创业板指盘中跌超1%。旅游、食品、乳业等大消费板块…...
【2023unity游戏制作-mango的冒险】-开始画面API制作
👨💻个人主页:元宇宙-秩沅 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 本文由 秩沅 原创 收录于专栏:游戏制作 ⭐mango的冒险-开始画面制作⭐ 文章目录⭐mango的冒险-开始画面制作⭐👨&…...
【微服务】Nacos配置管理
🚩本文已收录至专栏:微服务探索之旅 👍希望您能有所收获 Nacos除了可以做配置管理,同样可以当作注册中心来使用。 了解注册中心用法点击跳转👉【微服务】Nacos注册中心 一.引入 当微服务部署的实例越来越多࿰…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案
在大数据时代,海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构,在处理大规模数据抓取任务时展现出强大的能力。然而,随着业务规模的不断扩大和数据抓取需求的日益复杂,传统…...
echarts使用graphic强行给图增加一个边框(边框根据自己的图形大小设置)- 适用于无法使用dom的样式
pdf-lib https://blog.csdn.net/Shi_haoliu/article/details/148157624?spm1001.2014.3001.5501 为了完成在pdf中导出echarts图,如果边框加在dom上面,pdf-lib导出svg的时候并不会导出边框,所以只能在echarts图上面加边框 grid的边框是在图里…...
简单介绍C++中 string与wstring
在C中,string和wstring是两种用于处理不同字符编码的字符串类型,分别基于char和wchar_t字符类型。以下是它们的详细说明和对比: 1. 基础定义 string 类型:std::string 字符类型:char(通常为8位)…...
解决MybatisPlus使用Druid1.2.11连接池查询PG数据库报Merge sql error的一种办法
目录 前言 一、问题重现 1、环境说明 2、重现步骤 3、错误信息 二、关于LATERAL 1、Lateral作用场景 2、在四至场景中使用 三、问题解决之道 1、源码追踪 2、关闭sql合并 3、改写处理SQL 四、总结 前言 在博客:【写在创作纪念日】基于SpringBoot和PostG…...
python学习day39
图像数据与显存 知识点回顾 1.图像数据的格式:灰度和彩色数据 2.模型的定义 3.显存占用的4种地方 a.模型参数梯度参数 b.优化器参数 c.数据批量所占显存 d.神经元输出中间状态 4.batchisize和训练的关系 import torch import torchvision import torch.nn as nn imp…...
NLP常用工具包
✨做一次按NLP项目常见工具的使用拆解 1. tokenizer from torchtext.data.utils import get_tokenizertokenizer get_tokenizer(basic_english) text_sample "Were going on an adventure! The weather is really nice today." tokens tokenizer(text_sample) p…...
