当前位置: 首页 > news >正文

kafka之java客户端实战

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);}//消息处理完才停止发送者。producer.close();}
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 客户端核心参数与客户端机制

3.1 消费者分组消费机制

3.2 生产者拦截器机制

3.3 消息序列化机制

3.4 消息分区路由机制

3.5 生产者消息缓存机制

3.6 发送应答机制

3.7 生产者消息幂等性

3.8 生产者消息事务

内容更新中

相关文章:

kafka之java客户端实战

1. kafka的客户端 Kafka提供了两套客户端API&#xff0c;HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节&#xff0c;使用起来比较简单&#xff0c;是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节&#xff0c;Pa…...

图解渠道网关:不只是对接渠道的接口(一)

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;20&#xff09;篇。点击上方关注&#xff0c;深入了解支付系统的方方面面。 主要讲清楚什么是渠道&#xff0c;有哪些类型的渠道&#xff0c;什么是渠道网关&#xff0c;渠道网关在支付系统中定位、核心功能…...

【js版数据结构学习之队列】

队列 一、简要认识队列二、队列的封装三、队列的应用1.栈和队列的转换2.全排列3.任务调度4.缓存管理 一、简要认识队列 结构&#xff1a;一种特殊的线性表 入队&#xff1a;在队尾插入一个元素 出队&#xff1a;在队头删除一个元素 特点&#xff1a;先入先出 空队列&#xff1…...

iOS Xcode 升级Xcode15报错: SDK does not contain ‘libarclite‘

iOS Xcode 升级Xcode15报错: SDK does not contain libarclite 一、仔细查看报错代码: SDK does not contain libarclite at the path /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/ lib/arc/libarclite_iphonesimulator.a; try in…...

python高级练习题库实验2(B)部分

文章目录 题目1代码实验结果题目2代码实验结果题目总结题目1 注册课程小游戏程序 研究下面的例子,并编写一个与这些例子完全相同的程序。使用for loop和break来解决问题。提示用户输入课程数量,是否选择,并且课程代码,最后还需显示已经完成的课程注册数量或者未完成的注册…...

vue项目编译非常慢,经常卡在某个百分点

1、注册插件 2、在项目根目录下的 babel.config.js 文件中加入下方配置 3、将import导入方式改为require导入方式&#xff0c;返回promise 4、如果动态加载组件import引入组件找不到组件&#xff08;Error: Cannot find module&#xff09; 使用 webpack 的 require.ensure() …...

开源知识库zyplayer-doc部署指南

1.前置条件 docker已经安装 mysql已经安装且数据库zyplayer-doc存在 服务器ip:192.168.168.99/ 数据库账户:root,密码:123456 2.拉取镜像 docker pull zyplayer/zyplayer-doc:latest 3.启动 docker run -d \--restart unless-stopped \--name zyplayer-doc \-p 8083:8083 …...

第90讲:MySQL数据库主从复制集群原理概念以及搭建流程

文章目录 1.MySQL主从复制集群的核心概念1.1.什么是主从复制集群1.2.主从复制集群中的专业术语1.3.主从复制集群工作原理1.4.主从复制中的小细节1.5.搭建主从复制集群的前提条件1.6.MySQL主从复制集群的架构信息 2.搭建MySQL多实例环境2.1.在mysql-1中搭建身为主库的MySQL实例2…...

PHP反序列化漏洞-魔术方法绕过

一、__wakeup()魔法函数绕过: 在PHP中,__wakeup()是一个魔术方法,用于在反序列化对象时自动调用。当反序列化字符串中的对象属性个数大于实际属性个数时,可以利用这个漏洞进行绕过。 触发条件: PHP版本为5.6.25或早期版本,或者PHP7版本小于7.0.10。反序列化字符串中的对…...

抖店和商品橱窗的区别?这两个千万别再搞混了!

我是电商珠珠 很多人都会将抖店和商品橱窗搞混&#xff0c;想开抖店的人开了商品橱窗&#xff0c;想开橱窗的人开通了抖店。 我做抖店三年了&#xff0c;这种情况屡见不鲜。 那么抖店和商品橱窗究竟有什么区别呢&#xff1f; 1、属性不同 商品橱窗是抖音所展现商品的一个功…...

个人总结钉钉7.5新品发布会

