当前位置: 首页 > news >正文

kafka之java客户端实战

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);}//消息处理完才停止发送者。producer.close();}
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 客户端核心参数与客户端机制

3.1 消费者分组消费机制

3.2 生产者拦截器机制

3.3 消息序列化机制

3.4 消息分区路由机制

3.5 生产者消息缓存机制

3.6 发送应答机制

3.7 生产者消息幂等性

3.8 生产者消息事务

内容更新中

相关文章:

kafka之java客户端实战

1. kafka的客户端 Kafka提供了两套客户端API&#xff0c;HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节&#xff0c;使用起来比较简单&#xff0c;是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节&#xff0c;Pa…...

图解渠道网关:不只是对接渠道的接口(一)

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;20&#xff09;篇。点击上方关注&#xff0c;深入了解支付系统的方方面面。 主要讲清楚什么是渠道&#xff0c;有哪些类型的渠道&#xff0c;什么是渠道网关&#xff0c;渠道网关在支付系统中定位、核心功能…...

【js版数据结构学习之队列】

队列 一、简要认识队列二、队列的封装三、队列的应用1.栈和队列的转换2.全排列3.任务调度4.缓存管理 一、简要认识队列 结构&#xff1a;一种特殊的线性表 入队&#xff1a;在队尾插入一个元素 出队&#xff1a;在队头删除一个元素 特点&#xff1a;先入先出 空队列&#xff1…...

iOS Xcode 升级Xcode15报错: SDK does not contain ‘libarclite‘

iOS Xcode 升级Xcode15报错: SDK does not contain libarclite 一、仔细查看报错代码: SDK does not contain libarclite at the path /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/ lib/arc/libarclite_iphonesimulator.a; try in…...

python高级练习题库实验2(B)部分

文章目录 题目1代码实验结果题目2代码实验结果题目总结题目1 注册课程小游戏程序 研究下面的例子,并编写一个与这些例子完全相同的程序。使用for loop和break来解决问题。提示用户输入课程数量,是否选择,并且课程代码,最后还需显示已经完成的课程注册数量或者未完成的注册…...

vue项目编译非常慢,经常卡在某个百分点

1、注册插件 2、在项目根目录下的 babel.config.js 文件中加入下方配置 3、将import导入方式改为require导入方式&#xff0c;返回promise 4、如果动态加载组件import引入组件找不到组件&#xff08;Error: Cannot find module&#xff09; 使用 webpack 的 require.ensure() …...

开源知识库zyplayer-doc部署指南

1.前置条件 docker已经安装 mysql已经安装且数据库zyplayer-doc存在 服务器ip:192.168.168.99/ 数据库账户:root,密码:123456 2.拉取镜像 docker pull zyplayer/zyplayer-doc:latest 3.启动 docker run -d \--restart unless-stopped \--name zyplayer-doc \-p 8083:8083 …...

第90讲:MySQL数据库主从复制集群原理概念以及搭建流程

文章目录 1.MySQL主从复制集群的核心概念1.1.什么是主从复制集群1.2.主从复制集群中的专业术语1.3.主从复制集群工作原理1.4.主从复制中的小细节1.5.搭建主从复制集群的前提条件1.6.MySQL主从复制集群的架构信息 2.搭建MySQL多实例环境2.1.在mysql-1中搭建身为主库的MySQL实例2…...

PHP反序列化漏洞-魔术方法绕过

一、__wakeup()魔法函数绕过: 在PHP中,__wakeup()是一个魔术方法,用于在反序列化对象时自动调用。当反序列化字符串中的对象属性个数大于实际属性个数时,可以利用这个漏洞进行绕过。 触发条件: PHP版本为5.6.25或早期版本,或者PHP7版本小于7.0.10。反序列化字符串中的对…...

抖店和商品橱窗的区别?这两个千万别再搞混了!

我是电商珠珠 很多人都会将抖店和商品橱窗搞混&#xff0c;想开抖店的人开了商品橱窗&#xff0c;想开橱窗的人开通了抖店。 我做抖店三年了&#xff0c;这种情况屡见不鲜。 那么抖店和商品橱窗究竟有什么区别呢&#xff1f; 1、属性不同 商品橱窗是抖音所展现商品的一个功…...

个人总结钉钉7.5新品发布会

