当前位置: 首页 > news >正文

Kafka - 3.x Producer 生产者最佳实践

文章目录

  • 生产经验_生产者提高吞吐量
    • 核心参数
    • Code
  • 生产经验_数据可靠性
    • 消息的发送流程
    • ACK应答机制
    • ack应答级别
    • 应答机制 小结
    • Code
  • 生产经验_数据去重
    • 数据传递语义
    • 幂等性
      • 幂等性原理
      • 开启幂等性配置(默认开启)
    • 生产者事务
      • kafka事务原理
      • 事务代码流程
  • 生产经验_数据有序
  • 生产经验_数据乱序

在这里插入图片描述


生产经验_生产者提高吞吐量

核心参数

在这里插入图片描述

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// batch.size:批次大小,默认16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-" + i));}// 5. 关闭资源kafkaProducer.close();}
}

在这里插入图片描述


生产经验_数据可靠性

消息的发送流程

回顾下消息的发送流程如下:

在这里插入图

ACK应答机制

在这里插入图片描述

背景Kafka提供的解决方案
Leader收到数据,所有Follower开始同步数据,但有一个Follower因故障无法同步,导致Leader一直等待直到同步完成才发送ACK。- Leader维护了一个动态的In-Sync Replica Set (ISR)和Leader保持同步的Follower集合。
- 当ISR中的Follower完成数据同步后,Leader向Producer发送ACK。
- 如果某个Follower长时间(replica.lag.time.max.ms)未向Leader同步数据,则该Follower将被移出ISR。
- 在Leader发生故障时,将从ISR中选举新的Leader。

ack应答级别

在这里插入图片描述

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置

acks描述
0提供最低延迟,Leader副本接收消息后返回ack,尚未写入磁盘。可能导致数据丢失,特别是在Leader发生故障时。
1Leader副本将消息写入磁盘后返回ack,但如果Leader在Follower副本同步数据之前发生故障,可能会丢失数据。
-1或者 (all) ,Leader和所有Follower副本都将消息写入磁盘后才返回ack。如果在Follower副本同步完成后,Leader副本在发送ack之前发生故障,可能会导致数据重复。

应答机制 小结

在这里插入图片描述


Code

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerAck {public static void main(String[] args) throws InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数retries,默认是int最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-ack" + i));}// 5. 关闭资源kafkaProducer.close();}
}

在这里插入图片描述

生产经验_数据去重

数据传递语义

在这里插入图片描述


幂等性

幂等性原理

在这里插入图片描述

开启幂等性配置(默认开启)

在prudocer的配置对象中,添加参数enable.idempotence,参数值默认为true,设置为false就关闭了。


生产者事务

kafka事务原理

在这里插入图片描述

事务代码流程

// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

生产经验_数据有序

在这里插入图片描述


生产经验_数据乱序

在这里插入图片描述


相关文章:

Kafka - 3.x Producer 生产者最佳实践

文章目录 生产经验_生产者提高吞吐量核心参数Code 生产经验_数据可靠性消息的发送流程ACK应答机制ack应答级别应答机制 小结Code 生产经验_数据去重数据传递语义幂等性幂等性原理开启幂等性配置&#xff08;默认开启&#xff09; 生产者事务kafka事务原理事务代码流程 生产经验…...

对于多分类问题,使用深度学习(Keras)进行迁移学习提升性能

本文是仿照前面的文章,使用Keras迁移学习提升性能,原文是针对二分类问题,使用迁移学习的方式来提升准确率,本文用迁移学习的方式来提升多分类问题的准确率。 同时,在前面的文章中,使用普通的小型3层卷积网络+2层全连接层实现了多分类的85%左右的准确率, 此处将用迁移学…...

Python----break关键字对while...else结构的影响

案例&#xff1a; 女朋友生气&#xff0c;要求道歉5遍&#xff1a;老婆大人&#xff0c;我错了。道歉到第三遍的时候&#xff0c;媳妇埋怨这一遍说的不真诚&#xff0c;是不是就是要退出循环了&#xff1f;这个退出有两种可能性&#xff1a; ① 更生气&#xff0c;不打算原谅…...

js实现将文本生成二维码(腾讯云cos)

示例 页面代码 import { getQCodeUrl } from /utils/cosInstance; import { PageContainer } from ant-design/pro-components; import { Access, useAccess } from umijs/max; import { Button, Image } from antd; import { useState } from react;const AccessPage: Reac…...

机架式服务器介绍

大家都知道服务器分为机架式服务器、刀片式服务器、塔式服务器三类&#xff0c;今天小编就分别讲一讲这三种服务器&#xff0c;第一篇先来讲一讲机架式服务器的介绍。 机架式服务器定义&#xff1a;机架式服务器是安装在标准机柜中的服务器&#xff0c;一般采用19英寸的标准尺寸…...

解决github有时能访问有时不能访问的问题2

下载地址 https://steampp.net/...

Go实现网络通信

