kafka学习-概念与简单实战
目录
1、核心概念
消息和批次
Topic和Partition
Replicas
Offset
broker和集群
生产者和消费者
2、开发实战
2.1、消息发送
介绍
代码实现
2.2、消息消费
介绍
代码实现
2.3、SpringBoot Kafka
pom
application.yaml
KafkaConfig
producer
consumer
1、核心概念
消息和批次
kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。
批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。
Topic和Partition
topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。
partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)
主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。
主题的分区策略有如下几种:
- 直接指定分区;
- 根据消息的key散列取模得出分区;
- 轮询指定分区。
Replicas
- 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
- 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
- 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
- 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。
Offset
偏移量。
生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。
消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。
broker和集群
broker为一个独立的kafka服务器;一个kafka集群里有多个broker。
broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。
集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。
每个集群都有一个broker是集群控制器,作用如下:
- 将分区分配给首领分区的broker;
- 监控broker,首领分区切换
生产者和消费者
生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:
- 直接指定分区;
- 根据消息的key散列取模得出分区;
- 轮询指定分区。
消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。
2、开发实战
2.1、消息发送
介绍
- 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
- 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。
代码实现
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {Map<String, Object> configs = new HashMap<>();// 设置连接Kafka的初始连接⽤到的服务器地址// 如果是集群,则可以通过此初始连接发现集群中的其他brokerconfigs.put("bootstrap.servers", "node1:9092");// 设置key和value的序列化器configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于封装Producer的消息ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", // 主题名称0, // 分区编号,现在只有⼀个分区,所以是00, // 数字作为key"message 0" // 字符串作为value);// 发送消息,同步等待消息的确认// producer.send(record).get(3_000, TimeUnit.MILLISECONDS);// 使用回调异步等待消息的确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题:" + metadata.topic() + "\n"+ "分区:" + metadata.partition() + "\n"+ "偏移量:" + metadata.offset() + "\n"+ "序列化的key字节:" + metadata.serializedKeySize() + "\n"+ "序列化的value字节:" + metadata.serializedValueSize() + "\n"+ "时间戳:" + metadata.timestamp());} else {System.out.println("有异常:" + exception.getMessage());}}});// 关闭连接producer.close();
}
2.2、消息消费
介绍
消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:
代码实现
public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。configs.put("bootstrap.servers", "node1:9092");// key和value的反序列化器configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "consumer.demo");// 创建消费者对象KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);final Pattern pattern = Pattern.compile("topic_[0-9]");// 消费者订阅主题或分区// consumer.subscribe(pattern);// consumer.subscribe(pattern, new ConsumerRebalanceListener() {final List<String> topics = Arrays.asList("topic_1");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println("剥夺的分区:" + tp.partition());}); }@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println(tp.partition());});}});// 拉取订阅主题的消息final ConsumerRecords<Integer, String> records = consumer.poll(3_000);// 获取topic_1主题的消息final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");// 遍历topic_1主题的消息topic1Iterable.forEach(record -> {System.out.println("========================================");System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的值:" + record.value());System.out.println("消息的主题:" + record.topic());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的偏移量:" + record.offset());});// 关闭消费者consumer.close();
}
2.3、SpringBoot Kafka
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>
application.yaml
spring:kafka:bootstrap-servers: node1:9092 # 用于建立初始连接的broker地址producer:key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 16384 # 默认的批处理记录数buffer-memory: 33554432 # 32MB的总发送缓存consumer:key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: spring-kafka-02-consumer # consumer的消费组idenable-auto-commit: true # 是否自动提交消费者偏移量auto-commit-interval: 100 # 每隔100ms向broker提交一次偏移量auto-offset-reset: earliest # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
KafkaConfig
@Configuration
public class KafkaConfig {@Beanpublic NewTopic topic1() {return new NewTopic("ntp-01", 5, (short) 1);}@Beanpublic NewTopic topic2() {return new NewTopic("ntp-02", 3, (short) 1);}
}
producer
@RestController
public class KafkaSyncProducerController {@Autowiredprivate KafkaTemplate template;@RequestMapping("send/sync/{message}")public String sendSync(@PathVariable String message) {ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));try {// 同步等待broker的响应Object o = future.get();SendResult<Integer, String> result = (SendResult<Integer, String>) o;System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}
}@RestController
public class KafkaAsyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/async/{message}")public String asyncSend(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);ListenableFuture<SendResult<Integer, String>> future = template.send(record);// 添加回调,异步等待响应future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送失败: " + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());}});return "success";}
}
consumer
@Component
public class MyConsumer {@KafkaListener(topics = "topic-spring-02")public void onMessage(ConsumerRecord<Integer, String> record) {Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);if (optional.isPresent()) {System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());}}
}
以上内容为个人学习理解,如有问题,欢迎在评论区指出。
部分内容截取自网络,如有侵权,联系作者删除。
相关文章:

kafka学习-概念与简单实战
目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…...

爬虫进阶-反爬破解5(selenium的优势和点击操作+chrome的远程调试能力+通过Chrome隔离实现一台电脑登陆多个账号)
目录 一、selenium的优势和点击操作 二、chrome的远程调试能力 三、通过Chrome隔离实现一台电脑登陆多个账号 一、selenium的优势和点击操作 1.环境搭建 工具:Chrome浏览器chromedriverselenium win用户:chromedriver.exe放在python.exe旁边 MacO…...

