【大数据学习 | kafka】简述kafka的消费者consumer
1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。
这里面要涉及到一个动作叫做拉取。
首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push
1.1 消费者组
在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据,压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。
正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。
这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
2. 消费者实现
在实现消费者的时候我们需要知道几个消费者的配置重要参数
| 参数 | 解释 |
|---|---|
| bootstrap.servers | 集群地址 |
| key.deserializer | key反序列化器 |
| value.deserializer | value反序列化器 |
| group.id | 消费者组id |
首先创建消费者对象
消费者对象订阅相应的topic然后拉取其中的数据进行消费
整体代码如下
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");//设定组idpro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定key的反序列化器pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定value的反序列化器KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_a","topic_b");//一个消费者可以消费多个分区的数据consumer.subscribe(topics);//订阅这个topicwhile (true){//死循环要一直消费数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//间隔一秒钟消费一次数据,拉取一批数据过来Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。
# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况
首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行
启动两次,这个时候我们就有了两个消费者实例
生产者线程:分别向三个分区中发送1 2 3元素
package com.hainiu.kafka.consumer;/*** ClassName : test3_producer* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 23:40* Version 1.0*/import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test3_producer {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");producer.send(record1);producer.send(record2);
// producer.send(record3);producer.close();}
}
可以看到有的消费者消费了两个分区的数据
如果启动三个消费者会发现每个人消费一个分区的数据
如果启动四个消费者
我们发现有一个消费者没有数据
3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的
3.2 多个组消费一个topic
同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的
修改同一份代码的组标识不同。启动两个实例查看里面的消费信息
pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");//分别修改消费者组的id不同
package com.hainiu.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_c");//订阅多个topic的数据变化consumer.subscribe(topics);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}
相关文章:
【大数据学习 | kafka】简述kafka的消费者consumer
1. 消费者的结构 能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。 这里面要涉及到一个动作叫做拉取。 首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的…...
系统架构设计师论文:论湖仓一体架构及其应用
试题四 论湖仓一体架构及其应用 随着5G、大数据、人工智能、物联网等技术的不断成熟,各行各业的业务场景日益复杂,企业数据呈现出大规模、多样性的特点,特别是非结构化数据呈现出爆发式增长趋势。在这一背景下,企业数据管理不再局限于传统的结构化 OLTP (On-Line Transact…...
电磁兼容(EMC):GB 4343.1喀呖声 详解
目录 1. 喀呖声的危害 2. 喀呖声 Click定义 3. 中频参考电平 4. 开关操作 5. 最小观察时间 6. 喀呖声率 7. 喀呖声限值 8. 上四分位法 1. 喀呖声的危害 喀呖声作为一种电压骚扰,其危害主要体现在以下几个方面: 对电子设备的干扰:喀呖…...
纯血鸿蒙Native层支持说明
本文所有描述均参考鸿蒙官方文档:传送门 1.对C库的支持 C标准函数库在C语言程序设计中,提供符合标准的头文件,以及常用的库函数实现(如I/O输入输出和字符串控制)。 HarmonyOS采用musl作为C标准库,musl库…...
learn C++ NO.31——类型转换
C语言中的类型转换 在C语言中,当赋值符号两边的类型不匹配的时候,或者是形参类型和实参类型不匹配时,返回值类型与接受返回值类型不匹配时,都会需要类型转换。C语言的类型转换有两种:显示类型转换和隐式类型转换。 显…...
重学 Android 自定义 View 系列(三):自定义步数进度条
前言 本篇文章主要是实现仿QQ步数View,很老的一个View了,但技术永不落后,开搂! 最终效果如下: 1. 结构分析 QQStepView 主要由三个元素组成: 显示一个圆环进度条,通过外环和内环的角度变化来…...
海南华志亿星电子商务有限公司赋能抖音商家成长
在当今瞬息万变的电商时代,抖音凭借其短视频与直播电商的独特模式,迅速崛起并引领潮流。在这场电商变革中,海南华志亿星电子商务有限公司以其卓越的服务质量和创新的运营模式,在抖音电商领域大放异彩,成为众多商家的首…...
数据结构-并查集专题(1)
一、前言 因为要开始准备年底的校赛和明年年初的ACM、蓝桥杯、天梯赛,于是开始按专题梳理一下对应的知识点,先从简单入门又值得记录的内容开始,并查集首当其冲。 二、我的模板 虽然说是借用了jiangly鸽鸽的板子,但是自己也小做…...
共享汽车管理新纪元:SpringBoot框架应用
4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式,是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示: 图4-1系统工作原理…...
道可云人工智能元宇宙每日资讯|《中国生成式人工智能应用与实践展望》白皮书发布
道可云元宇宙每日简报(2024年11月6日)讯,今日元宇宙新鲜事有: 《重庆市“机器人”应用行动计划(2024—2027年)》发布 近日,重庆市经济和信息化委员会、重庆市教育委员会等八部门印发《重庆市“…...
kaggle学习 eloData项目(1)-数据校验
文章目录 kaggle学习 eloData项目(1)-数据校验(1) 数据基本情况查看(2) 数据校验(3) 数据探究 小结 kaggle学习 eloData项目(1)-数据校验 不能懈怠࿰…...
ORACLE RAC用DNS服务器的配置
一、搭建本地YUM源 二、安装DNS全部组建 yum -y install bind* 三、规划您RAC集群所有IP #public 192.168.16.111 rac1.ntt.com rac1 192.168.16.112 rac2.ntt.com rac2 192.168.16.121 rac3.ntt.com rac3 192.168.16.122 rac4.ntt.com rac4 #private 10.10.10.111 rac1-pr…...
vue3 + vite 实现版本更新检查(检测到版本更新时提醒用户刷新页面)
背景 当一个页面很久没刷新,又突然点到页面。由于一些文件是因为动态加载的,当重编后(如前后端发版后),这些文件会发生变化,就会出现加载不到的情况。进而导致正在使用的用户,点击页面发现加载…...
【CSP】爆零的独特姿势
硝烟散,繁花尽,第一次CSP折戟沉沙。 代码拿回来,花几分钟订正下,就是300分。 然而,实战只有100分,还是偷懒得的幸运,觉得第一题题目太简单懒得用文件IO调试... ... 啥也不说了,上图。…...
Git仓库
Git初始 概念 一个免费开源,分布式的代码版本控制系统,帮助开发团队维护代码 作用 记录代码内容,,切换代码版本,多人开发时高效合并代码内容 如何学: 个人本机使用:Git基础命令和概念 多…...
【科研日常】论文投稿的几大状态
Manuscript Submitted(Submitted to Journal):表示论文已经投稿成功,等待期刊工作人员检查论文格式排版、重复率是否符合要求,符合要求的文章会分配给期刊编辑进行处理。 Awaiting Admin Processing:意为等…...
SSLHandshakeException错误解决方案
1、错误提示 调用Http工具报如下异常信息: cn.hutool.core.io.IORuntimeException: SSLHandshakeException: Received fatal alert: handshake_failure2、查询问题 一开始我以为是代码bug,网络bug甚至是配置环境未生效,找了一大圈…...
python数据结构基础(7)
本节学习最后一种数据结构---图,在很多问题中应用图可以帮助构建思维空间,快速理清思路,解决复杂问题. 图就是一些顶点的集合,这些顶点通过一系列边链接起来.根据边的有向和无向,图分为有向图和无向图.有时图的边上带有权重,本节暂时不将权重作为重点. 计算机通过邻接表或者邻…...
【系统集成项目管理工程师】英语词汇对照表-项目管理类
英语单词(项目管理类)中文解释Activity活动Accept验收Acceptable Quality Level可接受的质量水平Acceptance Standard验收标准Acquisition Plan Review采购计划评审Action处理Active On the Arrow双代号网络图Activity Based Costing (ABC)基于活动的成本…...
购物车-多元素组合动画css
学习 渡一课程 多元素组合动画 练习。 在我们开发购物车功能时,经常会有点击添加按钮,就会有一个小圆点掉进购物车的动画,如下图所示,今天我们通过css来实现。 首先实现多元素组合动画 直接上代码,可以复制到本地使用…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...
