当前位置: 首页 > news >正文

Kafka(三)生产者发送消息

文章目录

  • 生产者发送思路
  • 自定义序列化类
  • 配置生产者参数
    • 提升吞吐量
  • 发送消息
  • 关闭生产者
  • 结语
  • 示例源码仓库

生产者发送思路

如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

  1. ack使用默认的all
  2. 开启重试
  3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
  4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
  5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

自定义序列化类

消息格式为JSON, 使用Jackson将类序列化为JSON字符串

public class UserDTOSerializer implements Serializer<UserDTO> {@Override@SneakyThrowspublic byte[] serialize(final String s, final UserDTO userDTO) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.writeValueAsBytes(userDTO);}
}

配置生产者参数

有几点需要注意

  1. 开启压缩
  2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
  3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
  4. ack使用默认的all
    /*** 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整* 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致* @return*/public static Properties loadProducerConfig(String valueSerializer) {Properties result = new Properties();result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");// 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量// 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);// 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);// 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量result.put(ProducerConfig.LINGER_MS_CONFIG, 10);return result;}

提升吞吐量

  • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
  • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
  • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

发送消息

@Log
public class MessageProducer {public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));private MessageFailedService messageFailedService = new MessageFailedService();/*** kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟* callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm* @param userDTO*/public void sendMessage(final UserDTO userDTO) {ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);try {PRODUCER.send( user, (recordMetadata, e) -> {if (Objects.nonNull(e)) {log.severe("message has sent failed");MessageFailedEntity messageFailedEntity = new MessageFailedEntity();messageFailedEntity.setMessageId(userDTO.getMessageId());ObjectMapper mapper = new ObjectMapper();try {messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));} catch (JsonProcessingException jsonProcessingException) {log.severe("message content json format failed");}messageFailedEntity.setMessageType(MessageType.EMAIL);messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);messageFailedEntity.setFailedReason(e.getMessage());// 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面// 如果放在主线程里,由于kafka producer是异步的,// kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);} else {log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );}});} catch (TimeoutException e) {log.info("send message to kafka timeout, message: ");// TODO: 自定义逻辑,比如发邮件通知kafka管理员}}
}

对上述代码做几点解释

  1. 我们使用异步的方式发送,如果发送成功,打印一条消息
  2. 关键在于重试,callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback, 这个问题我发邮件问过社区, 详情见这里的 邮件
public class ProducerMessageIdCache {private static final Map<String, Integer> MESSAGE_IDS = new ConcurrentHashMap<>();public static void add(String messageId) {MESSAGE_IDS.put(messageId, 0);}public static void remove(String messageId) {MESSAGE_IDS.remove(messageId);}public static boolean contains(String messageId) {return MESSAGE_IDS.containsKey(messageId);}// TODO 定时清理过期的messageId}

关闭生产者

实现ServletContextListener接口, 然后在web.xml的listener元素中配置

public class KafkaListener implements ServletContextListener {private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();@Overridepublic void contextInitialized(ServletContextEvent sce) {KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);}@Overridepublic void contextDestroyed(ServletContextEvent sce) {KAFKA_PRODUCERS.forEach(KafkaProducer::close);}
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaeehttps://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"version="6.0"><listener><listener-class>com.business.server.listener.KafkaListener</listener-class></listener>
</web-app>

结语

  1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
  2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
  3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

示例源码仓库

