Kafka-代码示例
一、构建开发环境
File > New > Project

选择一个最简单的模板

项目和坐标命名

配置maven路径

添加maven依赖
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.1</version></dependency> </dependencies>
加载刚刚添加的依赖

此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了

二、创建一个新的topic
kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092
![]()
![]()
三、编写生产者
kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了
package org.example.kafkaStudy;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;public class KafkaProducerDemo {public static void main(String[] args) {try{//topic名称String topicName = "kafka-study";//broker列表String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";//向topic打多少数据int numRecords = 10000;//是否异步推送数据boolean isAsync = true;int key = 0;int sentRecords = 0;//创建生产者KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);//判断是否达到生产要求while (sentRecords < numRecords) {if (isAsync) {//异步推送asyncSend(producer,topicName, key, "test" + key,sentRecords);} else {//同步推送syncSend(producer,topicName, key, "test" + key,sentRecords);}key++;sentRecords++;}producer.close();} catch (Throwable e) {e.printStackTrace();}}private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)throws ExecutionException, InterruptedException {try {// 发送记录,然后调用get,这会阻止等待来自broker的ackRecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();Utils.maybePrintRecord(sentRecords, key, value, metadata);return metadata;} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException| OutOfOrderSequenceException | SerializationException e) {Utils.printErr(e.getMessage());} catch (KafkaException e) {Utils.printErr(e.getMessage());}return null;}private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {//异步发送记录,设置一个回调以通知结果。//请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));}private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {Properties props = new Properties();// 生产者连接到broker需要引导服务器配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 设置序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);if (transactionTimeoutMs > 0) {// 事务协调器主动中止正在进行的事务之前的最长时间props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);}if (transactionalId != null) {// 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);}// 在分区级别启用重复保护props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);return new KafkaProducer<>(props);}static class ProducerCallback implements Callback {private final int key;private final int sentRecords;private final String value;public ProducerCallback(int key, String value,int sentRecords) {this.key = key;this.sentRecords = sentRecords;this.value = value;}/*** 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,* 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。** @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。* @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。*/public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {Utils.printErr(exception.getMessage());if (!(exception instanceof RetriableException)) {// 我们无法从这些异常中恢复过来}} else {Utils.maybePrintRecord(sentRecords, key, value, metadata);}}}
}
四、编写消费者
package org.example.kafkaStudy;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;public class KafkaConsumerDemo {public static void main(String[] args) {//topic名称String topicName = "kafka-study";//组名称String groupName = "my-group-1";//broker列表String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";//向topci打多少数据int numRecords = 10000;int remainingRecords = 10000;// 消费来自 topic = kafka-study 的数据KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);//订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知consumer.subscribe(singleton(topicName));Utils.printOut("Subscribed to %s", topicName);while (remainingRecords > 0) {try {// 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。// 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<Integer, String> record : records) {Utils.maybePrintRecord(numRecords, record);}remainingRecords -= records.count();} catch (AuthorizationException | UnsupportedVersionExceptione) {// 我们无法从这些异常中恢复过来Utils.printErr(e.getMessage());} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {// 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效Utils.printOut("Invalid or no offset found, using latest");consumer.seekToEnd(e.partitions());consumer.commitSync();} catch (KafkaException e) {// 记录异常并尝试继续Utils.printErr(e.getMessage());}}consumer.close();Utils.printOut("Fetched %d records", numRecords - remainingRecords);}private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,String groupId , boolean readCommitted) {Properties props = new Properties();// 消费者连接到broker需要引导服务器配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 当我们使用订阅(topic)进行组管理时,需要消费者groupIdprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//设置静态成员资格以提高可用性(例如滚动重启)
// instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));//启用EOS时禁用自动提交,因为偏移量与事务一起提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");//读取数据用到的反序列化器,需要和生产者对应props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);if (readCommitted) {// 跳过正在进行和已中止的事务props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");}// 在偏移无效或没有偏移的情况下设置重置偏移策略props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return new KafkaConsumer<>(props);}
}
五、运行程序
生产者日志打印
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)......
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2).......
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)
我们再次用命令看下每个分区的offset
![]()
消费者日志打印
main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records
六、问题说明
从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?
在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。
消费者也是从brokers一批一批的拉取数据来消费的
我们也可以看下broker的日志中数据的索引情况

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10

