Kafka - 异步/同步发送API
文章目录
- 异步发送
- 普通异步发送
- 异步发送流程
- Code
- 带回调函数的异步发送
- 带回调函数的异步发送流程
- Code
- 同步发送API

异步发送
普通异步发送
需求:创建Kafka生产者,采用异步的方式发送到Kafka broker
异步发送流程
Code
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}
输出
31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9
忽略我这个offset … 我都发了好多次了…
看控制台的吧
带回调函数的异步发送
回调函数callback()会在producer收到ack时调用,为异步调用。
该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
- 如果Exception为null,说明消息发送成功,
- 如果Exception不为null,说明消息发送失败
带回调函数的异步发送流程
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
Code
package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}
控制台
同步发送API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}
相关文章:

Kafka - 异步/同步发送API
文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code 同步发送API 异步发送 普通异步发送 需求:创建Kafka生产者,采用异步的方式发送到Kafka broker 异步发送流程 Code <!-- https://mvnrepository…...
嵌套for循环在外层循环和内层循环中使用两个Executors.newCachedThreadPool缓存线程池执行操作
1. 首先,我们需要创建两个ExecutorService对象,这两个对象将作为我们的缓存线程池。 2. 然后,我们使用嵌套的for循环来执行我们的操作。在每个外层循环中,我们将创建一个新的任务并提交给外层线程池。在这个任务中,我…...

【uniapp+云函数调用】人脸识别,实人认证,适用于app,具体思路解析,已实现
2023.10.8 需求: uniapp开发的app项目中使用人脸识别 app项目都是第一次搞,更别提人脸识别了。目前已有的就是Dcloud账号已申请,实现需求的时间没那么紧迫 此篇会详细记录从0到1的过程 2023.10.24 今天开始探究实现的过程 可能会记录的有些冗余 效果图如下: uniapp开发指南…...
系列十六、bean有哪些生命周期的回调方法?有哪几种实现方式?
一、概述 bean的生命周期的回调方法主要分两种,一种是初始化时进行调用,另外一种是销毁时进行调用。但是不管是初始化还是销毁,都对应着三种方式。 二、实现方式 2.1、注解方式 PostConstruct PreDestroy Component public class UserSe…...

2023平台工程崭露头角,AI 带来新机遇与挑战
在今年,平台工程正在迅速在 IT 企业中崭露头角,成为软件开发团队的必要实践。根据 CloudBees 发布的最新报告《2023年平台工程:快速采纳和影响》,83%的受访者已经完全实施了平台工程,或正处于某种实施阶段。 平台工…...
如何使用python快速修改Excel表单中的大量数据
python修改Excel中的内容进阶加速版 前面有一篇文章讲到了使用python处理Excel中的数据文件,即修改Excel中的数据,但是那个版本的代码跑点小规模、小数据量的excel还行,一旦数据量达到万条级别,代码运行会非常慢!因此&…...

✔ ★【备战实习(面经+项目+算法)】 10.27学习
✔ ★【备战实习(面经项目算法)】 坚持完成每天必做如何找到好工作1. 科学的学习方法(专注!效率!记忆!心流!)2. 每天认真完成必做项,踏实学习技术 认真完成每天必做&…...

视频分辨率/帧率/码率选择参考
1. 视频码率与分辨率的参考表 1080*720的分辨率,用5000K左右; 720*576的分辨率,用3500K左右; 640*480的分辨率,用1500K左右。 2. 计算公式 基本算法:码率(kb…...
LeetCode75——Day18
文章目录 一、题目二、题解 一、题目 1732. Find the Highest Altitude There is a biker going on a road trip. The road trip consists of n 1 points at different altitudes. The biker starts his trip on point 0 with altitude equal 0. You are given an integer …...

Java NIO 高并发开发
Java NIO 高并发开发 前言 Java NIO(New I/O)相比于传统的Java I/O(BIO)在高并发开发方面具有以下优势: 非阻塞模式:Java NIO使用非阻塞的I/O操作,允许一个线程管理多个通道(Channe…...
8.循环神经网络
#pic_center R 1 R_1 R1 R 2 R^2 R2 目录 知识框架No.1 序列模型一、序列模型二、D2L代码注意点三、QA No.2 文本预处理一、D2L代码注意点二、QA No.3 语言模型一、语言模型二、D2L代码注意点三、QA No.4 循环神经网络 RNN一、RNN二、QA No.5 循环神经网络 RNN 的实现一、从零…...

[C++随想录] map和set的使用
map和set的使用 set初始化finderasecountlower_bound && upper_boundequal_ range mapinsert[ ]运算符 multiset && multimap set — — key模拟 map — — key_value模型 set 初始化 void set_test1() {set<int>s;s.insert(10);s.insert(12);s.insert(…...

公网IP怎么设置?公网ip有哪些优点和缺点?
随着互联网的普及,越来越多的人开始关注网络安全和隐私保护。其中,公网IP的设置成为了一个备受关注的话题。本文将详细介绍公网IP的设置方法以及公网IP的优点和缺点。 一、公网IP设置方法 1. 路由器设置 在家庭或企业网络中,路由器通常是最重…...

蓝桥杯第 2 场算法双周赛 第2题 铺地板【算法赛】c++ 数学思维
题目 铺地板https://www.lanqiao.cn/problems/5887/learning/?contest_id145 问题描述 小蓝家要装修了,小蓝爸爸买来了很多块(你可以理解为数量无限)2323 规格的地砖,小蓝家的地板是 nm 规格的,小蓝想问你…...
APScheduler-调度器 BackgroundScheduler
当你有主程序需要执行,让定时任务在后台执行时,可以用BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler import time # 仅运行定时任务 scheduler BackgroundScheduler() # interval example, 间隔执行,…...

浅谈UI自动化测试
随着软件行业的不断发展,建立一个完善的自动化测试体系变得至关重要。目前,自动化测试主要涵盖接口自动化测试和UI自动化测试两个主要领域。就目前而言,企业在UI自动化测试方面的覆盖率仍然相对较低。 接口自动化测试可以模拟和执行应用程序…...
golang 工程组件 grpc-gateway—yaml定义http规则,和自定义实现网关路由
yaml定义http规则,和自定义实现网关路由 proto定义http规则总归是麻烦的,因为proto文件还是定义消息,grpc接口好一些。配置http规则有更好的方式。我们可以使用yaml文件定义接口的http规则。 同时有些接口不是只是让网关转发这么简单 有时需…...
在NLP中一下常见的任务,可以用作baseline;MRPC,CoLA,STS-B,RTE
1.MRPC(Microsoft Research Paraphrase Corpus)任务 是一个用于文本匹配和相似度判断的任务。在MRPC任务中,给定一对句子,模型需要判断它们是否是语义上等价的。MRPC任务的训练集和测试集由约5700对英语句子组成。每个句子对都有…...

【计算机网络笔记】Cookie技术
系列文章目录 什么是计算机网络? 什么是网络协议? 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能(1)——速率、带宽、延迟 计算机网络性能(2)…...

在虚拟环境中,通过pip安装tensorflow
目录 激活python虚拟环境,更新pip 通过pip 安装tensorflow 确定python版本: 编辑安装tensorflow: 编辑 为什么使用pip安装tensorflow? 激活python虚拟环境,更新pip 命令为python -m pip install --upgrade pip 通过pip 安装tensorf…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...

Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...