kafka之java客户端实战
1. kafka的客户端
Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。
2. 基础客户端的使用
Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>
2.1 如何发消息
现在, 我们使用Kafka提供的Producer类,如何发送消息。
2.1.1 单项发送消息
代码:
public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();} }
执行结果:
2.1.2 同步发送
代码:
public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);}//消息处理完才停止发送者。producer.close();} }
执行结果:
2.1.2 异步发送
代码:
public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();} }
执行结果:
2.1.3 总结
从上述示例中,我们可以总结出,构建Producer分为三个步骤:
- 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
- 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
- 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。
2.2 如何消费消息
接下来可以使用Kafka提供的Consumer类,快速消费消息。
2.2.1 消费消息
代码:
public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。 // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}} }
2.2.2 总结
整体来说,Consumer同样是分为三个步骤:
- 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
- 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
- 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。
3. 客户端核心参数与客户端机制
3.1 消费者分组消费机制
3.2 生产者拦截器机制
3.3 消息序列化机制
3.4 消息分区路由机制
3.5 生产者消息缓存机制
3.6 发送应答机制
3.7 生产者消息幂等性
3.8 生产者消息事务
内容更新中
相关文章:

kafka之java客户端实战
1. kafka的客户端 Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Pa…...

图解渠道网关:不只是对接渠道的接口(一)
这是《百图解码支付系统设计与实现》专栏系列文章中的第(20)篇。点击上方关注,深入了解支付系统的方方面面。 主要讲清楚什么是渠道,有哪些类型的渠道,什么是渠道网关,渠道网关在支付系统中定位、核心功能…...

【js版数据结构学习之队列】
队列 一、简要认识队列二、队列的封装三、队列的应用1.栈和队列的转换2.全排列3.任务调度4.缓存管理 一、简要认识队列 结构:一种特殊的线性表 入队:在队尾插入一个元素 出队:在队头删除一个元素 特点:先入先出 空队列࿱…...
iOS Xcode 升级Xcode15报错: SDK does not contain ‘libarclite‘
iOS Xcode 升级Xcode15报错: SDK does not contain libarclite 一、仔细查看报错代码: SDK does not contain libarclite at the path /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/ lib/arc/libarclite_iphonesimulator.a; try in…...

python高级练习题库实验2(B)部分
文章目录 题目1代码实验结果题目2代码实验结果题目总结题目1 注册课程小游戏程序 研究下面的例子,并编写一个与这些例子完全相同的程序。使用for loop和break来解决问题。提示用户输入课程数量,是否选择,并且课程代码,最后还需显示已经完成的课程注册数量或者未完成的注册…...

vue项目编译非常慢,经常卡在某个百分点
1、注册插件 2、在项目根目录下的 babel.config.js 文件中加入下方配置 3、将import导入方式改为require导入方式,返回promise 4、如果动态加载组件import引入组件找不到组件(Error: Cannot find module) 使用 webpack 的 require.ensure() …...

开源知识库zyplayer-doc部署指南
1.前置条件 docker已经安装 mysql已经安装且数据库zyplayer-doc存在 服务器ip:192.168.168.99/ 数据库账户:root,密码:123456 2.拉取镜像 docker pull zyplayer/zyplayer-doc:latest 3.启动 docker run -d \--restart unless-stopped \--name zyplayer-doc \-p 8083:8083 …...

第90讲:MySQL数据库主从复制集群原理概念以及搭建流程
文章目录 1.MySQL主从复制集群的核心概念1.1.什么是主从复制集群1.2.主从复制集群中的专业术语1.3.主从复制集群工作原理1.4.主从复制中的小细节1.5.搭建主从复制集群的前提条件1.6.MySQL主从复制集群的架构信息 2.搭建MySQL多实例环境2.1.在mysql-1中搭建身为主库的MySQL实例2…...
PHP反序列化漏洞-魔术方法绕过
一、__wakeup()魔法函数绕过: 在PHP中,__wakeup()是一个魔术方法,用于在反序列化对象时自动调用。当反序列化字符串中的对象属性个数大于实际属性个数时,可以利用这个漏洞进行绕过。 触发条件: PHP版本为5.6.25或早期版本,或者PHP7版本小于7.0.10。反序列化字符串中的对…...
抖店和商品橱窗的区别?这两个千万别再搞混了!
我是电商珠珠 很多人都会将抖店和商品橱窗搞混,想开抖店的人开了商品橱窗,想开橱窗的人开通了抖店。 我做抖店三年了,这种情况屡见不鲜。 那么抖店和商品橱窗究竟有什么区别呢? 1、属性不同 商品橱窗是抖音所展现商品的一个功…...
个人总结钉钉7.5新品发布会
钉钉发布了 7.5 版本,最主要推出了围绕AI能力的各项升级,通过AI“超级助理”提升组织内部的沟通协作效率、管理决策智能化程度,以及相关的音视频、在线文档、Teambition功能的升级,以满足不同企业的多元化需求。截至发布会&#x…...

