kafka(四)消息类型
一、同步消息
1、生产者
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
二、异步消息
1、生产者
异步消息有两种:
1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法//(2)消息发送失败 exception != null 也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
三、顺序消息
以订单为例,
- 生产者将相同的key的订单状态事件推送到kafka的同一分区
- kafka 消费者接收消息
- 消费者将消息提交给线程池
- 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
- 单个线程不停的从阻塞队列获取订单状态消息消费
@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}
相关文章:

kafka(四)消息类型
一、同步消息 1、生产者 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调…...

Emacs之显示blame插件:blamer、git-messenger(一百四十四)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...

【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】
需求 后端有个nodejs 基础库,用typescript编写,需要发包到代码仓库上,被其它业务引入。这其中就涉及了: 编译, 打包,发包。 工作流速览 前提依赖 webpack主体 npm install --save-dev webpack webpack…...
设计模式深入解析与实例应用
目录 工厂模式1.简单工厂模式2.工厂方法模式3.抽象工厂模式 策略模式责任链模式概述模板方法模式概述单例模式概述 工厂模式 工厂模式是一种创建型设计模式,它提供了一种创建对象的最佳实践,旨在将对象的创建过程与使用过程分离,以提高代码的…...

服务器数据恢复—异常断电导致RAID6阵列中磁盘出现坏扇区的数据恢复案例
服务器存储数据恢复环境: 一台存储中有一组由12块SAS硬盘组建的RAID6磁盘阵列,划分为一个卷,分配给几台Vmware ESXI主机做共享存储。该卷中存放了大量Windows虚拟机,这些虚拟机系统盘是统一大小,数据盘大小不确定&…...

前端工程化08-新的包管理工具pnpm
1、历史原因解读 pnpm这个东西发布的时间是比较早的,但是在最近一两年的时候才开始流行,甚至是可以说非常的盛行,那么这个包到底是个什么东西的,那么我们先说下,原来的包管理工具到底有那些问题?比如说我们…...

章十九、JavaVUE —— 框架、指令、声明周期、Vue-cli、组件路由、Element
目录 一、 框架 ● vue.js 框架 ● 特点 ● Vue 安装 二、 第一个vue程序 ● 创建项目 编辑 ● 导入 vue.js ● 创建vue对象,设置属性,使用模版渲染到页面 介绍 — Vue.js (vuejs.org) 三、 vue指令 ● v-text ● v-html ● v-…...

正则表达式阅读理解
这段正则表达式可以匹配什么呢? 超级复杂的一段正则表达式。 ((max|min)\\s*\\([^\\)]*(,[^\\)]*)*\\)|[a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][a-zA-Z0-9]*)?(\\*||%)?|[0-9](\\.[0-9])?|\\([^\\)]*(,[^\\)]*)*\\))(\\s*[-*/%]\\s*([a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][…...
Apache Calcite Linq4j学习
Lin4j简介 Linq4j是Apache Calcite项目中的一个模块,它提供了类似于LINQ(Language-Integrated Query)的功能,用于在Java中进行数据查询和操作。Linq4j可以将逻辑查询转换为物理查询,支持对集合进行筛选、映射、分组等…...

FPGA SATA高速存储设计
今天来讲一篇如何在fpga上实现sata ip,然后利用sata ip实现读写sata 盘的目的,如果需要再速度和容量上增加,那么仅仅需要增加sata ip个数就能够实现增加sata盘,如果仅仅实现data的读写整体来说sata ip设计比较简单,下面…...
MySQL----为什么选择使用MySQL
在我们日常做项目的过程中,不论是个人还是企业,大多数会选择使用MySQL数据库作为后端数据库存储,它到底有什么优势,能够做到如此广为流传呢? 优点 稳定性:MySQL具有良好的稳定性和可靠性,能够保…...
01.音视频小白系统入门(新专栏)
目录 一、基础知识 二、音频 三、视频 四、流媒体服务器 五、收获 音视频技术在远程办公、在线教育、远程医疗等领域的应用广泛。 学习音视频技术有助于提升职业竞争力,满足市场需求。 掌握音视频基础知识对未来发展至关重要,基础不牢会导致后续学习…...

C++:enum枚举共用体union
enum枚举 C继承C的枚举用法 (1)典型枚举类型定义,枚举变量定义和使用 (2)枚举类型中的枚举值常量不能和其他外部常量名称冲突: 举例1宏定义,举例2另一个枚举 // 定义一个名为Color的枚举类型 enum Color {RED, // 红色,默认值…...

动手学深度学习(Pytorch版)代码实践 -计算机视觉-47转置卷积
47转置卷积 import torch from torch import nn from d2l import torch as d2l# 输入矩阵X和卷积核矩阵K实现基本的转置卷积运算 def trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[0] h - 1, X.shape[1] w - 1))for i in range(X.shape[0]):for j in range(X.shap…...

LinkedIn被封原因和解封方法
对于初识领英和对领英生态规则不熟悉的人来说,很容易造成领英账号被封号(被限制登录)的情况,那么如何才能避免和解决领英帐号被封号(被限制登录)的难题呢? 领英帐号被封号或被限制登录主要会有两类情况。 首先要搞清楚, Linkedi…...
redis sentinel 部署
安装Redis 建议版本不要太低 > 6.2,我这里是redis 7.2.5 curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg echo "deb [signed-by/usr/share/keyrings/redis-archive-keyring.gpg] http…...

spring boot (shiro)+ websocket测试连接不上的简单检测处理
1、用前端连接测试的demo一切正常,但是到了项目中连接不上了 一开始以为是地址错,但是换了apifox测试也是不可以。 2、考虑是shiro进行了拦截了,所以就访问不到了地址,那么就放行。 3、再次用apifox测试,成功了。 当然…...
Jenkins - Python 虚拟环境
Jenkins - Python 虚拟环境 引言Python 虚拟环境创建 Python 虚拟环境安装 virtualenv(可选)创建虚拟环境激活虚拟环境安装依赖包退出虚拟环境(可选)注意 Python 虚拟环境实践 引言 Automation 脚本通常会部署到 Jenkins 上运行&…...

每日一道算法题 面试题 08.08. 有重复字符串的排列组合
题目 面试题 08.08. 有重复字符串的排列组合 - 力扣(LeetCode) Python class Solution:def permutation(self, S: str) -> List[str]:# 以索引记录字符是否用过lelen(S)idx[_ for _ in range(le) ]# 组合得到的字符串combine[]*leans[]# 递归def fu…...
Apache Kylin资源管理全指南:优化你的大数据架构
标题:Apache Kylin资源管理全指南:优化你的大数据架构 摘要 Apache Kylin是一个开源的分布式分析引擎,旨在为大规模数据集提供高性能的SQL查询能力。在Kylin中进行有效的资源管理对于确保查询性能和系统稳定性至关重要。本文将详细介绍如何…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...

STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...

免费数学几何作图web平台
光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...

nnUNet V2修改网络——暴力替换网络为UNet++
更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...

Vue3 PC端 UI组件库我更推荐Naive UI
一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用,前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率,还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库(Naive UI、Element …...

欢乐熊大话蓝牙知识17:多连接 BLE 怎么设计服务不会乱?分层思维来救场!
多连接 BLE 怎么设计服务不会乱?分层思维来救场! 作者按: 你是不是也遇到过 BLE 多连接时,调试现场像网吧“掉线风暴”? 温度传感器连上了,心率带丢了;一边 OTA 更新,一边通知卡壳。…...