当前位置: 首页 > news >正文

Kafka权威指南(第2版)读书笔记

目录

  • Kafka生产者——向Kafka写入数据
    • 生产者概览
    • 创建Kafka生产者
      • bootstrap.servers
      • key.serializer
      • value.serializer
    • 发送消息到Kafka
      • 同步发送消息

Kafka生产者——向Kafka写入数据

不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。

生产者概览

一个应用程序会在很多情况下向Kafka写入消息:记录用户的活动(用于审计和分析)​、记录指标、记录日志、记录从智能家电收集到的信息、与其他应用程序进行异步通信、缓冲即将写入数据库的数据,等等。不同的应用场景直接影响如何使用和配置生产者API。尽管生产者API使用起来很简单,但消息的发送过程还是有点儿复杂。下图展示了向Kafka发送消息的主要步骤:
在这里插入图片描述
先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。

接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。

broker在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。

创建Kafka生产者

要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必须设置的属性。

bootstrap.servers

broker的地址。可以由多个host:port组成,生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址,因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。

key.serializer

一个类名,用来序列化消息的键。broker希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker,但它需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常见的几种Java对象类型,就没有必要实现自己的序列化器。需要注意的是,必须设置key.serializer这个属性,尽管你可能只需要将值发送给Kafka。如果只需要发送值,则可以将Void作为键的类型,然后将这个属性设置为VoidSerializer。

value.serializer

一个类名,用来序列化消息的值。与设置key.serializer属性一样,需要将value.serializer设置成可以序列化消息值对象的类。

发送消息到Kafka

同步发送消息

同步发送消息很简单,当Kafka返回错误或重试次数达到上限时,生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同,broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能。因此,同步发送方式通常不会被用在生产环境中​。

KafkaProducer一般会出现两种错误。一种是可重试错误,这种错误可以通过重发消息来解决。例如,对于连接错误,只要再次建立连接就可以解决。对于“not leader for partition”​(非分区首领)错误,只要重新为分区选举首领就可以解决,此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题,则应用程序会收到重试异常。另一种错误则无法通过重试解决,比如“Message size too large”​(消息太大)​。对于这种错误,KafkaProducer不会进行任何重试,而会立即抛出异常。

相关文章:

Kafka权威指南(第2版)读书笔记

目录 Kafka生产者——向Kafka写入数据生产者概览创建Kafka生产者bootstrap.serverskey.serializervalue.serializer 发送消息到Kafka同步发送消息 Kafka生产者——向Kafka写入数据 不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写…...

WORD转PDF脚本文件

1、在桌面新建一个文本文件,把下列代码复制到文本文件中。 On Error Resume Next Const wdExportFormatPDF 17 Set oWord WScript.CreateObject("Word.Application") Set fso WScript.CreateObject("Scripting.Filesystemobject") Set fdsf…...

electron 打包后的 exe 文件,运行后是空白窗口

一、代码相关问题 1. 页面加载失败 1.1 原因 在 Electron 应用中,若loadFile或loadURL方法指定的页面路径或 URL 错误,就无法正确加载页面,导致窗口空白。 1.2. 解决 仔细检查loadFile或loadURL方法中传入的路径或 URL 是否正确&#xf…...

数据库重连 - 方案

要解决 SQL Server 连接失效后导致的错误问题,可以考虑以下几种解决方案: 1. 连接池机制: 通过实现一个连接池,确保连接失效后可以重新建立连接,而不会直接导致整个程序出错。连接池可以帮助在连接中断时自动恢复连接,而不必每次手动重连。 例如,可以通过以下方式定期…...

从 PostgreSQL 中挽救损坏的表

~/tmp-dir.dab4fd85-8b47-4d9a-b15c-18312ef61075 pg_dump -U postgres -h locathost www_p1 > wow_p1.sqlpg_dump:错误:转储表 “page_views” 的内容失败:PQgetResult() 失败。pg_dump:详细信息:来自服务器的错误…...

【Vue3 入门到实战】1. 创建Vue3工程

目录 ​编辑 1. 学习目标 2. 环境准备与初始化 3. 项目文件结构 4. 写一个简单的效果 5. 总结 1. 学习目标 (1) 掌握如何创建vue3项目。 (2) 了解项目中的文件的作用。 (3) 编辑App.vue文件,并写一个简单的效果。 2. 环境准备与初始化 (1) 安装 Node.js 和 …...

rtthread学习笔记系列(10/11) -- 系统定时器

文章目录 10. 系统定时器10.1 跳跃表[定时器跳表 (Skip List) 算法](https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/programming-manual/timer/timer?id定时器跳表-skip-list-算法) 10.2 硬件定时器10.2.1 初始化&&删除10.2.2 sta…...

mock服务-通过json定义接口自动实现mock服务

go-mock介绍 不管在前端还是后端开发过程中,当我们需要联调其他服务的接口,而这个服务还没法提供调用时,那我们就要用到mock服务,自己按接口文档定义一个临时接口返回指定数据,以供本地开发联调测试。 怎么快速启动一…...

像JSONDecodeError: Extra data: line 2 column 1 (char 134)这样的问题怎么解决

