当前位置: 首页 > news >正文

kafka 超详细的消息订阅与消息消费几种方式

kafka 消息订阅与消息消费几种方式

本文主要内容

  • 消费者订阅几种方式

    • 订阅多个主题

    • 按正则表达式订阅

  • 消息消费几种方式

    • 按分区消费

    • 按主题消费

    • 不区分

笔者建议一开始学习Kafka最好不要用SpringBoot 集成方式,因为SpringBoot推崇用注解方式,比如@KafkaListener 等,就可以直接消费,这样不能直接接触kafka-client一些api, 且SpringBoot 给我们提供了很多默认配置,我们几乎零配置也可以使用,实际上kafka很多配置很重要的,不容忽视。

消费者订阅几种方式

KafkaConsumer 给我们提供了几种订阅消息方式,我们可以订阅多个消息。示例代码如下

kafkaConsumer.subscribe(Arrays.asList("topicA","topicB"));kafkaConsumer.subscribe(Pattern.compile("topic-*"));kafkaConsumer.assign(Arrays.asList(new TopicPartition("topicA",0)));

订阅多个主题

void subscribe(Collection<String> topics) 对应上面第一行代码,这是最常见的订阅方式

按正则表达式订阅

void subscribe(Pattern pattern) 符合正则的主题都会被消费

有人创建了新的主题,并且与正则匹配,消费者也可以消费到

这种方式需要能对多种消息处理,对于一些能通用处理,不感知具体业务数据的场景比较合适。比如B系统需要同步A系统数据,我们按正则订阅,当A系统有新的数据需要同步,这是只需要A发满足条件正则的消息,B系统无需任何改动。

订阅指定分区

void assign(Collection<TopicPartition> partitions);正常业务不会使用,如果订阅的分区不存在,会报错。一些特殊场景,比如需要精确控制消费者消费消息,自定义分区分配策略时 可能会用到assign 方法



消息消费

kafka 采用客户端 拉取模式进行消息消费

poll() 返回所订阅的主题上一组消息ConsumerRecords ,我们可以对消息进行按主题、按分区进行处理,当然可以统一处理,不分主题和分区

ConsumerRecords<String,String> records =    kafkaConsumer.poll(Duration.ofMillis(1000));

不区分主题、分区

for(ConsumerRecord<String,String> record : records){// 处理消息
}

按partition 处理

Set<TopicPartition> topicPartitions = records.partitions();
for(TopicPartition topicPartition : topicPartitions){List<ConsumerRecord<String, String>> tpRecords = records.records(topicPartition);
}

按主题

  Iterable<ConsumerRecord<String,String>> iter = records.records(topic);

思考

kafka 给我们提供了灵活的消息订阅以及消息消费方式,我们需要根据实际业务场景选择。无论哪种场景都离不开 主题分区 ,最主要的是分区,当我们选择了某种订阅方式如果主题分区  发生了变化 ,消息还能正常消费吗

选择了按正则订阅消息方式, 后面创建了新的主题,该消息能被正常消费吗

选择了指定分区订阅, 如果后面扩容了新的分区,新分区消息能消费吗?

List<PartitionInfo> partitionsFor(String topic) 能获取分区情况,如果需要按分区订阅,该方法一定用的上


按分区维度消费消息,对于手动提交消息位移场景非常有用

主题分类处理消息也很常见,因为不同主题消息格式可能是不一样的,根据主题区分,很容易将不同的消息分类处理。

相关文章:

kafka 超详细的消息订阅与消息消费几种方式

kafka 消息订阅与消息消费几种方式 本文主要内容 消费者订阅几种方式 订阅多个主题 按正则表达式订阅 消息消费几种方式 按分区消费 按主题消费 不区分 “ 笔者建议一开始学习Kafka最好不要用SpringBoot 集成方式,因为SpringBoot推崇用注解方式&#xff0c;比如KafkaList…...

C++ 第三讲:内存管理

C 第三讲&#xff1a;内存管理 1.C内存分布2.内存管理方式2.1C语言内存管理方式2.2C内存管理方式2.2.1new\delete操作内置类型2.2.2new\delete操作自定义类型 3.operator new与operator delete函数4.new和delete实现原理4.1内置类型4.2自定义类型 5.定位new5.1内存池的基本了解…...

LeeCode打卡第二十九天

LeeCode打卡第二十九天 第一题&#xff1a;岛屿数量&#xff08;LeeCode第200题&#xff09;: 给你一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的的二维网格&#xff0c;请你计算网格中岛屿的数量。岛屿总是被水包围&#xff0c;并且每座岛屿只…...

阿里云专业翻译api对接

最近我们一个商城项目涉及多语言切换&#xff0c;默认中文。用户切换语言可选英语和阿拉伯语言&#xff0c;前端APP和后端返回动态数据都要根据用户选择语言来展示。前端静态内容都做了三套语言&#xff0c;后端商品为了适用这种多语言我们也进行了改造。每一件商品名称&#x…...

基于Spring Boot的能源管理系统+建筑能耗+建筑能耗监测系统+节能监测系统+能耗监测+建筑能耗监测

介绍 建筑节能监测系统是基于计算机网络、物联网、大数据和数据可视化等多种技术融合形成的一套节能监测系统。 系统实现了对建筑电、水、热&#xff0c;气等能源、资源消耗情况的实时监测和预警、动态分析和评估&#xff0c;为用户建立了科学、系统的节能分析方法&#xff0c…...

大数据新视界 --大数据大厂之 Cassandra 分布式数据库:高可用数据存储的新选择

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...

ROS第五梯:ROS+VSCode+C++单步调试

解决问题&#xff1a;在ROS项目中进行断点调试。 第一步&#xff1a;创建一个ROS项目或者打开一个现有的ROS项目。 第二步&#xff1a;修改c_cpp_properties.json 增加一段命令: "compileCommands": "${workspaceFolder}/build/compile_commands.json"第三…...