连接超时的问题
连接超时的问题 通用第三方工具连接超时 connect timeout 方案一: /etc/ssh/sshd_config node1上操作,图是错的 方案二: windows上Hosts文件域名解析有问题 比如: 192.168.xx.100 node1 192.168.xx.161 node1 两个都解析成node…...

python贪吃蛇游戏
为了实现这个游戏,需要用到Python的pygame模块,它是一个专门用于开发游戏的模块,提供了很多方便的功能,比如窗口、图形、音效、事件处理等。 用pygame来创建一个窗口,设置游戏的背景色,画出蛇和食物&#…...

【Spring】Spring AOP
文章目录 前言1. 什么是 AOP2. 什么是 Spring AOP3. Spring AOP 的使用引入 AOP 依赖编写 AOP 程序 4. Spring AOP 详解4.1 Spring AOP 的概念4.1.1 切点4.1.2 连接点4.1.3 通知4.1.4 切面 4.2 通知类型4.3 切点4.4 切面优先级 Order注解4.5 切点表达式4.5.1 execution 切点表达…...

软件开发架构
【 一 】软件开发架构图 【 1】ATM和选课系统 三层的开发架构 前段展示台 后端逻辑层 数据处理层 【二】软件开发架构的步骤流程 需求分析:在软件开发架构设计之前,需要对应用系统进行需求分析,明确用户需求、功能模块、业务流程等内容。…...
计图大模型推理库部署指南,CPU跑大模型,具有高性能、配置要求低、中文支持好、可移植等特点
Excerpt 计图大模型推理库,具有高性能、配置要求低、中文支持好、可移植等特点 计图大模型推理库,具有高性能、配置要求低、中文支持好、可移植等特点 计图大模型推理库 - 笔记本没有显卡也能跑大模型 本大模型推理库JittorLLMs有以下几个特点: 成本低:相比同类框架,本库…...

CSS||Emmet语法
1、简介 Emmet语法的前身是Zen coding,它使用缩写,来提高html/css的编写速度, Vscode内部已经集成该语法。 快速生成HTML结构语法 快速生成CSS样式语法 2、快速生成HTML结构语法 生成标签 直接输入标签名 按tab键即可 比如 div 然后tab 键, 就可以生成 <…...

Android中的anr定位指导与建议
1.背景 8月份安卓出现了一次直播间卡死(ANR)问题,且由于排查难度较大,持续了较长时间。本文针对如何快速定位安卓端出现ANR问题进行总结和探讨. 这里大致补充一下当时的情况,当时看到情景的是从某一个特定的场景下进入直播间后整个直播间界面立刻就卡住…...

YOLOV7剪枝流程
YOLOV7剪枝流程 1、训练 1)划分数据集进行训练前的准备,按正常的划分流程即可 2)修改train.py文件 第一次处在参数列表里添加剪枝的参数,正常训练时设置为False,剪枝后微调时设置为True parser.add_argument(--pr…...

【React】组件性能优化、高阶组件
文章目录 React性能优化SCUReact更新机制keys的优化render函数被调用shouldComponentUpdatePureComponentshallowEqual方法高阶组件memo 获取DOM方式refs如何使用refref的类型 受控和非受控组件认识受控组件非受控组件 React的高阶组件认识高阶函数高阶组件的定义应用一 – pro…...

51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...

微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...
在 Spring Boot 项目里,MYSQL中json类型字段使用
前言: 因为程序特殊需求导致,需要mysql数据库存储json类型数据,因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...

数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...
规则与人性的天平——由高考迟到事件引发的思考
当那位身着校服的考生在考场关闭1分钟后狂奔而至,他涨红的脸上写满绝望。铁门内秒针划过的弧度,成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定",构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...

C# winform教程(二)----checkbox
一、作用 提供一个用户选择或者不选的状态,这是一个可以多选的控件。 二、属性 其实功能大差不差,除了特殊的几个外,与button基本相同,所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...
从实验室到产业:IndexTTS 在六大核心场景的落地实践
一、内容创作:重构数字内容生产范式 在短视频创作领域,IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色,生成的 “各位吴彦祖们大家好” 语音相似度达 97%,单条视频播放量突破百万…...

Tauri2学习笔记
教程地址:https://www.bilibili.com/video/BV1Ca411N7mF?spm_id_from333.788.player.switch&vd_source707ec8983cc32e6e065d5496a7f79ee6 官方指引:https://tauri.app/zh-cn/start/ 目前Tauri2的教程视频不多,我按照Tauri1的教程来学习&…...