当前位置: 首页 > news >正文

kafka(四)消息类型

一、同步消息

1、生产者

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

二、异步消息

1、生产者

异步消息有两种:

1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

三、顺序消息

以订单为例,

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

相关文章:

kafka(四)消息类型

一、同步消息 1、生产者 同步发送的意思就是&#xff0c;一条消息发送之后&#xff0c;会阻塞当前线程&#xff0c;直至返回 ack。 由于 send 方法返回的是一个 Future 对象&#xff0c;根据 Futrue 对象的特点&#xff0c;我们也可以实现同 步发送的效果&#xff0c;只需在调…...

Emacs之显示blame插件:blamer、git-messenger(一百四十四)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…...

【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】

需求 后端有个nodejs 基础库&#xff0c;用typescript编写&#xff0c;需要发包到代码仓库上&#xff0c;被其它业务引入。这其中就涉及了&#xff1a; 编译&#xff0c; 打包&#xff0c;发包。 工作流速览 前提依赖 webpack主体 npm install --save-dev webpack webpack…...

设计模式深入解析与实例应用

目录 工厂模式1.简单工厂模式2.工厂方法模式3.抽象工厂模式 策略模式责任链模式概述模板方法模式概述单例模式概述 工厂模式 工厂模式是一种创建型设计模式&#xff0c;它提供了一种创建对象的最佳实践&#xff0c;旨在将对象的创建过程与使用过程分离&#xff0c;以提高代码的…...

服务器数据恢复—异常断电导致RAID6阵列中磁盘出现坏扇区的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台存储中有一组由12块SAS硬盘组建的RAID6磁盘阵列&#xff0c;划分为一个卷&#xff0c;分配给几台Vmware ESXI主机做共享存储。该卷中存放了大量Windows虚拟机&#xff0c;这些虚拟机系统盘是统一大小&#xff0c;数据盘大小不确定&…...

前端工程化08-新的包管理工具pnpm

1、历史原因解读 pnpm这个东西发布的时间是比较早的&#xff0c;但是在最近一两年的时候才开始流行&#xff0c;甚至是可以说非常的盛行&#xff0c;那么这个包到底是个什么东西的&#xff0c;那么我们先说下&#xff0c;原来的包管理工具到底有那些问题&#xff1f;比如说我们…...

章十九、JavaVUE —— 框架、指令、声明周期、Vue-cli、组件路由、Element

目录 一、 框架 ● vue.js 框架 ● 特点 ● Vue 安装 二、 第一个vue程序 ● 创建项目 ​编辑 ● 导入 vue.js ● 创建vue对象&#xff0c;设置属性&#xff0c;使用模版渲染到页面 介绍 — Vue.js (vuejs.org) 三、 vue指令 ● v-text ● v-html ● v-…...

正则表达式阅读理解

这段正则表达式可以匹配什么呢&#xff1f; 超级复杂的一段正则表达式。 ((max|min)\\s*\\([^\\)]*(,[^\\)]*)*\\)|[a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][a-zA-Z0-9]*)?(\\*||%)?|[0-9](\\.[0-9])?|\\([^\\)]*(,[^\\)]*)*\\))(\\s*[-*/%]\\s*([a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][…...

Apache Calcite Linq4j学习

Lin4j简介 Linq4j是Apache Calcite项目中的一个模块&#xff0c;它提供了类似于LINQ&#xff08;Language-Integrated Query&#xff09;的功能&#xff0c;用于在Java中进行数据查询和操作。Linq4j可以将逻辑查询转换为物理查询&#xff0c;支持对集合进行筛选、映射、分组等…...

FPGA SATA高速存储设计

今天来讲一篇如何在fpga上实现sata ip&#xff0c;然后利用sata ip实现读写sata 盘的目的&#xff0c;如果需要再速度和容量上增加&#xff0c;那么仅仅需要增加sata ip个数就能够实现增加sata盘&#xff0c;如果仅仅实现data的读写整体来说sata ip设计比较简单&#xff0c;下面…...

MySQL----为什么选择使用MySQL

在我们日常做项目的过程中&#xff0c;不论是个人还是企业&#xff0c;大多数会选择使用MySQL数据库作为后端数据库存储&#xff0c;它到底有什么优势&#xff0c;能够做到如此广为流传呢&#xff1f; 优点 稳定性&#xff1a;MySQL具有良好的稳定性和可靠性&#xff0c;能够保…...

01.音视频小白系统入门(新专栏)

目录 一、基础知识 二、音频 三、视频 四、流媒体服务器 五、收获 音视频技术在远程办公、在线教育、远程医疗等领域的应用广泛。 学习音视频技术有助于提升职业竞争力&#xff0c;满足市场需求。 掌握音视频基础知识对未来发展至关重要&#xff0c;基础不牢会导致后续学习…...

