Kafka权威指南(第2版)读书笔记
目录
- Kafka生产者——向Kafka写入数据
- 生产者概览
- 创建Kafka生产者
- bootstrap.servers
- key.serializer
- value.serializer
- 发送消息到Kafka
- 同步发送消息
Kafka生产者——向Kafka写入数据
不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。
生产者概览
一个应用程序会在很多情况下向Kafka写入消息:记录用户的活动(用于审计和分析)、记录指标、记录日志、记录从智能家电收集到的信息、与其他应用程序进行异步通信、缓冲即将写入数据库的数据,等等。不同的应用场景直接影响如何使用和配置生产者API。尽管生产者API使用起来很简单,但消息的发送过程还是有点儿复杂。下图展示了向Kafka发送消息的主要步骤:
先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。
接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。
broker在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。
创建Kafka生产者
要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必须设置的属性。
bootstrap.servers
broker的地址。可以由多个host:port组成,生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址,因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。
key.serializer
一个类名,用来序列化消息的键。broker希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker,但它需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常见的几种Java对象类型,就没有必要实现自己的序列化器。需要注意的是,必须设置key.serializer这个属性,尽管你可能只需要将值发送给Kafka。如果只需要发送值,则可以将Void作为键的类型,然后将这个属性设置为VoidSerializer。
value.serializer
一个类名,用来序列化消息的值。与设置key.serializer属性一样,需要将value.serializer设置成可以序列化消息值对象的类。
发送消息到Kafka
同步发送消息
同步发送消息很简单,当Kafka返回错误或重试次数达到上限时,生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同,broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能。因此,同步发送方式通常不会被用在生产环境中。
KafkaProducer一般会出现两种错误。一种是可重试错误,这种错误可以通过重发消息来解决。例如,对于连接错误,只要再次建立连接就可以解决。对于“not leader for partition”(非分区首领)错误,只要重新为分区选举首领就可以解决,此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题,则应用程序会收到重试异常。另一种错误则无法通过重试解决,比如“Message size too large”(消息太大)。对于这种错误,KafkaProducer不会进行任何重试,而会立即抛出异常。
相关文章:

Kafka权威指南(第2版)读书笔记
目录 Kafka生产者——向Kafka写入数据生产者概览创建Kafka生产者bootstrap.serverskey.serializervalue.serializer 发送消息到Kafka同步发送消息 Kafka生产者——向Kafka写入数据 不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写…...
WORD转PDF脚本文件
1、在桌面新建一个文本文件,把下列代码复制到文本文件中。 On Error Resume Next Const wdExportFormatPDF 17 Set oWord WScript.CreateObject("Word.Application") Set fso WScript.CreateObject("Scripting.Filesystemobject") Set fdsf…...
electron 打包后的 exe 文件,运行后是空白窗口
一、代码相关问题 1. 页面加载失败 1.1 原因 在 Electron 应用中,若loadFile或loadURL方法指定的页面路径或 URL 错误,就无法正确加载页面,导致窗口空白。 1.2. 解决 仔细检查loadFile或loadURL方法中传入的路径或 URL 是否正确…...
数据库重连 - 方案
要解决 SQL Server 连接失效后导致的错误问题,可以考虑以下几种解决方案: 1. 连接池机制: 通过实现一个连接池,确保连接失效后可以重新建立连接,而不会直接导致整个程序出错。连接池可以帮助在连接中断时自动恢复连接,而不必每次手动重连。 例如,可以通过以下方式定期…...
从 PostgreSQL 中挽救损坏的表
~/tmp-dir.dab4fd85-8b47-4d9a-b15c-18312ef61075 pg_dump -U postgres -h locathost www_p1 > wow_p1.sqlpg_dump:错误:转储表 “page_views” 的内容失败:PQgetResult() 失败。pg_dump:详细信息:来自服务器的错误…...

【Vue3 入门到实战】1. 创建Vue3工程
目录 编辑 1. 学习目标 2. 环境准备与初始化 3. 项目文件结构 4. 写一个简单的效果 5. 总结 1. 学习目标 (1) 掌握如何创建vue3项目。 (2) 了解项目中的文件的作用。 (3) 编辑App.vue文件,并写一个简单的效果。 2. 环境准备与初始化 (1) 安装 Node.js 和 …...

