整合Spring Boot和Pulsar实现可扩展的消息处理
整合Spring Boot和Pulsar实现可扩展的消息处理
大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!
在现代分布式系统中,消息队列是实现异步通信和解耦的重要组件。Apache Pulsar作为一个分布式消息流平台,具备高吞吐、低延迟、多租户支持等优势,是很多高性能消息处理场景的理想选择。本文将介绍如何在Spring Boot项目中整合Pulsar,实现可扩展的消息处理功能。
什么是Apache Pulsar
Apache Pulsar是一个开源的分布式消息流平台,支持多租户、多主题和持久化。Pulsar的架构包括Brokers、Bookies(Apache BookKeeper的存储节点)和ZooKeeper协调服务,提供了高可用性和高性能的消息传递和存储服务。
在Spring Boot中集成Pulsar
为了在Spring Boot项目中使用Pulsar,我们需要以下几个步骤:
- 添加Maven依赖
- 配置Pulsar客户端
- 创建消息生产者
- 创建消息消费者
1. 添加Maven依赖
首先,我们需要在pom.xml
中添加Pulsar的依赖:
<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
</dependencies>
2. 配置Pulsar客户端
接下来,我们需要创建一个配置类来初始化Pulsar客户端。创建一个名为PulsarConfig
的配置类:
package cn.juwatech.config;import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {return PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();}
}
3. 创建消息生产者
我们需要一个消息生产者来发送消息到Pulsar。创建一个名为PulsarProducer
的生产者类:
package cn.juwatech.producer;import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class PulsarProducer {private final PulsarClient pulsarClient;private Producer<byte[]> producer;@Autowiredpublic PulsarProducer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;initProducer();}private void initProducer() {try {ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer();this.producer = producerBuilder.topic("my-topic").create();} catch (PulsarClientException e) {e.printStackTrace();}}public void sendMessage(String message) {try {producer.send(message.getBytes());} catch (PulsarClientException e) {e.printStackTrace();}}
}
4. 创建消息消费者
我们需要一个消息消费者来接收来自Pulsar的消息。创建一个名为PulsarConsumer
的消费者类:
package cn.juwatech.consumer;import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class PulsarConsumer {private final PulsarClient pulsarClient;private Consumer<byte[]> consumer;@Autowiredpublic PulsarConsumer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;}@PostConstructprivate void initConsumer() {try {this.consumer = pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription").subscribe();startConsumer();} catch (PulsarClientException e) {e.printStackTrace();}}private void startConsumer() {new Thread(() -> {while (true) {try {Message<byte[]> msg = consumer.receive();String message = new String(msg.getData());System.out.println("Received message: " + message);consumer.acknowledge(msg);} catch (PulsarClientException e) {e.printStackTrace();}}}).start();}
}
5. 测试Pulsar生产者和消费者
最后,我们编写一个简单的测试类来验证生产者和消费者的工作。创建一个名为PulsarTest
的测试类:
package cn.juwatech;import cn.juwatech.producer.PulsarProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PulsarApplication implements CommandLineRunner {@Autowiredprivate PulsarProducer pulsarProducer;public static void main(String[] args) {SpringApplication.run(PulsarApplication.class, args);}@Overridepublic void run(String... args) throws Exception {pulsarProducer.sendMessage("Hello, Pulsar!");}
}
运行上述代码后,您应该会在控制台上看到消费者接收到的消息。
总结
通过以上步骤,我们成功地在Spring Boot项目中整合了Pulsar,实现了可扩展的消息处理功能。Pulsar的高性能和可扩展性使其非常适合分布式系统中的消息传递和流处理。在实际项目中,可以根据需求进一步优化和扩展Pulsar的使用,例如配置不同的主题和分区、实现更复杂的消息处理逻辑等。
相关文章:
整合Spring Boot和Pulsar实现可扩展的消息处理
整合Spring Boot和Pulsar实现可扩展的消息处理 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在现代分布式系统中,消息队列是实现异步通信和解耦…...

如何给WPS、Word、PPT等办公三件套添加收费字体---方正仿宋GBK
1.先下载需要的字体。 下载字体的网站比较多,基本上都是免费的。随便在网上搜索一个就可以了,下面是下载的链接。 方正仿宋GBK字体免费下载和在线预览-字体天下 www.fonts.net.cn/font-31602268591.html 注意:切记不要商用,以免…...

《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】
文章目录 第1章 重构,第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构,第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…...

学会整理电脑,基于小白用户(无关硬件升级)
如果你不想进行硬件升级,就要学会进行整理维护电脑 基于小白用户,每一个操作点我都会在后续整理出流程,软件推荐会选择占用小且实用的软件 主要从三个角度去讨论【如果有新的内容我会随时修改,也希望有补充告诉我,我…...

使用ioDraw,AI绘图只需几秒钟!
只需几秒钟,就能将文字或图片转化为精准的思维导图、流程图、折线图、柱状图、饼图等各种图表! 思维导图 思维导图工具使用入口 文字转思维导图 将文本大纲或想法转换成可视化的思维导图,以组织和结构化您的想法。 图片转思维导图 从现有…...

Websocket解析及用法(封装一个通用订阅发布主题的webSocket类)
1、什么是WebSocket? websocket的目标是通过一个长连接实现与服务器全双工,双向的通信。是一种在单个TCP连接上进行全双工通信的协议,使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 js中创建websocket…...
Foxit Reader(福昕阅读器)详细安装和使用教程
第一部分:Foxit Reader简介和基本信息 1.1 什么是Foxit Reader? Foxit Reader(福昕阅读器)是一款功能强大的PDF阅读和编辑软件,以其快速、轻巧和丰富的功能而闻名。它不仅支持常规的PDF阅读功能,还提供了…...

c++静态成员变量和静态成员函数
1)C入门级小知识,分享给将要学习或者正在学习C开发的同学。 2)内容属于原创,若转载,请说明出处。 3)提供相关问题有偿答疑和支持。 我们可以使用 static 关键字来把类成员定义为静态的。当我们声明类的成…...

