当前位置: 首页 > news >正文

【大数据学习 | kafka】简述kafka的消费者consumer

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");//设定组idpro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定key的反序列化器pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定value的反序列化器KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_a","topic_b");//一个消费者可以消费多个分区的数据consumer.subscribe(topics);//订阅这个topicwhile (true){//死循环要一直消费数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//间隔一秒钟消费一次数据,拉取一批数据过来Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;/*** ClassName : test3_producer* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 23:40* Version 1.0*/import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test3_producer {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");producer.send(record1);producer.send(record2);
//        producer.send(record3);producer.close();}
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");//分别修改消费者组的id不同
package com.hainiu.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_c");//订阅多个topic的数据变化consumer.subscribe(topics);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

相关文章:

【大数据学习 | kafka】简述kafka的消费者consumer

1. 消费者的结构 能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。 这里面要涉及到一个动作叫做拉取。 首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用&#xff0c;比如flume采集数据然后交给spark或者flink进行计算分析&#xff0c;但是flume采用的…...

系统架构设计师论文:论湖仓一体架构及其应用

试题四 论湖仓一体架构及其应用 随着5G、大数据、人工智能、物联网等技术的不断成熟,各行各业的业务场景日益复杂,企业数据呈现出大规模、多样性的特点,特别是非结构化数据呈现出爆发式增长趋势。在这一背景下,企业数据管理不再局限于传统的结构化 OLTP (On-Line Transact…...

电磁兼容(EMC):GB 4343.1喀呖声 详解

目录 1. 喀呖声的危害 2. 喀呖声 Click定义 3. 中频参考电平 4. 开关操作 5. 最小观察时间 6. 喀呖声率 7. 喀呖声限值 8. 上四分位法 1. 喀呖声的危害 喀呖声作为一种电压骚扰&#xff0c;其危害主要体现在以下几个方面&#xff1a; 对电子设备的干扰&#xff1a;喀呖…...

纯血鸿蒙Native层支持说明

本文所有描述均参考鸿蒙官方文档&#xff1a;传送门 1.对C库的支持 C标准函数库在C语言程序设计中&#xff0c;提供符合标准的头文件&#xff0c;以及常用的库函数实现&#xff08;如I/O输入输出和字符串控制&#xff09;。 HarmonyOS采用musl作为C标准库&#xff0c;musl库…...

learn C++ NO.31——类型转换

C语言中的类型转换 在C语言中&#xff0c;当赋值符号两边的类型不匹配的时候&#xff0c;或者是形参类型和实参类型不匹配时&#xff0c;返回值类型与接受返回值类型不匹配时&#xff0c;都会需要类型转换。C语言的类型转换有两种&#xff1a;显示类型转换和隐式类型转换。 显…...

重学 Android 自定义 View 系列(三):自定义步数进度条

前言 本篇文章主要是实现仿QQ步数View&#xff0c;很老的一个View了&#xff0c;但技术永不落后&#xff0c;开搂&#xff01; 最终效果如下&#xff1a; 1. 结构分析 QQStepView 主要由三个元素组成&#xff1a; 显示一个圆环进度条&#xff0c;通过外环和内环的角度变化来…...

海南华志亿星电子商务有限公司赋能抖音商家成长

在当今瞬息万变的电商时代&#xff0c;抖音凭借其短视频与直播电商的独特模式&#xff0c;迅速崛起并引领潮流。在这场电商变革中&#xff0c;海南华志亿星电子商务有限公司以其卓越的服务质量和创新的运营模式&#xff0c;在抖音电商领域大放异彩&#xff0c;成为众多商家的首…...

数据结构-并查集专题(1)

一、前言 因为要开始准备年底的校赛和明年年初的ACM、蓝桥杯、天梯赛&#xff0c;于是开始按专题梳理一下对应的知识点&#xff0c;先从简单入门又值得记录的内容开始&#xff0c;并查集首当其冲。 二、我的模板 虽然说是借用了jiangly鸽鸽的板子&#xff0c;但是自己也小做…...

共享汽车管理新纪元:SpringBoot框架应用

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…...

道可云人工智能元宇宙每日资讯|《中国生成式人工智能应用与实践展望》白皮书发布

道可云元宇宙每日简报&#xff08;2024年11月6日&#xff09;讯&#xff0c;今日元宇宙新鲜事有&#xff1a; 《重庆市“机器人”应用行动计划&#xff08;2024—2027年&#xff09;》发布 近日&#xff0c;重庆市经济和信息化委员会、重庆市教育委员会等八部门印发《重庆市“…...

kaggle学习 eloData项目(1)-数据校验

文章目录 kaggle学习 eloData项目&#xff08;1&#xff09;-数据校验&#xff08;1&#xff09; 数据基本情况查看&#xff08;2&#xff09; 数据校验&#xff08;3&#xff09; 数据探究 小结 kaggle学习 eloData项目&#xff08;1&#xff09;-数据校验 不能懈怠&#xff0…...

ORACLE RAC用DNS服务器的配置

一、搭建本地YUM源 二、安装DNS全部组建 yum -y install bind* 三、规划您RAC集群所有IP #public 192.168.16.111 rac1.ntt.com rac1 192.168.16.112 rac2.ntt.com rac2 192.168.16.121 rac3.ntt.com rac3 192.168.16.122 rac4.ntt.com rac4 #private 10.10.10.111 rac1-pr…...

vue3 + vite 实现版本更新检查(检测到版本更新时提醒用户刷新页面)

背景 当一个页面很久没刷新&#xff0c;又突然点到页面。由于一些文件是因为动态加载的&#xff0c;当重编后&#xff08;如前后端发版后&#xff09;&#xff0c;这些文件会发生变化&#xff0c;就会出现加载不到的情况。进而导致正在使用的用户&#xff0c;点击页面发现加载…...

【CSP】爆零的独特姿势

硝烟散&#xff0c;繁花尽&#xff0c;第一次CSP折戟沉沙。 代码拿回来&#xff0c;花几分钟订正下&#xff0c;就是300分。 然而&#xff0c;实战只有100分&#xff0c;还是偷懒得的幸运&#xff0c;觉得第一题题目太简单懒得用文件IO调试... ... 啥也不说了&#xff0c;上图。…...

Git仓库

Git初始 概念 一个免费开源&#xff0c;分布式的代码版本控制系统&#xff0c;帮助开发团队维护代码 作用 记录代码内容&#xff0c;&#xff0c;切换代码版本&#xff0c;多人开发时高效合并代码内容 如何学&#xff1a; 个人本机使用&#xff1a;Git基础命令和概念 多…...

【科研日常】论文投稿的几大状态

Manuscript Submitted&#xff08;Submitted to Journal&#xff09;&#xff1a;表示论文已经投稿成功&#xff0c;等待期刊工作人员检查论文格式排版、重复率是否符合要求&#xff0c;符合要求的文章会分配给期刊编辑进行处理。 Awaiting Admin Processing&#xff1a;意为等…...

SSLHandshakeException错误解决方案

1、错误提示 调用Http工具报如下异常信息&#xff1a; cn.hutool.core.io.IORuntimeException: SSLHandshakeException: Received fatal alert: handshake_failure2、查询问题 一开始我以为是代码bug&#xff0c;网络bug甚至是配置环境未生效&#xff0c;找了一大圈&#xf…...

python数据结构基础(7)

本节学习最后一种数据结构---图,在很多问题中应用图可以帮助构建思维空间,快速理清思路,解决复杂问题. 图就是一些顶点的集合,这些顶点通过一系列边链接起来.根据边的有向和无向,图分为有向图和无向图.有时图的边上带有权重,本节暂时不将权重作为重点. 计算机通过邻接表或者邻…...

【系统集成项目管理工程师】英语词汇对照表-项目管理类

英语单词&#xff08;项目管理类&#xff09;中文解释Activity活动Accept验收Acceptable Quality Level可接受的质量水平Acceptance Standard验收标准Acquisition Plan Review采购计划评审Action处理Active On the Arrow双代号网络图Activity Based Costing (ABC)基于活动的成本…...

购物车-多元素组合动画css

学习 渡一课程 多元素组合动画 练习。 在我们开发购物车功能时&#xff0c;经常会有点击添加按钮&#xff0c;就会有一个小圆点掉进购物车的动画&#xff0c;如下图所示&#xff0c;今天我们通过css来实现。 首先实现多元素组合动画 直接上代码&#xff0c;可以复制到本地使用…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

vscode里如何用git

打开vs终端执行如下&#xff1a; 1 初始化 Git 仓库&#xff08;如果尚未初始化&#xff09; git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

R语言速释制剂QBD解决方案之三

本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用

中达瑞和自2005年成立以来&#xff0c;一直在光谱成像领域深度钻研和发展&#xff0c;始终致力于研发高性能、高可靠性的光谱成像相机&#xff0c;为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...