当前位置: 首页 > news >正文

消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅(Pub/Sub)消息系统,最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点,提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力,适用于大规模分布式系统的实时数据处理和异步通信。
Pulsar的架构设计结合了消息队列和流处理的特点,既可以作为传统消息队列使用,也可以作为流处理平台支持实时数据处理。

主要特点:

  • 分布式架构:Pulsar采用分层架构,将消息存储与代理服务分离,提供了更好的水平扩展能力和故障隔离。
  • 多租户支持:Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。
  • 持久化和一致性:Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。
  • 灵活的消息模型:Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。
  • 多语言支持:Pulsar提供了多种编程语言的客户端库,如Java、Python、Go、C++等。
  • 丰富的生态:Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成,如Kafka Connect、Flink、Spark等。

1、核心概念

(1)、命名空间(Namespace)

命名空间是Pulsar中的一个逻辑单元,用于组织和管理主题(Topic)。每个命名空间可以包含多个主题,并且可以为不同的命名空间设置不同的配置,例如保留策略、订阅类型等。命名空间通常用于实现多租户隔离。

(2)、主题(Topic)

主题是Pulsar中的消息通道,生产者(Producer)将消息发送到主题,消费者(Consumer)从主题中消费消息。

Pulsar支持两种类型的主题:

  • 持久化主题(Persistent Topic):消息会被持久化存储,确保即使在broker故障的情况下也不会丢失。
  • 非持久化主题(Non-Persistent Topic):消息不会被持久化存储,适用于对延迟敏感但对可靠性要求较低的场景。

(3)、订阅(Subscription)

订阅是消费者与主题之间的绑定关系。Pulsar支持多种订阅类型,每种订阅类型决定了消息的分发方式:

  • 独占订阅(Exclusive Subscription):只有一个消费者可以订阅该主题,其他消费者无法订阅。
  • 共享订阅(Shared Subscription):多个消费者可以订阅同一个主题,消息会被轮询分发给不同的消费者。
  • 故障转移订阅(Failover Subscription):多个消费者可以订阅同一个主题,但只有一个是活跃的消费者,其他消费者作为备用。当活跃消费者失败时,备用消费者会接管消息消费。
  • Key_Shared 订阅:基于消息的key进行分区,确保相同key的消息总是被分发给同一个消费者。

(4)、消息(Message)

消息是Pulsar中的基本数据单位,由生产者发送到主题。

每个消息可以包含以下属性:

  • 消息体(Payload):消息的实际内容,可以是任意二进制数据。
  • 消息ID(Message ID):唯一标识每条消息的ID,用于确认消息的消费状态。
  • 属性(Properties):用户可以为消息添加自定义的键值对属性,方便后续处理。
  • 时间戳(Timestamp):消息的创建时间或发送时间。

(5)、分区(Partition)

Pulsar支持主题分区,即将一个主题划分为多个分区,每个分区可以独立地处理消息。分区可以提高主题的吞吐量和并发性,特别是在高负载场景下。Pulsar会自动将消息均匀分布到不同的分区中。

(6)、Broker

Broker是Pulsar的核心组件之一,负责接收生产者的消息并将其分发给消费者。注意,Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker负责管理主题、订阅和消费者的连接,并处理消息的路由和分发。

(7)、BookKeeper

BookKeeper是Pulsar的持久化存储层,负责将消息持久化到磁盘。BookKeeper采用分布式日志存储机制,提供了高可用性和强一致性保障。每个消息会被写入多个BookKeeper节点,确保即使部分节点故障也不会丢失数据。

(8)、ZooKeeper

ZooKeeper是Pulsar的元数据管理组件,用于存储集群的配置信息、主题和命名空间的元数据、以及Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保Pulsar集群的一致性和可靠性。

2、架构设计

Pulsar的架构设计采用了分层结构,将消息存储与代理服务分离,使得系统更加模块化和可扩展。

结构示例图:
在这里插入图片描述

