当前位置: 首页 > news >正文

kafka(四)消息类型

一、同步消息

1、生产者

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

二、异步消息

1、生产者

异步消息有两种:

1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

三、顺序消息

以订单为例,

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

相关文章:

kafka(四)消息类型

一、同步消息 1、生产者 同步发送的意思就是&#xff0c;一条消息发送之后&#xff0c;会阻塞当前线程&#xff0c;直至返回 ack。 由于 send 方法返回的是一个 Future 对象&#xff0c;根据 Futrue 对象的特点&#xff0c;我们也可以实现同 步发送的效果&#xff0c;只需在调…...

Emacs之显示blame插件:blamer、git-messenger(一百四十四)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…...

【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】

需求 后端有个nodejs 基础库&#xff0c;用typescript编写&#xff0c;需要发包到代码仓库上&#xff0c;被其它业务引入。这其中就涉及了&#xff1a; 编译&#xff0c; 打包&#xff0c;发包。 工作流速览 前提依赖 webpack主体 npm install --save-dev webpack webpack…...

设计模式深入解析与实例应用

目录 工厂模式1.简单工厂模式2.工厂方法模式3.抽象工厂模式 策略模式责任链模式概述模板方法模式概述单例模式概述 工厂模式 工厂模式是一种创建型设计模式&#xff0c;它提供了一种创建对象的最佳实践&#xff0c;旨在将对象的创建过程与使用过程分离&#xff0c;以提高代码的…...

服务器数据恢复—异常断电导致RAID6阵列中磁盘出现坏扇区的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台存储中有一组由12块SAS硬盘组建的RAID6磁盘阵列&#xff0c;划分为一个卷&#xff0c;分配给几台Vmware ESXI主机做共享存储。该卷中存放了大量Windows虚拟机&#xff0c;这些虚拟机系统盘是统一大小&#xff0c;数据盘大小不确定&…...

前端工程化08-新的包管理工具pnpm

1、历史原因解读 pnpm这个东西发布的时间是比较早的&#xff0c;但是在最近一两年的时候才开始流行&#xff0c;甚至是可以说非常的盛行&#xff0c;那么这个包到底是个什么东西的&#xff0c;那么我们先说下&#xff0c;原来的包管理工具到底有那些问题&#xff1f;比如说我们…...

章十九、JavaVUE —— 框架、指令、声明周期、Vue-cli、组件路由、Element

目录 一、 框架 ● vue.js 框架 ● 特点 ● Vue 安装 二、 第一个vue程序 ● 创建项目 ​编辑 ● 导入 vue.js ● 创建vue对象&#xff0c;设置属性&#xff0c;使用模版渲染到页面 介绍 — Vue.js (vuejs.org) 三、 vue指令 ● v-text ● v-html ● v-…...

正则表达式阅读理解

这段正则表达式可以匹配什么呢&#xff1f; 超级复杂的一段正则表达式。 ((max|min)\\s*\\([^\\)]*(,[^\\)]*)*\\)|[a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][a-zA-Z0-9]*)?(\\*||%)?|[0-9](\\.[0-9])?|\\([^\\)]*(,[^\\)]*)*\\))(\\s*[-*/%]\\s*([a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][…...

Apache Calcite Linq4j学习

Lin4j简介 Linq4j是Apache Calcite项目中的一个模块&#xff0c;它提供了类似于LINQ&#xff08;Language-Integrated Query&#xff09;的功能&#xff0c;用于在Java中进行数据查询和操作。Linq4j可以将逻辑查询转换为物理查询&#xff0c;支持对集合进行筛选、映射、分组等…...

FPGA SATA高速存储设计

今天来讲一篇如何在fpga上实现sata ip&#xff0c;然后利用sata ip实现读写sata 盘的目的&#xff0c;如果需要再速度和容量上增加&#xff0c;那么仅仅需要增加sata ip个数就能够实现增加sata盘&#xff0c;如果仅仅实现data的读写整体来说sata ip设计比较简单&#xff0c;下面…...

MySQL----为什么选择使用MySQL