音视频编码格式-AAC ADT
例子:config 1408 1408(16进制) : 0001 0100 0000 1000 audioObjectType(5bit)为 00010 , 即 2, profie (audioObjectType -1 ) AAC LC samplingFrequencyIndex (4bit) 为 1000 , 即 8 , 对应的采样频率为 16000 channelConfiguration (…...
【计算机网络】网络编程接口 Socket API 解读(3)
Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具,和其他平台(比如 os-x …...
kafka知识小结
1.为什么分区数只能增加,不能减少? 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 另外实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需…...
算法刷题记录-DP(LeetCode)
746. Min Cost Climbing Stairs 代码 int minCostClimbingStairs(vector<int>& cost) {if (cost.size()<2){return 0;}int cache[cost.size()1];cache[0]0;cache[1]0;for (int i 2; i < cost.size(); i) {cache[i] min(cache[i-2]cost[i-2],cache[i-1]cost[i…...
Springboot整合Neo4J图数据库
1.引入依赖 JDK11, neo4J4.4.23 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent …...
Unity 2018发布在iOS 16.3偶尔出现画面不动的问题
1)Unity 2018发布在iOS 16.3偶尔出现画面不动的问题 2)IL2CPP在Xcode下增量编译问题 3)帧同步实现PuppetMaster布娃娃系统的问题 这是第351篇UWA技术知识分享的推送,精选了UWA社区的热门话题,涵盖了UWA问答、社区帖子等…...

蠕虫病毒流量分析案例
背景 某供排水集团的网络管理员对其网络的健康状况持认可态度,表示网络运行正常,没有发现异常行为。然而,由于网络环境变得越来越复杂,仅凭借传统的网络经验已经不能全面了解网络情况。因此,我们为供排水集团安装了Ne…...

Transformer(一)—— Attention Batch Normalization
Transformer详解 一、RNN循环神经网络二、seq2seq模型三、Attention(注意力机制)四、Transformer4.1 self attention4.2 self-attention的变形——Multi-head Self-attention4.3 Masked Attention4.4 Positional Encoding4.5 Batch Normalization4.6 Lay…...

2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策
# 1 赛题 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…...

【C++漂流记】一文搞懂类与对象的封装
本篇文章主要说明了类与对象中封装的有关知识,包括属性和行为作为整体、访问权限、class与struct的区别、成员属性的私有化,希望这篇文章可以帮助你更好的了解类与对象这方面的知识。 文章目录 一、属性和行为作为整体二、访问权限三、class与struct的区…...

ctfshow 反序列化
PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的,序列化就是把对象转化为字符串以便存储和传输,反序列化就是将字符串转化为对象 魔术方法 __construct() //构造,当对象new时调用 __wakeup() //执行unserialize()时&am…...

数据结构:线性表之-单向链表(无头)
目录 什么是单向链表 顺序表和链表的区别和联系 顺序表: 链表: 链表表示(单项)和实现 1.1 链表的概念及结构 1.2单链表(无头)的实现 所用文件 将有以下功能: 链表定义 创建新链表元素 尾插 头插 尾删 头删 查找-给一个节点的…...

为IT服务台构建自定义Zia操作
Zia是manageengine的商业人工智能助手,是ServiceDesk Plus Cloud的虚拟会话支持代理。使用Zia,您可以优化帮助台管理,还可以缩小最终用户与其帮助台之间的差距,Zia通过执行预配置的操作来帮助用户完成他们的服务台任务。 例如&…...
【C/C++】BMP格式32位转24位
问题 如题 解决方法 bmp文件格式参考:【C/C++】BITMAP格式分析_vc++ bitmap头文件_sunriver2000的博客-CSDN博客BITMAP文件大体上分成四个部分,如下表所示。文件部分长度(字节)位图文件头 Bitmap File Header14位图信息数据头 Bitmap Info Header40调色板 Palette4*n (n≥…...

合宙Air724UG LuatOS-Air LVGL API控件-滑动条 (Slider)
滑动条 (Slider) 滑动条看起来和进度条是有些是有些像,但不同的是滑动条可以进行数值选择。 示例代码 -- 回调函数 slider_event_cb function(obj, event)if event lvgl.EVENT_VALUE_CHANGED then local val (lvgl.slider_get_value(obj) or "0")..&…...

SQLAlchemy 封装的工具类,数据库pgsql(数据库连接池)
1.SQLAlchemy是什么? SQLAlchemy 是 Python 著名的 ORM 工具包。通过 ORM,开发者可以用面向对象的方式来操作数据库,不再需要编写 SQL 语句。 SQLAlchemy 支持多种数据库,除 sqlite 外,其它数据库需要安装第三方驱动。…...

【Git】Git 基础
Git 基础 参考 Git 中文文档 — https://git-scm.com/book/zh/v2 1.介绍 Git 是目前世界上最先进的分布式版本控制系统,有这么几个特点: 分布式:是用来保存工程源代码历史状态的命令行工具保存点:保存点可以追溯源码中的文件…...

腾讯云AI绘画:探究AI创意与技术的新边界
目录 一、2023的“网红词汇”——AI绘画二、智能文生图1、智能文生图的应用场景2、风格和配置的多样性3、输入一段话,腾讯云AI绘画给你生成一张图4、文本描述生成图像,惊艳全场 三、智能图生图:重新定义图像美学1、智能图生图的多元应用场景2…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...

RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...

MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...