Go 语言提供了强大的网络编程能力&#xff0c;包括 TCP、UDP、HTTP、WebSocket 等协议的支持。下面是 Go 语言中常用的网络操作&#xff1a; TCP 通信 使用 net 包进行 TCP 通信&#xff0c;可以创建 TCP 客户端和服务器。 客户端使用 net.Dial 方法连接到指定的 TCP 地址&am…...

在antd里面渲染MarkDown并且自定义一个锚点目录TOC(重点解决导航目录不跟随文档滚动的问题)

一、整体思路 由于有很多很长的文档需要渲染&#xff0c;我觉得用MarkDown的方式会比较适合管理&#xff0c;所以这两天测试了一下在antd里面集成MarkDown的渲染模块。 总体思路参考&#xff1a; https://blog.csdn.net/Sakuraaaa_/article/details/128400497 感恩大佬的倾情付…...

Linux MMC子系统 - 2.eMMC 5.1总线协议浅析

By: Ailson Jack Date: 2023.10.27 个人博客&#xff1a;http://www.only2fire.com/ 本文在我博客的地址是&#xff1a;http://www.only2fire.com/archives/161.html&#xff0c;排版更好&#xff0c;便于学习&#xff0c;也可以去我博客逛逛&#xff0c;兴许有你想要的内容呢。…...

时序预测 | Python实现ARIMA-LSTM自回归移动差分模型结合长短期记忆神经网络时间序列预测

时序预测 | Python实现ARIMA-LSTM自回归移动差分模型结合长短期记忆神经网络时间序列预测 目录 时序预测 | Python实现ARIMA-LSTM自回归移动差分模型结合长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 时序预测 | Python实现ARIMA-LSTM自…...

【Linux】部署单机OA项目及搭建spa前后端分离项目

目录 部署OA项目 ​编辑 搭建spa前后端分离项目 后端 前端 配置坏境变量 部署OA项目 在虚拟机中&#xff0c;将项目打包成war文件放置到tomcat根目录下的webapps文件目录下 再在主机数据库中连接数据库&#xff0c;并定义数据库名导入相关的表 继续进入tomcat目录下双击点…...

2023中国计算机大会:蚂蚁集团连发两支百万级科研基金

10月26日&#xff0c;中国计算机学会&#xff08;CCF&#xff09;主办的第二十届中国计算机大会(CNCC2023)在沈阳举行。在“CCF-蚂蚁科研基金及产学研合作交流活动”上&#xff0c;蚂蚁集团发布了2023年度“CCF-蚂蚁科研基金”绿色计算及隐私计算两支百万级专项基金&#xff0c…...

Knife4j使用教程(三) -- 实体类的配置注解(@ApiModel与@ApiModelProperty 的 认识与使用)

目录 1. @ApiModel与@ApiModelProperty的区分 2. @ApiModel注解 3. @ApiModelProperty注解 3.1 value属性 3.2 name属性...

计算机网络【CN】IPV4报文格式

版本&#xff08;4bit&#xff09;&#xff1a;IPV4/IPV6首部长度&#xff08;4bit&#xff09;&#xff1a;标识首部的长度 单位是4B最小为&#xff1a;20B最大为&#xff1a;60&#xff08;15*4&#xff09;B总长度&#xff08;16bit&#xff09;&#xff1a;整个数据报&…...

SQL server数据库单用户模式如何退出

根据网上的说法&#xff0c;用下面的方式尝试即可退出 进入ssms数据库管理软件执行下面的sql语句 -- 第一步执行USE [master] GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO--建一个存储过程&#xff0c;断开所有用户连接。 create proc [dbo].[killspid] (dbn…...

QT mqtt 在子线程中使用

qtmqtt 在子线程中使用_qt在子线程里mqtt无法new-CSDN博客文章浏览阅读524次。解决问题&#xff1a;QMqttClient - connection not made from another thread在qt中使用多线的qtmqtt客户端发送接收数据_qt在子线程里mqtt无法newhttps://blog.csdn.net/qq_35708970/article/deta…...

Tomcat运维以及优化

Tomcat常用运维命令 # 查看版本/opt/data/app/tomcat-9.0.82/bin/catalina.sh version## 启动 /opt/data/app/tomcat-9.0.82/bin/startup.sh # 停止 /opt/data/app/tomcat-9.0.82/bin/shutdown.sh调整JVM 参数 方式1 vim /opt/data/app/tomcat-9.0.82/bin/catalina.sh # OS…...

C++设计模式_14_Facade门面模式

本篇介绍的Facade门面模式属于“接口隔离”模式的一种&#xff0c;以下进行详细介绍。 文章目录 1. “接口隔离”模式1. 1 典型模式 2. 系统间耦合的复杂度3. 动机(Motivation)4. 模式定义5. Facade门面模式的代码实现6. 结构7. 要点总结8. 其他参考 1. “接口隔离”模式 在组…...

正点原子嵌入式linux驱动开发——外置RTC芯片PCF8563

