当前位置: 首页 > news >正文

消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅(Pub/Sub)消息系统,最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点,提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力,适用于大规模分布式系统的实时数据处理和异步通信。
Pulsar的架构设计结合了消息队列和流处理的特点,既可以作为传统消息队列使用,也可以作为流处理平台支持实时数据处理。

主要特点:

  • 分布式架构:Pulsar采用分层架构,将消息存储与代理服务分离,提供了更好的水平扩展能力和故障隔离。
  • 多租户支持:Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。
  • 持久化和一致性:Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。
  • 灵活的消息模型:Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。
  • 多语言支持:Pulsar提供了多种编程语言的客户端库,如Java、Python、Go、C++等。
  • 丰富的生态:Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成,如Kafka Connect、Flink、Spark等。

1、核心概念

(1)、命名空间(Namespace)

命名空间是Pulsar中的一个逻辑单元,用于组织和管理主题(Topic)。每个命名空间可以包含多个主题,并且可以为不同的命名空间设置不同的配置,例如保留策略、订阅类型等。命名空间通常用于实现多租户隔离。

(2)、主题(Topic)

主题是Pulsar中的消息通道,生产者(Producer)将消息发送到主题,消费者(Consumer)从主题中消费消息。

Pulsar支持两种类型的主题:

  • 持久化主题(Persistent Topic):消息会被持久化存储,确保即使在broker故障的情况下也不会丢失。
  • 非持久化主题(Non-Persistent Topic):消息不会被持久化存储,适用于对延迟敏感但对可靠性要求较低的场景。

(3)、订阅(Subscription)

订阅是消费者与主题之间的绑定关系。Pulsar支持多种订阅类型,每种订阅类型决定了消息的分发方式:

  • 独占订阅(Exclusive Subscription):只有一个消费者可以订阅该主题,其他消费者无法订阅。
  • 共享订阅(Shared Subscription):多个消费者可以订阅同一个主题,消息会被轮询分发给不同的消费者。
  • 故障转移订阅(Failover Subscription):多个消费者可以订阅同一个主题,但只有一个是活跃的消费者,其他消费者作为备用。当活跃消费者失败时,备用消费者会接管消息消费。
  • Key_Shared 订阅:基于消息的key进行分区,确保相同key的消息总是被分发给同一个消费者。

(4)、消息(Message)

消息是Pulsar中的基本数据单位,由生产者发送到主题。

每个消息可以包含以下属性:

  • 消息体(Payload):消息的实际内容,可以是任意二进制数据。
  • 消息ID(Message ID):唯一标识每条消息的ID,用于确认消息的消费状态。
  • 属性(Properties):用户可以为消息添加自定义的键值对属性,方便后续处理。
  • 时间戳(Timestamp):消息的创建时间或发送时间。

(5)、分区(Partition)

Pulsar支持主题分区,即将一个主题划分为多个分区,每个分区可以独立地处理消息。分区可以提高主题的吞吐量和并发性,特别是在高负载场景下。Pulsar会自动将消息均匀分布到不同的分区中。

(6)、Broker

Broker是Pulsar的核心组件之一,负责接收生产者的消息并将其分发给消费者。注意,Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker负责管理主题、订阅和消费者的连接,并处理消息的路由和分发。

(7)、BookKeeper

BookKeeper是Pulsar的持久化存储层,负责将消息持久化到磁盘。BookKeeper采用分布式日志存储机制,提供了高可用性和强一致性保障。每个消息会被写入多个BookKeeper节点,确保即使部分节点故障也不会丢失数据。

(8)、ZooKeeper

ZooKeeper是Pulsar的元数据管理组件,用于存储集群的配置信息、主题和命名空间的元数据、以及Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保Pulsar集群的一致性和可靠性。

2、架构设计

Pulsar的架构设计采用了分层结构,将消息存储与代理服务分离,使得系统更加模块化和可扩展。

结构示例图:
在这里插入图片描述

