当前位置: 首页 > news >正文

Kafka - 异步/同步发送API

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

相关文章:

Kafka - 异步/同步发送API

文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code 同步发送API 异步发送 普通异步发送 需求&#xff1a;创建Kafka生产者&#xff0c;采用异步的方式发送到Kafka broker 异步发送流程 Code <!-- https://mvnrepository…...

嵌套for循环在外层循环和内层循环中使用两个Executors.newCachedThreadPool缓存线程池执行操作

1. 首先&#xff0c;我们需要创建两个ExecutorService对象&#xff0c;这两个对象将作为我们的缓存线程池。 2. 然后&#xff0c;我们使用嵌套的for循环来执行我们的操作。在每个外层循环中&#xff0c;我们将创建一个新的任务并提交给外层线程池。在这个任务中&#xff0c;我…...

【uniapp+云函数调用】人脸识别,实人认证,适用于app,具体思路解析,已实现

2023.10.8 需求: uniapp开发的app项目中使用人脸识别 app项目都是第一次搞,更别提人脸识别了。目前已有的就是Dcloud账号已申请,实现需求的时间没那么紧迫 此篇会详细记录从0到1的过程 2023.10.24 今天开始探究实现的过程 可能会记录的有些冗余 效果图如下: uniapp开发指南…...

系列十六、bean有哪些生命周期的回调方法?有哪几种实现方式?

一、概述 bean的生命周期的回调方法主要分两种&#xff0c;一种是初始化时进行调用&#xff0c;另外一种是销毁时进行调用。但是不管是初始化还是销毁&#xff0c;都对应着三种方式。 二、实现方式 2.1、注解方式 PostConstruct PreDestroy Component public class UserSe…...

2023平台工程崭露头角,AI 带来新机遇与挑战

在今年&#xff0c;平台工程正在迅速在 IT 企业中崭露头角&#xff0c;成为软件开发团队的必要实践。根据 CloudBees 发布的最新报告《2023年平台工程&#xff1a;快速采纳和影响》&#xff0c;83%的受访者已经完全实施了平台工程&#xff0c;或正处于某种实施阶段。 平台工…...

如何使用python快速修改Excel表单中的大量数据

python修改Excel中的内容进阶加速版 前面有一篇文章讲到了使用python处理Excel中的数据文件&#xff0c;即修改Excel中的数据&#xff0c;但是那个版本的代码跑点小规模、小数据量的excel还行&#xff0c;一旦数据量达到万条级别&#xff0c;代码运行会非常慢&#xff01;因此&…...

✔ ★【备战实习(面经+项目+算法)】 10.27学习

✔ ★【备战实习&#xff08;面经项目算法&#xff09;】 坚持完成每天必做如何找到好工作1. 科学的学习方法&#xff08;专注&#xff01;效率&#xff01;记忆&#xff01;心流&#xff01;&#xff09;2. 每天认真完成必做项&#xff0c;踏实学习技术 认真完成每天必做&…...

视频分辨率/帧率/码率选择参考

1. 视频码率与分辨率的参考表 1080&#xff0a;720的分辨率&#xff0c;用5000K左右&#xff1b; 720&#xff0a;576的分辨率&#xff0c;用3500K左右&#xff1b; 640&#xff0a;480的分辨率&#xff0c;用1500K左右。 2. 计算公式 基本算法&#xff1a;码率&#xff08;kb…...

LeetCode75——Day18

文章目录 一、题目二、题解 一、题目 1732. Find the Highest Altitude There is a biker going on a road trip. The road trip consists of n 1 points at different altitudes. The biker starts his trip on point 0 with altitude equal 0. You are given an integer …...

Java NIO 高并发开发

Java NIO 高并发开发 前言 Java NIO&#xff08;New I/O&#xff09;相比于传统的Java I/O&#xff08;BIO&#xff09;在高并发开发方面具有以下优势&#xff1a; 非阻塞模式&#xff1a;Java NIO使用非阻塞的I/O操作&#xff0c;允许一个线程管理多个通道&#xff08;Channe…...

8.循环神经网络

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.1 序列模型一、序列模型二、D2L代码注意点三、QA No.2 文本预处理一、D2L代码注意点二、QA No.3 语言模型一、语言模型二、D2L代码注意点三、QA No.4 循环神经网络 RNN一、RNN二、QA No.5 循环神经网络 RNN 的实现一、从零…...

[C++随想录] map和set的使用