C++:enum枚举共用体union

enum枚举 C继承C的枚举用法 (1)典型枚举类型定义&#xff0c;枚举变量定义和使用 (2)枚举类型中的枚举值常量不能和其他外部常量名称冲突&#xff1a; 举例1宏定义&#xff0c;举例2另一个枚举 // 定义一个名为Color的枚举类型 enum Color {RED, // 红色&#xff0c;默认值…...

动手学深度学习(Pytorch版)代码实践 -计算机视觉-47转置卷积

47转置卷积 import torch from torch import nn from d2l import torch as d2l# 输入矩阵X和卷积核矩阵K实现基本的转置卷积运算 def trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[0] h - 1, X.shape[1] w - 1))for i in range(X.shape[0]):for j in range(X.shap…...

LinkedIn被封原因和解封方法

对于初识领英和对领英生态规则不熟悉的人来说&#xff0c;很容易造成领英账号被封号(被限制登录)的情况&#xff0c;那么如何才能避免和解决领英帐号被封号(被限制登录)的难题呢&#xff1f; 领英帐号被封号或被限制登录主要会有两类情况。 首先要搞清楚&#xff0c; Linkedi…...

redis sentinel 部署

安装Redis 建议版本不要太低 > 6.2&#xff0c;我这里是redis 7.2.5 curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg echo "deb [signed-by/usr/share/keyrings/redis-archive-keyring.gpg] http…...

spring boot (shiro)+ websocket测试连接不上的简单检测处理

1、用前端连接测试的demo一切正常&#xff0c;但是到了项目中连接不上了 一开始以为是地址错&#xff0c;但是换了apifox测试也是不可以。 2、考虑是shiro进行了拦截了&#xff0c;所以就访问不到了地址&#xff0c;那么就放行。 3、再次用apifox测试&#xff0c;成功了。 当然…...

Jenkins - Python 虚拟环境

Jenkins - Python 虚拟环境 引言Python 虚拟环境创建 Python 虚拟环境安装 virtualenv&#xff08;可选&#xff09;创建虚拟环境激活虚拟环境安装依赖包退出虚拟环境&#xff08;可选&#xff09;注意 Python 虚拟环境实践 引言 Automation 脚本通常会部署到 Jenkins 上运行&…...

每日一道算法题 面试题 08.08. 有重复字符串的排列组合

题目 面试题 08.08. 有重复字符串的排列组合 - 力扣&#xff08;LeetCode&#xff09; Python class Solution:def permutation(self, S: str) -> List[str]:# 以索引记录字符是否用过lelen(S)idx[_ for _ in range(le) ]# 组合得到的字符串combine[]*leans[]# 递归def fu…...

Apache Kylin资源管理全指南:优化你的大数据架构

标题&#xff1a;Apache Kylin资源管理全指南&#xff1a;优化你的大数据架构 摘要 Apache Kylin是一个开源的分布式分析引擎&#xff0c;旨在为大规模数据集提供高性能的SQL查询能力。在Kylin中进行有效的资源管理对于确保查询性能和系统稳定性至关重要。本文将详细介绍如何…...

深入理解JavaScript设计模式之单例模式

目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式&#xff08;Singleton Pattern&#…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

微服务商城-商品微服务

数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

Linux nano命令的基本使用

参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时&#xff0c;显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

抽象类和接口(全)

一、抽象类 1.概念&#xff1a;如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象&#xff0c;这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法&#xff0c;包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中&#xff0c;⼀个类如果被 abs…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)

cd /home 进入home盘 安装虚拟环境&#xff1a; 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境&#xff1a; virtualenv myenv 3、激活虚拟环境&#xff08;激活环境可以在当前环境下安装包&#xff09; source myenv/bin/activate 此时&#xff0c;终端…...

第八部分:阶段项目 6:构建 React 前端应用

现在&#xff0c;是时候将你学到的 React 基础知识付诸实践&#xff0c;构建一个简单的前端应用来模拟与后端 API 的交互了。在这个阶段&#xff0c;你可以先使用模拟数据&#xff0c;或者如果你的后端 API&#xff08;阶段项目 5&#xff09;已经搭建好&#xff0c;可以直接连…...

LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》

&#x1f9e0; LangChain 中 TextSplitter 的使用详解&#xff1a;从基础到进阶&#xff08;附代码&#xff09; 一、前言 在处理大规模文本数据时&#xff0c;特别是在构建知识库或进行大模型训练与推理时&#xff0c;文本切分&#xff08;Text Splitting&#xff09; 是一个…...