kafka(三)springboot集成kafka(1)介绍
基于kafka新版本
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>
一、kafkaProducer
1、介绍
设计上比consumer要简单一些,因为不涉及组管理,即每个producer都是独立工作的。
(1)目前producer的主要功能是向某个topic的某个分区发送一条消息,这就涉及分区选择策略,在ProducerRecord中介绍。
(2)因为有ISR,因此在发送消息时,producer有多种选择来实现消息发送,如不等待任何副本的影响便返回、只等待leader副本响应返回等等。
2、发送流程
produce的发送主要流程概述如下:
-
拦截器对发送的消息拦截处理;
-
获取元数据信息;
-
序列化处理;
-
分区处理;
-
批次添加处理;
-
发送消息。

3、主要参数
3.1、acks:
有三个参数:0、1、all,数据可靠性的重要参数,可以保证消息不丢失。
Properties properties = new Properties();
properties.put(ProducerConfig.ACKS_CONFIG,"1");
3.2、 buffer.memory
指定了producer端用于缓冲消息的缓冲区大小,单位是字节。
3.3、compression.type
producer是否压缩消息。
3.4、batch.size
二、ProducerRecord
1、介绍
发送给Kafka Broker的key/value 值对,producer将待发送的消息封装进ProducerRecord实例类。
2、发送消息分区策略
(1)指定了分区:
当发送时指定了partition就使用该partition。即kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)指定了发送到哪个具体的分区。

(2)轮询
如果kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)没有指定发送到哪个具体的分区,即partition=null(并且key也为空时,如果此时key不为空的话就会采用另一种分区策略key哈希分区策略),并且使用了默认的分区器,那么消息将被随机的发送到主题的各个可用分区上,分区器使用轮询的算法将消息均衡的分布到各个分区。

(3)key哈希分区策略
根据消息的key进行哈希计算,并将消息发送到对应的分区。保证相同key的消息始终被发送到同一个分区,确保消息的顺序性。

(4)自定义分区策略(即自定义Partitioner)
用户可以根据自己的需求实现自定义的分区策略,通过实现org.apache.kafka.clients.producer.Partitioner接口来自定义分区选择逻辑。

