整合Spring Boot和Pulsar实现可扩展的消息处理
整合Spring Boot和Pulsar实现可扩展的消息处理
大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!
在现代分布式系统中,消息队列是实现异步通信和解耦的重要组件。Apache Pulsar作为一个分布式消息流平台,具备高吞吐、低延迟、多租户支持等优势,是很多高性能消息处理场景的理想选择。本文将介绍如何在Spring Boot项目中整合Pulsar,实现可扩展的消息处理功能。
什么是Apache Pulsar
Apache Pulsar是一个开源的分布式消息流平台,支持多租户、多主题和持久化。Pulsar的架构包括Brokers、Bookies(Apache BookKeeper的存储节点)和ZooKeeper协调服务,提供了高可用性和高性能的消息传递和存储服务。
在Spring Boot中集成Pulsar
为了在Spring Boot项目中使用Pulsar,我们需要以下几个步骤:
- 添加Maven依赖
- 配置Pulsar客户端
- 创建消息生产者
- 创建消息消费者
1. 添加Maven依赖
首先,我们需要在pom.xml
中添加Pulsar的依赖:
<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
</dependencies>
2. 配置Pulsar客户端
接下来,我们需要创建一个配置类来初始化Pulsar客户端。创建一个名为PulsarConfig
的配置类:
package cn.juwatech.config;import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {return PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();}
}
3. 创建消息生产者
我们需要一个消息生产者来发送消息到Pulsar。创建一个名为PulsarProducer
的生产者类:
package cn.juwatech.producer;import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class PulsarProducer {private final PulsarClient pulsarClient;private Producer<byte[]> producer;@Autowiredpublic PulsarProducer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;initProducer();}private void initProducer() {try {ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer();this.producer = producerBuilder.topic("my-topic").create();} catch (PulsarClientException e) {e.printStackTrace();}}public void sendMessage(String message) {try {producer.send(message.getBytes());} catch (PulsarClientException e) {e.printStackTrace();}}
}
4. 创建消息消费者
我们需要一个消息消费者来接收来自Pulsar的消息。创建一个名为PulsarConsumer
的消费者类:
package cn.juwatech.consumer;import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class PulsarConsumer {private final PulsarClient pulsarClient;private Consumer<byte[]> consumer;@Autowiredpublic PulsarConsumer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;}@PostConstructprivate void initConsumer() {try {this.consumer = pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription").subscribe();startConsumer();} catch (PulsarClientException e) {e.printStackTrace();}}private void startConsumer() {new Thread(() -> {while (true) {try {Message<byte[]> msg = consumer.receive();String message = new String(msg.getData());System.out.println("Received message: " + message);consumer.acknowledge(msg);} catch (PulsarClientException e) {e.printStackTrace();}}}).start();}
}
5. 测试Pulsar生产者和消费者
最后,我们编写一个简单的测试类来验证生产者和消费者的工作。创建一个名为PulsarTest
的测试类:
package cn.juwatech;import cn.juwatech.producer.PulsarProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PulsarApplication implements CommandLineRunner {@Autowiredprivate PulsarProducer pulsarProducer;public static void main(String[] args) {SpringApplication.run(PulsarApplication.class, args);}@Overridepublic void run(String... args) throws Exception {pulsarProducer.sendMessage("Hello, Pulsar!");}
}
运行上述代码后,您应该会在控制台上看到消费者接收到的消息。
总结
通过以上步骤,我们成功地在Spring Boot项目中整合了Pulsar,实现了可扩展的消息处理功能。Pulsar的高性能和可扩展性使其非常适合分布式系统中的消息传递和流处理。在实际项目中,可以根据需求进一步优化和扩展Pulsar的使用,例如配置不同的主题和分区、实现更复杂的消息处理逻辑等。
相关文章:
整合Spring Boot和Pulsar实现可扩展的消息处理
整合Spring Boot和Pulsar实现可扩展的消息处理 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在现代分布式系统中,消息队列是实现异步通信和解耦…...

如何给WPS、Word、PPT等办公三件套添加收费字体---方正仿宋GBK
1.先下载需要的字体。 下载字体的网站比较多,基本上都是免费的。随便在网上搜索一个就可以了,下面是下载的链接。 方正仿宋GBK字体免费下载和在线预览-字体天下 www.fonts.net.cn/font-31602268591.html 注意:切记不要商用,以免…...

《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】
文章目录 第1章 重构,第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构,第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…...

学会整理电脑,基于小白用户(无关硬件升级)
如果你不想进行硬件升级,就要学会进行整理维护电脑 基于小白用户,每一个操作点我都会在后续整理出流程,软件推荐会选择占用小且实用的软件 主要从三个角度去讨论【如果有新的内容我会随时修改,也希望有补充告诉我,我…...

使用ioDraw,AI绘图只需几秒钟!
只需几秒钟,就能将文字或图片转化为精准的思维导图、流程图、折线图、柱状图、饼图等各种图表! 思维导图 思维导图工具使用入口 文字转思维导图 将文本大纲或想法转换成可视化的思维导图,以组织和结构化您的想法。 图片转思维导图 从现有…...

Websocket解析及用法(封装一个通用订阅发布主题的webSocket类)
1、什么是WebSocket? websocket的目标是通过一个长连接实现与服务器全双工,双向的通信。是一种在单个TCP连接上进行全双工通信的协议,使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 js中创建websocket…...
Foxit Reader(福昕阅读器)详细安装和使用教程
第一部分:Foxit Reader简介和基本信息 1.1 什么是Foxit Reader? Foxit Reader(福昕阅读器)是一款功能强大的PDF阅读和编辑软件,以其快速、轻巧和丰富的功能而闻名。它不仅支持常规的PDF阅读功能,还提供了…...

c++静态成员变量和静态成员函数
1)C入门级小知识,分享给将要学习或者正在学习C开发的同学。 2)内容属于原创,若转载,请说明出处。 3)提供相关问题有偿答疑和支持。 我们可以使用 static 关键字来把类成员定义为静态的。当我们声明类的成…...

