当前位置: 首页 > news >正文

尚硅谷rabbitmq 2024 流式队列2024指定偏移量 第55节答疑

rabbitmq的stream:
4、对比
autoTrackingstrategy方式:始终监听Stream中的新消息(狗狗看家,忠于职守)指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘,回来就完)
这两种分别怎么写?java


在 RabbitMQ 中,Stream 是一种新的消息传递机制,允许高吞吐量和低延迟的消息传递。你提到的两种消费方式分别是自动跟踪策略(Auto Tracking Strategy)和指定偏移量(Offset)的消费方式。下面我们将分别演示这两种方式的 Java 实现。

### 1. 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会一直监听 Stream 中的新消息,并不断处理这些消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 2. 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message


============
为什么没写.autoTrackingStrategy()?

你提到的 `.autoTrackingStrategy()` 是 RabbitMQ Stream API 中的一个重要方法,它用于指定消费者的自动跟踪策略。让我们详细解释一下如何在代码中使用这个方法。

### 自动跟踪策略(Auto Tracking Strategy)

自动跟踪策略允许消费者始终监听 Stream 中的新消息,并自动跟踪消息的偏移量。这种方式下,消费者会持续消费新消息,而不会停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {


============
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() /
-----这里指定了 .autoTrackingStrategy() /就不用指定.offset(OffsetSpecification.first())了吧?


确实,如果你使用 `.autoTrackingStrategy()`,那么你不需要再指定 `.offset(OffsetSpecification.first())`,因为自动跟踪策略会自动从最新的消息开始消费并持续监听新的消息。让我们分别看看两种方式的完整代码示例。

### 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会自动从最新的消息开始消费,并持续监听新的消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
           


============
那里之前为什么要写:            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() /

你说得对,如果你使用 `.autoTrackingStrategy()`,就不需要再指定 `.offset(OffsetSpecification.first())`。这是一个错误的示例。使用 `.autoTrackingStrategy()` 时,消费者会自动从最新的消息开始消费,并持续监听新的消息。

让我们修正并简化这两种方式的示例代码:

### 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会自动从最新的消息开始消费,并持续监听新的消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAs

相关文章:

尚硅谷rabbitmq 2024 流式队列2024指定偏移量 第55节答疑

rabbitmq的stream: 4、对比 autoTrackingstrategy方式:始终监听Stream中的新消息(狗狗看家,忠于职守)指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘,回来就完) 这两种分别怎么写?java 在 RabbitMQ 中&#xff0c…...

NSSCTF-WEB-pklovecloud