上一章学习了STM32MP1内置RTC外设&#xff0c;了解了Linux系统下RTC驱动框架。一般的应用场合使用SOC内置的RTC就可以了&#xff0c;而且成本也低&#xff0c;但是在一些对于时间精度要求比较高的场合&#xff0c;SOC内置的RTC就不适用了。这个时候需要根据自己的应用要求选择合…...

自动驾驶感知算法面经(20+)

原文链接: https://zhuanlan.zhihu.com/p/656952371 本人2022年4月和2023年7月两次跳槽找工作&#xff0c;面经总结在这里&#xff0c;希望可以帮到需要的朋友。 项目相关的问题主要和经历有关&#xff0c;参考性不大。 2023年7月 1. 文远知行 自动标注算法岗位 项目经历问…...

告别付费IP!手把手教你用ZCU102 PS端DP接口点亮显示器(附参数调试心得)

解锁ZCU102 PS端DisplayPort潜力&#xff1a;零成本实现高效显示输出的实战指南 在嵌入式视觉系统开发中&#xff0c;显示输出往往是项目落地的最后一道关卡。当我在多个Zynq UltraScale MPSoC项目中反复遭遇HDMI IP核的授权困扰和PL端实现的复杂性后&#xff0c;意外发现PS端集…...

LangChain 1.0 中间件实战:5个钩子函数让你的Agent像专业工程师一样思考

LangChain 1.0中间件深度实践&#xff1a;5个钩子函数打造工程级Agent思维 当我们在2023年首次接触LangChain时&#xff0c;它还是一个以Chain为核心的实验性框架。如今&#xff0c;LangChain 1.0的发布标志着AI Agent开发正式进入生产就绪阶段。本文将带您深入探索其最具革命性…...

Wan2.1 VAE与微信小程序开发结合:打造个人AI头像生成工具

Wan2.1 VAE与微信小程序开发结合&#xff1a;打造个人AI头像生成工具 你有没有想过&#xff0c;用一张自己的照片&#xff0c;就能快速生成几十种不同风格的艺术头像&#xff1f;无论是动漫风、油画感&#xff0c;还是赛博朋克&#xff0c;都能一键搞定。以前这可能需要专业的…...

【课后习题答案】SystemVerilog for Verification 3rd Edition第五章(绿皮书第三版)

1 解答class MemTrans;// a. 8位logic类型的data_inlogic [7:0] data_in;// b. 4位logic类型的addresslogic [3:0] address;// c. 打印data_in和address的void函数function void print();$display("data_in 0x%h, address 0x%h", data_in, address);endfunction// …...

基于 SpringBoot 的自助图书借阅管理系统源码讲解

以下是一个基于 SpringBoot 的自助图书借阅管理系统的 核心源码讲解&#xff0c;涵盖用户管理、图书管理、借阅管理、设备对接等关键模块&#xff0c;代码结构清晰&#xff0c;可直接用于学习或二次开发。一、项目结构src/main/java/com/library/ ├── config/ # 配…...

忍者像素绘卷参数详解:如何通过提示词触发‘火之意志’专属风格权重

忍者像素绘卷参数详解&#xff1a;如何通过提示词触发火之意志专属风格权重 1. 认识忍者像素绘卷 忍者像素绘卷是一款基于Z-Image-Turbo深度优化的图像生成工具&#xff0c;它将传统忍者文化与16-Bit复古游戏美学完美结合。这款工具特别适合创作具有热血动漫风格的像素艺术作…...

BilibiliDown终极指南:如何快速掌握B站视频批量下载技巧

BilibiliDown终极指南&#xff1a;如何快速掌握B站视频批量下载技巧 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader &#x1f633; 项目地址: https://gitcode.com/gh_mirrors…...

运放跟随器:电路设计中最容易被低估的‘保镖‘(隔离驱动全解析)

运放跟随器&#xff1a;电路设计中最容易被低估的"保镖"&#xff08;隔离&驱动全解析&#xff09; 在硬件工程师的日常设计中&#xff0c;运放跟随器常常被视为一个"可有可无"的组件——毕竟它的电压增益仅为1&#xff0c;看起来似乎只是将输入信号原封…...

InternLM2-Chat-1.8B助力在线教育:个性化作业批改与学习反馈生成

InternLM2-Chat-1.8B助力在线教育&#xff1a;个性化作业批改与学习反馈生成 1. 引言&#xff1a;当作业批改遇上AI 想象一下&#xff0c;一位老师深夜还在批改几十份、甚至上百份学生作业。面对相似的错误&#xff0c;需要一遍遍写下相同的评语&#xff1b;面对有潜力的答案…...

AntdUI实战:用WinForm和.NET 6给老旧内部管理系统“换肤”的完整记录

AntdUI实战&#xff1a;用WinForm和.NET 6给老旧内部管理系统“换肤”的完整记录 当企业内部的WinForm系统运行超过十年&#xff0c;那些灰底蓝框的界面早已与现代审美格格不入。去年接手某制造业ERP系统改造时&#xff0c;我面对的是一个基于.NET Framework 4.0的"古董&q…...