Kafka - 3.x Producer 生产者最佳实践
文章目录
- 生产经验_生产者提高吞吐量
- 核心参数
- Code
- 生产经验_数据可靠性
- 消息的发送流程
- ACK应答机制
- ack应答级别
- 应答机制 小结
- Code
- 生产经验_数据去重
- 数据传递语义
- 幂等性
- 幂等性原理
- 开启幂等性配置(默认开启)
- 生产者事务
- kafka事务原理
- 事务代码流程
- 生产经验_数据有序
- 生产经验_数据乱序
生产经验_生产者提高吞吐量
核心参数

Code
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// batch.size:批次大小,默认16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-" + i));}// 5. 关闭资源kafkaProducer.close();}
}

生产经验_数据可靠性
消息的发送流程
回顾下消息的发送流程如下:

ACK应答机制

| 背景 | Kafka提供的解决方案 |
|---|---|
| Leader收到数据,所有Follower开始同步数据,但有一个Follower因故障无法同步,导致Leader一直等待直到同步完成才发送ACK。 | - Leader维护了一个动态的In-Sync Replica Set (ISR)和Leader保持同步的Follower集合。 - 当ISR中的Follower完成数据同步后,Leader向Producer发送ACK。 - 如果某个Follower长时间(replica.lag.time.max.ms)未向Leader同步数据,则该Follower将被移出ISR。 - 在Leader发生故障时,将从ISR中选举新的Leader。 |
ack应答级别

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置
| acks | 描述 |
|---|---|
| 0 | 提供最低延迟,Leader副本接收消息后返回ack,尚未写入磁盘。可能导致数据丢失,特别是在Leader发生故障时。 |
| 1 | Leader副本将消息写入磁盘后返回ack,但如果Leader在Follower副本同步数据之前发生故障,可能会丢失数据。 |
| -1 | 或者 (all) ,Leader和所有Follower副本都将消息写入磁盘后才返回ack。如果在Follower副本同步完成后,Leader副本在发送ack之前发生故障,可能会导致数据重复。 |
应答机制 小结

Code
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerAck {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数retries,默认是int最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-ack" + i));}// 5. 关闭资源kafkaProducer.close();}
}

生产经验_数据去重
数据传递语义

幂等性
幂等性原理

开启幂等性配置(默认开启)
在prudocer的配置对象中,添加参数enable.idempotence,参数值默认为true,设置为false就关闭了。
生产者事务
kafka事务原理

事务代码流程
// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
生产经验_数据有序

生产经验_数据乱序