map和set的使用 set初始化finderasecountlower_bound && upper_boundequal_ range mapinsert[ ]运算符 multiset && multimap set — — key模拟 map — — key_value模型 set 初始化 void set_test1() {set<int>s;s.insert(10);s.insert(12);s.insert(…...

公网IP怎么设置?公网ip有哪些优点和缺点?

随着互联网的普及&#xff0c;越来越多的人开始关注网络安全和隐私保护。其中&#xff0c;公网IP的设置成为了一个备受关注的话题。本文将详细介绍公网IP的设置方法以及公网IP的优点和缺点。 一、公网IP设置方法 1. 路由器设置 在家庭或企业网络中&#xff0c;路由器通常是最重…...

蓝桥杯第 2 场算法双周赛 第2题 铺地板【算法赛】c++ 数学思维

题目 铺地板https://www.lanqiao.cn/problems/5887/learning/?contest_id145 问题描述 小蓝家要装修了&#xff0c;小蓝爸爸买来了很多块&#xff08;你可以理解为数量无限&#xff09;2323 规格的地砖&#xff0c;小蓝家的地板是 nm 规格的&#xff0c;小蓝想问你&#xf…...

APScheduler-调度器 BackgroundScheduler

当你有主程序需要执行&#xff0c;让定时任务在后台执行时&#xff0c;可以用BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler import time # 仅运行定时任务 scheduler BackgroundScheduler() # interval example, 间隔执行,…...

浅谈UI自动化测试

随着软件行业的不断发展&#xff0c;建立一个完善的自动化测试体系变得至关重要。目前&#xff0c;自动化测试主要涵盖接口自动化测试和UI自动化测试两个主要领域。就目前而言&#xff0c;企业在UI自动化测试方面的覆盖率仍然相对较低。 接口自动化测试可以模拟和执行应用程序…...

golang 工程组件 grpc-gateway—yaml定义http规则,和自定义实现网关路由

yaml定义http规则&#xff0c;和自定义实现网关路由 proto定义http规则总归是麻烦的&#xff0c;因为proto文件还是定义消息&#xff0c;grpc接口好一些。配置http规则有更好的方式。我们可以使用yaml文件定义接口的http规则。 同时有些接口不是只是让网关转发这么简单 有时需…...

在NLP中一下常见的任务,可以用作baseline;MRPC,CoLA,STS-B,RTE

1.MRPC&#xff08;Microsoft Research Paraphrase Corpus&#xff09;任务 是一个用于文本匹配和相似度判断的任务。在MRPC任务中&#xff0c;给定一对句子&#xff0c;模型需要判断它们是否是语义上等价的。MRPC任务的训练集和测试集由约5700对英语句子组成。每个句子对都有…...

【计算机网络笔记】Cookie技术

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…...

在虚拟环境中,通过pip安装tensorflow

目录 激活python虚拟环境&#xff0c;更新pip 通过pip 安装tensorflow 确定python版本&#xff1a; ​编辑安装tensorflow: ​编辑 为什么使用pip安装tensorflow? 激活python虚拟环境&#xff0c;更新pip 命令为python -m pip install --upgrade pip 通过pip 安装tensorf…...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

Yolov8 目标检测蒸馏学习记录

yolov8系列模型蒸馏基本流程&#xff0c;代码下载&#xff1a;这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中&#xff0c;**知识蒸馏&#xff08;Knowledge Distillation&#xff09;**被广泛应用&#xff0c;作为提升模型…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

NPOI操作EXCEL文件 ——CAD C# 二次开发

缺点:dll.版本容易加载错误。CAD加载插件时&#xff0c;没有加载所有类库。插件运行过程中用到某个类库&#xff0c;会从CAD的安装目录找&#xff0c;找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库&#xff0c;就用插件程序加载进…...

C语言中提供的第三方库之哈希表实现

一. 简介 前面一篇文章简单学习了C语言中第三方库&#xff08;uthash库&#xff09;提供对哈希表的操作&#xff0c;文章如下&#xff1a; C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

未授权访问事件频发,我们应当如何应对?

在当下&#xff0c;数据已成为企业和组织的核心资产&#xff0c;是推动业务发展、决策制定以及创新的关键驱动力。然而&#xff0c;未授权访问这一隐匿的安全威胁&#xff0c;正如同高悬的达摩克利斯之剑&#xff0c;时刻威胁着数据的安全&#xff0c;一旦触发&#xff0c;便可…...