Kafka(三)生产者发送消息
文章目录
- 生产者发送思路
- 自定义序列化类
- 配置生产者参数
- 提升吞吐量
- 发送消息
- 关闭生产者
- 结语
- 示例源码仓库
生产者发送思路
如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是
- ack使用默认的all
- 开启重试
- 在一定时间内重试不成功,则入库,后续由定时任务继续发送
- 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
- 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。
自定义序列化类
消息格式为JSON, 使用Jackson将类序列化为JSON字符串
public class UserDTOSerializer implements Serializer<UserDTO> {@Override@SneakyThrowspublic byte[] serialize(final String s, final UserDTO userDTO) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.writeValueAsBytes(userDTO);}
}
配置生产者参数
有几点需要注意
- 开启压缩
- retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
- buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
- ack使用默认的all
/*** 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整* 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致* @return*/public static Properties loadProducerConfig(String valueSerializer) {Properties result = new Properties();result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");// 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量// 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);// 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);// 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量result.put(ProducerConfig.LINGER_MS_CONFIG, 10);return result;}
提升吞吐量
- 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
- 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
- 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间
发送消息
@Log
public class MessageProducer {public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));private MessageFailedService messageFailedService = new MessageFailedService();/*** kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟* callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm* @param userDTO*/public void sendMessage(final UserDTO userDTO) {ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(), userDTO);try {PRODUCER.send( user, (recordMetadata, e) -> {if (Objects.nonNull(e)) {log.severe("message has sent failed");MessageFailedEntity messageFailedEntity = new MessageFailedEntity();messageFailedEntity.setMessageId(userDTO.getMessageId());ObjectMapper mapper = new ObjectMapper();try {messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));} catch (JsonProcessingException jsonProcessingException) {log.severe("message content json format failed");}messageFailedEntity.setMessageType(MessageType.EMAIL);messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);messageFailedEntity.setFailedReason(e.getMessage());// 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面// 如果放在主线程里,由于kafka producer是异步的,// kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);} else {log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );}});} catch (TimeoutException e) {log.info("send message to kafka timeout, message: ");// TODO: 自定义逻辑,比如发邮件通知kafka管理员}}
}
对上述代码做几点解释
- 我们使用异步的方式发送,如果发送成功,打印一条消息
- 关键在于重试,callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback, 这个问题我发邮件问过社区, 详情见这里的 邮件
public class ProducerMessageIdCache {private static final Map<String, Integer> MESSAGE_IDS = new ConcurrentHashMap<>();public static void add(String messageId) {MESSAGE_IDS.put(messageId, 0);}public static void remove(String messageId) {MESSAGE_IDS.remove(messageId);}public static boolean contains(String messageId) {return MESSAGE_IDS.containsKey(messageId);}// TODO 定时清理过期的messageId}
关闭生产者
实现ServletContextListener接口, 然后在web.xml的listener元素中配置
public class KafkaListener implements ServletContextListener {private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();@Overridepublic void contextInitialized(ServletContextEvent sce) {KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);}@Overridepublic void contextDestroyed(ServletContextEvent sce) {KAFKA_PRODUCERS.forEach(KafkaProducer::close);}
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaeehttps://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"version="6.0"><listener><listener-class>com.business.server.listener.KafkaListener</listener-class></listener>
</web-app>
结语
- 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
- Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
- 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式
示例源码仓库
- Github地址
- 项目下business-server module代表生产者
- 运行时IDEA配置如下

注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server
相关文章:
Kafka(三)生产者发送消息
文章目录 生产者发送思路自定义序列化类配置生产者参数提升吞吐量 发送消息关闭生产者结语示例源码仓库 生产者发送思路 如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是 ack使用默认的all开启重试在一定时间内重试不成功,则入库ÿ…...
2020年五一杯数学建模C题饲料混合加工问题解题全过程文档及程序
2020年五一杯数学建模 C题 饲料混合加工问题 原题再现 饲料加工厂需要加工一批动物能量饲料。饲料加工需要原料,如加工猪饲料需要玉米、荞麦、稻谷等。加工厂从不同的产区收购了原料,原料在收购的过程中由于运输、保鲜以及产品本身属性等原因ÿ…...
公益SRC实战|SQL注入漏洞攻略
目录 一、信息收集 二、实战演示 三、使用sqlmap进行验证 四、总结 一、信息收集 1.查找带有ID传参的网站(可以查找sql注入漏洞) inurl:asp idxx 2.查找网站后台(多数有登陆框,可以查找弱口令,暴力破解等漏洞&…...
Word软件手动安装Zotero插件
文章目录 Word软件手动安装Zotero插件方法一方法二 参考资料 Word软件手动安装Zotero插件 方法一 关闭word在zotero中依次点击编辑—首选项—引用—文字编辑软件—重新安装加载项Microsoft word 方法二 寻找Zotero.dotm存储位置, 例如D:\Program Files\Zotero\ext…...
idea 插件推荐第二期
文章目录 便捷开发CodeGlance Pro (代码缩略图)GenerateAllSetter(快速生成对象所有set方法)GsonFormatPlus:json转实体RestfulToolkitX(找到controller快捷请求接口) 美化activate-power-mode-x (敲击计数、动效)Nyan…...
plsql查询中文出现乱码
添加环境变量:如下 变量名:NLS_LANG 变量值:SIMPLIFIED CHINESE_CHINA.ZHS16GBK 变量名:TNS_ADMIN 变量值:D:\instantclient_11_2\network\admin 在Path中添加instantclient_11_2存放路径...
【Docker】五分钟完成Docker部署Java应用,你也可以的!!!
文章目录 前言一、部署步骤1.项目结构2.Dockerfile3.docker-compose.yml4.启动5.常用命令 总结 前言 本文基于Docker Compose部署Java应用,请确保你已经安装了Docker和Docker Compose。 十分钟就能上手docker?要不你也试试? 一、部署步骤 1…...
如何准备2024年的系统设计面试?
1 前言 如果你正在准备软件工程师或软件开发人员的面试,那么你可能知道由于其开放性质和广泛性,准备系统设计是多么困难,但同时你也不能忽略它。在软件工程界,如果你正在申请高级工程师/主管/架构师或更高级别的角色,系统设计是最受追捧的技能,也是整个过程中最重要的环节之一…...
【开源】基于JAVA的电子元器件管理系统
目录 一、摘要1.1 项目简介1.2 项目详细录屏 二、研究内容三、界面展示3.1 登录&注册&主页3.2 元器件单位模块3.3 元器件仓库模块3.4 元器件供应商模块3.5 元器件品类模块3.6 元器件明细模块3.7 元器件类型模块3.8 元器件采购模块3.9 元器件领用模块3.10 系统基础模块 …...
足底筋膜炎怎么治疗治愈
足底筋膜炎又称为跖筋膜炎,跖筋膜主要在足弓下方,它维持足弓稳定性,对于喜欢长期长跑、跳远,或者越野运动,或者部队中的士兵进行拉练,还有需要久坐或者久站的人群中,容易发生跖筋膜炎。治疗方法…...
Keil工程忽略文件.gitignore、自动删除脚本:keilkilll.bat、自动生成目录文件列表脚本
Keil工程忽略文件:.gitignore 忽略规则 *.rar *.o *.d *.crf *.htm *.dep *.map *.bak *.lnp *.lst *.ini *.iex *.sct *.scvd *.dbg* *.uvguix.* *Log.*#忽略.gitignore根目录下的文件夹,根据自己的需要修改 RTE/ Templates/ Examples/ OBJ/#不能忽略…...
软考高级职称哪个好考?明确给你答案
软考考试分为初、中、高三级,其中高级5个方向分别为系统分析师、信息系统项目管理师、网络规划设计师、系统架构设计师、系统规划与管理师。软考高级职称考什么好?有很多人是因为要评高级职称而选择参考软考高级资格考试,那么软考高级里哪个资…...
智能客服外包服务适用于哪些行业?
在当今快节奏的商业环境下,企业需要更高效、更智能且更灵活的客户服务解决方案。而智能客服外包服务正是满足这一需求的利器。不仅可以帮助企业提升客户服务的品质和效率,还能降低企业的运营成本。智能客服外包服务适用于哪些行业呢? 1.电子…...
数字化企业各业务模块模型
1.计划 1.1采购计划执行情况 序号 采购计划号 采购订单号 业务员 供应商 物料 数量 金额 计划入库日期 实际入库日期 状态 针对企业执行中或者未关闭的采购计划进行统计与分析,主要目的在于引领企业员工与领导关注长期在途的采购…...
WPF动画小知识
一、动画合集 创建一个Storyboard演示画板,在画板里对动画进行定义与处理。 常见动画类型 提醒:更多介绍可查看microsoft提供的相关文档 DoubleAnimation //普通Double型控制动画 DoubleAnimationUsingKeyFrames //Dou…...
数据结构 顺序表和链表
1.线性表 线性表(linear list)是n个具有相同特性的数据元素的有限序列 线性表是一种在实际中广泛使用的数据结构,常见的线性表:顺序表、链表、栈、队列、字符串.. 线性表在逻辑上是线性结构,也就说是连续的一条直线…...
LMI相机配置步骤,使用Gocator2550相机
在此之前可以先浏览我编写的相机SDK通用类和LMISDK,进行配套观看 https://blog.csdn.net/m0_51559565/article/details/134404394 //LMI相机SDK https://blog.csdn.net/m0_51559565/article/details/134403745 //相机通用类1.启动LMI加速器 LMI加速器用于将相机…...
掌握Python中的控制流语句:break, continue, quit的应用技巧详解
引言 在Python编程中,控制流语句是非常重要的一部分,它们可以帮助我们控制程序的执行流程。其中,break、continue和quit是常用的控制流语句,它们可以在循环中起到关键作用。本文将详细介绍这些控制流语句的应用技巧,帮…...
TS手动编译和自动编译方法
把 TS 文件编译成 JS 文件 安装 npm i -g typescript检查是否安装成功 tsc -v方法一 先通过 tsc 把 .ts 文件编译成 .js 文件,再通过 node 把 .js 文件运行 方法二 通过监视配置页面 初始化 tsc --init自动生成一个tsconfig.json 文件 点击进入tsconfig.js…...
【Hello Go】Go语言运算符
Go语言运算符 算术运算符关系运算符逻辑运算符位运算符赋值运算符其他运算符运算符优先级 算术运算符 如果之前没有其他语言基础的小伙伴可以参考下我之前写的C语言运算符讲解 这里主要讲解下Go和C运算符的不同点 – 运算符 Go语言中只有后置 和后置– var a int 5a--fmt.P…...
Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
