当前位置: 首页 > news >正文

RocketMQ 源码分析——Producer

文章目录

  • 消息发送代码实现
  • 消息发送者启动流程
    • 检查配置
    • 获得MQ客户端实例
    • 启动实例
    • 定时任务
  • Producer 消息发送流程
    • 选择队列
      • 默认选择队列策略
      • 故障延迟机制策略*
      • 两种策略的选择
  • 技术亮点:ThreadLocal

消息发送代码实现

下面是一个生产者发送消息的demo(同步发送)

image.png

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

消息发送者启动流程

image.png

DefaultMQProducerImpl类start()

image.png

检查配置

DefaultMQProducerImpl

image.png

获得MQ客户端实例

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。DefaultMQProducerImpl类start()

image.png

一个clientId只会创建一个MQClientInstance

image.png

clientId生成规则:IP@instanceName@unitName

image.png

RocketMQ中消息发送者、消息消费者都属于”客户端“。每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

不同的生产者、消费端如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

启动实例

MQClientInstance类start()

image.png

定时任务

MQClientInstance类startScheduledTask()

image.png

Producer 消息发送流程

我们从一个生产者案例的代码进入代码可知:DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

image.png

image.png

从核心方法可知消息发送就是4个步骤:验证消息、查找路由、选择队列、消息发送。

image.png

image.png

选择队列

默认选择队列策略

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

故障延迟机制策略*

采用此策略后,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做筛选。

image.png

除了记录Broker的发送消息时长之外,还要计算一个Broker的不可用时长。这里采用一个经验值:

如果发送时长在550ms之内,不可用时长为0。

达到550ms,不可用时长为30S

达到1000ms,不可用时长为60S

达到2000ms,不可用时长为120S

达到3000ms,不可用时长为180S

达到15000ms,不可用时长为600S

image.png

image.png

有了以上的Broker规避信息后发送消息就非常简单了。

在开启故障延迟机制策略步骤如下:

  1. 根据消息队列表时做轮训
  2. 选好一个队列
  3. 判断该队列所在Broker是否可用
  4. 如果是可用则返回该队列,队列选择逻辑结束
  5. 如果不可用,则接着步骤2继续
  6. 如果都不可用,则随机选一个

代码如下:

image.png

两种策略的选择

从这种策略上可以很明显看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢?

首先RocketMQ默认的发送失败有重试策略,默认是2,也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了,作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。

如果是网络比较好的环境,推荐默认策略,毕竟网络问题导致的发送失败几率比较小。

如果是网络不太好的环境,推荐故障延迟机制,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。

技术亮点:ThreadLocal

image.png

image.png

image.png

相关文章:

RocketMQ 源码分析——Producer