目录 前言 正文 思路 尝试 结尾 前言 许久未见,甚是想念. 今天来解一道有意思的序列化题 正文 思路 <?php include flag.php; class pkshow {function echo_name(){return "Pk very safe^.^";} }class acp {protected $cinder;public $neutron;public $…...

深入Postman- 自动化篇

前言 在前两篇博文《Postman使用 - 基础篇》《玩转Postman:进阶篇》中,我们介绍了 Postman 作为一款专业接口测试工具在接口测试中的主要用法以及它强大的变量、脚本功能,给测试工作人员完成接口的手工测试带来了极大的便利。其实在自动化测试上,Postman 也能进行良好的支…...

react-JSX

JSX理念 jsx在编译的时候会被babel编译为react.createELement方法 在使用jsx的文件中&#xff0c;需要引入react。import React from "react" jsx会被编译为React.createElement,所有jsx的运行结果都是react element React Component 在react中&#xff0c;常使用…...

深度对比:IPguard与Ping32在企业网络管理中的应用

随着网络安全形势日益严峻&#xff0c;企业在选择网络管理工具时需慎之又慎。IPguard与Ping32是目前市场上两款颇具代表性的产品&#xff0c;它们在功能、性能以及应用场景上各有优势。本文将对这两款产品进行深度对比&#xff0c;以帮助企业找到最合适的解决方案。 IPguard以其…...

AI测试之 TestGPT

如今最火热的技术莫非OpenAI的ChatGPT莫属&#xff0c;AI技术也在很多方面得到广泛应用。今天我们要介绍的TestGPT就是一个软件测试领域中当红的应用。 TestGPT是什么&#xff1f; TestGPT是一家总部位于以色列特拉维夫的初创公司 CodiumAI Ltd.&#xff0c;发布的一款用于测…...

JavaEE-进程与线程

1.进程 1.1什么是进程 每个应⽤程序运⾏于现代操作系统之上时&#xff0c;操作系统会提供⼀种抽象&#xff0c;好像系统上只有这个程序在运 ⾏&#xff0c;所有的硬件资源都被这个程序在使⽤。这种假象是通过抽象了⼀个进程的概念来完成的&#xff0c;进程可 以说是计算机科学…...

JAVA软开-面试经典问题(6)-equals与hashcode方法

1.equals方法 1.Object类中的equals方法比较的是两个对象的地址&#xff08;底层原理是 比较的&#xff0c;即比较的是对象的地址&#xff09; return (this obj);2.基本数据类型的包装类和String类都重写了equals方法。 基本数据类型&#xff1a;比较的是数值的是否相等 …...

计算机网络(以Linux讲解)

计算机网络 网络协议初识协议分层OSI七层模型TCP/IP五层模型--初识 网络中的地址管理IP地址MAC地址 网络传输基本流程网络编程套接字预备知识网络字节序socket编程UDP socketTCP socket地址转换函数Jsoncpp 进程间关系与守护进程进程组会话控制终端作业控制守护进程 网络命令TC…...

计算机网络基本架构知识点

1. 网络体系结构模型&#xff1a; - OSI 七层模型&#xff1a; - 物理层&#xff1a;是网络通信的基础层&#xff0c;负责在物理介质上传输比特流。该层定义了物理连接的标准&#xff0c;如电缆的类型、接口的形状、插头的规格等&#xff0c;以及信号的传输方式&#xff0c;包括…...

GES DISC 的 ATMOS L2 潜在温度网格上的痕量气体,固定场格式 V3 (ATMOSL2TF)

ATMOS L2 Trace Gases on Potential Temperature Grid, Fixed Field Format V3 (ATMOSL2TF) at GES DISC 简介 GES DISC 的 ATMOS L2 潜在温度网格上的痕量气体&#xff0c;固定场格式 V3 (ATMOSL2TF) 这是版本3的气溶胶痕量分子光谱&#xff08;ATMOS&#xff09;第二级产品…...

MLCC贴片电容不同材质区别:【及电容工作原理】

贴片电容的材质常规有&#xff1a;NPO&#xff08;COG&#xff09;&#xff0c;X7R&#xff0c;X5R&#xff0c;Y5V 等&#xff0c;主要区别是它们的填充介质不同。在相同的体积下由于填充介质不同所组成的电容器的容量就不同&#xff0c;随之带来的电容器的介质损耗、容量稳定…...

Word粘贴时出现“文件未找到:MathPage.WLL”的解决方案

解决方案 一、首先确定自己电脑的位数&#xff08;这里默认大家的电脑都是64位&#xff09;二、右击MathType桌面图标&#xff0c;点击“打开文件所在位置”&#xff0c;然后分别找到MathPage.WLL三、把这个文件复制到该目录下&#xff1a;C:\Program Files\Microsoft Office\r…...

前端开发笔记--html 黑马程序员1

文章目录 前端开发工具--VsCode前端开发基础语法VsCode优秀插件Chinese --中文插件Auto Rename Tag --自动重命名插件open in browserOpen in Default BrowserOpen in Other Browser Live Server -- 实时预览 前端开发工具–VsCode 轻量级与快速启动 快速加载&#xff1a;VSCo…...

ARM/Linux嵌入式面经(四四):华星光电

文章目录 1、自我介绍2、介绍一下你最得意的一个项目3、这个项目里面都用到了什么模块,什么型号,有什么作用4、移植操作系统的过程中,流程是什么,需要注意什么移植操作系统的流程需要注意的事项面试官可能的追问及回答5、你用的传感器挺多的,怎么保证传感器传输的稳定性,…...

帮助,有奖提问

<?php $u $_GET[“user”]; //变量获取 $v $_GET[“variable”]; //$v看flag&#xff0c;绕过正则 $flag‘flag{}; if(isset($u)&&(file_get_contents($u,‘r’)“im admin”)){//猜测data://协议 //检查u指向 echo “hello admin!<br>”; if(preg_…...

Java编辑工具IDEA

哪个编程工具让你的工作效率翻倍&#xff1f; 在日益繁忙的工作环境中&#xff0c;选择合适的编程工具已成为提升开发者工作效率的关键。不同的工具能够帮助我们简化代码编写、自动化任务、提升调试速度&#xff0c;甚至让团队协作更加顺畅。那么&#xff0c;哪款编程工具让你…...

闲谈Promise

预备知识 回调函数&#xff1a;当一个函数作为参数传入另一个函数中&#xff0c;并且它不会立刻执行&#xff0c;当满足一定条件之后&#xff0c;才会执行&#xff0c;这种函数称为回调函数。比如&#xff1a;定时器。异步任务&#xff1a;与之对应的概念是同步任务&#xff0…...

【C++堆(优先队列)】1882. 使用服务器处理任务|1979

本文涉及知识点 C堆(优先队列) LeetCode1882. 使用服务器处理任务 给你两个 下标从 0 开始 的整数数组 servers 和 tasks &#xff0c;长度分别为 n​​​​​​ 和 m​​​​​​ 。servers[i] 是第 i​​​​​​​​​​ 台服务器的 权重 &#xff0c;而 tasks[j] 是处理…...

VBA高级应用30例应用3Excel中的ListObject对象:选择表的一部分

《VBA高级应用30例》&#xff08;版权10178985&#xff09;&#xff0c;是我推出的第十套教程&#xff0c;教程是专门针对高级学员在学习VBA过程中提高路途上的案例展开&#xff0c;这套教程案例与理论结合&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态&#xff08;编译时多态&#xff09; 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1&#xff09;.协变 2&#xff09;.析构函数的重写 5.override 和 final关键字 1&#…...

Tauri2学习笔记

教程地址&#xff1a;https://www.bilibili.com/video/BV1Ca411N7mF?spm_id_from333.788.player.switch&vd_source707ec8983cc32e6e065d5496a7f79ee6 官方指引&#xff1a;https://tauri.app/zh-cn/start/ 目前Tauri2的教程视频不多&#xff0c;我按照Tauri1的教程来学习&…...

JavaScript 标签加载

目录 JavaScript 标签加载script 标签的 async 和 defer 属性&#xff0c;分别代表什么&#xff0c;有什么区别1. 普通 script 标签2. async 属性3. defer 属性4. type"module"5. 各种加载方式的对比6. 使用建议 JavaScript 标签加载 script 标签的 async 和 defer …...

使用python进行图像处理—图像滤波(5)

图像滤波是图像处理中最基本和最重要的操作之一。它的目的是在空间域上修改图像的像素值&#xff0c;以达到平滑&#xff08;去噪&#xff09;、锐化、边缘检测等效果。滤波通常通过卷积操作实现。 5.1卷积(Convolution)原理 卷积是滤波的核心。它是一种数学运算&#xff0c;…...

Android Framework预装traceroute执行文件到system/bin下

文章目录 Android SDK中寻找traceroute代码内置traceroute到SDK中traceroute参数说明-I 参数&#xff08;使用 ICMP Echo 请求&#xff09;-T 参数&#xff08;使用 TCP SYN 包&#xff09; 相关文章 Android SDK中寻找traceroute代码 设备使用的是Android 11&#xff0c;在/s…...

iOS 项目怎么构建稳定性保障机制?一次系统性防错经验分享(含 KeyMob 工具应用)

崩溃、内存飙升、后台任务未释放、页面卡顿、日志丢失——稳定性问题&#xff0c;不一定会立刻崩&#xff0c;但一旦积累&#xff0c;就是“上线后救不回来的代价”。 稳定性保障不是某个工具的功能&#xff0c;而是一套贯穿开发、测试、上线全流程的“观测分析防范”机制。 …...