Kafka权威指南(第2版)读书笔记
目录
- Kafka生产者——向Kafka写入数据
- 生产者概览
- 创建Kafka生产者
- bootstrap.servers
- key.serializer
- value.serializer
- 发送消息到Kafka
- 同步发送消息
Kafka生产者——向Kafka写入数据
不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。
生产者概览
一个应用程序会在很多情况下向Kafka写入消息:记录用户的活动(用于审计和分析)、记录指标、记录日志、记录从智能家电收集到的信息、与其他应用程序进行异步通信、缓冲即将写入数据库的数据,等等。不同的应用场景直接影响如何使用和配置生产者API。尽管生产者API使用起来很简单,但消息的发送过程还是有点儿复杂。下图展示了向Kafka发送消息的主要步骤:

先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。
接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。
broker在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。
创建Kafka生产者
要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必须设置的属性。
bootstrap.servers
broker的地址。可以由多个host:port组成,生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址,因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。
key.serializer
一个类名,用来序列化消息的键。broker希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker,但它需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常见的几种Java对象类型,就没有必要实现自己的序列化器。需要注意的是,必须设置key.serializer这个属性,尽管你可能只需要将值发送给Kafka。如果只需要发送值,则可以将Void作为键的类型,然后将这个属性设置为VoidSerializer。
value.serializer
一个类名,用来序列化消息的值。与设置key.serializer属性一样,需要将value.serializer设置成可以序列化消息值对象的类。
发送消息到Kafka
同步发送消息
同步发送消息很简单,当Kafka返回错误或重试次数达到上限时,生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同,broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能。因此,同步发送方式通常不会被用在生产环境中。
KafkaProducer一般会出现两种错误。一种是可重试错误,这种错误可以通过重发消息来解决。例如,对于连接错误,只要再次建立连接就可以解决。对于“not leader for partition”(非分区首领)错误,只要重新为分区选举首领就可以解决,此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题,则应用程序会收到重试异常。另一种错误则无法通过重试解决,比如“Message size too large”(消息太大)。对于这种错误,KafkaProducer不会进行任何重试,而会立即抛出异常。
相关文章:
Kafka权威指南(第2版)读书笔记
目录 Kafka生产者——向Kafka写入数据生产者概览创建Kafka生产者bootstrap.serverskey.serializervalue.serializer 发送消息到Kafka同步发送消息 Kafka生产者——向Kafka写入数据 不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写…...
WORD转PDF脚本文件
1、在桌面新建一个文本文件,把下列代码复制到文本文件中。 On Error Resume Next Const wdExportFormatPDF 17 Set oWord WScript.CreateObject("Word.Application") Set fso WScript.CreateObject("Scripting.Filesystemobject") Set fdsf…...
electron 打包后的 exe 文件,运行后是空白窗口
一、代码相关问题 1. 页面加载失败 1.1 原因 在 Electron 应用中,若loadFile或loadURL方法指定的页面路径或 URL 错误,就无法正确加载页面,导致窗口空白。 1.2. 解决 仔细检查loadFile或loadURL方法中传入的路径或 URL 是否正确…...
数据库重连 - 方案
要解决 SQL Server 连接失效后导致的错误问题,可以考虑以下几种解决方案: 1. 连接池机制: 通过实现一个连接池,确保连接失效后可以重新建立连接,而不会直接导致整个程序出错。连接池可以帮助在连接中断时自动恢复连接,而不必每次手动重连。 例如,可以通过以下方式定期…...
从 PostgreSQL 中挽救损坏的表
~/tmp-dir.dab4fd85-8b47-4d9a-b15c-18312ef61075 pg_dump -U postgres -h locathost www_p1 > wow_p1.sqlpg_dump:错误:转储表 “page_views” 的内容失败:PQgetResult() 失败。pg_dump:详细信息:来自服务器的错误…...
【Vue3 入门到实战】1. 创建Vue3工程
目录 编辑 1. 学习目标 2. 环境准备与初始化 3. 项目文件结构 4. 写一个简单的效果 5. 总结 1. 学习目标 (1) 掌握如何创建vue3项目。 (2) 了解项目中的文件的作用。 (3) 编辑App.vue文件,并写一个简单的效果。 2. 环境准备与初始化 (1) 安装 Node.js 和 …...
rtthread学习笔记系列(10/11) -- 系统定时器
文章目录 10. 系统定时器10.1 跳跃表[定时器跳表 (Skip List) 算法](https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/programming-manual/timer/timer?id定时器跳表-skip-list-算法) 10.2 硬件定时器10.2.1 初始化&&删除10.2.2 sta…...
mock服务-通过json定义接口自动实现mock服务
go-mock介绍 不管在前端还是后端开发过程中,当我们需要联调其他服务的接口,而这个服务还没法提供调用时,那我们就要用到mock服务,自己按接口文档定义一个临时接口返回指定数据,以供本地开发联调测试。 怎么快速启动一…...
像JSONDecodeError: Extra data: line 2 column 1 (char 134)这样的问题怎么解决
问题介绍 今天处理返回的 JSON 的时候,出现了下面这样的问题: 处理这种问题的时候,首先你要看一下当前的字符串格式是啥样的,比如我查看后发现是下面这样的: 会发现这个字符串中间没有逗号,也就是此时的J…...
C#版 软件开发6大原则与23种设计模式
开发原则和设计模式一直是软件开发中的圣经, 但是这仅仅适用于中大型的项目开发, 在小型项目的开发中, 这些规则会降低你的开发效率, 使你的工程变得繁杂. 所以只有适合你的才是最好的. 设计模式六大原则1. 单一职责原则(Single Responsibility Principle࿰…...
java8 springboot 集成javaFx 实现一个客户端程序
1. 先创建一个springboot 程序(此步骤不做流程展示) 2. 更改springboot的版本依赖和导入所需依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</versio…...
MySQL(高级特性篇) 06 章——索引的数据结构
一、为什么使用索引 索引是存储引擎用于快速找到数据记录的一种数据结构,就好比一本教科书的目录部分,通过目录找到对应文章的页码,便可快速定位到需要的文章。MySQL中也是一样的道理,进行数据查找时,首先查看查询条件…...
PanWeidb-使用BenchmarkSQL对磐维数据库进行压测
本文提供PanweiDb使用BenchmarkSQL进行性能测试的方法和测试数据报告。 BenchmarkSQL,一个JDBC基准测试工具,内嵌了TPC-C测试脚本,支持很多数据库,如PostgreSQL、Oracle和Mysql等。 TPC-C是专门针对联机交易处理系统(OLTP系统)的规范,一般情况下我们也把这类系统称为业…...
AR 在高校实验室安全教育中的应用
AR应用APP可以内置实验室安全功能介绍,学习并考试(为满足教育部关于实验室人员准入条件),AR主模块。其中AR主模块应该包括图形标识码的扫描,生成相应模型,或者火灾、逃生等应急处置的路线及动画演示。考试采…...
微信小程序实现个人中心页面
文章目录 1. 官方文档教程2. 编写静态页面3. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/ 2. 编写静态页面 mine.wxml布局文件 <!--index.wxml--> <navigation-bar title"个人中心" ba…...
Spring Boot中的配置文件有哪些类型
在 Spring Boot 中,配置文件用于管理应用程序的设置和参数,通常存放在项目的 src/main/resources 目录下。Spring Boot 支持多种类型的配置文件,并通过这些文件来控制应用的行为和环境配置。 1. application.properties application.proper…...
Spring Boot 项目启动后自动加载系统配置的多种实现方式
Spring Boot 项目启动后自动加载系统配置的多种实现方式 在 Spring Boot 项目中,可以通过以下几种方式实现 在项目启动完成后自动加载系统配置缓存操作 的需求: 1. 使用 CommandLineRunner CommandLineRunner 是一个接口,可以用来在 Spring…...
如何在 CentOS 中生成 CSR
在本教程中,我们将向您展示如何在CentOS 7和6中生成CSR。您可以直接从服务器生成 CSR。 只需按照以下步骤操作: 第 1 步:使用安全外壳 (SSH) 登录您的服务器 步骤 2:创建私钥和 CSR 文件 在提示符处键入以…...
qml XmlListModel详解
1、概述 XmlListModel是QtQuick用于从XML数据创建只读模型的组件。它可以作为各种view元素的数据源,比如ListView、GridView、PathView等;也可以作为其他和model交互的元素的数据源。通过XmlRole定义角色,如name、age和height,并…...
C++并发编程之跨应用程序与驱动程序的单生产者单消费者队列
设计一个单生产者单消费者队列(SPSC队列),不使用C STL库或操作系统原子操作函数,并且将其放入跨进程共享内存中以便在Ring3(用户模式)和Ring0(内核模式)之间传递数据,是一…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
计算机基础知识解析:从应用到架构的全面拆解
目录 前言 1、 计算机的应用领域:无处不在的数字助手 2、 计算机的进化史:从算盘到量子计算 3、计算机的分类:不止 “台式机和笔记本” 4、计算机的组件:硬件与软件的协同 4.1 硬件:五大核心部件 4.2 软件&#…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