钉钉发布了 7.5 版本&#xff0c;最主要推出了围绕AI能力的各项升级&#xff0c;通过AI“超级助理”提升组织内部的沟通协作效率、管理决策智能化程度&#xff0c;以及相关的音视频、在线文档、Teambition功能的升级&#xff0c;以满足不同企业的多元化需求。截至发布会&#x…...

连接超时的问题

连接超时的问题 通用第三方工具连接超时 connect timeout 方案一&#xff1a; /etc/ssh/sshd_config node1上操作&#xff0c;图是错的 方案二&#xff1a; windows上Hosts文件域名解析有问题 比如&#xff1a; 192.168.xx.100 node1 192.168.xx.161 node1 两个都解析成node…...

python贪吃蛇游戏

为了实现这个游戏&#xff0c;需要用到Python的pygame模块&#xff0c;它是一个专门用于开发游戏的模块&#xff0c;提供了很多方便的功能&#xff0c;比如窗口、图形、音效、事件处理等。 用pygame来创建一个窗口&#xff0c;设置游戏的背景色&#xff0c;画出蛇和食物&#…...

【Spring】Spring AOP

文章目录 前言1. 什么是 AOP2. 什么是 Spring AOP3. Spring AOP 的使用引入 AOP 依赖编写 AOP 程序 4. Spring AOP 详解4.1 Spring AOP 的概念4.1.1 切点4.1.2 连接点4.1.3 通知4.1.4 切面 4.2 通知类型4.3 切点4.4 切面优先级 Order注解4.5 切点表达式4.5.1 execution 切点表达…...

软件开发架构

【 一 】软件开发架构图 【 1】ATM和选课系统 三层的开发架构 前段展示台 后端逻辑层 数据处理层 【二】软件开发架构的步骤流程 需求分析&#xff1a;在软件开发架构设计之前&#xff0c;需要对应用系统进行需求分析&#xff0c;明确用户需求、功能模块、业务流程等内容。…...

计图大模型推理库部署指南,CPU跑大模型,具有高性能、配置要求低、中文支持好、可移植等特点

Excerpt 计图大模型推理库,具有高性能、配置要求低、中文支持好、可移植等特点 计图大模型推理库,具有高性能、配置要求低、中文支持好、可移植等特点 计图大模型推理库 - 笔记本没有显卡也能跑大模型 本大模型推理库JittorLLMs有以下几个特点: 成本低:相比同类框架,本库…...

CSS||Emmet语法

1、简介 ​ Emmet语法的前身是Zen coding,它使用缩写,来提高html/css的编写速度, Vscode内部已经集成该语法。 ​ 快速生成HTML结构语法 ​ 快速生成CSS样式语法 2、快速生成HTML结构语法 生成标签 直接输入标签名 按tab键即可 比如 div 然后tab 键&#xff0c; 就可以生成 <…...

Android中的anr定位指导与建议

1.背景 8月份安卓出现了一次直播间卡死(ANR)问题&#xff0c;且由于排查难度较大&#xff0c;持续了较长时间。本文针对如何快速定位安卓端出现ANR问题进行总结和探讨. 这里大致补充一下当时的情况,当时看到情景的是从某一个特定的场景下进入直播间后整个直播间界面立刻就卡住…...

YOLOV7剪枝流程

YOLOV7剪枝流程 1、训练 1&#xff09;划分数据集进行训练前的准备&#xff0c;按正常的划分流程即可 2&#xff09;修改train.py文件 第一次处在参数列表里添加剪枝的参数&#xff0c;正常训练时设置为False&#xff0c;剪枝后微调时设置为True parser.add_argument(--pr…...

【React】组件性能优化、高阶组件

文章目录 React性能优化SCUReact更新机制keys的优化render函数被调用shouldComponentUpdatePureComponentshallowEqual方法高阶组件memo 获取DOM方式refs如何使用refref的类型 受控和非受控组件认识受控组件非受控组件 React的高阶组件认识高阶函数高阶组件的定义应用一 – pro…...

3步搞定:m4s-converter让你的B站缓存视频重获新生

3步搞定&#xff1a;m4s-converter让你的B站缓存视频重获新生 【免费下载链接】m4s-converter 一个跨平台小工具&#xff0c;将bilibili缓存的m4s格式音视频文件合并成mp4 项目地址: https://gitcode.com/gh_mirrors/m4/m4s-converter 你是否曾经遇到过这样的困境&#…...