钉钉发布了 7.5 版本&#xff0c;最主要推出了围绕AI能力的各项升级&#xff0c;通过AI“超级助理”提升组织内部的沟通协作效率、管理决策智能化程度&#xff0c;以及相关的音视频、在线文档、Teambition功能的升级&#xff0c;以满足不同企业的多元化需求。截至发布会&#x…...

连接超时的问题

连接超时的问题 通用第三方工具连接超时 connect timeout 方案一&#xff1a; /etc/ssh/sshd_config node1上操作&#xff0c;图是错的 方案二&#xff1a; windows上Hosts文件域名解析有问题 比如&#xff1a; 192.168.xx.100 node1 192.168.xx.161 node1 两个都解析成node…...

python贪吃蛇游戏

为了实现这个游戏&#xff0c;需要用到Python的pygame模块&#xff0c;它是一个专门用于开发游戏的模块&#xff0c;提供了很多方便的功能&#xff0c;比如窗口、图形、音效、事件处理等。 用pygame来创建一个窗口&#xff0c;设置游戏的背景色&#xff0c;画出蛇和食物&#…...

【Spring】Spring AOP

文章目录 前言1. 什么是 AOP2. 什么是 Spring AOP3. Spring AOP 的使用引入 AOP 依赖编写 AOP 程序 4. Spring AOP 详解4.1 Spring AOP 的概念4.1.1 切点4.1.2 连接点4.1.3 通知4.1.4 切面 4.2 通知类型4.3 切点4.4 切面优先级 Order注解4.5 切点表达式4.5.1 execution 切点表达…...

软件开发架构

【 一 】软件开发架构图 【 1】ATM和选课系统 三层的开发架构 前段展示台 后端逻辑层 数据处理层 【二】软件开发架构的步骤流程 需求分析&#xff1a;在软件开发架构设计之前&#xff0c;需要对应用系统进行需求分析&#xff0c;明确用户需求、功能模块、业务流程等内容。…...

计图大模型推理库部署指南,CPU跑大模型,具有高性能、配置要求低、中文支持好、可移植等特点

Excerpt 计图大模型推理库,具有高性能、配置要求低、中文支持好、可移植等特点 计图大模型推理库,具有高性能、配置要求低、中文支持好、可移植等特点 计图大模型推理库 - 笔记本没有显卡也能跑大模型 本大模型推理库JittorLLMs有以下几个特点: 成本低:相比同类框架,本库…...

CSS||Emmet语法

1、简介 ​ Emmet语法的前身是Zen coding,它使用缩写,来提高html/css的编写速度, Vscode内部已经集成该语法。 ​ 快速生成HTML结构语法 ​ 快速生成CSS样式语法 2、快速生成HTML结构语法 生成标签 直接输入标签名 按tab键即可 比如 div 然后tab 键&#xff0c; 就可以生成 <…...

Android中的anr定位指导与建议

1.背景 8月份安卓出现了一次直播间卡死(ANR)问题&#xff0c;且由于排查难度较大&#xff0c;持续了较长时间。本文针对如何快速定位安卓端出现ANR问题进行总结和探讨. 这里大致补充一下当时的情况,当时看到情景的是从某一个特定的场景下进入直播间后整个直播间界面立刻就卡住…...

YOLOV7剪枝流程

YOLOV7剪枝流程 1、训练 1&#xff09;划分数据集进行训练前的准备&#xff0c;按正常的划分流程即可 2&#xff09;修改train.py文件 第一次处在参数列表里添加剪枝的参数&#xff0c;正常训练时设置为False&#xff0c;剪枝后微调时设置为True parser.add_argument(--pr…...

【React】组件性能优化、高阶组件

文章目录 React性能优化SCUReact更新机制keys的优化render函数被调用shouldComponentUpdatePureComponentshallowEqual方法高阶组件memo 获取DOM方式refs如何使用refref的类型 受控和非受控组件认识受控组件非受控组件 React的高阶组件认识高阶函数高阶组件的定义应用一 – pro…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist

现象&#xff1a; android studio报错&#xff1a; [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决&#xff1a; 不要动CMakeLists.…...

git: early EOF

macOS报错&#xff1a; Initialized empty Git repository in /usr/local/Homebrew/Library/Taps/homebrew/homebrew-core/.git/ remote: Enumerating objects: 2691797, done. remote: Counting objects: 100% (1760/1760), done. remote: Compressing objects: 100% (636/636…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...