视频共享融合赋能平台LntonCVS统一视频接入平台数字化升级医疗体系
医疗健康事关国计民生,然而,当前我国医疗水平的地区发展不平衡、医疗资源分布不均和医疗信息系统老化等问题,制约了整体服务能力和水平的提升。视频融合云平台作为推动数字医疗的关键工具,在医疗领域的广泛应用和普及,…...

Gin框架基础
1、一个简单的Gin示例 下载并安装Gin: go get -u github.com/gin-gonic/gin1.1 一个简单的例子 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 创建一个默认的路由引擎r : gin.Default()// 当客户端以GET方式访问 /hello…...

用GPT-4纠错GPT-4 OpenAI推出CriticGPT模型
根据OpenAI周四(6月27日)发布的新闻稿,该公司新推出了一个基于GPT-4的模型——CriticGPT,用于捕获ChatGPT代码输出中的错误。CriticGPT的作用相当于让人们用GPT-4来查找GPT-4的错误。该模型可以对ChatGPT响应结果做出批评评论&…...
SQL CASE WHEN语句的使用技巧
SQL CASE WHEN语句的使用技巧 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在SQL查询中,经常需要根据不同的条件进行分支处理,这时就…...

虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力
来源:虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力 原文链接:虹科技术 | 跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力 欢迎关注虹科,为您提供最新资讯! #PCAN #网关 #CA…...

【UE 网络】RPC远程过程调用 入门篇
目录 0 引言1 RPC基本概念1.1 定义1.2 分类 2 RPC的使用2.1 Client RPC2.2 Server RPC2.3 Multicast RPC 🙋♂️ 作者:海码007📜 专栏:UE虚幻引擎专栏💥 标题:【UE 网络】RPC远程过程调用 入门篇❣️ 寄语…...

安装maven与nexus
安装maven与nexus Maven官网下载地址:http://maven.apache.org cd /data/software/wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.8-bin.tar.gz# 解压 tar xf apache-maven-3.8.1-bin.tar.gz -C /opt/[rooth…...

如何用DCA1000持续采集雷达数据
摘要:本文介绍一下如何通过mmwave studio软件,搭配DCA1000数据采集卡,对AWR1843BOOST进行不间断的数据采集。本文要求读者已经掌握了有关基础知识。 本文开放获取,无需关注。 到SensorConfig页面下,一步步操作…...
怎么用JavaScript写爬虫
随着互联网技术的不断发展,爬虫(web crawler)已经成为当前最热门的爬取信息方式之一。通过爬虫技术,我们可以轻松地获取互联网上的数据,并用于数据分析、挖掘、建模等多个领域。而javascript语言则因其强大的前端开发工…...
Leetcode 3203. Find Minimum Diameter After Merging Two Trees
Leetcode 3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路2. 代码实现 题目链接:3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路 这一题的话算是一个拓扑树的题目?总之就是从树的叶子节点不断向上遍历ÿ…...
【抽代复习笔记】24-群(十八):循环群的两道例题
例1:证明: (1)三次交错群A3是循环群,它与(Z3,)同构,其中Z3 {[0],[1],[2]}; (2)G {1,i,-1,-i},G上的代数运算是数的乘法,则G是一个循环群&…...

Linux常见操作问题
1、登录刚创建的用户,无法操作。 注:etc/passwd文件是Linux操作系统中存储用户账户信息的文本文件,包含了系统中所有用户的基本信息,比如用户名、用户ID、用户组ID、用户家目录路径。 注:etc: 这个目录存放所有的系统…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Cesium1.95中高性能加载1500个点
一、基本方式: 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...

优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...