如何在VSCode中快速预览PDF文件:vscode-pdfviewer完整使用指南

如何在VSCode中快速预览PDF文件&#xff1a;vscode-pdfviewer完整使用指南 【免费下载链接】vscode-pdfviewer Show PDF preview in VSCode. 项目地址: https://gitcode.com/gh_mirrors/vs/vscode-pdfviewer 你是否经常需要在VSCode中查看PDF文档&#xff0c;但又不想频…...

python旅游出行指南系统

目录同行可拿货,招校园代理 ,本人源头供货商项目概述核心功能技术实现代码示例&#xff08;路线规划&#xff09;扩展方向适用场景源码获取详细视频演示 &#xff1a;同行可合作点击我获取源码->获取博主联系方式->进我个人主页-->同行可拿货,招校园代理 ,本人源头供货…...

ZStack控制台报错Failed to connect to console排查指南

1. 问题现场还原&#xff1a;不是连接失败&#xff0c;而是控制台页面直接报错弹窗Zstack 打开控制台报错——这六个字背后藏着一个在私有云运维一线高频出现、却常被误判为“网络不通”或“浏览器问题”的典型故障。我第一次遇到它是在给某制造企业做ZStack 4.5.2升级后的验收…...

VIVE Focus3 Unity开发避坑指南:JDK11.0.22与Wave SDK 4.2集成要点

1. 这不是SDK安装教程&#xff0c;而是新手在Focus3上摔的前七跤Unity新手刚拿到VIVE Focus3设备&#xff0c;满心欢喜点开VIVE Developer Portal下载SDK 4.2&#xff0c;解压、导入、Build、Run——然后卡在黑屏、报错、手势没反应、手柄漂移、甚至Unity编辑器直接崩溃。我带过…...

SaaS系统数据范围权限设计:从RBAC/ABAC到高性能实现

1. 项目概述&#xff1a;当数据安全遇上规模化增长在构建和运营一个面向多租户的大型SaaS&#xff08;软件即服务&#xff09;系统时&#xff0c;数据安全与隔离是悬在每一位架构师和开发者头上的“达摩克利斯之剑”。这不仅仅是技术问题&#xff0c;更是商业信任的基石。想象一…...

Python数据库迁移实战:从SQLAlchemy到Alembic的完整指南

Python数据库迁移实战&#xff1a;从SQLAlchemy到Alembic的完整指南 引言 数据库迁移是后端开发中不可或缺的一部分。作为从Python转向Rust的后端开发者&#xff0c;我发现Python的数据库迁移工具非常成熟&#xff0c;尤其是Alembic配合SQLAlchemy的组合。本文将从实战角度出发…...

ESXi 9.0.0 HPE原厂定制版深度解析|专属硬件适配+零报错部署指南,HPE服务器运维最优解

随着vSphere 9.0虚拟化架构全面普及&#xff0c;企业HPE慧与服务器的底层虚拟化部署迎来全新升级需求。普通通用版ESXi镜像在HPE ProLiant、Apollo系列服务器中&#xff0c;常出现网卡不认、RAID驱动缺失、iLO管理异常、硬件兼容报错等问题&#xff0c;严重影响生产部署效率与系…...

上班族开例会懒得记要点?2026年这3款AI总结工具,会后自动整理纪要

做互联网运营四年&#xff0c;开会已经成了每天的常态。部门周例会、项目复盘会、线上培训课、远程沟通会&#xff0c;大大小小的视频会议一场接一场。以前最让我头疼的不是参会&#xff0c;而是会后整理纪要。开会时既要认真听讨论、跟进工作进度&#xff0c;又要低头飞速记笔…...

数据库锁机制:表锁、行锁(Oracle 默认)、共享锁、排他锁、乐观锁、悲观锁、死锁、Hive 中的锁

数据库锁机制是控制并发访问数据的关键技术。本文系统介绍了锁的概念、分类和应用场景&#xff1a;1&#xff09;锁通过限制并发访问确保数据一致性&#xff0c;类比厕所门锁机制&#xff1b;2&#xff09;按粒度分为表锁&#xff08;适合批量操作&#xff09;、行锁&#xff0…...