Pulsar的主要组件及其作用:

  • Broker:负责接收生产者的消息并将其分发给消费者。Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker还负责管理主题、订阅和消费者的连接。

  • BookKeeper:即上图BK Client。负责将消息持久化到磁盘,提供高可用性和强一致性保障。BookKeeper采用分布式日志存储机制,确保消息的安全性和可靠性。

  • Bookie:Bookie是BookKeeper的存储节点组成,持久化地存储消息。BookKeeper采用分布式日志存储的方式,将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性,即使在节点故障的情况下也能保证消息不丢失。

  • ZooKeeper:负责存储集群的元数据,包括主题、命名空间、Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保集群的一致性和可靠性。

  • Proxy(可选):Pulsar提供了一个可选的代理层(Proxy),允许客户端通过HTTP或WebSocket协议与Pulsar集群进行通信。Proxy可以简化客户端的连接管理,并提供跨区域访问的能力。

  • Function:Pulsar提供了一个轻量级的流处理框架(Pulsar Functions),允许用户编写简单的流处理逻辑并将其部署到Pulsar集群中。Pulsar Functions可以用于实时数据处理、事件驱动计算等场景。

  • SQL:Pulsar提供了一个SQL查询引擎(Pulsar SQL),允许用户通过SQL语句查询Pulsar中的消息数据。Pulsar SQL可以用于数据分析、监控和告警等场景。

3、特性与优势

(1)、高吞吐量和低延迟

Pulsar采用了分层架构,将消息存储与代理服务分离,使得系统能够同时具备高吞吐量和低延迟。Broker负责处理消息的路由和分发,而BookKeeper负责持久化存储,两者相互协作,确保消息的高效传递。

(2)、多租户支持

Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。每个租户可以拥有自己的命名空间,并可以根据需要设置不同的配置,例如保留策略、订阅类型等。
即:类似Nacos的命名空间,实现配置,服务等隔离。

(3)、持久化和一致性

Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。每个消息会被写入多个Bookie节点,确保即使部分节点故障也不会丢失数据。Pulsar还支持事务和幂等性,确保消息的可靠传递。

(4)、灵活的消息模型

Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。用户可以根据实际需求选择合适的订阅类型,满足不同的业务场景。Pulsar还支持消息的重播、回溯和跳过等功能,方便用户进行调试和故障排查。

(5)、多语言支持

Pulsar提供了多种编程语言的客户端库,包括Java、Python、Go、C++等。用户可以根据自己的技术栈选择合适的客户端库,快速集成Pulsar到应用程序中。

(6)、丰富的生态

Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成。例如,Pulsar可以与Kafka Connect、Flink、Spark等工具集成,实现数据的实时处理和分析。Pulsar还提供了Pulsar Functions和Pulsar SQL等功能,进一步扩展了其应用场景。

4、应用场景

(1)、实时数据处理

Pulsar的高吞吐量和低延迟特性使其非常适合用于实时数据处理场景。例如,电商网站可以使用Pulsar来处理订单、支付、库存等实时数据,确保数据的及时性和准确性。

(2)、物联网(IoT)

Pulsar的分布式架构和多租户支持使其非常适合用于物联网场景。物联网设备可以将传感器数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行历史数据分析。

(3)、微服务架构

Pulsar可以作为微服务之间的消息总线,实现服务间的异步通信。微服务可以通过Pulsar发送和接收消息,避免阻塞主线程,提高系统的响应速度和稳定性。

(4)、日志收集和监控

Pulsar可以用于日志收集和监控场景,将应用的日志数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的持久化存储,确保日志数据不会丢失。

(5)、事件驱动架构

Pulsar支持事件驱动架构,用户可以将事件发送到Pulsar,Pulsar可以将这些事件分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行事件的回放和调试。

5、代码示例

(1)、生产者示例

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.MessageId;public class PulsarProducerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {// 2、创建生产者try (Producer<byte[]> producer = client.newProducer().topic("persistent://public/default/example-topic")    // 指定主题.create()) {// 3、发送消息for (int i = 0; i < 10; i++) {String message = "Hello, Pulsar! " + i;MessageId msgId = producer.send(message.getBytes());    // 发送消息System.out.println(" [x] Sent message: " + message + ", msgId: " + msgId);}}}}
}

(2)、消费者示例

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;public class PulsarConsumerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {// 2、创建消费者try (Consumer<byte[]> consumer = client.newConsumer().topic("persistent://public/default/example-topic")   // 监听的主题.subscriptionName("example-subscription").subscriptionType(SubscriptionType.Shared).subscribe()) {// 3、接收和消费消息while (true) {       // 利用循环接收消息Message<byte[]> msg = consumer.receive();      // 具体接收消息try {System.out.println(" [x] Received message: " + new String(msg.getData()));consumer.acknowledge(msg);  // 4、确认消息已消费} catch (Exception e) {consumer.negativeAcknowledge(msg);  // 5、处理失败,重新投递}}}}}
}

