kafka学习-概念与简单实战
目录
1、核心概念
消息和批次
Topic和Partition
Replicas
Offset
broker和集群
生产者和消费者
2、开发实战
2.1、消息发送
介绍
代码实现
2.2、消息消费
介绍
代码实现
2.3、SpringBoot Kafka
pom
application.yaml
KafkaConfig
producer
consumer
1、核心概念
消息和批次
kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。
批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。
Topic和Partition
topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。
partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)
主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。
主题的分区策略有如下几种:
- 直接指定分区;
- 根据消息的key散列取模得出分区;
- 轮询指定分区。

Replicas
- 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
- 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
- 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
- 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。
Offset
偏移量。
生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。
消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。
broker和集群

broker为一个独立的kafka服务器;一个kafka集群里有多个broker。
broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。
集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。
每个集群都有一个broker是集群控制器,作用如下:
- 将分区分配给首领分区的broker;
- 监控broker,首领分区切换
生产者和消费者

生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:
- 直接指定分区;
- 根据消息的key散列取模得出分区;
- 轮询指定分区。
消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。
2、开发实战
2.1、消息发送
介绍
- 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
- 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。


代码实现
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {Map<String, Object> configs = new HashMap<>();// 设置连接Kafka的初始连接⽤到的服务器地址// 如果是集群,则可以通过此初始连接发现集群中的其他brokerconfigs.put("bootstrap.servers", "node1:9092");// 设置key和value的序列化器configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于封装Producer的消息ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", // 主题名称0, // 分区编号,现在只有⼀个分区,所以是00, // 数字作为key"message 0" // 字符串作为value);// 发送消息,同步等待消息的确认// producer.send(record).get(3_000, TimeUnit.MILLISECONDS);// 使用回调异步等待消息的确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题:" + metadata.topic() + "\n"+ "分区:" + metadata.partition() + "\n"+ "偏移量:" + metadata.offset() + "\n"+ "序列化的key字节:" + metadata.serializedKeySize() + "\n"+ "序列化的value字节:" + metadata.serializedValueSize() + "\n"+ "时间戳:" + metadata.timestamp());} else {System.out.println("有异常:" + exception.getMessage());}}});// 关闭连接producer.close();
}
2.2、消息消费
介绍
消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:

代码实现
public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。configs.put("bootstrap.servers", "node1:9092");// key和value的反序列化器configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "consumer.demo");// 创建消费者对象KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);final Pattern pattern = Pattern.compile("topic_[0-9]");// 消费者订阅主题或分区// consumer.subscribe(pattern);// consumer.subscribe(pattern, new ConsumerRebalanceListener() {final List<String> topics = Arrays.asList("topic_1");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println("剥夺的分区:" + tp.partition());}); }@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println(tp.partition());});}});// 拉取订阅主题的消息final ConsumerRecords<Integer, String> records = consumer.poll(3_000);// 获取topic_1主题的消息final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");// 遍历topic_1主题的消息topic1Iterable.forEach(record -> {System.out.println("========================================");System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的值:" + record.value());System.out.println("消息的主题:" + record.topic());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的偏移量:" + record.offset());});// 关闭消费者consumer.close();
}
2.3、SpringBoot Kafka
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>
application.yaml
spring:kafka:bootstrap-servers: node1:9092 # 用于建立初始连接的broker地址producer:key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 16384 # 默认的批处理记录数buffer-memory: 33554432 # 32MB的总发送缓存consumer:key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: spring-kafka-02-consumer # consumer的消费组idenable-auto-commit: true # 是否自动提交消费者偏移量auto-commit-interval: 100 # 每隔100ms向broker提交一次偏移量auto-offset-reset: earliest # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
KafkaConfig
@Configuration
public class KafkaConfig {@Beanpublic NewTopic topic1() {return new NewTopic("ntp-01", 5, (short) 1);}@Beanpublic NewTopic topic2() {return new NewTopic("ntp-02", 3, (short) 1);}
}
producer
@RestController
public class KafkaSyncProducerController {@Autowiredprivate KafkaTemplate template;@RequestMapping("send/sync/{message}")public String sendSync(@PathVariable String message) {ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));try {// 同步等待broker的响应Object o = future.get();SendResult<Integer, String> result = (SendResult<Integer, String>) o;System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}
}@RestController
public class KafkaAsyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/async/{message}")public String asyncSend(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);ListenableFuture<SendResult<Integer, String>> future = template.send(record);// 添加回调,异步等待响应future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送失败: " + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());}});return "success";}
}
consumer
@Component
public class MyConsumer {@KafkaListener(topics = "topic-spring-02")public void onMessage(ConsumerRecord<Integer, String> record) {Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);if (optional.isPresent()) {System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());}}
}
以上内容为个人学习理解,如有问题,欢迎在评论区指出。
部分内容截取自网络,如有侵权,联系作者删除。
相关文章:
kafka学习-概念与简单实战
目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…...
爬虫进阶-反爬破解5(selenium的优势和点击操作+chrome的远程调试能力+通过Chrome隔离实现一台电脑登陆多个账号)
目录 一、selenium的优势和点击操作 二、chrome的远程调试能力 三、通过Chrome隔离实现一台电脑登陆多个账号 一、selenium的优势和点击操作 1.环境搭建 工具:Chrome浏览器chromedriverselenium win用户:chromedriver.exe放在python.exe旁边 MacO…...
音视频编码格式-AAC ADT
例子:config 1408 1408(16进制) : 0001 0100 0000 1000 audioObjectType(5bit)为 00010 , 即 2, profie (audioObjectType -1 ) AAC LC samplingFrequencyIndex (4bit) 为 1000 , 即 8 , 对应的采样频率为 16000 channelConfiguration (…...
【计算机网络】网络编程接口 Socket API 解读(3)
Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具,和其他平台(比如 os-x …...
kafka知识小结
1.为什么分区数只能增加,不能减少? 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 另外实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需…...
算法刷题记录-DP(LeetCode)
746. Min Cost Climbing Stairs 代码 int minCostClimbingStairs(vector<int>& cost) {if (cost.size()<2){return 0;}int cache[cost.size()1];cache[0]0;cache[1]0;for (int i 2; i < cost.size(); i) {cache[i] min(cache[i-2]cost[i-2],cache[i-1]cost[i…...
Springboot整合Neo4J图数据库
1.引入依赖 JDK11, neo4J4.4.23 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent …...
Unity 2018发布在iOS 16.3偶尔出现画面不动的问题
1)Unity 2018发布在iOS 16.3偶尔出现画面不动的问题 2)IL2CPP在Xcode下增量编译问题 3)帧同步实现PuppetMaster布娃娃系统的问题 这是第351篇UWA技术知识分享的推送,精选了UWA社区的热门话题,涵盖了UWA问答、社区帖子等…...
蠕虫病毒流量分析案例
背景 某供排水集团的网络管理员对其网络的健康状况持认可态度,表示网络运行正常,没有发现异常行为。然而,由于网络环境变得越来越复杂,仅凭借传统的网络经验已经不能全面了解网络情况。因此,我们为供排水集团安装了Ne…...
Transformer(一)—— Attention Batch Normalization
Transformer详解 一、RNN循环神经网络二、seq2seq模型三、Attention(注意力机制)四、Transformer4.1 self attention4.2 self-attention的变形——Multi-head Self-attention4.3 Masked Attention4.4 Positional Encoding4.5 Batch Normalization4.6 Lay…...
2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策
# 1 赛题 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…...
【C++漂流记】一文搞懂类与对象的封装
本篇文章主要说明了类与对象中封装的有关知识,包括属性和行为作为整体、访问权限、class与struct的区别、成员属性的私有化,希望这篇文章可以帮助你更好的了解类与对象这方面的知识。 文章目录 一、属性和行为作为整体二、访问权限三、class与struct的区…...
ctfshow 反序列化
PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的,序列化就是把对象转化为字符串以便存储和传输,反序列化就是将字符串转化为对象 魔术方法 __construct() //构造,当对象new时调用 __wakeup() //执行unserialize()时&am…...
数据结构:线性表之-单向链表(无头)
目录 什么是单向链表 顺序表和链表的区别和联系 顺序表: 链表: 链表表示(单项)和实现 1.1 链表的概念及结构 1.2单链表(无头)的实现 所用文件 将有以下功能: 链表定义 创建新链表元素 尾插 头插 尾删 头删 查找-给一个节点的…...
为IT服务台构建自定义Zia操作
Zia是manageengine的商业人工智能助手,是ServiceDesk Plus Cloud的虚拟会话支持代理。使用Zia,您可以优化帮助台管理,还可以缩小最终用户与其帮助台之间的差距,Zia通过执行预配置的操作来帮助用户完成他们的服务台任务。 例如&…...
【C/C++】BMP格式32位转24位
问题 如题 解决方法 bmp文件格式参考:【C/C++】BITMAP格式分析_vc++ bitmap头文件_sunriver2000的博客-CSDN博客BITMAP文件大体上分成四个部分,如下表所示。文件部分长度(字节)位图文件头 Bitmap File Header14位图信息数据头 Bitmap Info Header40调色板 Palette4*n (n≥…...
合宙Air724UG LuatOS-Air LVGL API控件-滑动条 (Slider)
滑动条 (Slider) 滑动条看起来和进度条是有些是有些像,但不同的是滑动条可以进行数值选择。 示例代码 -- 回调函数 slider_event_cb function(obj, event)if event lvgl.EVENT_VALUE_CHANGED then local val (lvgl.slider_get_value(obj) or "0")..&…...
SQLAlchemy 封装的工具类,数据库pgsql(数据库连接池)
1.SQLAlchemy是什么? SQLAlchemy 是 Python 著名的 ORM 工具包。通过 ORM,开发者可以用面向对象的方式来操作数据库,不再需要编写 SQL 语句。 SQLAlchemy 支持多种数据库,除 sqlite 外,其它数据库需要安装第三方驱动。…...
【Git】Git 基础
Git 基础 参考 Git 中文文档 — https://git-scm.com/book/zh/v2 1.介绍 Git 是目前世界上最先进的分布式版本控制系统,有这么几个特点: 分布式:是用来保存工程源代码历史状态的命令行工具保存点:保存点可以追溯源码中的文件…...
腾讯云AI绘画:探究AI创意与技术的新边界
目录 一、2023的“网红词汇”——AI绘画二、智能文生图1、智能文生图的应用场景2、风格和配置的多样性3、输入一段话,腾讯云AI绘画给你生成一张图4、文本描述生成图像,惊艳全场 三、智能图生图:重新定义图像美学1、智能图生图的多元应用场景2…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...
xmind转换为markdown
文章目录 解锁思维导图新姿势:将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件(ZIP处理)2.解析JSON数据结构3:递归转换树形结构4:Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...