在我们日常做项目的过程中&#xff0c;不论是个人还是企业&#xff0c;大多数会选择使用MySQL数据库作为后端数据库存储&#xff0c;它到底有什么优势&#xff0c;能够做到如此广为流传呢&#xff1f; 优点 稳定性&#xff1a;MySQL具有良好的稳定性和可靠性&#xff0c;能够保…...

01.音视频小白系统入门(新专栏)

目录 一、基础知识 二、音频 三、视频 四、流媒体服务器 五、收获 音视频技术在远程办公、在线教育、远程医疗等领域的应用广泛。 学习音视频技术有助于提升职业竞争力&#xff0c;满足市场需求。 掌握音视频基础知识对未来发展至关重要&#xff0c;基础不牢会导致后续学习…...

C++:enum枚举共用体union

enum枚举 C继承C的枚举用法 (1)典型枚举类型定义&#xff0c;枚举变量定义和使用 (2)枚举类型中的枚举值常量不能和其他外部常量名称冲突&#xff1a; 举例1宏定义&#xff0c;举例2另一个枚举 // 定义一个名为Color的枚举类型 enum Color {RED, // 红色&#xff0c;默认值…...

动手学深度学习(Pytorch版)代码实践 -计算机视觉-47转置卷积

47转置卷积 import torch from torch import nn from d2l import torch as d2l# 输入矩阵X和卷积核矩阵K实现基本的转置卷积运算 def trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[0] h - 1, X.shape[1] w - 1))for i in range(X.shape[0]):for j in range(X.shap…...

LinkedIn被封原因和解封方法

对于初识领英和对领英生态规则不熟悉的人来说&#xff0c;很容易造成领英账号被封号(被限制登录)的情况&#xff0c;那么如何才能避免和解决领英帐号被封号(被限制登录)的难题呢&#xff1f; 领英帐号被封号或被限制登录主要会有两类情况。 首先要搞清楚&#xff0c; Linkedi…...

redis sentinel 部署

安装Redis 建议版本不要太低 > 6.2&#xff0c;我这里是redis 7.2.5 curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg echo "deb [signed-by/usr/share/keyrings/redis-archive-keyring.gpg] http…...

spring boot (shiro)+ websocket测试连接不上的简单检测处理

1、用前端连接测试的demo一切正常&#xff0c;但是到了项目中连接不上了 一开始以为是地址错&#xff0c;但是换了apifox测试也是不可以。 2、考虑是shiro进行了拦截了&#xff0c;所以就访问不到了地址&#xff0c;那么就放行。 3、再次用apifox测试&#xff0c;成功了。 当然…...

Jenkins - Python 虚拟环境

Jenkins - Python 虚拟环境 引言Python 虚拟环境创建 Python 虚拟环境安装 virtualenv&#xff08;可选&#xff09;创建虚拟环境激活虚拟环境安装依赖包退出虚拟环境&#xff08;可选&#xff09;注意 Python 虚拟环境实践 引言 Automation 脚本通常会部署到 Jenkins 上运行&…...

每日一道算法题 面试题 08.08. 有重复字符串的排列组合

题目 面试题 08.08. 有重复字符串的排列组合 - 力扣&#xff08;LeetCode&#xff09; Python class Solution:def permutation(self, S: str) -> List[str]:# 以索引记录字符是否用过lelen(S)idx[_ for _ in range(le) ]# 组合得到的字符串combine[]*leans[]# 递归def fu…...

Apache Kylin资源管理全指南:优化你的大数据架构

标题&#xff1a;Apache Kylin资源管理全指南&#xff1a;优化你的大数据架构 摘要 Apache Kylin是一个开源的分布式分析引擎&#xff0c;旨在为大规模数据集提供高性能的SQL查询能力。在Kylin中进行有效的资源管理对于确保查询性能和系统稳定性至关重要。本文将详细介绍如何…...

HttpOnly Cookie 深度解析

一、什么是 HttpOnly Cookie HttpOnly 是一个可以附加在 Set-Cookie 响应头上的标志位&#xff08;flag&#xff09;。当一个 Cookie 被标记为 HttpOnly 后&#xff0c;客户端脚本&#xff08;如 JavaScript&#xff09;将无法通过 document.cookie 等 API 访问该 Cookie&…...