Pulsar的主要组件及其作用:

  • Broker:负责接收生产者的消息并将其分发给消费者。Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker还负责管理主题、订阅和消费者的连接。

  • BookKeeper:即上图BK Client。负责将消息持久化到磁盘,提供高可用性和强一致性保障。BookKeeper采用分布式日志存储机制,确保消息的安全性和可靠性。

  • Bookie:Bookie是BookKeeper的存储节点组成,持久化地存储消息。BookKeeper采用分布式日志存储的方式,将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性,即使在节点故障的情况下也能保证消息不丢失。

  • ZooKeeper:负责存储集群的元数据,包括主题、命名空间、Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保集群的一致性和可靠性。

  • Proxy(可选):Pulsar提供了一个可选的代理层(Proxy),允许客户端通过HTTP或WebSocket协议与Pulsar集群进行通信。Proxy可以简化客户端的连接管理,并提供跨区域访问的能力。

  • Function:Pulsar提供了一个轻量级的流处理框架(Pulsar Functions),允许用户编写简单的流处理逻辑并将其部署到Pulsar集群中。Pulsar Functions可以用于实时数据处理、事件驱动计算等场景。

  • SQL:Pulsar提供了一个SQL查询引擎(Pulsar SQL),允许用户通过SQL语句查询Pulsar中的消息数据。Pulsar SQL可以用于数据分析、监控和告警等场景。

3、特性与优势

(1)、高吞吐量和低延迟

Pulsar采用了分层架构,将消息存储与代理服务分离,使得系统能够同时具备高吞吐量和低延迟。Broker负责处理消息的路由和分发,而BookKeeper负责持久化存储,两者相互协作,确保消息的高效传递。

(2)、多租户支持

Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。每个租户可以拥有自己的命名空间,并可以根据需要设置不同的配置,例如保留策略、订阅类型等。
即:类似Nacos的命名空间,实现配置,服务等隔离。

(3)、持久化和一致性

Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。每个消息会被写入多个Bookie节点,确保即使部分节点故障也不会丢失数据。Pulsar还支持事务和幂等性,确保消息的可靠传递。

(4)、灵活的消息模型

Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。用户可以根据实际需求选择合适的订阅类型,满足不同的业务场景。Pulsar还支持消息的重播、回溯和跳过等功能,方便用户进行调试和故障排查。

(5)、多语言支持

Pulsar提供了多种编程语言的客户端库,包括Java、Python、Go、C++等。用户可以根据自己的技术栈选择合适的客户端库,快速集成Pulsar到应用程序中。

(6)、丰富的生态

Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成。例如,Pulsar可以与Kafka Connect、Flink、Spark等工具集成,实现数据的实时处理和分析。Pulsar还提供了Pulsar Functions和Pulsar SQL等功能,进一步扩展了其应用场景。

4、应用场景

(1)、实时数据处理

Pulsar的高吞吐量和低延迟特性使其非常适合用于实时数据处理场景。例如,电商网站可以使用Pulsar来处理订单、支付、库存等实时数据,确保数据的及时性和准确性。

(2)、物联网(IoT)

Pulsar的分布式架构和多租户支持使其非常适合用于物联网场景。物联网设备可以将传感器数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行历史数据分析。

(3)、微服务架构

Pulsar可以作为微服务之间的消息总线,实现服务间的异步通信。微服务可以通过Pulsar发送和接收消息,避免阻塞主线程,提高系统的响应速度和稳定性。

(4)、日志收集和监控

Pulsar可以用于日志收集和监控场景,将应用的日志数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的持久化存储,确保日志数据不会丢失。

(5)、事件驱动架构

Pulsar支持事件驱动架构,用户可以将事件发送到Pulsar,Pulsar可以将这些事件分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行事件的回放和调试。

5、代码示例

(1)、生产者示例

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.MessageId;public class PulsarProducerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {// 2、创建生产者try (Producer<byte[]> producer = client.newProducer().topic("persistent://public/default/example-topic")    // 指定主题.create()) {// 3、发送消息for (int i = 0; i < 10; i++) {String message = "Hello, Pulsar! " + i;MessageId msgId = producer.send(message.getBytes());    // 发送消息System.out.println(" [x] Sent message: " + message + ", msgId: " + msgId);}}}}
}

(2)、消费者示例

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;public class PulsarConsumerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {// 2、创建消费者try (Consumer<byte[]> consumer = client.newConsumer().topic("persistent://public/default/example-topic")   // 监听的主题.subscriptionName("example-subscription").subscriptionType(SubscriptionType.Shared).subscribe()) {// 3、接收和消费消息while (true) {       // 利用循环接收消息Message<byte[]> msg = consumer.receive();      // 具体接收消息try {System.out.println(" [x] Received message: " + new String(msg.getData()));consumer.acknowledge(msg);  // 4、确认消息已消费} catch (Exception e) {consumer.negativeAcknowledge(msg);  // 5、处理失败,重新投递}}}}}
}