rtthread学习笔记系列(10/11) -- 系统定时器
文章目录 10. 系统定时器10.1 跳跃表[定时器跳表 (Skip List) 算法](https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/programming-manual/timer/timer?id定时器跳表-skip-list-算法) 10.2 硬件定时器10.2.1 初始化&&删除10.2.2 sta…...

mock服务-通过json定义接口自动实现mock服务
go-mock介绍 不管在前端还是后端开发过程中,当我们需要联调其他服务的接口,而这个服务还没法提供调用时,那我们就要用到mock服务,自己按接口文档定义一个临时接口返回指定数据,以供本地开发联调测试。 怎么快速启动一…...

像JSONDecodeError: Extra data: line 2 column 1 (char 134)这样的问题怎么解决
问题介绍 今天处理返回的 JSON 的时候,出现了下面这样的问题: 处理这种问题的时候,首先你要看一下当前的字符串格式是啥样的,比如我查看后发现是下面这样的: 会发现这个字符串中间没有逗号,也就是此时的J…...
C#版 软件开发6大原则与23种设计模式
开发原则和设计模式一直是软件开发中的圣经, 但是这仅仅适用于中大型的项目开发, 在小型项目的开发中, 这些规则会降低你的开发效率, 使你的工程变得繁杂. 所以只有适合你的才是最好的. 设计模式六大原则1. 单一职责原则(Single Responsibility Principle࿰…...

java8 springboot 集成javaFx 实现一个客户端程序
1. 先创建一个springboot 程序(此步骤不做流程展示) 2. 更改springboot的版本依赖和导入所需依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</versio…...

MySQL(高级特性篇) 06 章——索引的数据结构
一、为什么使用索引 索引是存储引擎用于快速找到数据记录的一种数据结构,就好比一本教科书的目录部分,通过目录找到对应文章的页码,便可快速定位到需要的文章。MySQL中也是一样的道理,进行数据查找时,首先查看查询条件…...
PanWeidb-使用BenchmarkSQL对磐维数据库进行压测
本文提供PanweiDb使用BenchmarkSQL进行性能测试的方法和测试数据报告。 BenchmarkSQL,一个JDBC基准测试工具,内嵌了TPC-C测试脚本,支持很多数据库,如PostgreSQL、Oracle和Mysql等。 TPC-C是专门针对联机交易处理系统(OLTP系统)的规范,一般情况下我们也把这类系统称为业…...
AR 在高校实验室安全教育中的应用
AR应用APP可以内置实验室安全功能介绍,学习并考试(为满足教育部关于实验室人员准入条件),AR主模块。其中AR主模块应该包括图形标识码的扫描,生成相应模型,或者火灾、逃生等应急处置的路线及动画演示。考试采…...

微信小程序实现个人中心页面
文章目录 1. 官方文档教程2. 编写静态页面3. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/ 2. 编写静态页面 mine.wxml布局文件 <!--index.wxml--> <navigation-bar title"个人中心" ba…...
Spring Boot中的配置文件有哪些类型
在 Spring Boot 中,配置文件用于管理应用程序的设置和参数,通常存放在项目的 src/main/resources 目录下。Spring Boot 支持多种类型的配置文件,并通过这些文件来控制应用的行为和环境配置。 1. application.properties application.proper…...
Spring Boot 项目启动后自动加载系统配置的多种实现方式
Spring Boot 项目启动后自动加载系统配置的多种实现方式 在 Spring Boot 项目中,可以通过以下几种方式实现 在项目启动完成后自动加载系统配置缓存操作 的需求: 1. 使用 CommandLineRunner CommandLineRunner 是一个接口,可以用来在 Spring…...
如何在 CentOS 中生成 CSR
在本教程中,我们将向您展示如何在CentOS 7和6中生成CSR。您可以直接从服务器生成 CSR。 只需按照以下步骤操作: 第 1 步:使用安全外壳 (SSH) 登录您的服务器 步骤 2:创建私钥和 CSR 文件 在提示符处键入以…...

qml XmlListModel详解
1、概述 XmlListModel是QtQuick用于从XML数据创建只读模型的组件。它可以作为各种view元素的数据源,比如ListView、GridView、PathView等;也可以作为其他和model交互的元素的数据源。通过XmlRole定义角色,如name、age和height,并…...
C++并发编程之跨应用程序与驱动程序的单生产者单消费者队列
设计一个单生产者单消费者队列(SPSC队列),不使用C STL库或操作系统原子操作函数,并且将其放入跨进程共享内存中以便在Ring3(用户模式)和Ring0(内核模式)之间传递数据,是一…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...