相关文章:
Kafka - 3.x Producer 生产者最佳实践
文章目录 生产经验_生产者提高吞吐量核心参数Code 生产经验_数据可靠性消息的发送流程ACK应答机制ack应答级别应答机制 小结Code 生产经验_数据去重数据传递语义幂等性幂等性原理开启幂等性配置(默认开启) 生产者事务kafka事务原理事务代码流程 生产经验…...
对于多分类问题,使用深度学习(Keras)进行迁移学习提升性能
本文是仿照前面的文章,使用Keras迁移学习提升性能,原文是针对二分类问题,使用迁移学习的方式来提升准确率,本文用迁移学习的方式来提升多分类问题的准确率。 同时,在前面的文章中,使用普通的小型3层卷积网络+2层全连接层实现了多分类的85%左右的准确率, 此处将用迁移学…...
Python----break关键字对while...else结构的影响
案例: 女朋友生气,要求道歉5遍:老婆大人,我错了。道歉到第三遍的时候,媳妇埋怨这一遍说的不真诚,是不是就是要退出循环了?这个退出有两种可能性: ① 更生气,不打算原谅…...
js实现将文本生成二维码(腾讯云cos)
示例 页面代码 import { getQCodeUrl } from /utils/cosInstance; import { PageContainer } from ant-design/pro-components; import { Access, useAccess } from umijs/max; import { Button, Image } from antd; import { useState } from react;const AccessPage: Reac…...
机架式服务器介绍
大家都知道服务器分为机架式服务器、刀片式服务器、塔式服务器三类,今天小编就分别讲一讲这三种服务器,第一篇先来讲一讲机架式服务器的介绍。 机架式服务器定义:机架式服务器是安装在标准机柜中的服务器,一般采用19英寸的标准尺寸…...
解决github有时能访问有时不能访问的问题2
下载地址 https://steampp.net/...
Go实现网络通信
Go 语言提供了强大的网络编程能力,包括 TCP、UDP、HTTP、WebSocket 等协议的支持。下面是 Go 语言中常用的网络操作: TCP 通信 使用 net 包进行 TCP 通信,可以创建 TCP 客户端和服务器。 客户端使用 net.Dial 方法连接到指定的 TCP 地址&am…...
在antd里面渲染MarkDown并且自定义一个锚点目录TOC(重点解决导航目录不跟随文档滚动的问题)
一、整体思路 由于有很多很长的文档需要渲染,我觉得用MarkDown的方式会比较适合管理,所以这两天测试了一下在antd里面集成MarkDown的渲染模块。 总体思路参考: https://blog.csdn.net/Sakuraaaa_/article/details/128400497 感恩大佬的倾情付…...
Linux MMC子系统 - 2.eMMC 5.1总线协议浅析
By: Ailson Jack Date: 2023.10.27 个人博客:http://www.only2fire.com/ 本文在我博客的地址是:http://www.only2fire.com/archives/161.html,排版更好,便于学习,也可以去我博客逛逛,兴许有你想要的内容呢。…...
时序预测 | Python实现ARIMA-LSTM自回归移动差分模型结合长短期记忆神经网络时间序列预测
时序预测 | Python实现ARIMA-LSTM自回归移动差分模型结合长短期记忆神经网络时间序列预测 目录 时序预测 | Python实现ARIMA-LSTM自回归移动差分模型结合长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 时序预测 | Python实现ARIMA-LSTM自…...
【Linux】部署单机OA项目及搭建spa前后端分离项目
目录 部署OA项目 编辑 搭建spa前后端分离项目 后端 前端 配置坏境变量 部署OA项目 在虚拟机中,将项目打包成war文件放置到tomcat根目录下的webapps文件目录下 再在主机数据库中连接数据库,并定义数据库名导入相关的表 继续进入tomcat目录下双击点…...
2023中国计算机大会:蚂蚁集团连发两支百万级科研基金
10月26日,中国计算机学会(CCF)主办的第二十届中国计算机大会(CNCC2023)在沈阳举行。在“CCF-蚂蚁科研基金及产学研合作交流活动”上,蚂蚁集团发布了2023年度“CCF-蚂蚁科研基金”绿色计算及隐私计算两支百万级专项基金,…...
Knife4j使用教程(三) -- 实体类的配置注解(@ApiModel与@ApiModelProperty 的 认识与使用)
目录 1. @ApiModel与@ApiModelProperty的区分 2. @ApiModel注解 3. @ApiModelProperty注解 3.1 value属性 3.2 name属性...
计算机网络【CN】IPV4报文格式
版本(4bit):IPV4/IPV6首部长度(4bit):标识首部的长度 单位是4B最小为:20B最大为:60(15*4)B总长度(16bit):整个数据报&…...
SQL server数据库单用户模式如何退出
根据网上的说法,用下面的方式尝试即可退出 进入ssms数据库管理软件执行下面的sql语句 -- 第一步执行USE [master] GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO--建一个存储过程,断开所有用户连接。 create proc [dbo].[killspid] (dbn…...
QT mqtt 在子线程中使用
qtmqtt 在子线程中使用_qt在子线程里mqtt无法new-CSDN博客文章浏览阅读524次。解决问题:QMqttClient - connection not made from another thread在qt中使用多线的qtmqtt客户端发送接收数据_qt在子线程里mqtt无法newhttps://blog.csdn.net/qq_35708970/article/deta…...
Tomcat运维以及优化
Tomcat常用运维命令 # 查看版本/opt/data/app/tomcat-9.0.82/bin/catalina.sh version## 启动 /opt/data/app/tomcat-9.0.82/bin/startup.sh # 停止 /opt/data/app/tomcat-9.0.82/bin/shutdown.sh调整JVM 参数 方式1 vim /opt/data/app/tomcat-9.0.82/bin/catalina.sh # OS…...
C++设计模式_14_Facade门面模式
本篇介绍的Facade门面模式属于“接口隔离”模式的一种,以下进行详细介绍。 文章目录 1. “接口隔离”模式1. 1 典型模式 2. 系统间耦合的复杂度3. 动机(Motivation)4. 模式定义5. Facade门面模式的代码实现6. 结构7. 要点总结8. 其他参考 1. “接口隔离”模式 在组…...
正点原子嵌入式linux驱动开发——外置RTC芯片PCF8563
上一章学习了STM32MP1内置RTC外设,了解了Linux系统下RTC驱动框架。一般的应用场合使用SOC内置的RTC就可以了,而且成本也低,但是在一些对于时间精度要求比较高的场合,SOC内置的RTC就不适用了。这个时候需要根据自己的应用要求选择合…...
自动驾驶感知算法面经(20+)
原文链接: https://zhuanlan.zhihu.com/p/656952371 本人2022年4月和2023年7月两次跳槽找工作,面经总结在这里,希望可以帮到需要的朋友。 项目相关的问题主要和经历有关,参考性不大。 2023年7月 1. 文远知行 自动标注算法岗位 项目经历问…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
Golang——9、反射和文件操作
反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一:使用Read()读取文件2.3、方式二:bufio读取文件2.4、方式三:os.ReadFile读取2.5、写…...
Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
