当前位置: 首页 > news >正文

Kafka之消费者客户端

1、历史上的二个版本

与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:

  • 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端
  • 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端

2、必要的参数配置

  • bootstrap.servers

    用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。

    不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。

    建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。

  • group.id

    消费者隶属消费组的名称。

  • key.deserializer 和 value.deserializer

    与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。

    用来将字节数组中的key和value反序列化还原为原来的对象格式。

3、订阅主题与分区

一个消费者可以订阅一个或多个主题。

Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。

这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。

其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。

综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
在这里插入图片描述

4、消费消息

消息消费一般有两种方式:

  • 推模式:服务器主动将消息推送给消费者。
  • 拉模式:消费者主动向服务器发起请求来来取信息。

Kafka采用的消息消费模式是拉模式。

在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。

5、位移提交

在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。

对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。

对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”

通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
在这里插入图片描述
通过上图我们可以了解到如下信息:

  1. 正在消费的消息下标为3。
  2. 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
  3. 对于分区来说,下标4则作为下一个消息要写入的位置。
  4. 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。

Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。

根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
在这里插入图片描述
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。

  • 消息丢失

    假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。

  • 重复消费

    假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。

通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。

6、指定位移消息

首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?

为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:

  • latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
  • earliest:意为消费者会从起始处也就是0开始消费。
  • none:直接抛出NoOffsetForPartitionException异常。

7、再均衡

所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。

再均衡的过程中,消费组内的消费者无法读取消息。

再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。

8、消费者拦截器

我们可以通过消费者拦截器在poll返回消息之前消费位移提交之后进行一些特定的处理。

9、多线程实现

为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。

有三种多线程的实现方式:

  1. 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图: 在这里插入图片描述
  2. 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
  3. 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图: 在这里插入图片描述
    在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
    在这里插入图片描述
    基于这种实现方式提供以下两种实现方案:
    • 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
    • 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图: 在这里插入图片描述
      窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。

上一篇:Kafka之消费组与消费者

相关文章:

Kafka之消费者客户端

1、历史上的二个版本 与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本: 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端。新消费…...

使用Python进行数据分析入门

文章目录 Python环境搭建安装Anaconda验证安装 必备库介绍NumPyPandasMatplotlibSciPy 数据导入与清洗导入数据清洗数据 数据探索与分析描述性统计相关性分析 数据可视化绘制直方图 高级主题机器学习深度学习 总结 随着大数据时代的到来,数据分析变得越来越重要。Py…...

ubuntu20 从源码编译升级到版本5.15.263

author: hjjdebug date: 2024年 10月 25日 星期五 15:38:48 CST description: ubuntu20 从源码编译升级到版本5.15.263 我的内核是 5.15.105, 用apt 下载源码后其版本是5.15.263 为什么要从源码编译内核. 升级内核? 目的: 练练手. 消除内核神秘性. 还可以裁减内核,也是调试内核…...

php 程序开发分层与验证思想

在PHP程序开发中,合理的层级设计可以提高代码的可维护性、可扩展性和可测试性。以下是常见的层级设计模式及建议: 1. 分层架构 通常可以将PHP应用分为以下几层: 表示层(Presentation Layer): 负责与用户交…...

关于InternVL2的单卡、多卡推理

关于InternVL2的单卡、多卡推理 前言单卡推理多卡推理总结前言 本章节将介绍如何使用上一章节微调后的模型进行推理。推理又分为单卡和多卡,这里介绍的两种方式都是Hugging Face的transformers方法进行推理。模型的话可以使用上一章微调的任意一个非lora模型进行测试。 单卡推…...

Go语言设计Web框架

如何设计一个Web框架 项目规划 在开始设计Web框架之前,我们需要对整个项目进行规划。主要包括以下几个方面: 项目结构依赖管理路由设计控制器设计日志和配置管理 项目结构 首先,我们定义项目的目录结构: ├── cmd/ │ └…...

2024年10月28日练习(双指针算法)

一.11. 盛最多水的容器 - 力扣(LeetCode) 1.题目描述: 这个题目代表的意思就是数组上每个对应的值就相当于每条垂直线的高度,就相当于短板效应,两 个高度的线会取最短的长度因为那样水才不会漏。而两条线的数组的下标…...