SLA 概念和计算方法

SLA 概念和计算方法 SLA SLA&#xff1a;服务等级协议&#xff08;简称&#xff1a;SLA&#xff0c;全称&#xff1a;service level agreement&#xff09; 网站服务可用性的一个保证 9越多代表全年服务可用时间越长服务更可靠&#xff0c;停机时间越短&#xff0c;反之亦然…...

C++比大小游戏

目录 开头程序程序的流程图程序游玩的效果下一篇博客要说的东西 开头 大家好&#xff0c;我叫这是我58。 程序 #include <iostream> #include <Windows.h> using namespace std; int main() {int ir 1;char chparr[2] { 0 };int ip1 0;int ip2 0;int i 1;c…...

PCIe进阶之TL:Memory, I/O, and Configuration Request Rules TPH Rules

1 Memory, I/O, and Configuration Request Rules 下述规则适用于 Memory 请求、IO 请求和配置请求。 除了公共的 header 字段外,所有 Memory 请求、IO 请求和配置请求还包括以下字段: (1)Requester ID[15:0] 和 Tag[9:0],组成了 Transaction ID 。 (2)Last DW BE[3:0]…...

【初阶数据结构】一文讲清楚 “堆” 和 “堆排序” -- 树和二叉树(二)(内含TOP-K问题)

文章目录 前言1. 堆1.1 堆的概念1.2 堆的分类 2. 堆的实现2.1 堆的结构体设置2.2 堆的初始化2.3 堆的销毁2.4 添加数据到堆2.4.1 "向上调整"算法 2.5 从堆中删除数据2.5.1 “向下调整”算法 2.6 堆的其它各种方法接口函数 3. 堆排序3.1 堆排序的代码实现 4. TOP-K问题…...

sqli-lab靶场学习(二)——Less8-10(盲注、时间盲注)

Less8 第八关依然是先看一般状态 http://localhost/sqli-labs/Less-8/?id1 然后用单引号闭合&#xff1a; http://localhost/sqli-labs/Less-8/?id1 这关的问题在于报错是不显示&#xff0c;那没办法通过上篇文章的updatexml大法处理。对于这种情况&#xff0c;需要用“盲…...

Dijkstra算法和BFS算法(单源最短路径)

基于你设计的带权有向图&#xff0c;从某一结点出发&#xff0c;执行Dijkstra算法求单源最短路径。用文字描述每一轮执行的过程 文字描述&#xff1a;用BFS算法求单源最短路径的过程 Dijkstra 算法 BFS算法 广度优先算法...

在WordPress中最佳Elementor主题推荐:专家级指南

对于已经在WordPress和Elementor上有丰富经验的用户来说&#xff0c;选择功能强大且高度灵活的主题&#xff0c;能大大提升网站的表现和定制能力。今天&#xff0c;我们来介绍六款适合用户的专家级Elementor主题&#xff1a;Sydney、Blocksy、Rife Free、Customify、Deep和Laye…...

关于RabbitMQ消息丢失的解决方案

RabbitMQ如何保证消息的可靠性传输 一、消息丢失的原因 1. 生产者端 网络问题&#xff1a; 原因&#xff1a;生产者与RabbitMQ服务器之间的网络连接不稳定或中断&#xff0c;导致消息在传输过程中丢失。解决方案&#xff1a;确保网络连接稳定&#xff0c;监控网络状态&#x…...

c语言动态内存分配

前言 我们已经掌握的内存开辟⽅式有&#xff1a; int val 20;//在栈空间上开辟四个字节 char arr[10] {0};//在栈空间上开辟10个字节的连续空间 但是上述的开辟空间的⽅式有两个特点&#xff1a; • 空间开辟⼤⼩是固定的。 • 数组在申明的时候&#xff0c;必须指定数组的…...

零基础制作一个ST-LINK V2 附PCB文件原理图 AD格式

资料下载地址&#xff1a;零基础制作一个ST-LINK V2 附PCB文件原理图 AD格式 ST-LINK/V2是一款可以在线仿真以及下载STM8以及STM32的开发工具。支持所有带SWIM接口的STM8系列单片机;支持所有带JTAG / SWD接口的STM32系列单片机。 基本属性 ST-LINK/V2是ST意法半导体为评估、开…...

nginx基础篇(一)

文章目录 学习链接概图一、Nginx简介1.1 背景介绍名词解释 1.2 常见服务器对比IISTomcatApacheLighttpd其他的服务器 1.3 Nginx的优点(1)速度更快、并发更高(2)配置简单&#xff0c;扩展性强(3)高可靠性(4)热部署(5)成本低、BSD许可证 1.4 Nginx的功能特性及常用功能基本HTTP服…...

监控系列之-Grafana面板展示及制作

一 Grafana设置添加数据源 1、设置Grafana中文显示 最后保存退出&#xff0c;数据源添加完毕 2、导入node_exporter主机监控面板 此处 有外网的情况下&#xff0c;直接输入对应面板的ID号&#xff0c;然后点击加载即可&#xff1b;无无外网的话&#xff0c;则考虑使用上传仪表…...

值传递和地址传递

值传递 我们从下面这段代码开始&#xff1a; point(char*pt); void main(){char b[4]{m,n,o,p},*ptb;point(pt);printf("%c\n",*pt); } point(char *p){p3; }这段代码定义了一个函数 point 和一个主函数 main。 在 main 函数中&#xff0c;定义了一个字符数组 b 并…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?

Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式与专业的 MQ&#xff08;Message Queue&#xff09;如 Kafka、RabbitMQ 进行比较&#xff0c;核心的权衡点在于&#xff1a;简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...