当前位置: 首页 > news >正文

Kafka权威指南(第2版)读书笔记

目录

  • Kafka生产者——向Kafka写入数据
    • 生产者概览
    • 创建Kafka生产者
      • bootstrap.servers
      • key.serializer
      • value.serializer
    • 发送消息到Kafka
      • 同步发送消息

Kafka生产者——向Kafka写入数据

不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。

生产者概览

一个应用程序会在很多情况下向Kafka写入消息:记录用户的活动(用于审计和分析)​、记录指标、记录日志、记录从智能家电收集到的信息、与其他应用程序进行异步通信、缓冲即将写入数据库的数据,等等。不同的应用场景直接影响如何使用和配置生产者API。尽管生产者API使用起来很简单,但消息的发送过程还是有点儿复杂。下图展示了向Kafka发送消息的主要步骤:
在这里插入图片描述
先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。

接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。

broker在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。

创建Kafka生产者

要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必须设置的属性。

bootstrap.servers

broker的地址。可以由多个host:port组成,生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址,因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。

key.serializer

一个类名,用来序列化消息的键。broker希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker,但它需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常见的几种Java对象类型,就没有必要实现自己的序列化器。需要注意的是,必须设置key.serializer这个属性,尽管你可能只需要将值发送给Kafka。如果只需要发送值,则可以将Void作为键的类型,然后将这个属性设置为VoidSerializer。

value.serializer

一个类名,用来序列化消息的值。与设置key.serializer属性一样,需要将value.serializer设置成可以序列化消息值对象的类。

发送消息到Kafka

同步发送消息

同步发送消息很简单,当Kafka返回错误或重试次数达到上限时,生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同,broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能。因此,同步发送方式通常不会被用在生产环境中​。

KafkaProducer一般会出现两种错误。一种是可重试错误,这种错误可以通过重发消息来解决。例如,对于连接错误,只要再次建立连接就可以解决。对于“not leader for partition”​(非分区首领)错误,只要重新为分区选举首领就可以解决,此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题,则应用程序会收到重试异常。另一种错误则无法通过重试解决,比如“Message size too large”​(消息太大)​。对于这种错误,KafkaProducer不会进行任何重试,而会立即抛出异常。

相关文章:

Kafka权威指南(第2版)读书笔记

目录 Kafka生产者——向Kafka写入数据生产者概览创建Kafka生产者bootstrap.serverskey.serializervalue.serializer 发送消息到Kafka同步发送消息 Kafka生产者——向Kafka写入数据 不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写…...

WORD转PDF脚本文件

1、在桌面新建一个文本文件,把下列代码复制到文本文件中。 On Error Resume Next Const wdExportFormatPDF 17 Set oWord WScript.CreateObject("Word.Application") Set fso WScript.CreateObject("Scripting.Filesystemobject") Set fdsf…...

electron 打包后的 exe 文件,运行后是空白窗口

一、代码相关问题 1. 页面加载失败 1.1 原因 在 Electron 应用中,若loadFile或loadURL方法指定的页面路径或 URL 错误,就无法正确加载页面,导致窗口空白。 1.2. 解决 仔细检查loadFile或loadURL方法中传入的路径或 URL 是否正确&#xf…...

数据库重连 - 方案

要解决 SQL Server 连接失效后导致的错误问题,可以考虑以下几种解决方案: 1. 连接池机制: 通过实现一个连接池,确保连接失效后可以重新建立连接,而不会直接导致整个程序出错。连接池可以帮助在连接中断时自动恢复连接,而不必每次手动重连。 例如,可以通过以下方式定期…...

从 PostgreSQL 中挽救损坏的表

~/tmp-dir.dab4fd85-8b47-4d9a-b15c-18312ef61075 pg_dump -U postgres -h locathost www_p1 > wow_p1.sqlpg_dump:错误:转储表 “page_views” 的内容失败:PQgetResult() 失败。pg_dump:详细信息:来自服务器的错误…...

【Vue3 入门到实战】1. 创建Vue3工程

目录 ​编辑 1. 学习目标 2. 环境准备与初始化 3. 项目文件结构 4. 写一个简单的效果 5. 总结 1. 学习目标 (1) 掌握如何创建vue3项目。 (2) 了解项目中的文件的作用。 (3) 编辑App.vue文件,并写一个简单的效果。 2. 环境准备与初始化 (1) 安装 Node.js 和 …...

rtthread学习笔记系列(10/11) -- 系统定时器

文章目录 10. 系统定时器10.1 跳跃表[定时器跳表 (Skip List) 算法](https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/programming-manual/timer/timer?id定时器跳表-skip-list-算法) 10.2 硬件定时器10.2.1 初始化&&删除10.2.2 sta…...

mock服务-通过json定义接口自动实现mock服务