从《西部世界》到现实:AI智能体如何重塑游戏NPC与虚拟社会?

从《西部世界》到现实&#xff1a;AI智能体如何重塑游戏NPC与虚拟社会&#xff1f; 当《西部世界》中的NPC开始拥有记忆、情感和自主决策能力时&#xff0c;观众惊叹于科幻与现实的边界正在模糊。如今&#xff0c;大型语言模型&#xff08;LLM&#xff09;驱动的AI智能体正将这…...

DownKyi完全指南:三步解锁B站8K视频下载的终极方案

DownKyi完全指南&#xff1a;三步解锁B站8K视频下载的终极方案 【免费下载链接】downkyi 哔哩下载姬downkyi&#xff0c;哔哩哔哩网站视频下载工具&#xff0c;支持批量下载&#xff0c;支持8K、HDR、杜比视界&#xff0c;提供工具箱&#xff08;音视频提取、去水印等&#xff…...

快速免费解锁网易云音乐NCM格式:ncmdumpGUI完整使用指南

快速免费解锁网易云音乐NCM格式&#xff1a;ncmdumpGUI完整使用指南 【免费下载链接】ncmdumpGUI C#版本网易云音乐ncm文件格式转换&#xff0c;Windows图形界面版本 项目地址: https://gitcode.com/gh_mirrors/nc/ncmdumpGUI 你是否曾在网易云音乐下载了心爱的歌曲&am…...

别再死记硬背了!用MATLAB手把手教你画根轨迹图(附代码与避坑指南)

MATLAB实战&#xff1a;从零绘制根轨迹图的完整指南与避坑技巧 在控制系统的设计与分析中&#xff0c;根轨迹图是理解系统动态特性的重要工具。传统教学中&#xff0c;学生往往被要求死记硬背绘制规则&#xff0c;却难以理解其实际应用价值。本文将彻底改变这一现状——通过MAT…...

Windows Cleaner终极指南:3分钟彻底解决C盘爆红问题!

Windows Cleaner终极指南&#xff1a;3分钟彻底解决C盘爆红问题&#xff01; 【免费下载链接】WindowsCleaner Windows Cleaner——专治C盘爆红及各种不服&#xff01; 项目地址: https://gitcode.com/gh_mirrors/wi/WindowsCleaner 还在为Windows系统越用越慢而烦恼吗&…...

【C语言】printf格式化输出:你真的理解“四舍五入”的陷阱吗?

1. 从printf的"四舍五入"陷阱说起 那天我在调试一个财务计算程序时&#xff0c;发现金额显示总差那么几分钱。比如3.145元应该显示为3.15&#xff0c;但程序输出却是3.14。这让我想起刚学C语言时踩过的坑——printf的格式化输出并不像数学课教的四舍五入那样简单。 先…...

用51单片机和HC-SR04超声波模块DIY一个倒车雷达(附完整代码和立创EDA原理图)

51单片机与HC-SR04超声波模块实战&#xff1a;打造高精度倒车雷达系统 在汽车电子和智能硬件领域&#xff0c;倒车雷达作为基础安全装置&#xff0c;其DIY实现不仅能帮助理解超声波测距原理&#xff0c;更是掌握嵌入式系统开发的绝佳实践。本文将手把手教你使用经典的STC89C52单…...

从仿生结构到步态算法:8自由度并联腿机器狗行走全解析

1. 8自由度并联腿机器狗的结构奥秘 第一次拆解机器狗时&#xff0c;我对着那些复杂的连杆结构发了半小时呆。直到发现它的腿部运动原理和公园里的跷跷板惊人相似——这个发现让我瞬间理解了8自由度并联腿的精妙之处。这种结构就像给机器人装上了"机械肌腱"&#xff0…...

gwadd:轻量级Git仓库组管理工具,提升多项目开发效率

1. 项目概述&#xff1a;一个被低估的Git仓库管理利器如果你和我一样&#xff0c;日常工作中需要频繁地在多个Git仓库之间穿梭&#xff0c;处理各种依赖、子模块&#xff0c;或者仅仅是同步一堆相关的项目代码&#xff0c;那么你一定对那种重复、繁琐的切换和操作感到头疼。今天…...