6、Pulsar总结

Apache Pulsar是一个功能强大、架构灵活的消息系统,特别适合大规模分布式系统的实时数据处理和异步通信。它的分层架构、多租户支持、持久化和一致性保障、灵活的消息模型等特点,使其在性能、可靠性和可扩展性方面表现出色。Pulsar还拥有丰富的生态系统,支持与其他工具和服务集成,适用于多种应用场景。

乘风破浪会有时,直挂云帆济沧海!!!

相关文章:

消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅&#xff08;Pub/Sub&#xff09;消息系统&#xff0c;最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点&#xff0c;提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力&#xff0c;适用于大规模分布式系…...

扬帆数据结构算法之舟,启航C++探索征途——LeetCode深度磨砺:顺序表技术精进实践

人无完人&#xff0c;持之以恒&#xff0c;方能见真我&#xff01;&#xff01;&#xff01; 共同进步&#xff01;&#xff01; 文章目录 顺序表练习1.移除数组中指定的元素方法1&#xff08;顺序表&#xff09;方法2&#xff08;双指针&#xff09; 2.删除有序数组中的重复项…...

基于本地事务表+MQ实现分布式事务

基于本地事务表MQ实现分布式事务 引言1、原理2、本地消息表优缺点3、代码实现3.1、代码执行流程3.2、项目结构3.3、项目源码 引言 本地消息表的方案最初由ebay的工程师提出&#xff0c;核心思想是将分布式事务拆分成本地事务进行处理。本地消息表实现最终一致性。本文主要学习…...

数据结构:二叉树—面试题(一)

目录 1、相同的树 2、另一棵树的子树 3、翻转二叉树 4、平衡二叉树 5、对称二叉树 6、二叉树遍历 7、二叉树的分层遍历 1、相同的树 习题链接https://leetcode.cn/problems/same-tree/description/https://leetcode.cn/problems/same-tree/description/ 描述&#xff1a…...

【Wordpress网站制作】切换语言的问题

前言 自学笔记&#xff0c;解决问题为主&#xff0c;欢迎补充。 本文重点&#xff1a;如何将页面语言从默认的【英语】修改成【中文】。 问题描述 安装完wordpress&#xff0c;在【Setting】→【General】的语言中&#xff0c;选项只有英语。无法切换成中文 方法1: 在 wp-c…...

【第二天】零基础入门刷题Python-算法篇-数据结构与算法的介绍-五种常见的排序算法(持续更新)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Python数据结构与算法的详细介绍1.Python中的常用的排序算法1.排序算法的介绍2.五种详细的排序算法代码 总结 前言 提示&#xff1a;这里可以添加本文要记…...

Neural networks 神经网络

发展时间线 基础概念 多层神经网络结构 神经网络中一个网络层的数学表达 TensorFlow实践 创建网络层 神经网络的创建、训练与推理 推理 推理可以理解为执行一次前向传播 前向传播 前向传播直观数学表达 前向传播直观数学表达的Python实现 前向传播向量化实现 相关数学知识…...

汽车免拆诊断案例 | 2007 款日产天籁车起步加速时偶尔抖动

故障现象  一辆2007款日产天籁车&#xff0c;搭载VQ23发动机&#xff08;气缸编号如图1所示&#xff0c;点火顺序为1-2-3-4-5-6&#xff09;&#xff0c;累计行驶里程约为21万km。车主反映&#xff0c;该车起步加速时偶尔抖动&#xff0c;且行驶中加速无力。 图1 VQ23发动机…...

代码随想录day3

203:移除链表元素&#xff1a;注意虚拟头节点的使用 ListNode* removeElements(ListNode* head, int val) {ListNode* result new ListNode();result->next head;ListNode* current result;while(current ! nullptr && current->next ! nullptr){if(current-…...

Spring 面试题【每日20道】【其一】

1、Spring 当中什么是循环依赖&#xff08;常问&#xff09;&#xff1f; 中等 在Spring框架中&#xff0c;循环依赖&#xff08;Circular Dependency&#xff09;是指两个或多个bean互相之间直接或间接地依赖对方的注入。例如&#xff1a; A bean依赖于B bean。B bean又依赖…...

leetcode刷题记录(八十九)——35. 搜索插入位置

&#xff08;一&#xff09;问题描述 35. 搜索插入位置 - 力扣&#xff08;LeetCode&#xff09;35. 搜索插入位置 - 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位…...