6、Pulsar总结

Apache Pulsar是一个功能强大、架构灵活的消息系统,特别适合大规模分布式系统的实时数据处理和异步通信。它的分层架构、多租户支持、持久化和一致性保障、灵活的消息模型等特点,使其在性能、可靠性和可扩展性方面表现出色。Pulsar还拥有丰富的生态系统,支持与其他工具和服务集成,适用于多种应用场景。

乘风破浪会有时,直挂云帆济沧海!!!

相关文章:

消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅&#xff08;Pub/Sub&#xff09;消息系统&#xff0c;最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点&#xff0c;提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力&#xff0c;适用于大规模分布式系…...

扬帆数据结构算法之舟,启航C++探索征途——LeetCode深度磨砺:顺序表技术精进实践

人无完人&#xff0c;持之以恒&#xff0c;方能见真我&#xff01;&#xff01;&#xff01; 共同进步&#xff01;&#xff01; 文章目录 顺序表练习1.移除数组中指定的元素方法1&#xff08;顺序表&#xff09;方法2&#xff08;双指针&#xff09; 2.删除有序数组中的重复项…...

基于本地事务表+MQ实现分布式事务

基于本地事务表MQ实现分布式事务 引言1、原理2、本地消息表优缺点3、代码实现3.1、代码执行流程3.2、项目结构3.3、项目源码 引言 本地消息表的方案最初由ebay的工程师提出&#xff0c;核心思想是将分布式事务拆分成本地事务进行处理。本地消息表实现最终一致性。本文主要学习…...

数据结构:二叉树—面试题(一)

目录 1、相同的树 2、另一棵树的子树 3、翻转二叉树 4、平衡二叉树 5、对称二叉树 6、二叉树遍历 7、二叉树的分层遍历 1、相同的树 习题链接https://leetcode.cn/problems/same-tree/description/https://leetcode.cn/problems/same-tree/description/ 描述&#xff1a…...

【Wordpress网站制作】切换语言的问题

前言 自学笔记&#xff0c;解决问题为主&#xff0c;欢迎补充。 本文重点&#xff1a;如何将页面语言从默认的【英语】修改成【中文】。 问题描述 安装完wordpress&#xff0c;在【Setting】→【General】的语言中&#xff0c;选项只有英语。无法切换成中文 方法1: 在 wp-c…...

【第二天】零基础入门刷题Python-算法篇-数据结构与算法的介绍-五种常见的排序算法(持续更新)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Python数据结构与算法的详细介绍1.Python中的常用的排序算法1.排序算法的介绍2.五种详细的排序算法代码 总结 前言 提示&#xff1a;这里可以添加本文要记…...

Neural networks 神经网络

发展时间线 基础概念 多层神经网络结构 神经网络中一个网络层的数学表达 TensorFlow实践 创建网络层 神经网络的创建、训练与推理 推理 推理可以理解为执行一次前向传播 前向传播 前向传播直观数学表达 前向传播直观数学表达的Python实现 前向传播向量化实现 相关数学知识…...

汽车免拆诊断案例 | 2007 款日产天籁车起步加速时偶尔抖动

故障现象  一辆2007款日产天籁车&#xff0c;搭载VQ23发动机&#xff08;气缸编号如图1所示&#xff0c;点火顺序为1-2-3-4-5-6&#xff09;&#xff0c;累计行驶里程约为21万km。车主反映&#xff0c;该车起步加速时偶尔抖动&#xff0c;且行驶中加速无力。 图1 VQ23发动机…...

代码随想录day3

203:移除链表元素&#xff1a;注意虚拟头节点的使用 ListNode* removeElements(ListNode* head, int val) {ListNode* result new ListNode();result->next head;ListNode* current result;while(current ! nullptr && current->next ! nullptr){if(current-…...

Spring 面试题【每日20道】【其一】

1、Spring 当中什么是循环依赖&#xff08;常问&#xff09;&#xff1f; 中等 在Spring框架中&#xff0c;循环依赖&#xff08;Circular Dependency&#xff09;是指两个或多个bean互相之间直接或间接地依赖对方的注入。例如&#xff1a; A bean依赖于B bean。B bean又依赖…...