二、 KafkaConsumer
三、生产者发送消息应用
1、同步发送消息
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
2、异步发送消息
2.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
2.2、带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法//(2)消息发送失败 exception != null 也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
四、消费者接收消息应用
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1. 创建消费者配置对象Properties properties = new Properties();// 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)// broker的ip地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//配置消费者组(组名必须)properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 注册消费主题ArrayList<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 4.调用方法消费数据// 如果kafka集群没有新数据会造成空转// 填写参数为时间,如果没有拉取数据,线程睡眠一会while (true) {// 设置1s中消费的一批数据// Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1sConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 打印消费数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());}}//5.关闭资源
// consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作}
}相关文章:
kafka(三)springboot集成kafka(1)介绍
基于kafka新版本 <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency> </dependencies> 一、kafkaProducer 1、介绍…...
Markdown语法与基础使用
在撰写博客、文档或者其他类型的文字内容时,Markdown语法是一种简洁、易读易写的标记语言,被广泛应用于互联网上的文本编辑和排版中。下面将介绍Markdown语法的基础使用方法,帮助你更好地利用Markdown来编写内容。 1. 标题 Markdown支持使用…...
【排序】七大排序表格比较
排序 时间复杂度 空间复杂度 最坏时间复杂度 最好时间复杂度 稳定性 插入排序 O(n) O(1) O(n) O(n) 稳定 希尔排序 O(nlogn)-O(n)取决于增量序列 O(1) O(n^1.3) O(nlogn) 不稳定 选择排序 O(n) O(1) O(n) O(n) 不稳定 冒泡排序 O(n) O(1) O(n) O(n…...
arcgis 栅格数据处理2——栅格转地级市(栅格转矢量图)
1. 获取空间分析权限(解决无法执行所选工具问题) 选中“自定义”中的“扩展模块” 在弹出的模块中选中能选的模块,此处需要选择“spatial analysis”以进行下一步分析 3. 将栅格数据转为整数型(解决无法矢量化) 选…...
unity学习(53)——选择角色界面--分配服务器返回的信息
好久没写客户端了,一上手还不太适应 1.经过测试,成功登陆后,客户端请求list_request,成功返回,如下图: 可见此时model第三个位置的参数是1.也成功返回了所有已注册角色的信息。 2.之前已知创建的角色信息…...
矩阵爆破逆向-条件断点的妙用
不知道你是否使用过IDA的条件断点呢?在IDA进阶使用中,它的很多功能都有大作用,比如:ida-trace来跟踪调用流程。同时IDA的断点功能也十分强大,配合IDA-python的输出语句能够大杀特杀! 那么本文就介绍一下这个…...
logstash和elasticsearch的几种交互接口
Logstash与Elasticsearch是两个非常流行的开源工具,用于处理和存储大量的日志数据。它们之间的集成非常重要,因为Logstash用于收集、处理和转换日志数据,而Elasticsearch用于存储、搜索和分析这些数据。在本文中,我们将详细介绍Lo…...
Golang 开发实战day02 - Print Formatting
Golang 教程02 - Print,Formatting Strings Go语言提供了丰富的格式化字符串功能,用于将数据格式化为特定格式的字符串。本课程将详细介绍Go语言中Print和Formatting Strings的用法,并提供代码示例供大家参考。 1.Print 类型及使用 1.1 Pr…...
2023护网蓝初面试
目录 一、渗透测试的流程 二、常见的漏洞 三、中间件漏洞 四、SQL注入原理、种类?防御?预编译原理,宽字节注入原理 预编译原理: 宽字节注入原理: 五、XSS的种类有哪些?区别?修复…...
Unity编辑器功能Inspector快捷自动填充数据和可视化调试
我们有时候可能需要在面板增加一些引用,可能添加脚本后要手动拖动,这样如果有大量的脚本拖动也是不小的工作量 实例 例如:我的脚本需要添加一个Bone的列表,一个个拖动很麻烦。 实现脚本 我们可以用这样的脚本来实现。 public…...
【C/C++】常量指针与指针常量的深入解析与区分(什么是const int * 与 int * const ?)
目录 一、前言 二、const 的简单介绍 三、常量指针 🔍介绍与分析 📰小结与记忆口诀 四、指针常量 🔍介绍与分析 📰小结与记忆口诀 五、总结与提炼 六、共勉 一、前言 在【C/C】的编程中,指针与const关键字的组合…...
零、自然语言处理开篇
目录 0、NLP任务的基础——符号向量化 0.0 词袋模型 0.1 查表/One-hot编码 0.2 词嵌入模型/预训练模型 0.2.0 Word2Vec (0)CBOW (1)Skip-gram 0.2.1 GloVe 0.2.2 WordPiece 0.2.3 BERT 0.2.4 ERNIE NLP自然语言处理&am…...
Learn OpenGL 04 纹理
纹理环绕方式 纹理坐标的范围通常是从(0, 0)到(1, 1),那如果我们把纹理坐标设置在范围之外会发生什么?OpenGL默认的行为是重复这个纹理图像(我们基本上忽略浮点纹理坐标的整数部分),但OpenGL提供了更多的选择…...
了解开源可视化表单的主要优势
为什么可视化表单深受大家喜爱?这就需要了解开源可视化表单的优势和特点了。在流程化办公深入人心的今天,提高办公协作效率早已成为大家的发展目标,低代码技术平台、开源可视化表单是提升办公协作效率的得力助手,一起来看看它的优…...
Redis进阶--一篇文章带你走出Redis
目录 什么是Redis?? Redis有哪些使用场景? Redis是单线程还是多线程? 为什么Redis是单线程速度还是很快?? Redis持久化 RDB机制:(Redis DataBase) [是redis中默认的持久化方式] AOF机制:(Append Only File) Redis和MySQL如何保持数据一致????…...
【框架设计】MVC、MVP、MVVM对比图
1. MVC(Model-View-Controller) 2. MVP(Model-View-Presenter) 3. MVVM(Model-View-ViewModel)...
四桥臂三相逆变器动态电压恢复器(DVR)MATLAB仿真
微❤关注“电气仔推送”获得资料(专享优惠) 简介 四桥臂三相逆变器 电路 的一般形式如图 1,为 便于分析 ,将其等效成图所示的电路 。以直流母线电压Ud的 1/2处为参考点 ,逆变器三相和零线相 输 出可等效成…...
阿里云一键登录(号码认证服务)
前言 用户登录原来的登录方式如下 1. 手机号验证码 2. 账号密码 运营觉得操作过于复杂, 因此想引入阿里自动登录的逻辑, 也就是号码认证服务,所以才有了这篇问文章 注: 本文只是记录Java端的实现, app端的请自行查询文档实现 官方资料 文档 : 什么是号码认证服务_号码认证服务(…...
全量知识系统中的翻译器以及百度文库AI应用中心给出的答复
Q1. 下面是全量知识系统中的翻译器的规划(参考前一篇:全量知识系统 之 “百度翻译”。从“全量知识系统的翻译器”起。链接在下面)。下面的文字分5次发出。链接: 全量知识系统 之 “百度翻译”-CSDN博客 第一次回答:…...
抓包工具获取请求信息
Charles 下载安装 下载 官方下载地址:https://www.charlesproxy.com/latest-release/download.do 下载后傻瓜式安装就好,这个官方的需要激活,可以选择绿色版或者学习版 绿色版 绿色中文版:https://soft.kxdw.com/pc/Charles.z…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
华为OD机试-最短木板长度-二分法(A卷,100分)
此题是一个最大化最小值的典型例题, 因为搜索范围是有界的,上界最大木板长度补充的全部木料长度,下界最小木板长度; 即left0,right10^6; 我们可以设置一个候选值x(mid),将木板的长度全部都补充到x,如果成功…...