文章目录 消息发送代码实现消息发送者启动流程检查配置获得MQ客户端实例启动实例定时任务 Producer 消息发送流程选择队列默认选择队列策略故障延迟机制策略*两种策略的选择 技术亮点:ThreadLocal 消息发送代码实现 下面是一个生产者发送消息的demo(同步发送&#…...

ISTQB术语表

此术语表为国际软件测试认证委员会(ISTQB)发布的标准术语表。此表历经数次修改、完善,集纳了计算机行业界、商业界及政府相关机构的见解及意见,在国际化的层面上达到了罕有的统一性及一致性。参与编制此表的国际团体包括澳大利亚、…...

小米笔试题——01背包问题变种

这段代码的主要思路是使用动态规划来构建一个二维数组 dp,其中 dp[i][j] 表示前 i 个产品是否可以组合出金额 j。通过遍历产品列表和可能的目标金额,不断更新 dp 数组中的值,最终返回 dp[N][M] 来判断是否可以组合出目标金额 M。如果 dp[N][M…...

SkyWalking内置MQE语法

此文档出自SkyWalking官方git https://github.com/apache/skywalking docs/en/api/metrics-query-expression.md Metrics Query Expression(MQE) Syntax MQE is a string that consists of one or more expressions. Each expression could be a combination of one or more …...

Springboot2 Pandas Pyecharts 量子科技专利课程设计大作业

数据集介绍 1.背景 根据《中国科学:信息科学》期刊上的一篇文章,量子通信包括多种协议与应用类型: 基于量子隐形传态与量子存储中继等技术,可实现量子态信息传输,进而构建量子信息网络,已成为当前科研热点&…...

RabbitMQ里的几个重要概念

RabbitMQ中的一些角色: publisher:生产者consumer:消费者exchange个:交换机,负责消息路由,接受生产者发送的消息,把消息发送到一个或多个队列里queue:队列,存储消息virt…...

23. 图论 - 图的由来和构成

文章目录 图的由来图的构成Hi, 你好。我是茶桁。 从第一节课上到现在,我基本上把和人工智能相关的一些数学知识都教给大家了,终于来到我们人工智能数学的最后一个部分了,让我们从今天开始进入「图论」。 图论其实是一个比较有趣的领域,因为微积分其实更多的是对应连续型的…...

拼多多API接口解析,实现根据ID取商品详情

拼多多是一个流行的电商平台,它提供了API接口供开发者使用。要根据ID获取商品详情,您需要使用拼多多API接口并进行相应的请求。 以下是使用拼多多API接口根据ID获取商品详情的示例代码(使用Python编写): import requ…...

【JavaScript】解构

解构(Destructuring)是 JavaScript 中一种强大的语法特性,它允许你从数组或对象中提取值并赋值给变量,使代码更加简洁和易读。JavaScript 中有两种主要的解构语法:数组解构和对象解构。 数组解构 数组解构用于从数组…...

现代卷积网络实战系列2:训练函数、PyTorch构建LeNet网络

4、训练函数 4.1 调用训练函数 train(epochs, net, train_loader, device, optimizer, test_loader, true_value)因为每一个epoch训练结束后,我们需要测试一下这个网络的性能,所有会在训练函数中频繁调用测试函数,所有测试函数中所有需要的…...

rust特性

特性,也叫特质,英文是trait。 trait是一种特殊的类型,用于抽象某些方法。trait类似于其他编程语言中的接口,但又有所不同。 trait定义了一组方法,其他类型可以各自实现这个trait的方法,从而形成多态。 一、…...

TouchGFX之画布控件

TouchGFX的画布控件,在使用相对较小的存储空间的同时保持高性能,可提供平滑、抗锯齿效果良好的几何图形绘制。 TouchGFX 设计器中可用的画布控件: LineCircleShapeLine Progress圆形进度条 存储空间分配和使用​ 为了生成反锯齿效果良好的…...

STM32F103RCT6学习笔记2:串口通信

今日开始快速掌握这款STM32F103RCT6芯片的环境与编程开发,有关基础知识的部分不会多唠,直接实践与运用!文章贴出代码测试工程与测试效果图: 目录 串口通信实验计划: 串口通信配置代码: 测试效果图&#…...

Opencv-图像噪声(均值滤波、高斯滤波、中值滤波)

图像的噪声 图像的平滑 均值滤波 均值滤波代码实现 import cv2 as cv import numpy as np import matplotlib.pyplot as plt from pylab import mplmpl.rcParams[font.sans-serif] [SimHei]img cv.imread("dog.png")#均值滤波cv.blur(img, (5, 5))将对图像img进行…...

MasterAlign相机参数设置-增益调节

相机参数设置-曝光时间调节操作说明 相机参数的设置对于获取清晰、准确的图像至关重要。曝光时间是其中一个关键参数,它直接影响图像的亮度和清晰度。以下是关于曝光时间调节的详细操作步骤,以帮助您轻松进行设置。 步骤一:登录系统 首先&…...

9月22日,每日信息差

今天是2023年09月22日,以下是为您准备的14条信息差 第一、亚马逊将于2024年初在Prime Video中加入广告。Prime Video内容中的广告将于2024年初在美国、英国、德国和加拿大推出,随后晚些时候在法国、意大利、西班牙、墨西哥和澳大利亚推出 第二、中国移…...

Java版本企业工程项目管理系统源码+spring cloud 系统管理+java 系统设置+二次开发

工程项目各模块及其功能点清单 一、系统管理 1、数据字典:实现对数据字典标签的增删改查操作 2、编码管理:实现对系统编码的增删改查操作 3、用户管理:管理和查看用户角色 4、菜单管理:实现对系统菜单的增删改查操…...

Android studio中如何下载sdk

打开 file -> settings 这个页面, 在要下载的 SDK 前面勾上, 然后点 apply 在 platforms 中就可以看到下载好的 SDK: Android SDK目录结构详细介绍可以参考这篇文章: 51CTO博客- Android SDK目录结构...

STM32单片机中国象棋TFT触摸屏小游戏

实践制作DIY- GC0167-中国象棋 一、功能说明: 基于STM32单片机设计-中国象棋 二、功能介绍: 硬件组成:STM32F103RCT6最小系统2.8寸TFT电阻触摸屏24C02存储器1个按键(悔棋) 游戏规则: 1.有悔棋键&…...

【PHP图片托管】CFimagehost搭建私人图床 - 无需数据库支持

文章目录 1.前言2. CFImagehost网站搭建2.1 CFImagehost下载和安装2.2 CFImagehost网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道(云端设置)3.3.Cpolar稳定隧道(本地设置) 4.公网访问测…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

day52 ResNet18 CBAM

在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

嵌入式学习笔记DAY33(网络编程——TCP)

一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...