leetcode刷题记录(八十九)——35. 搜索插入位置

&#xff08;一&#xff09;问题描述 35. 搜索插入位置 - 力扣&#xff08;LeetCode&#xff09;35. 搜索插入位置 - 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位…...

Flutter 与 React 前端框架对比:深入分析与实战示例

Flutter 与 React 前端框架对比&#xff1a;深入分析与实战示例 在现代前端开发中&#xff0c;Flutter 和 React 是两个非常流行的框架。Flutter 是 Google 推出的跨平台开发框架&#xff0c;支持从一个代码库生成 iOS、Android、Web 和桌面应用&#xff1b;React 则是 Facebo…...

基于Docker的Spark分布式集群

目录 1. 说明 2. 服务器规划 3. 步骤 3.1 要点 3.2 配置文件 3.2 访问Spark Master 4. 使用测试 5. 参考 1. 说明 以docker容器方式实现apache spark计算集群&#xff0c;能灵活的增减配置与worker数目。 2. 服务器规划 服务器 (1master, 3workers) ip开放端口备注ce…...

Web 代理、爬行器和爬虫

目录 Web 在线网页代理服务器的使用方法Web 在线网页代理服务器使用流程详解注意事项 Web 请求和响应中的代理方式Web 开发中的请求方法借助代理进行文件下载的示例 Web 服务器请求代理方式代理、网关和隧道的概念参考文献说明 爬虫的工作原理及案例网络爬虫概述爬虫工作原理 W…...

MySQL 事件调度器

MySQL 事件调度器确实是一个更方便且内置的解决方案&#xff0c;可以在 MySQL 服务器端自动定期执行表优化操作&#xff0c;无需依赖外部工具或应用程序代码。这种方式也能减少数据库维护的复杂性&#xff0c;尤其适用于在数据库频繁更新或删除时进行自动化优化。 使用 MySQL …...

直线拟合例子 ,岭回归拟合直线

目录 直线拟合,算出离群点 岭回归拟合直线&#xff1a; 直线拟合,算出离群点 import cv2 import numpy as np# 输入的点 points np.array([[51, 149],[122, 374],[225, 376],[340, 382],[463, 391],[535, 298],[596, 400],[689, 406],[821, 407] ], dtypenp.float32)# 使用…...

Flutter android debug 编译报错问题。插件编译报错

下面相关内容 都以 Mac 电脑为例子。 一、问题 起因&#xff1a;&#xff08;更新 Android studio 2024.2.2.13、 Flutter SDK 3.27.2&#xff09; 最近 2025年 1 月 左右&#xff0c;我更新了 Android studio 和 Flutter SDK 再运行就会出现下面的问题。当然 下面的提示只是其…...

关于IPD流程的学习理解和使用

IPD&#xff08;Integrated Product Development&#xff0c;集成产品开发&#xff09;是一种系统化的产品开发流程和方法论&#xff0c;旨在通过跨职能团队的协作和并行工程&#xff0c;缩短产品开发周期&#xff0c;提高产品质量&#xff0c;降低开发成本。IPD 最初由美国 PR…...

C# 类(Class)

C# 类&#xff08;Class&#xff09; 概述 在C#编程语言中&#xff0c;类&#xff08;Class&#xff09;是面向对象编程&#xff08;OOP&#xff09;的核心概念之一。类是一种用户定义的数据类型&#xff0c;它包含了一组属性&#xff08;数据&#xff09;和方法&#xff08;…...

Jenkins pipline怎么设置定时跑脚本

目录 示例&#xff1a;在Jenkins Pipeline中设置定时触发 使用pipeline指令设置定时触发 使用Declarative Pipeline设置定时触发 使用Scripted Pipeline设置定时触发 解释Cron表达式 保存和应用配置 小结 在Jenkins中&#xff0c;定时跑脚本&#xff08;例如定时执行Pip…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度​

一、引言&#xff1a;多云环境的技术复杂性本质​​ 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时&#xff0c;​​基础设施的技术债呈现指数级积累​​。网络连接、身份认证、成本管理这三大核心挑战相互嵌套&#xff1a;跨云网络构建数据…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

Springboot社区养老保险系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;社区养老保险系统小程序被用户普遍使用&#xff0c;为方…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...