当前位置: 首页 > news >正文

kafka消息发送几种方式

同步发送 or 异步发送

       消息发送根据是否需要处理发送的结果分为同步发送、异步发送。

同步发送:等待发送结果返回,这种方式是可靠的,因为异常能及时处理,但同步发送需要阻塞等待一条消息发送完才处理下一条,吞吐量差。


异步发送:发送是异步的,不关心发送的结果,吞吐量最高,但可能存在发送失败的情况。

    本质上kafka 客户端提供的发送接口都是异步的,因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送结果。异步发送则忽略send 返回值。

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);try {SendResult sendResult = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

发送完成回调

有没有办法既要异步发送还要能处理发送失败的场景,这就是第三种,发送完成时,执行相应的回调方法。这是折中方案,兼顾效率且保证发送失败能被监控到。

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("send error ");
}else {System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
}}
});

发送异常

       有些发送异常可以通过重试几次后解决,比如网络异常,对于有些异常比如消息太大超出kafka配置的最大消息字节数,这类异常重试也会失败,所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以配置重试次数

spring.kafka.producer.retries=10

SpringBoot 集成简单介绍

     参考上篇文章SpringBoot 集成配置(pom依赖、application配置),简单讲解SpringBoot 几个重要自动装配类。

KafkaAutoConfiguration

KafkaAutoConfiguration给我们自动配置了几个类

KafkaTemplate:可以通过KafkaTemplate进行发送消息,本质上内部还是使用的KafkaProducer发送消息的。

ProducerFactory:KafkaProducer工厂,通过createProducer()方法可以获取(KafkaProducer) 进行发送消息,避免直接new KafkaProducer

使用方式也很简单,由于直接KafkaAutoConfiguration已经定义了相关Bean, 使用时注入Bean即可

图片

@Autowired
private KafkaTemplate kafkaTemplate;@Autowired
private ProducerFactory producerFactory;

具体代码

同步发送、异步发送的方式直接使用 kafkaTemplate即可完成,同步发送结果处理:这里简单的打印出消息的topic partition offset 等信息如下图

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
SendResult sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
 

                                   

图片

发送回调kafkaTemplate没有对应api , 需要通过Producer发送,我们通过producerFactory获取。

ProducerRecord record = new ProducerRecord(topic,content);Producer producer = producerFactory.createProducer();producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("send error ");}else {System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );}}});

相关文章:

kafka消息发送几种方式

同步发送 or 异步发送 消息发送根据是否需要处理发送的结果分为同步发送、异步发送。 同步发送&#xff1a;等待发送结果返回&#xff0c;这种方式是可靠的&#xff0c;因为异常能及时处理&#xff0c;但同步发送需要阻塞等待一条消息发送完才处理下一条&#xff0c;吞吐量差。…...

K1计划100%收购 MariaDB; TDSQL成为腾讯云核心战略产品; Oracle@AWS/Google/Azure发布

重要更新 1. 腾讯全球数字生态大会与9月5日-6日举行&#xff0c;发布“5T”战略&#xff0c;包括TDSQL、TencentOS、TCE&#xff08;专有云 &#xff09;、TBDS&#xff08;大数据&#xff09;、TI &#xff08;人工智能开发平台&#xff09;等 ( [2] ) ; 并正式向原子开源基金…...

Kyutai 开源对话模型 Moshi;李飞飞空间智能公司已筹集超过 2.3 亿美元丨 RTE 开发者日报

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。 我们的社区编辑团队会整理分享 RTE&#xff08;Real-Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、…...

Go语言的io输入输出流

Go语言的输入输出流不如其他语言那么直观&#xff0c;由于是通过实现接口方法的隐式继承所以比较抽象&#xff0c;今天具体介绍一下go语言的输入输出流。 go语言输入输出在io库中&#xff0c;使用Reader接口&#xff0c;如下&#xff1a; type Reader interface {Read(p []by…...

表单里面input的type属性值有哪些?

在HTML的表单&#xff08;<form>&#xff09;中&#xff0c;<input>元素是一个常用的元素&#xff0c;用于收集用户输入。每个<input>元素都包含一个type属性&#xff0c;用于定义输入字段的类型。以下是<input>元素中常见的type属性值&#xff1a; 1…...

【Redis】之Geo

概述 Geo就是Geolocation的简写形式&#xff0c;代表地理坐标。在Redis中&#xff0c;构造了能够存储地址坐标信息的一种数据结构&#xff0c;帮助我们根据经纬度来检索数据。 命令行操作方法 GEOADD 可以用来添加一个或者多个地理坐标。 GEODIST 返回一个key中两个成员之…...

常用的k8s容器网络模式有哪些?

常用的k8s容器网络模式包括Bridge模式、Host模式、Overlay模式、Flannel模式、CNI&#xff08;ContainerNetworkInterface&#xff09;模式。K8s的容器网络模式多种多样&#xff0c;每种模式都有其特点和适用场景。Bridge模式适用于简单的容器通信场景&#xff1b;Host模式适用…...

4位整数的数位和