视频共享融合赋能平台LntonCVS统一视频接入平台数字化升级医疗体系
医疗健康事关国计民生,然而,当前我国医疗水平的地区发展不平衡、医疗资源分布不均和医疗信息系统老化等问题,制约了整体服务能力和水平的提升。视频融合云平台作为推动数字医疗的关键工具,在医疗领域的广泛应用和普及,…...

Gin框架基础
1、一个简单的Gin示例 下载并安装Gin: go get -u github.com/gin-gonic/gin1.1 一个简单的例子 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 创建一个默认的路由引擎r : gin.Default()// 当客户端以GET方式访问 /hello…...

用GPT-4纠错GPT-4 OpenAI推出CriticGPT模型
根据OpenAI周四(6月27日)发布的新闻稿,该公司新推出了一个基于GPT-4的模型——CriticGPT,用于捕获ChatGPT代码输出中的错误。CriticGPT的作用相当于让人们用GPT-4来查找GPT-4的错误。该模型可以对ChatGPT响应结果做出批评评论&…...
SQL CASE WHEN语句的使用技巧
SQL CASE WHEN语句的使用技巧 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在SQL查询中,经常需要根据不同的条件进行分支处理,这时就…...

虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力
来源:虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力 原文链接:虹科技术 | 跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力 欢迎关注虹科,为您提供最新资讯! #PCAN #网关 #CA…...

【UE 网络】RPC远程过程调用 入门篇
目录 0 引言1 RPC基本概念1.1 定义1.2 分类 2 RPC的使用2.1 Client RPC2.2 Server RPC2.3 Multicast RPC 🙋♂️ 作者:海码007📜 专栏:UE虚幻引擎专栏💥 标题:【UE 网络】RPC远程过程调用 入门篇❣️ 寄语…...

安装maven与nexus
安装maven与nexus Maven官网下载地址:http://maven.apache.org cd /data/software/wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.8-bin.tar.gz# 解压 tar xf apache-maven-3.8.1-bin.tar.gz -C /opt/[rooth…...

如何用DCA1000持续采集雷达数据
摘要:本文介绍一下如何通过mmwave studio软件,搭配DCA1000数据采集卡,对AWR1843BOOST进行不间断的数据采集。本文要求读者已经掌握了有关基础知识。 本文开放获取,无需关注。 到SensorConfig页面下,一步步操作…...
怎么用JavaScript写爬虫
随着互联网技术的不断发展,爬虫(web crawler)已经成为当前最热门的爬取信息方式之一。通过爬虫技术,我们可以轻松地获取互联网上的数据,并用于数据分析、挖掘、建模等多个领域。而javascript语言则因其强大的前端开发工…...
Leetcode 3203. Find Minimum Diameter After Merging Two Trees
Leetcode 3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路2. 代码实现 题目链接:3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路 这一题的话算是一个拓扑树的题目?总之就是从树的叶子节点不断向上遍历ÿ…...
【抽代复习笔记】24-群(十八):循环群的两道例题
例1:证明: (1)三次交错群A3是循环群,它与(Z3,)同构,其中Z3 {[0],[1],[2]}; (2)G {1,i,-1,-i},G上的代数运算是数的乘法,则G是一个循环群&…...

Linux常见操作问题
1、登录刚创建的用户,无法操作。 注:etc/passwd文件是Linux操作系统中存储用户账户信息的文本文件,包含了系统中所有用户的基本信息,比如用户名、用户ID、用户组ID、用户家目录路径。 注:etc: 这个目录存放所有的系统…...

C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建
【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
CppCon 2015 学习:Time Programming Fundamentals
Civil Time 公历时间 特点: 共 6 个字段: Year(年)Month(月)Day(日)Hour(小时)Minute(分钟)Second(秒) 表示…...
手动给中文分词和 直接用神经网络RNN做有什么区别
手动分词和基于神经网络(如 RNN)的自动分词在原理、实现方式和效果上有显著差异,以下是核心对比: 1. 实现原理对比 对比维度手动分词(规则 / 词典驱动)神经网络 RNN 分词(数据驱动)…...