当前位置: 首页 > news >正文

整合Spring Boot和Pulsar实现可扩展的消息处理

整合Spring Boot和Pulsar实现可扩展的消息处理

大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!

在现代分布式系统中,消息队列是实现异步通信和解耦的重要组件。Apache Pulsar作为一个分布式消息流平台,具备高吞吐、低延迟、多租户支持等优势,是很多高性能消息处理场景的理想选择。本文将介绍如何在Spring Boot项目中整合Pulsar,实现可扩展的消息处理功能。

什么是Apache Pulsar

Apache Pulsar是一个开源的分布式消息流平台,支持多租户、多主题和持久化。Pulsar的架构包括Brokers、Bookies(Apache BookKeeper的存储节点)和ZooKeeper协调服务,提供了高可用性和高性能的消息传递和存储服务。

在Spring Boot中集成Pulsar

为了在Spring Boot项目中使用Pulsar,我们需要以下几个步骤:

  1. 添加Maven依赖
  2. 配置Pulsar客户端
  3. 创建消息生产者
  4. 创建消息消费者

1. 添加Maven依赖

首先,我们需要在pom.xml中添加Pulsar的依赖:

<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
</dependencies>

2. 配置Pulsar客户端

接下来,我们需要创建一个配置类来初始化Pulsar客户端。创建一个名为PulsarConfig的配置类:

package cn.juwatech.config;import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {return PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();}
}

3. 创建消息生产者

我们需要一个消息生产者来发送消息到Pulsar。创建一个名为PulsarProducer的生产者类:

package cn.juwatech.producer;import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class PulsarProducer {private final PulsarClient pulsarClient;private Producer<byte[]> producer;@Autowiredpublic PulsarProducer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;initProducer();}private void initProducer() {try {ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer();this.producer = producerBuilder.topic("my-topic").create();} catch (PulsarClientException e) {e.printStackTrace();}}public void sendMessage(String message) {try {producer.send(message.getBytes());} catch (PulsarClientException e) {e.printStackTrace();}}
}

4. 创建消息消费者

我们需要一个消息消费者来接收来自Pulsar的消息。创建一个名为PulsarConsumer的消费者类:

package cn.juwatech.consumer;import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class PulsarConsumer {private final PulsarClient pulsarClient;private Consumer<byte[]> consumer;@Autowiredpublic PulsarConsumer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;}@PostConstructprivate void initConsumer() {try {this.consumer = pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription").subscribe();startConsumer();} catch (PulsarClientException e) {e.printStackTrace();}}private void startConsumer() {new Thread(() -> {while (true) {try {Message<byte[]> msg = consumer.receive();String message = new String(msg.getData());System.out.println("Received message: " + message);consumer.acknowledge(msg);} catch (PulsarClientException e) {e.printStackTrace();}}}).start();}
}

5. 测试Pulsar生产者和消费者

最后,我们编写一个简单的测试类来验证生产者和消费者的工作。创建一个名为PulsarTest的测试类:

package cn.juwatech;import cn.juwatech.producer.PulsarProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PulsarApplication implements CommandLineRunner {@Autowiredprivate PulsarProducer pulsarProducer;public static void main(String[] args) {SpringApplication.run(PulsarApplication.class, args);}@Overridepublic void run(String... args) throws Exception {pulsarProducer.sendMessage("Hello, Pulsar!");}
}

运行上述代码后,您应该会在控制台上看到消费者接收到的消息。

总结

通过以上步骤,我们成功地在Spring Boot项目中整合了Pulsar,实现了可扩展的消息处理功能。Pulsar的高性能和可扩展性使其非常适合分布式系统中的消息传递和流处理。在实际项目中,可以根据需求进一步优化和扩展Pulsar的使用,例如配置不同的主题和分区、实现更复杂的消息处理逻辑等。

相关文章:

整合Spring Boot和Pulsar实现可扩展的消息处理

整合Spring Boot和Pulsar实现可扩展的消息处理 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在现代分布式系统中&#xff0c;消息队列是实现异步通信和解耦…...

如何给WPS、Word、PPT等办公三件套添加收费字体---方正仿宋GBK

1.先下载需要的字体。 下载字体的网站比较多&#xff0c;基本上都是免费的。随便在网上搜索一个就可以了&#xff0c;下面是下载的链接。 方正仿宋GBK字体免费下载和在线预览-字体天下 ​www.fonts.net.cn/font-31602268591.html 注意&#xff1a;切记不要商用&#xff0c;以免…...

《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】

文章目录 第1章 重构&#xff0c;第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构&#xff0c;第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…...

学会整理电脑,基于小白用户(无关硬件升级)

如果你不想进行硬件升级&#xff0c;就要学会进行整理维护电脑 基于小白用户&#xff0c;每一个操作点我都会在后续整理出流程&#xff0c;软件推荐会选择占用小且实用的软件 主要从三个角度去讨论【如果有新的内容我会随时修改&#xff0c;也希望有补充告诉我&#xff0c;我…...

使用ioDraw,AI绘图只需几秒钟!

只需几秒钟&#xff0c;就能将文字或图片转化为精准的思维导图、流程图、折线图、柱状图、饼图等各种图表&#xff01; 思维导图 思维导图工具使用入口 文字转思维导图 将文本大纲或想法转换成可视化的思维导图&#xff0c;以组织和结构化您的想法。 图片转思维导图 从现有…...

Websocket解析及用法(封装一个通用订阅发布主题的webSocket类)

1、什么是WebSocket? websocket的目标是通过一个长连接实现与服务器全双工&#xff0c;双向的通信。是一种在单个TCP连接上进行全双工通信的协议&#xff0c;使得客户端和服务器之间的数据交换变得更加简单&#xff0c;允许服务端主动向客户端推送数据。在 js中创建websocket…...

Foxit Reader(福昕阅读器)详细安装和使用教程

第一部分&#xff1a;Foxit Reader简介和基本信息 1.1 什么是Foxit Reader&#xff1f; Foxit Reader&#xff08;福昕阅读器&#xff09;是一款功能强大的PDF阅读和编辑软件&#xff0c;以其快速、轻巧和丰富的功能而闻名。它不仅支持常规的PDF阅读功能&#xff0c;还提供了…...

c++静态成员变量和静态成员函数

1&#xff09;C入门级小知识&#xff0c;分享给将要学习或者正在学习C开发的同学。 2&#xff09;内容属于原创&#xff0c;若转载&#xff0c;请说明出处。 3&#xff09;提供相关问题有偿答疑和支持。 我们可以使用 static 关键字来把类成员定义为静态的。当我们声明类的成…...

视频共享融合赋能平台LntonCVS统一视频接入平台数字化升级医疗体系

医疗健康事关国计民生&#xff0c;然而&#xff0c;当前我国医疗水平的地区发展不平衡、医疗资源分布不均和医疗信息系统老化等问题&#xff0c;制约了整体服务能力和水平的提升。视频融合云平台作为推动数字医疗的关键工具&#xff0c;在医疗领域的广泛应用和普及&#xff0c;…...

Gin框架基础

1、一个简单的Gin示例 下载并安装Gin: go get -u github.com/gin-gonic/gin1.1 一个简单的例子 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 创建一个默认的路由引擎r : gin.Default()// 当客户端以GET方式访问 /hello…...

用GPT-4纠错GPT-4 OpenAI推出CriticGPT模型

根据OpenAI周四&#xff08;6月27日&#xff09;发布的新闻稿&#xff0c;该公司新推出了一个基于GPT-4的模型——CriticGPT&#xff0c;用于捕获ChatGPT代码输出中的错误。CriticGPT的作用相当于让人们用GPT-4来查找GPT-4的错误。该模型可以对ChatGPT响应结果做出批评评论&…...

SQL CASE WHEN语句的使用技巧

SQL CASE WHEN语句的使用技巧 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在SQL查询中&#xff0c;经常需要根据不同的条件进行分支处理&#xff0c;这时就…...

虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力

来源&#xff1a;虹科技术丨跨越距离障碍&#xff1a;PCAN系列网关在远程CAN网络通信的应用潜力 原文链接&#xff1a;虹科技术 | 跨越距离障碍&#xff1a;PCAN系列网关在远程CAN网络通信的应用潜力 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; #PCAN #网关 #CA…...

【UE 网络】RPC远程过程调用 入门篇

目录 0 引言1 RPC基本概念1.1 定义1.2 分类 2 RPC的使用2.1 Client RPC2.2 Server RPC2.3 Multicast RPC &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;UE虚幻引擎专栏&#x1f4a5; 标题&#xff1a;【UE 网络】RPC远程过程调用 入门篇❣️ 寄语…...

安装maven与nexus

安装maven与nexus Maven官网下载地址&#xff1a;http://maven.apache.org cd /data/software/wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.8-bin.tar.gz# 解压 tar xf apache-maven-3.8.1-bin.tar.gz -C /opt/[rooth…...

如何用DCA1000持续采集雷达数据

摘要&#xff1a;本文介绍一下如何通过mmwave studio软件&#xff0c;搭配DCA1000数据采集卡&#xff0c;对AWR1843BOOST进行不间断的数据采集。本文要求读者已经掌握了有关基础知识。 本文开放获取&#xff0c;无需关注。 到SensorConfig页面下&#xff0c;一步步操作&#xf…...

怎么用JavaScript写爬虫

随着互联网技术的不断发展&#xff0c;爬虫&#xff08;web crawler&#xff09;已经成为当前最热门的爬取信息方式之一。通过爬虫技术&#xff0c;我们可以轻松地获取互联网上的数据&#xff0c;并用于数据分析、挖掘、建模等多个领域。而javascript语言则因其强大的前端开发工…...

Leetcode 3203. Find Minimum Diameter After Merging Two Trees

Leetcode 3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路2. 代码实现 题目链接&#xff1a;3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路 这一题的话算是一个拓扑树的题目&#xff1f;总之就是从树的叶子节点不断向上遍历&#xff…...

【抽代复习笔记】24-群(十八):循环群的两道例题

例1&#xff1a;证明&#xff1a; &#xff08;1&#xff09;三次交错群A3是循环群&#xff0c;它与(Z3,)同构&#xff0c;其中Z3 {[0],[1],[2]}&#xff1b; &#xff08;2&#xff09;G {1,i,-1,-i}&#xff0c;G上的代数运算是数的乘法&#xff0c;则G是一个循环群&…...

Linux常见操作问题

1、登录刚创建的用户&#xff0c;无法操作。 注&#xff1a;etc/passwd文件是Linux操作系统中存储用户账户信息的文本文件&#xff0c;包含了系统中所有用户的基本信息&#xff0c;比如用户名、用户ID、用户组ID、用户家目录路径。 注&#xff1a;etc: 这个目录存放所有的系统…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具

作者&#xff1a;来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗&#xff1f;了解下一期 Elasticsearch Engineer 培训的时间吧&#xff01; Elasticsearch 拥有众多新功能&#xff0c;助你为自己…...

GitHub 趋势日报 (2025年06月08日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

省略号和可变参数模板

本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

Qemu arm操作系统开发环境

使用qemu虚拟arm硬件比较合适。 步骤如下&#xff1a; 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载&#xff0c;下载地址&#xff1a;https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...

aardio 自动识别验证码输入

技术尝试 上周在发学习日志时有网友提议“在网页上识别验证码”&#xff0c;于是尝试整合图像识别与网页自动化技术&#xff0c;完成了这套模拟登录流程。核心思路是&#xff1a;截图验证码→OCR识别→自动填充表单→提交并验证结果。 代码在这里 import soImage; import we…...

游戏开发中常见的战斗数值英文缩写对照表

游戏开发中常见的战斗数值英文缩写对照表 基础属性&#xff08;Basic Attributes&#xff09; 缩写英文全称中文释义常见使用场景HPHit Points / Health Points生命值角色生存状态MPMana Points / Magic Points魔法值技能释放资源SPStamina Points体力值动作消耗资源APAction…...

NineData数据库DevOps功能全面支持百度智能云向量数据库 VectorDB,助力企业 AI 应用高效落地

NineData 的数据库 DevOps 解决方案已完成对百度智能云向量数据库 VectorDB 的全链路适配&#xff0c;成为国内首批提供 VectorDB 原生操作能力的服务商。此次合作聚焦 AI 开发核心场景&#xff0c;通过标准化 SQL 工作台与细粒度权限管控两大能力&#xff0c;助力企业安全高效…...