Kafka - 异步/同步发送API
文章目录
- 异步发送
- 普通异步发送
- 异步发送流程
- Code
- 带回调函数的异步发送
- 带回调函数的异步发送流程
- Code
- 同步发送API
异步发送
普通异步发送
需求:创建Kafka生产者,采用异步的方式发送到Kafka broker
异步发送流程

Code
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}
输出
31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9
忽略我这个offset … 我都发了好多次了…
看控制台的吧

带回调函数的异步发送
回调函数callback()会在producer收到ack时调用,为异步调用。
该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
- 如果Exception为null,说明消息发送成功,
- 如果Exception不为null,说明消息发送失败
带回调函数的异步发送流程

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
Code
package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}

控制台

同步发送API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}

相关文章:
Kafka - 异步/同步发送API
文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code 同步发送API 异步发送 普通异步发送 需求:创建Kafka生产者,采用异步的方式发送到Kafka broker 异步发送流程 Code <!-- https://mvnrepository…...
嵌套for循环在外层循环和内层循环中使用两个Executors.newCachedThreadPool缓存线程池执行操作
1. 首先,我们需要创建两个ExecutorService对象,这两个对象将作为我们的缓存线程池。 2. 然后,我们使用嵌套的for循环来执行我们的操作。在每个外层循环中,我们将创建一个新的任务并提交给外层线程池。在这个任务中,我…...
【uniapp+云函数调用】人脸识别,实人认证,适用于app,具体思路解析,已实现
2023.10.8 需求: uniapp开发的app项目中使用人脸识别 app项目都是第一次搞,更别提人脸识别了。目前已有的就是Dcloud账号已申请,实现需求的时间没那么紧迫 此篇会详细记录从0到1的过程 2023.10.24 今天开始探究实现的过程 可能会记录的有些冗余 效果图如下: uniapp开发指南…...
系列十六、bean有哪些生命周期的回调方法?有哪几种实现方式?
一、概述 bean的生命周期的回调方法主要分两种,一种是初始化时进行调用,另外一种是销毁时进行调用。但是不管是初始化还是销毁,都对应着三种方式。 二、实现方式 2.1、注解方式 PostConstruct PreDestroy Component public class UserSe…...
2023平台工程崭露头角,AI 带来新机遇与挑战
在今年,平台工程正在迅速在 IT 企业中崭露头角,成为软件开发团队的必要实践。根据 CloudBees 发布的最新报告《2023年平台工程:快速采纳和影响》,83%的受访者已经完全实施了平台工程,或正处于某种实施阶段。 平台工…...
如何使用python快速修改Excel表单中的大量数据
python修改Excel中的内容进阶加速版 前面有一篇文章讲到了使用python处理Excel中的数据文件,即修改Excel中的数据,但是那个版本的代码跑点小规模、小数据量的excel还行,一旦数据量达到万条级别,代码运行会非常慢!因此&…...
✔ ★【备战实习(面经+项目+算法)】 10.27学习
✔ ★【备战实习(面经项目算法)】 坚持完成每天必做如何找到好工作1. 科学的学习方法(专注!效率!记忆!心流!)2. 每天认真完成必做项,踏实学习技术 认真完成每天必做&…...
视频分辨率/帧率/码率选择参考
1. 视频码率与分辨率的参考表 1080*720的分辨率,用5000K左右; 720*576的分辨率,用3500K左右; 640*480的分辨率,用1500K左右。 2. 计算公式 基本算法:码率(kb…...
LeetCode75——Day18
文章目录 一、题目二、题解 一、题目 1732. Find the Highest Altitude There is a biker going on a road trip. The road trip consists of n 1 points at different altitudes. The biker starts his trip on point 0 with altitude equal 0. You are given an integer …...
Java NIO 高并发开发
Java NIO 高并发开发 前言 Java NIO(New I/O)相比于传统的Java I/O(BIO)在高并发开发方面具有以下优势: 非阻塞模式:Java NIO使用非阻塞的I/O操作,允许一个线程管理多个通道(Channe…...
8.循环神经网络
#pic_center R 1 R_1 R1 R 2 R^2 R2 目录 知识框架No.1 序列模型一、序列模型二、D2L代码注意点三、QA No.2 文本预处理一、D2L代码注意点二、QA No.3 语言模型一、语言模型二、D2L代码注意点三、QA No.4 循环神经网络 RNN一、RNN二、QA No.5 循环神经网络 RNN 的实现一、从零…...
[C++随想录] map和set的使用
map和set的使用 set初始化finderasecountlower_bound && upper_boundequal_ range mapinsert[ ]运算符 multiset && multimap set — — key模拟 map — — key_value模型 set 初始化 void set_test1() {set<int>s;s.insert(10);s.insert(12);s.insert(…...
公网IP怎么设置?公网ip有哪些优点和缺点?
随着互联网的普及,越来越多的人开始关注网络安全和隐私保护。其中,公网IP的设置成为了一个备受关注的话题。本文将详细介绍公网IP的设置方法以及公网IP的优点和缺点。 一、公网IP设置方法 1. 路由器设置 在家庭或企业网络中,路由器通常是最重…...
蓝桥杯第 2 场算法双周赛 第2题 铺地板【算法赛】c++ 数学思维
题目 铺地板https://www.lanqiao.cn/problems/5887/learning/?contest_id145 问题描述 小蓝家要装修了,小蓝爸爸买来了很多块(你可以理解为数量无限)2323 规格的地砖,小蓝家的地板是 nm 规格的,小蓝想问你…...
APScheduler-调度器 BackgroundScheduler
当你有主程序需要执行,让定时任务在后台执行时,可以用BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler import time # 仅运行定时任务 scheduler BackgroundScheduler() # interval example, 间隔执行,…...
浅谈UI自动化测试
随着软件行业的不断发展,建立一个完善的自动化测试体系变得至关重要。目前,自动化测试主要涵盖接口自动化测试和UI自动化测试两个主要领域。就目前而言,企业在UI自动化测试方面的覆盖率仍然相对较低。 接口自动化测试可以模拟和执行应用程序…...
golang 工程组件 grpc-gateway—yaml定义http规则,和自定义实现网关路由
yaml定义http规则,和自定义实现网关路由 proto定义http规则总归是麻烦的,因为proto文件还是定义消息,grpc接口好一些。配置http规则有更好的方式。我们可以使用yaml文件定义接口的http规则。 同时有些接口不是只是让网关转发这么简单 有时需…...
在NLP中一下常见的任务,可以用作baseline;MRPC,CoLA,STS-B,RTE
1.MRPC(Microsoft Research Paraphrase Corpus)任务 是一个用于文本匹配和相似度判断的任务。在MRPC任务中,给定一对句子,模型需要判断它们是否是语义上等价的。MRPC任务的训练集和测试集由约5700对英语句子组成。每个句子对都有…...
【计算机网络笔记】Cookie技术
系列文章目录 什么是计算机网络? 什么是网络协议? 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能(1)——速率、带宽、延迟 计算机网络性能(2)…...
在虚拟环境中,通过pip安装tensorflow
目录 激活python虚拟环境,更新pip 通过pip 安装tensorflow 确定python版本: 编辑安装tensorflow: 编辑 为什么使用pip安装tensorflow? 激活python虚拟环境,更新pip 命令为python -m pip install --upgrade pip 通过pip 安装tensorf…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...
【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...
Appium下载安装配置保姆教程(图文详解)
目录 一、Appium软件介绍 1.特点 2.工作原理 3.应用场景 二、环境准备 安装 Node.js 安装 Appium 安装 JDK 安装 Android SDK 安装Python及依赖包 三、安装教程 1.Node.js安装 1.1.下载Node 1.2.安装程序 1.3.配置npm仓储和缓存 1.4. 配置环境 1.5.测试Node.j…...
shell脚本质数判断
shell脚本质数判断 shell输入一个正整数,判断是否为质数(素数)shell求1-100内的质数shell求给定数组输出其中的质数 shell输入一个正整数,判断是否为质数(素数) 思路: 1:1 2:1 2 3:1 2 3 4:1 2 3 4 5:1 2 3 4 5-------> 3:2 4:2 3 5:2 3…...
鸿蒙Navigation路由导航-基本使用介绍
1. Navigation介绍 Navigation组件是路由导航的根视图容器,一般作为Page页面的根容器使用,其内部默认包含了标题栏、内容区和工具栏,其中内容区默认首页显示导航内容(Navigation的子组件)或非首页显示(Nav…...