从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。
相关文章:
Kafka-代码示例
一、构建开发环境 File > New > Project 选择一个最简单的模板 项目和坐标命名 配置maven路径 添加maven依赖 <dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kaf…...
LLVM - 编译器前端-llvm 基本块、指令、函数 的关系
一:基础概念: 在 LLVM 中,基本块、指令和函数是构建中间表示(IR)的核心概念,它们之间有着紧密的关系,首先了解下基本概念。 1. 基本块(Basic Block) 定义:基本块是一个不包含任何跳转指令的线性代码段,执行顺序是从头到尾。每个基本块至少有一个入口和一个出口。特…...
探索人工智能在自然语言处理中的应用
探索人工智能在自然语言处理中的应用 前言1. 机器翻译2. 情感分析3. 智能客服4. 文本生成未来展望 结语 前言 在信息爆炸的时代,自然语言处理(NLP)作为人工智能(AI)的一个重要分支,正以前所未有的速度改变着…...
IFC模型文本的含义
以下代码是一个STEP文件(ISO-10303-21标准),它是一种用于表示产品数据的国际标准。STEP文件通常用于在不同的计算机辅助设计(CAD)系统之间交换数据。下面是对这段代码的逐行解释: HEADER部分: …...
构建高效评奖系统:SpringBoot在教育领域的应用
摘要 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了学生评奖评优管理系统的开发全过程。通过分析学生评奖评优管理系统管理的不足,创建了一个计算机管理学生评奖评优管理系统的方案。文章介绍了学生评奖…...
「二叉树进阶题解:构建、遍历与结构转化全解析」
文章目录 根据二叉树创建字符串思路代码 二叉树的层序遍历思路代码 二叉树的最近公共祖先思路代码 二叉搜索树与双向链表思路代码 从前序与中序遍历序列构造二叉树思路代码 总结 根据二叉树创建字符串 题目: 样例: 可以看见,唯一特殊的就…...
在使用代理IP时,需要注意以下几点:
1. 代理IP的质量和稳定性直接影响爬虫的效果。因此,我们需要定期更新代理IP列表,并筛选出可用的代理IP。 2. 有些代理IP可能存在被目标网站封禁的风险。因此,我们需要合理使用代理IP,避免过度频繁地访问目标网站。 3. 在使用代…...
深入理解Java基础概念的高级应用(1/5)
目录 1. Java内存模型:堆、栈与方法区 示例代码:对象存储位置 2. 类加载器的工作原理 示例代码:自定义类加载器 3. JVM如何执行字节码 字节码指令示例 4. Java基础数据类型的存储与操作 自动装箱与拆箱 示例代码:基础类型…...
高可用HA软件
高可用HA(High Availability)软件在分布式系统架构设计中至关重要,它们能够减少系统停机时间,确保应用程序持久、不间断地提供服务。以下是四款常用的高可用HA软件介绍: Keepalived Keepalived起初是为LVS(…...
《近似线性可分支持向量机的原理推导》 拉格朗日函数 公式解析
本文是将文章《近似线性可分支持向量机的原理推导》中的公式单独拿出来做一个详细的解析,便于初学者更好的理解。 公式 9-41 解释: L ( w , b , ξ , α , μ ) 1 2 ∥ w ∥ 2 C ∑ i 1 N ξ i − ∑ i 1 N α i ( y i ( w T x i b ) − ( 1 − ξ …...
9.指针和字符串string类型
指针和字符串string类型 1.指针2.字符串string类型 1.指针 C完全兼容C语言指针,C多出一个this指针 交换两数 #include <iostream>using namespace std;void swap(int *a,int *b){int temp;temp *a;*a *b;*b temp; }int main() {//交换前int a 50;int b …...
八,Linux基础环境搭建(CentOS7)- 安装Mysql和Hive
Linux基础环境搭建(CentOS7)- 安装Mysql和Hive 大家注意以下的环境搭建版本号,如果版本不匹配有可能出现问题! 一、Mysql下载及安装 MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Orac…...
海量数据面试题
⭐️前言⭐️ 本篇文章主要针对在面试时可能涉及到的海量数据的面试题,该类型面试题常常考虑通过位图、布隆过滤器或者哈希的方式来解决。 🍉欢迎点赞 👍 收藏 ⭐留言评论 🍉博主将持续更新学习记录收获,友友们有任何…...
基于SSM积分商城管理系统的设计与实现(源码+lw+部署文档+讲解等)
前言 伴随着基础网络设施的不断进步和终端电子设备的高度普及,互联网用户规模越来越大。现在人们越来越离不开计算机网络、互联网所带来的好处了,现如今不同的网站系统遍地都是,现在已经不同于以往的传统的管理方式了,只有跟上时代…...
MLP预售开启,革新去中心化通信生态:智能手机与AI Agent齐上阵
2024年10月22日,Matrix Layer Protocol(MLP)宣布其备受期待的第一期产品正式进入预售阶段。随着Web3世界的不断发展,去中心化技术已经深入到我们日常生活的方方面面。作为Web3世界中炙手可热的创新项目,Matrix Layer P…...
js获取浏览器指纹
Canvas指纹法 来源:https://www.cnblogs.com/leijing0607/p/8044218.html 从根本上来说,每一种浏览器都会使用不同的图像处理引擎,不同的导出选项,不同的压缩等级,所以每一台电脑绘制出的图形都会有些许不同…...
乐尚代驾的项目问题
订单状态如果在流转的过程中卡住了,怎么办? 卡住的原因有可能是: 网络问题 网络不稳定或中断可能导致订单状态更新的请求无法及时发送或接收。例如,司机端在更新代驾车辆信息时,如果网络出现故障,可能无法…...
uniapp app.onshow 和 onMounted一样用吗
在uni-app中,onShow和onMounted并不完全相同,它们分别属于应用生命周期和组件生命周期。 应用生命周期中的onShow 在uni-app中,onShow是应用生命周期的一部分,它会在应用启动或从后台进入前台时触发。这意味着它不仅仅局限于页…...
基于Mysql、JavaScript、PHP、ajax开发的MBTI性格测试网站(前端+后端)
源码地址:https://download.csdn.net/download/2302_79553009/89933699 项目简介 本项目旨在构建一个基于MBTI(迈尔斯-布里格斯性格分类指标)理论的在线平台——“16Personalities”。该平台利用PHP、MySQL、JavaScript等技术栈开发…...
【问题解决】连接mysql时报错caching_sha2_password can not load
一, 问题 在连接Mysql时报错, caching_sha2_password can not load 二,问题原因 报错信息 "caching_sha2_password can not load" 通常出现在尝试连接到使用 MySQL 8.0 或更高版本的数据库时,因为从 MySQL 8.0 开始&a…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
Rust 开发环境搭建
环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu 2、Hello World fn main() { println…...
MySQL的pymysql操作
本章是MySQL的最后一章,MySQL到此完结,下一站Hadoop!!! 这章很简单,完整代码在最后,详细讲解之前python课程里面也有,感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...
用递归算法解锁「子集」问题 —— LeetCode 78题解析
文章目录 一、题目介绍二、递归思路详解:从决策树开始理解三、解法一:二叉决策树 DFS四、解法二:组合式回溯写法(推荐)五、解法对比 递归算法是编程中一种非常强大且常见的思想,它能够优雅地解决很多复杂的…...
