kafka(四)消息类型
一、同步消息
1、生产者
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
二、异步消息
1、生产者
异步消息有两种:
1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法//(2)消息发送失败 exception != null 也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
三、顺序消息
以订单为例,
- 生产者将相同的key的订单状态事件推送到kafka的同一分区
- kafka 消费者接收消息
- 消费者将消息提交给线程池
- 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
- 单个线程不停的从阻塞队列获取订单状态消息消费

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}
相关文章:
kafka(四)消息类型
一、同步消息 1、生产者 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调…...
Emacs之显示blame插件:blamer、git-messenger(一百四十四)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...
【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】
需求 后端有个nodejs 基础库,用typescript编写,需要发包到代码仓库上,被其它业务引入。这其中就涉及了: 编译, 打包,发包。 工作流速览 前提依赖 webpack主体 npm install --save-dev webpack webpack…...
设计模式深入解析与实例应用
目录 工厂模式1.简单工厂模式2.工厂方法模式3.抽象工厂模式 策略模式责任链模式概述模板方法模式概述单例模式概述 工厂模式 工厂模式是一种创建型设计模式,它提供了一种创建对象的最佳实践,旨在将对象的创建过程与使用过程分离,以提高代码的…...
服务器数据恢复—异常断电导致RAID6阵列中磁盘出现坏扇区的数据恢复案例
服务器存储数据恢复环境: 一台存储中有一组由12块SAS硬盘组建的RAID6磁盘阵列,划分为一个卷,分配给几台Vmware ESXI主机做共享存储。该卷中存放了大量Windows虚拟机,这些虚拟机系统盘是统一大小,数据盘大小不确定&…...
前端工程化08-新的包管理工具pnpm
1、历史原因解读 pnpm这个东西发布的时间是比较早的,但是在最近一两年的时候才开始流行,甚至是可以说非常的盛行,那么这个包到底是个什么东西的,那么我们先说下,原来的包管理工具到底有那些问题?比如说我们…...
章十九、JavaVUE —— 框架、指令、声明周期、Vue-cli、组件路由、Element
目录 一、 框架 ● vue.js 框架 ● 特点 ● Vue 安装 二、 第一个vue程序 ● 创建项目 编辑 ● 导入 vue.js ● 创建vue对象,设置属性,使用模版渲染到页面 介绍 — Vue.js (vuejs.org) 三、 vue指令 ● v-text ● v-html ● v-…...
正则表达式阅读理解
这段正则表达式可以匹配什么呢? 超级复杂的一段正则表达式。 ((max|min)\\s*\\([^\\)]*(,[^\\)]*)*\\)|[a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][a-zA-Z0-9]*)?(\\*||%)?|[0-9](\\.[0-9])?|\\([^\\)]*(,[^\\)]*)*\\))(\\s*[-*/%]\\s*([a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][…...
Apache Calcite Linq4j学习
Lin4j简介 Linq4j是Apache Calcite项目中的一个模块,它提供了类似于LINQ(Language-Integrated Query)的功能,用于在Java中进行数据查询和操作。Linq4j可以将逻辑查询转换为物理查询,支持对集合进行筛选、映射、分组等…...
FPGA SATA高速存储设计
今天来讲一篇如何在fpga上实现sata ip,然后利用sata ip实现读写sata 盘的目的,如果需要再速度和容量上增加,那么仅仅需要增加sata ip个数就能够实现增加sata盘,如果仅仅实现data的读写整体来说sata ip设计比较简单,下面…...
MySQL----为什么选择使用MySQL
在我们日常做项目的过程中,不论是个人还是企业,大多数会选择使用MySQL数据库作为后端数据库存储,它到底有什么优势,能够做到如此广为流传呢? 优点 稳定性:MySQL具有良好的稳定性和可靠性,能够保…...
01.音视频小白系统入门(新专栏)
目录 一、基础知识 二、音频 三、视频 四、流媒体服务器 五、收获 音视频技术在远程办公、在线教育、远程医疗等领域的应用广泛。 学习音视频技术有助于提升职业竞争力,满足市场需求。 掌握音视频基础知识对未来发展至关重要,基础不牢会导致后续学习…...
C++:enum枚举共用体union
enum枚举 C继承C的枚举用法 (1)典型枚举类型定义,枚举变量定义和使用 (2)枚举类型中的枚举值常量不能和其他外部常量名称冲突: 举例1宏定义,举例2另一个枚举 // 定义一个名为Color的枚举类型 enum Color {RED, // 红色,默认值…...
动手学深度学习(Pytorch版)代码实践 -计算机视觉-47转置卷积
47转置卷积 import torch from torch import nn from d2l import torch as d2l# 输入矩阵X和卷积核矩阵K实现基本的转置卷积运算 def trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[0] h - 1, X.shape[1] w - 1))for i in range(X.shape[0]):for j in range(X.shap…...
LinkedIn被封原因和解封方法
对于初识领英和对领英生态规则不熟悉的人来说,很容易造成领英账号被封号(被限制登录)的情况,那么如何才能避免和解决领英帐号被封号(被限制登录)的难题呢? 领英帐号被封号或被限制登录主要会有两类情况。 首先要搞清楚, Linkedi…...
redis sentinel 部署
安装Redis 建议版本不要太低 > 6.2,我这里是redis 7.2.5 curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg echo "deb [signed-by/usr/share/keyrings/redis-archive-keyring.gpg] http…...
spring boot (shiro)+ websocket测试连接不上的简单检测处理
1、用前端连接测试的demo一切正常,但是到了项目中连接不上了 一开始以为是地址错,但是换了apifox测试也是不可以。 2、考虑是shiro进行了拦截了,所以就访问不到了地址,那么就放行。 3、再次用apifox测试,成功了。 当然…...
Jenkins - Python 虚拟环境
Jenkins - Python 虚拟环境 引言Python 虚拟环境创建 Python 虚拟环境安装 virtualenv(可选)创建虚拟环境激活虚拟环境安装依赖包退出虚拟环境(可选)注意 Python 虚拟环境实践 引言 Automation 脚本通常会部署到 Jenkins 上运行&…...
每日一道算法题 面试题 08.08. 有重复字符串的排列组合
题目 面试题 08.08. 有重复字符串的排列组合 - 力扣(LeetCode) Python class Solution:def permutation(self, S: str) -> List[str]:# 以索引记录字符是否用过lelen(S)idx[_ for _ in range(le) ]# 组合得到的字符串combine[]*leans[]# 递归def fu…...
Apache Kylin资源管理全指南:优化你的大数据架构
标题:Apache Kylin资源管理全指南:优化你的大数据架构 摘要 Apache Kylin是一个开源的分布式分析引擎,旨在为大规模数据集提供高性能的SQL查询能力。在Kylin中进行有效的资源管理对于确保查询性能和系统稳定性至关重要。本文将详细介绍如何…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...
手机平板能效生态设计指令EU 2023/1670标准解读
手机平板能效生态设计指令EU 2023/1670标准解读 以下是针对欧盟《手机和平板电脑生态设计法规》(EU) 2023/1670 的核心解读,综合法规核心要求、最新修正及企业合规要点: 一、法规背景与目标 生效与强制时间 发布于2023年8月31日(OJ公报&…...
如何应对敏捷转型中的团队阻力
应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中,明确沟通敏捷转型目的尤为关键,团队成员只有清晰理解转型背后的原因和利益,才能降低对变化的…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
软件工程 期末复习
瀑布模型:计划 螺旋模型:风险低 原型模型: 用户反馈 喷泉模型:代码复用 高内聚 低耦合:模块内部功能紧密 模块之间依赖程度小 高内聚:指的是一个模块内部的功能应该紧密相关。换句话说,一个模块应当只实现单一的功能…...
macOS 终端智能代理检测
🧠 终端智能代理检测:自动判断是否需要设置代理访问 GitHub 在开发中,使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新,例如: fatal: unable to access https://github.com/ohmyzsh/oh…...