输入一个4位数的整数&#xff0c;求其各数位上的数字之和。 输入格式: 输入在一行中给出1个4位的正整数n。 输出格式: 在一行中输出n的各数位上的数字之和。 输入样例: 1234输出样例: 10 代码如下&#xff1a; #include<stdio.h> int main() {int n;int a,b,c,d;scanf(&…...

XHTML学习

XHTML学习 1.XHTML 简介2.XHTML - 元素标准3.XHTML - 属性标准 1.XHTML 简介 XHTML是一个严格遵循 XML语法规则的 HTML 标准。它是 HTML4 的一种重构版本&#xff0c;结合了 HTML 的灵活性和 XML 的严格性&#xff0c;如今XHTML已经得到了所有主流浏览器的支持 与 HTML 相比最…...

KTH7823——16 位高精度低延时霍尔磁编码器可编程 ABZ 和 PWM 输出模式角度传感器

KTH7823 是一款高精度绝对角度霍尔传感器芯片&#xff0c;最高 16 位分辨率绝对角度输出&#xff0c;可 实现在轴向和离轴场合下的无接触式磁场角度测量。不论转速范围在 0-120000rpm 之间&#xff0c; KTH7823 都能快速准确地输出角度信息&#xff0c;适用于需要精准角…...

JDBC笔记

文章目录 准备MySQL数据的建立和建表 idea 建工程和模块设置属性配置文件编写JDBC代码URL的设置JDBC 代码配置文件 准备MySQL 数据的建立和建表 idea 建工程和模块 设置属性配置文件 编写JDBC代码 URL的设置 JDBC 代码 package com.yanyu;import java.sql.*; import java.util…...

小众语言ruby在苹果中的初步应用

前言 感觉Ruby在苹果系统中充当一种脚本语言来使用。 1、直接输入ruby没有反应 2、可显示结果的命令 ruby -e "puts Goodbye, cruel world!" 效果如下图&#xff1a; 说明苹果系统中ruby已经安装完毕&#xff0c;或者就是自带的。 3、编辑运行第一个ruby程序 输入…...

Nature: 一种基于宏基因组序列空间生成无参考的蛋白质家族的计算方法

通过全局宏基因组学揭示功能性暗物质 Unraveling the functional dark matter through global metagenomics Article, 2023-10-11 Nature [IF: 64.8] DOI: https://doi.org/10.1038/s41586-023-06583-7 原文链接&#xff1a;https://www.nature.com/articles/s41586-023-06…...

play-with-docker使用指南

Play-with-Docker(PWD)是一个在线平台,提供免费的 Docker 实验环境。它允许用户在浏览器中创建和管理 Docker 容器,适合学习和实验。国内访问需要借助于魔法工具,否则可能无法访问哦。 网站地址:https://labs.play-with-docker.com/ 一、登录play-with-docker 点击页面上…...

常见中间件漏洞靶场(tomcat)

1.CVE-2017-12615 开启环境 查看端口 查看IP 在哥斯拉里生成一个木马 访问页面修改文件后缀和文件内容 放包拿去连接 2.后台弱⼝令部署war包 打开环境 将前边的1.jsp压缩成1.zip然后改名为1.war 访问页面进行上传 在拿去连接 3.CVE-2020-1938 打开环境 访问一下 来到kali …...

一文读懂SpringCLoud

一、前言 只有光头才能变强 认识我的朋友可能都知道我这阵子去实习啦&#xff0c;去的公司说是用SpringCloud(但我觉得使用的力度并不大啊~~)… 所以&#xff0c;这篇主要来讲讲SpringCloud的一些基础的知识。(我就是现学现卖了&#xff0c;主要当做我学习SpringCloud的笔记吧&…...

tcpdump使用方法

一、centos上可以采用下面的命令进行安装。 yum install tcpdump 二、实例&#xff1a; 1、监视指定网络接口的数据包 即监听指定网卡的数据包&#xff0c;若不指定网卡&#xff0c;默认tcpdump只会监视第一个网络接口。如监听 eth0网卡&#xff0c;如下&#xff1a; tcpd…...

密码字典txt python密码字典代码

由于生成的密码数量非常庞大&#xff0c;这个过程可能需要非常长的时间来完成&#xff0c;并且会占用大量的磁盘空间。 链接&#xff1a; 密码字典下载地址610.4M 提取码: w8bi...

ubuntu安装emqx

目录 1.预先下载好emqx压缩包 2.使用tar命令解压 3.进入bin目录 5.放开访问端口18083 6.从通过ip地址访问emqx后台 7.默认用户名密码为admin/public 8.登录后台 9.资源包绑定在此博文可自取 1.预先下载好emqx压缩包 2.使用tar命令解压 sudo tar -xzvf emqx-5.0.8-el8-…...

F28335 时钟及控制系统

1 F28335 系统时钟来源 1.1 振荡器OSC与锁相环PLL 时钟信号对于DSP来说是非常重要的,它为DSP工作提供一个稳定的机器周期从而使系统能够正常运行。时钟系统犹如人的心脏,一旦有问题整个系统就崩溃。DSP 属于数字信号处理器, 它正常工作也必须为其提供时钟信号。那么这个时钟…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会&#xff0c;其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具&#xff0c;对过去十年 WWDC 主题演讲内容进行了系统化分析&#xff0c;形成了这份…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...