Objective-C 音频爬虫:实时接收数据的 didReceiveData_ 方法

在互联网技术领域,数据的获取和处理是至关重要的。尤其是对于音频内容的获取,实时性和效率是衡量一个爬虫性能的重要指标。本文将深入探讨在Objective-C中实现音频爬虫时,如何高效地使用didReceiveData:方法来实时接收数据,并通过…...

提升网站流量和自然排名的SEO基本知识与策略分析

内容概要 在当今数字化时代,SEO(搜索引擎优化)成为加强网站可见度和提升流量的重要工具。SEO的基础知识包括理解搜索引擎的工作原理,以及如何通过优化网站内容和结构来提高自然排名。白帽SEO和黑帽SEO代表了两种截然不同的策略&a…...

雷池社区版compose文件配置讲解--fvm

在现代网络安全中,选择合适的 Web 应用防火墙至关重要。雷池(SafeLine)社区版免费切好用。为网站提供全面的保护,帮助网站抵御各种网络攻击。 docker-compose.yml 文件是 Docker Compose 的核心文件,用于定义和管理多…...

基于51单片机的智能断路器proteus仿真

地址: https://pan.baidu.com/s/16lfGgrgVr9V7JehonMNVQA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...

(N-154)基于springboot酒店预订管理系统

开发工具:IDEA 服务器:Tomcat9.0, jdk1.8 项目构建:maven 数据库:mysql5.7 前端技术:AdminLTEBootstrapLayUIHTMLjQuery 服务端技术:springbootmybatis-plusthymeleaf 本项目分前台和后台…...

elasticsearch 8.x 插件安装(三)之拼音插件

elasticsearch 8.x 插件安装(三)之拼音插件 elasticsearch插件安装合集 elasticsearch插件安装(一)之ik分词器安装(含MySQL更新) elasticsearch 8.x插件(二)之同义词安装如何解决…...

快速遍历包含合并单元格的Word表格

Word中的合并表格如下,现在需要根据子类(例如:果汁)查找对应的品类,如果这是Excel表格,那么即使包含合并单元格,也很容易处理,但是使用Word VBA进行查找,就需要一些技巧。…...

手机收银云进销存管理软件,商品档案Excel格式批量导入导出,一键导入Excel的商品档案

如果您有Excel的商品档案,那么就可以批量导入到我们的手机云进销存软件系统里,就不需要人工手工一个个商品的新建商品档案,大大提高工作效率。如果您看下面的步骤不会操作,可以联系我们技术支持,来帮您把商品档案导入。…...

html 中识别\n自动换行

CSS实现&#xff1a;white-space <div style"white-space: pre-wrap;" v-html"str"> </div>white-space: normal|nowrap|pre|pre-line|pre-wrap|initial|inherit;值描述换行符空格和制表符文字换行行尾空格normal默认。空白会被浏览器忽略。合…...

用QWebSocketServer写websocket服务端

1. 引入必要的头文件 #include <QCoreApplication> #include <QWebSocketServer> #include <QWebSocket> #include <QDebug> #include <QObject>QCoreApplication&#xff1a;用于创建控制台应用的事件循环。QWebSocketServer&#xff1a;提供 …...

云原生后端:现代应用架构的核心力量

云原生后端&#xff1a;现代应用架构的核心力量 云原生后端是基于云环境进行设计和开发的一种理念&#xff0c;利用云服务和云原生技术构建的服务端应用。它旨在提供灵活、高效、弹性和可扩展的解决方案&#xff0c;成为推动应用现代化的核心力量。本文将详细探讨云原生后端的…...

arcgis中dem转模型导入3dmax

文末分享素材 效果 1、准备数据 (1)DEM (2)DOM 2、打开arcscene软件 3、加载DEM、DOM数据 4、设置DOM的高度为DEM...

Python自动化测试中的Mock与单元测试实战

在软件开发过程中&#xff0c;自动化测试是确保代码质量和稳定性的关键一环。而Python作为一门灵活且强大的编程语言&#xff0c;提供了丰富的工具和库来支持自动化测试。本文将深入探讨如何结合Mock与单元测试&#xff0c;利用Python进行自动化测试&#xff0c;以提高代码的可…...

【kafka】Golang实现分布式Masscan任务调度系统

要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...