问题介绍 今天处理返回的 JSON 的时候,出现了下面这样的问题: 处理这种问题的时候,首先你要看一下当前的字符串格式是啥样的,比如我查看后发现是下面这样的: 会发现这个字符串中间没有逗号,也就是此时的J…...

C#版 软件开发6大原则与23种设计模式

开发原则和设计模式一直是软件开发中的圣经, 但是这仅仅适用于中大型的项目开发, 在小型项目的开发中, 这些规则会降低你的开发效率, 使你的工程变得繁杂. 所以只有适合你的才是最好的. 设计模式六大原则1. 单一职责原则(Single Responsibility Principle&#xff0…...

java8 springboot 集成javaFx 实现一个客户端程序

1. 先创建一个springboot 程序(此步骤不做流程展示) 2. 更改springboot的版本依赖和导入所需依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</versio…...

MySQL(高级特性篇) 06 章——索引的数据结构

一、为什么使用索引 索引是存储引擎用于快速找到数据记录的一种数据结构&#xff0c;就好比一本教科书的目录部分&#xff0c;通过目录找到对应文章的页码&#xff0c;便可快速定位到需要的文章。MySQL中也是一样的道理&#xff0c;进行数据查找时&#xff0c;首先查看查询条件…...

PanWeidb-使用BenchmarkSQL对磐维数据库进行压测

本文提供PanweiDb使用BenchmarkSQL进行性能测试的方法和测试数据报告。 BenchmarkSQL,一个JDBC基准测试工具,内嵌了TPC-C测试脚本,支持很多数据库,如PostgreSQL、Oracle和Mysql等。 TPC-C是专门针对联机交易处理系统(OLTP系统)的规范,一般情况下我们也把这类系统称为业…...

AR 在高校实验室安全教育中的应用

AR应用APP可以内置实验室安全功能介绍&#xff0c;学习并考试&#xff08;为满足教育部关于实验室人员准入条件&#xff09;&#xff0c;AR主模块。其中AR主模块应该包括图形标识码的扫描&#xff0c;生成相应模型&#xff0c;或者火灾、逃生等应急处置的路线及动画演示。考试采…...

微信小程序实现个人中心页面

文章目录 1. 官方文档教程2. 编写静态页面3. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/ 2. 编写静态页面 mine.wxml布局文件 <!--index.wxml--> <navigation-bar title"个人中心" ba…...

Spring Boot中的配置文件有哪些类型

在 Spring Boot 中&#xff0c;配置文件用于管理应用程序的设置和参数&#xff0c;通常存放在项目的 src/main/resources 目录下。Spring Boot 支持多种类型的配置文件&#xff0c;并通过这些文件来控制应用的行为和环境配置。 1. application.properties application.proper…...

Spring Boot 项目启动后自动加载系统配置的多种实现方式

Spring Boot 项目启动后自动加载系统配置的多种实现方式 在 Spring Boot 项目中&#xff0c;可以通过以下几种方式实现 在项目启动完成后自动加载系统配置缓存操作 的需求&#xff1a; 1. 使用 CommandLineRunner CommandLineRunner 是一个接口&#xff0c;可以用来在 Spring…...

如何在 CentOS 中生成 CSR

在本教程中&#xff0c;我们将向您展示如何在CentOS 7和6中生成CSR。您可以直接从服务器生成 CSR。 只需按照以下步骤操作&#xff1a; 第 1 步&#xff1a;使用安全外壳 &#xff08;SSH&#xff09; 登录您的服务器 步骤 2&#xff1a;创建私钥和 CSR 文件 在提示符处键入以…...

qml XmlListModel详解

1、概述 XmlListModel是QtQuick用于从XML数据创建只读模型的组件。它可以作为各种view元素的数据源&#xff0c;比如ListView、GridView、PathView等&#xff1b;也可以作为其他和model交互的元素的数据源。通过XmlRole定义角色&#xff0c;如name、age和height&#xff0c;并…...

C++并发编程之跨应用程序与驱动程序的单生产者单消费者队列

设计一个单生产者单消费者队列&#xff08;SPSC队列&#xff09;&#xff0c;不使用C STL库或操作系统原子操作函数&#xff0c;并且将其放入跨进程共享内存中以便在Ring3&#xff08;用户模式&#xff09;和Ring0&#xff08;内核模式&#xff09;之间传递数据&#xff0c;是一…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机&#xff0c;它可以执行Java字节码。Java虚拟机是Java平台的一部分&#xff0c;Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅

目录 前言 操作系统与驱动程序 是什么&#xff0c;为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中&#xff0c;我们在使用电子设备时&#xff0c;我们所输入执行的每一条指令最终大多都会作用到硬件上&#xff0c;比如下载一款软件最终会下载到硬盘上&am…...

数据结构:递归的种类(Types of Recursion)

目录 尾递归&#xff08;Tail Recursion&#xff09; 什么是 Loop&#xff08;循环&#xff09;&#xff1f; 复杂度分析 头递归&#xff08;Head Recursion&#xff09; 树形递归&#xff08;Tree Recursion&#xff09; 线性递归&#xff08;Linear Recursion&#xff09;…...

规则与人性的天平——由高考迟到事件引发的思考

当那位身着校服的考生在考场关闭1分钟后狂奔而至&#xff0c;他涨红的脸上写满绝望。铁门内秒针划过的弧度&#xff0c;成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定"&#xff0c;构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...