  1. Github地址
  2. 项目下business-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述
    注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server

相关文章:

Kafka(三)生产者发送消息

文章目录 生产者发送思路自定义序列化类配置生产者参数提升吞吐量 发送消息关闭生产者结语示例源码仓库 生产者发送思路 如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是 ack使用默认的all开启重试在一定时间内重试不成功&#xff0c;则入库&#xff…...

2020年五一杯数学建模C题饲料混合加工问题解题全过程文档及程序

2020年五一杯数学建模 C题 饲料混合加工问题 原题再现 饲料加工厂需要加工一批动物能量饲料。饲料加工需要原料&#xff0c;如加工猪饲料需要玉米、荞麦、稻谷等。加工厂从不同的产区收购了原料&#xff0c;原料在收购的过程中由于运输、保鲜以及产品本身属性等原因&#xff…...

公益SRC实战|SQL注入漏洞攻略

目录 一、信息收集 二、实战演示 三、使用sqlmap进行验证 四、总结 一、信息收集 1.查找带有ID传参的网站&#xff08;可以查找sql注入漏洞&#xff09; inurl:asp idxx 2.查找网站后台&#xff08;多数有登陆框&#xff0c;可以查找弱口令&#xff0c;暴力破解等漏洞&…...

Word软件手动安装Zotero插件

文章目录 Word软件手动安装Zotero插件方法一方法二 参考资料 Word软件手动安装Zotero插件 方法一 关闭word在zotero中依次点击编辑—首选项—引用—文字编辑软件—重新安装加载项Microsoft word 方法二 寻找Zotero.dotm存储位置&#xff0c; 例如D:\Program Files\Zotero\ext…...

idea 插件推荐第二期

文章目录 便捷开发CodeGlance Pro (代码缩略图)GenerateAllSetter&#xff08;快速生成对象所有set方法&#xff09;GsonFormatPlus&#xff1a;json转实体RestfulToolkitX&#xff08;找到controller快捷请求接口&#xff09; 美化activate-power-mode-x (敲击计数、动效)Nyan…...

plsql查询中文出现乱码

添加环境变量&#xff1a;如下 变量名&#xff1a;NLS_LANG 变量值&#xff1a;SIMPLIFIED CHINESE_CHINA.ZHS16GBK 变量名&#xff1a;TNS_ADMIN 变量值&#xff1a;D:\instantclient_11_2\network\admin 在Path中添加instantclient_11_2存放路径...

【Docker】五分钟完成Docker部署Java应用,你也可以的!!!

文章目录 前言一、部署步骤1.项目结构2.Dockerfile3.docker-compose.yml4.启动5.常用命令 总结 前言 本文基于Docker Compose部署Java应用&#xff0c;请确保你已经安装了Docker和Docker Compose。 十分钟就能上手docker&#xff1f;要不你也试试&#xff1f; 一、部署步骤 1…...

如何准备2024年的系统设计面试?

1 前言 如果你正在准备软件工程师或软件开发人员的面试,那么你可能知道由于其开放性质和广泛性,准备系统设计是多么困难,但同时你也不能忽略它。在软件工程界,如果你正在申请高级工程师/主管/架构师或更高级别的角色,系统设计是最受追捧的技能,也是整个过程中最重要的环节之一…...

【开源】基于JAVA的电子元器件管理系统

目录 一、摘要1.1 项目简介1.2 项目详细录屏 二、研究内容三、界面展示3.1 登录&注册&主页3.2 元器件单位模块3.3 元器件仓库模块3.4 元器件供应商模块3.5 元器件品类模块3.6 元器件明细模块3.7 元器件类型模块3.8 元器件采购模块3.9 元器件领用模块3.10 系统基础模块 …...

足底筋膜炎怎么治疗治愈

足底筋膜炎又称为跖筋膜炎&#xff0c;跖筋膜主要在足弓下方&#xff0c;它维持足弓稳定性&#xff0c;对于喜欢长期长跑、跳远&#xff0c;或者越野运动&#xff0c;或者部队中的士兵进行拉练&#xff0c;还有需要久坐或者久站的人群中&#xff0c;容易发生跖筋膜炎。治疗方法…...

Keil工程忽略文件.gitignore、自动删除脚本:keilkilll.bat、自动生成目录文件列表脚本

Keil工程忽略文件&#xff1a;.gitignore 忽略规则 *.rar *.o *.d *.crf *.htm *.dep *.map *.bak *.lnp *.lst *.ini *.iex *.sct *.scvd *.dbg* *.uvguix.* *Log.*#忽略.gitignore根目录下的文件夹&#xff0c;根据自己的需要修改 RTE/ Templates/ Examples/ OBJ/#不能忽略…...

软考高级职称哪个好考?明确给你答案

软考考试分为初、中、高三级&#xff0c;其中高级5个方向分别为系统分析师、信息系统项目管理师、网络规划设计师、系统架构设计师、系统规划与管理师。软考高级职称考什么好&#xff1f;有很多人是因为要评高级职称而选择参考软考高级资格考试&#xff0c;那么软考高级里哪个资…...

智能客服外包服务适用于哪些行业?

在当今快节奏的商业环境下&#xff0c;企业需要更高效、更智能且更灵活的客户服务解决方案。而智能客服外包服务正是满足这一需求的利器。不仅可以帮助企业提升客户服务的品质和效率&#xff0c;还能降低企业的运营成本。智能客服外包服务适用于哪些行业呢&#xff1f; 1.电子…...

数字化企业各业务模块模型

1.计划 1.1采购计划执行情况 序号 采购计划号 采购订单号 业务员 供应商 物料 数量 金额 计划入库日期 实际入库日期 状态 针对企业执行中或者未关闭的采购计划进行统计与分析&#xff0c;主要目的在于引领企业员工与领导关注长期在途的采购…...

WPF动画小知识

一、动画合集 创建一个Storyboard演示画板&#xff0c;在画板里对动画进行定义与处理。 常见动画类型 提醒&#xff1a;更多介绍可查看microsoft提供的相关文档 DoubleAnimation //普通Double型控制动画 DoubleAnimationUsingKeyFrames //Dou…...

数据结构 顺序表和链表

1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列、字符串.. 线性表在逻辑上是线性结构&#xff0c;也就说是连续的一条直线…...

LMI相机配置步骤,使用Gocator2550相机

在此之前可以先浏览我编写的相机SDK通用类和LMISDK&#xff0c;进行配套观看 https://blog.csdn.net/m0_51559565/article/details/134404394 //LMI相机SDK https://blog.csdn.net/m0_51559565/article/details/134403745 //相机通用类1.启动LMI加速器 LMI加速器用于将相机…...

掌握Python中的控制流语句:break, continue, quit的应用技巧详解

引言 在Python编程中&#xff0c;控制流语句是非常重要的一部分&#xff0c;它们可以帮助我们控制程序的执行流程。其中&#xff0c;break、continue和quit是常用的控制流语句&#xff0c;它们可以在循环中起到关键作用。本文将详细介绍这些控制流语句的应用技巧&#xff0c;帮…...

TS手动编译和自动编译方法

把 TS 文件编译成 JS 文件 安装 npm i -g typescript检查是否安装成功 tsc -v方法一 先通过 tsc 把 .ts 文件编译成 .js 文件&#xff0c;再通过 node 把 .js 文件运行 方法二 通过监视配置页面 初始化 tsc --init自动生成一个tsconfig.json 文件 点击进入tsconfig.js…...

【Hello Go】Go语言运算符

Go语言运算符 算术运算符关系运算符逻辑运算符位运算符赋值运算符其他运算符运算符优先级 算术运算符 如果之前没有其他语言基础的小伙伴可以参考下我之前写的C语言运算符讲解 这里主要讲解下Go和C运算符的不同点 – 运算符 Go语言中只有后置 和后置– var a int 5a--fmt.P…...

理解 JMeter 聚合报告(Aggregate Report)

Aggregate Report 是 JMeter 常用的一个 Listener&#xff0c;中文被翻译为“聚合报告”。今天再次有同行问到这个报告中的各项数据表示什么意思&#xff0c;顺便在这里公布一下&#xff0c;以备大家查阅。 如果大家都是做Web应用的性能测试&#xff0c;例如只有一个登录的请求…...

深度学习之pytorch第一课

学习使用pytorch&#xff0c;然后进行简单的线性模型的训练与保存 学习代码如下&#xff1a; import numpy as np import torch import torch.nn as nn x_value [i for i in range(11)] x_train np.array(x_value,dtypenp.float32) print(x_train.shape) x_train x_train.r…...

企业传统纸质设备维修方式的痛点以及解决方案

传统的纸质设备维修方式有很多痛点&#xff1a; 数据更新和访问的低效率&#xff1a;传统的纸质记录方法在更新和检索数据时效率极低。这种方式无法实时更新设备的维修状态&#xff0c;导致管理层和维修人员无法及时获取最新信息&#xff0c;影响决策的速度和质量。 记录的易…...

vue2 - SuperMap3D实现自定义标记点位和自定义弹窗功能

文章目录 🍉开发环境🍉实现思路🍉代码封装🍍1:src/utils 下创建 extendMap文件如下🍍2:src/utils/extendMap/model/createMap.js 文件相关代码🍍3:src/utils/extendMap/model/bubble.js 文件相关代码🍍4:src/utils/extendMap/model\dragEntity.js 文件相关代…...

vue中通过.style.animationDuration属性,根据数据长度动态设定元素的纵向滚动时长的demo

根据数据长度动态设定元素的animation 先看看效果&#xff0c;是一个纯原生div标签加上css实现的表格纵向滚动动画&#xff1a; 目录 根据数据长度动态设定元素的animationHTMLjs逻辑1、判断是数据长度是否达到滚动要求2、根据数据长度设置滚动速度 Demo完整代码 HTML 1、确…...

(五)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB

一、七种算法&#xff08;DBO、LO、SWO、COA、LSO、KOA、GRO&#xff09;简介 1、蜣螂优化算法DBO 蜣螂优化算法&#xff08;Dung beetle optimizer&#xff0c;DBO&#xff09;由Jiankai Xue和Bo Shen于2022年提出&#xff0c;该算法主要受蜣螂的滚球、跳舞、觅食、偷窃和繁…...

深度学习之基于Pytorch框架的MNIST手写数字识别

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 MNIST是一个手写数字识别的数据集&#xff0c;是深度学习中最常用的数据集之一。基于Pytorch框架的MNIST手写数字识…...

zabbix的服务器端 server端安装部署

zabbix的服务器端 server 主机iplocalhost&#xff08;centos 7&#xff09;192.168.10.128 zabbix官网部署教程 但是不全&#xff0c;建议搭配这篇文章一起看 zabbixAgent部署 安装mysql 所有配置信息和Zabbix收集到的数据都被存储在数据库中。 下载对应的yum源 yum ins…...

css3 初步了解

1、css3的含义及简介 简而言之&#xff0c;css3 就是 css的最新标准&#xff0c;使用css3都要遵循这个标准&#xff0c;CSS3 已完全向后兼容&#xff0c;所以你就不必改变现有的设计&#xff0c; 2、一些比较重要的css3 模块 选择器 1、标签选择器&#xff0c;也称为元素选择…...

【实战经验】MT4外汇交易指南:新手如何制定交易计划?

在外汇交易中&#xff0c;制定一个合理的交易计划至关重要。一个良好的交易计划可以帮助您规避风险、提高交易效率&#xff0c;甚至在市场波动时保持冷静。作为资深外汇交易专家&#xff0c;我将分享一些制定交易计划的重要性、技术分析工具的应用以及风险管理策略等方面的内容…...