Flutter 与 React 前端框架对比:深入分析与实战示例

Flutter 与 React 前端框架对比&#xff1a;深入分析与实战示例 在现代前端开发中&#xff0c;Flutter 和 React 是两个非常流行的框架。Flutter 是 Google 推出的跨平台开发框架&#xff0c;支持从一个代码库生成 iOS、Android、Web 和桌面应用&#xff1b;React 则是 Facebo…...

基于Docker的Spark分布式集群

目录 1. 说明 2. 服务器规划 3. 步骤 3.1 要点 3.2 配置文件 3.2 访问Spark Master 4. 使用测试 5. 参考 1. 说明 以docker容器方式实现apache spark计算集群&#xff0c;能灵活的增减配置与worker数目。 2. 服务器规划 服务器 (1master, 3workers) ip开放端口备注ce…...

Web 代理、爬行器和爬虫

目录 Web 在线网页代理服务器的使用方法Web 在线网页代理服务器使用流程详解注意事项 Web 请求和响应中的代理方式Web 开发中的请求方法借助代理进行文件下载的示例 Web 服务器请求代理方式代理、网关和隧道的概念参考文献说明 爬虫的工作原理及案例网络爬虫概述爬虫工作原理 W…...

MySQL 事件调度器

MySQL 事件调度器确实是一个更方便且内置的解决方案&#xff0c;可以在 MySQL 服务器端自动定期执行表优化操作&#xff0c;无需依赖外部工具或应用程序代码。这种方式也能减少数据库维护的复杂性&#xff0c;尤其适用于在数据库频繁更新或删除时进行自动化优化。 使用 MySQL …...

直线拟合例子 ,岭回归拟合直线

目录 直线拟合,算出离群点 岭回归拟合直线&#xff1a; 直线拟合,算出离群点 import cv2 import numpy as np# 输入的点 points np.array([[51, 149],[122, 374],[225, 376],[340, 382],[463, 391],[535, 298],[596, 400],[689, 406],[821, 407] ], dtypenp.float32)# 使用…...

Flutter android debug 编译报错问题。插件编译报错

下面相关内容 都以 Mac 电脑为例子。 一、问题 起因&#xff1a;&#xff08;更新 Android studio 2024.2.2.13、 Flutter SDK 3.27.2&#xff09; 最近 2025年 1 月 左右&#xff0c;我更新了 Android studio 和 Flutter SDK 再运行就会出现下面的问题。当然 下面的提示只是其…...

关于IPD流程的学习理解和使用

IPD&#xff08;Integrated Product Development&#xff0c;集成产品开发&#xff09;是一种系统化的产品开发流程和方法论&#xff0c;旨在通过跨职能团队的协作和并行工程&#xff0c;缩短产品开发周期&#xff0c;提高产品质量&#xff0c;降低开发成本。IPD 最初由美国 PR…...

C# 类(Class)

C# 类&#xff08;Class&#xff09; 概述 在C#编程语言中&#xff0c;类&#xff08;Class&#xff09;是面向对象编程&#xff08;OOP&#xff09;的核心概念之一。类是一种用户定义的数据类型&#xff0c;它包含了一组属性&#xff08;数据&#xff09;和方法&#xff08;…...

Jenkins pipline怎么设置定时跑脚本

目录 示例&#xff1a;在Jenkins Pipeline中设置定时触发 使用pipeline指令设置定时触发 使用Declarative Pipeline设置定时触发 使用Scripted Pipeline设置定时触发 解释Cron表达式 保存和应用配置 小结 在Jenkins中&#xff0c;定时跑脚本&#xff08;例如定时执行Pip…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

Android15默认授权浮窗权限

我们经常有那种需求&#xff0c;客户需要定制的apk集成在ROM中&#xff0c;并且默认授予其【显示在其他应用的上层】权限&#xff0c;也就是我们常说的浮窗权限&#xff0c;那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

技术栈RabbitMq的介绍和使用

目录 1. 什么是消息队列&#xff1f;2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...

多模态图像修复系统:基于深度学习的图片修复实现

多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...

HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散

前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说&#xff0c;在叠衣服的过程中&#xff0c;我会带着团队对比各种模型、方法、策略&#xff0c;毕竟针对各个场景始终寻找更优的解决方案&#xff0c;是我个人和我司「七月在线」的职责之一 且个人认为&#xff0c…...