go-mock介绍 不管在前端还是后端开发过程中,当我们需要联调其他服务的接口,而这个服务还没法提供调用时,那我们就要用到mock服务,自己按接口文档定义一个临时接口返回指定数据,以供本地开发联调测试。 怎么快速启动一…...

像JSONDecodeError: Extra data: line 2 column 1 (char 134)这样的问题怎么解决

问题介绍 今天处理返回的 JSON 的时候,出现了下面这样的问题: 处理这种问题的时候,首先你要看一下当前的字符串格式是啥样的,比如我查看后发现是下面这样的: 会发现这个字符串中间没有逗号,也就是此时的J…...

C#版 软件开发6大原则与23种设计模式

开发原则和设计模式一直是软件开发中的圣经, 但是这仅仅适用于中大型的项目开发, 在小型项目的开发中, 这些规则会降低你的开发效率, 使你的工程变得繁杂. 所以只有适合你的才是最好的. 设计模式六大原则1. 单一职责原则(Single Responsibility Principle&#xff0…...

java8 springboot 集成javaFx 实现一个客户端程序

1. 先创建一个springboot 程序(此步骤不做流程展示) 2. 更改springboot的版本依赖和导入所需依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</versio…...

MySQL(高级特性篇) 06 章——索引的数据结构

一、为什么使用索引 索引是存储引擎用于快速找到数据记录的一种数据结构&#xff0c;就好比一本教科书的目录部分&#xff0c;通过目录找到对应文章的页码&#xff0c;便可快速定位到需要的文章。MySQL中也是一样的道理&#xff0c;进行数据查找时&#xff0c;首先查看查询条件…...

PanWeidb-使用BenchmarkSQL对磐维数据库进行压测

本文提供PanweiDb使用BenchmarkSQL进行性能测试的方法和测试数据报告。 BenchmarkSQL,一个JDBC基准测试工具,内嵌了TPC-C测试脚本,支持很多数据库,如PostgreSQL、Oracle和Mysql等。 TPC-C是专门针对联机交易处理系统(OLTP系统)的规范,一般情况下我们也把这类系统称为业…...

AR 在高校实验室安全教育中的应用

AR应用APP可以内置实验室安全功能介绍&#xff0c;学习并考试&#xff08;为满足教育部关于实验室人员准入条件&#xff09;&#xff0c;AR主模块。其中AR主模块应该包括图形标识码的扫描&#xff0c;生成相应模型&#xff0c;或者火灾、逃生等应急处置的路线及动画演示。考试采…...

微信小程序实现个人中心页面

文章目录 1. 官方文档教程2. 编写静态页面3. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/ 2. 编写静态页面 mine.wxml布局文件 <!--index.wxml--> <navigation-bar title"个人中心" ba…...

Spring Boot中的配置文件有哪些类型

在 Spring Boot 中&#xff0c;配置文件用于管理应用程序的设置和参数&#xff0c;通常存放在项目的 src/main/resources 目录下。Spring Boot 支持多种类型的配置文件&#xff0c;并通过这些文件来控制应用的行为和环境配置。 1. application.properties application.proper…...

Spring Boot 项目启动后自动加载系统配置的多种实现方式

Spring Boot 项目启动后自动加载系统配置的多种实现方式 在 Spring Boot 项目中&#xff0c;可以通过以下几种方式实现 在项目启动完成后自动加载系统配置缓存操作 的需求&#xff1a; 1. 使用 CommandLineRunner CommandLineRunner 是一个接口&#xff0c;可以用来在 Spring…...

如何在 CentOS 中生成 CSR

在本教程中&#xff0c;我们将向您展示如何在CentOS 7和6中生成CSR。您可以直接从服务器生成 CSR。 只需按照以下步骤操作&#xff1a; 第 1 步&#xff1a;使用安全外壳 &#xff08;SSH&#xff09; 登录您的服务器 步骤 2&#xff1a;创建私钥和 CSR 文件 在提示符处键入以…...

qml XmlListModel详解

1、概述 XmlListModel是QtQuick用于从XML数据创建只读模型的组件。它可以作为各种view元素的数据源&#xff0c;比如ListView、GridView、PathView等&#xff1b;也可以作为其他和model交互的元素的数据源。通过XmlRole定义角色&#xff0c;如name、age和height&#xff0c;并…...

C++并发编程之跨应用程序与驱动程序的单生产者单消费者队列

设计一个单生产者单消费者队列&#xff08;SPSC队列&#xff09;&#xff0c;不使用C STL库或操作系统原子操作函数&#xff0c;并且将其放入跨进程共享内存中以便在Ring3&#xff08;用户模式&#xff09;和Ring0&#xff08;内核模式&#xff09;之间传递数据&#xff0c;是一…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...