当前位置: 首页 > article >正文

Kafka基础篇

Kafaka安装和使用以及整和一、 安装docker1创建docker-compose.yml文件2测试二、 kafaka基础知识1kafaka核心架构2) 工作流程三、Spring Boot 整合Kafka1. 导入依赖 配置yml文件2. API讲解2.1 KafkaListener2.2 KafkaTemplate2.3 实现手动提交偏移量一、 安装docker1创建docker-compose.yml文件mkdirkafka-democdkafka-demotouchdocker-compose.ymldockercompose up-ddocker-compose.ymlversion:3services:kafka:image:apache/kafka:latest# 镜像container_name:kafka# 容器名ports:# 映射端口-9092:9092environment:KAFKA_NODE_ID:1# 当钱节点# KRaft 模式KAFKA_PROCESS_ROLES:broker,controllerKAFKA_LISTENERS:PLAINTEXT://:9092,CONTROLLER://:9093# 监听端口# controller Kafka集群节点之间通信# plaintext 普通客户端端口KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092# 客户端如访问kafakaKAFKA_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_LISTENER_SECURITY_PROTOCOL_MAP:CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTKAFKA_CONTROLLER_QUORUM_VOTERS:1kafka:9093# nodeIdhost:portKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:02测试dockerexec-itkafkabash# 进入容器# 创建topic/opt/kafka/bin/kafka-topics.sh\--create\--topictest-topic\--bootstrap-server localhost:9092# 查看topic/opt/kafka/bin/kafka-topics.sh\--list\--bootstrap-server localhost:9092# 启动生产者/opt/kafka/bin/kafka-console-producer.sh\--topictest-topic\--bootstrap-server localhost:9092# 输入hello kafka启动一个新终端dockerexec-itkafkabash/opt/kafka/bin/kafka-console-consumer.sh\--topictest-topic\--from-beginning\--bootstrap-server localhost:9092可以看到二、 kafaka基础知识1kafaka核心架构Producer(生产者)负责发送消息可以是订单系统 、 日志系统 、webAppConsumer(消费者)从主题订阅新消息的Kafka 客户端。消费者通过检查消息偏移量来区分消息是否已读。Topic(主题)Kafka 消息通过主题进行分类。类似 数据库的表例如order-topic log-topic user-topicPartition分区一个 Topic 可以拆成多个 Partition。分区后可以可以并行写、并行读、横向扩展本质就是多个日志文件Offset偏移量Kafka 中消息长这样Partition-00-hello1-world2-kafka这个编号就是offset0、1、2作用标识消息位置记录消费进度BrokerKafka 集群中的一台服务器。组成 Kafka 集群提供高可用提供分布式能力Broker-1 Broker-2 Broker-3Consumer Group消费者组多个消费者构成的消费者组同时消费多个分区以实现高并发。每一个消费者属于一个特定的消费者组。消费者组中一个消费者可以消费多个分区。一个分区只能被指定给一个消费者。Replica(副本)Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余这些地方就是所谓的副本。副本还分为领导者副本和追随者副本各自有不同的角色划分。副本是在分区层级下的即每个分区可配置多个副本实现高可用。2) 工作流程# 1.生产者发送消息Producer ↓ Topic# 2.Kafka写入PartitionTopic ├── P0 ├── P1 └── P2# 3.Consumer 拉取消息Consumer -主动拉取三、Spring Boot 整合Kafka1. 导入依赖 配置yml文件dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencyspring:kafka:bootstrap-servers:192.168.59.128:9092# kafka集群地址producer:#生产者配置retries:3acks:all# 要求所有副本都确认收到消息之后才算发送成功batch-size:16384# 批量发送大小 16KB累积到该大小后批零发送提升吞吐量buffer-memory:33554432# 生产者缓冲区总大小 32MB用于缓存待发送消息key-serializer:org.apache.kafka.common.serialization.StringSerializer# KEY的序列化器value-serializer:org.apache.kafka.common.serialization.StringSerializer# 消息的序列化器properties:linger.ms:1# 消息在发送前最多等待1ms,配和batch-size 实现微批量consumer:# 消费者配置group-id:pet-life-consumer-group# 消费者组ID同一组的消费者回分摊消费分区auto-offset-reset:earliest# 无初始偏移量时从最早的消息开始消费另一个常用值是 latestenable-auto-commit:true# 自动提交消费位移简化消费端逻辑auto-commit-interval:1000# 自动提交的间隔为 1000ms即 1 秒#消息 Key 和 Value 的反序列化器与生产者对应key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializermax-poll-records:500# 单次 poll() 调用最多拉取 500 条消息listener:missing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误避免启动失败concurrency:3# 消费监听器并发线程数为 3即同时启动 3 个消费者实例消费分区2. API讲解2.1KafkaListenerpublicinterfaceKafkaListener{Stringid()default;//Listener 唯一 ID。StringcontainerFactory()default;//指定监听容器工厂String[]topics()default{};//指定监听的topicStringtopicPattern()default;//正则匹配topicTopicPartition[]topicPartitions()default{};//精确撇脂topic及其分区StringcontainerGroup()default;StringerrorHandler()default;//异常处理器StringgroupId()default;//指定消费者组booleanidIsGroup()defaulttrue;// 只id 和groupId是否相同StringclientIdPrefix()default;//Kafka Client ID 前缀StringbeanRef()default__listener;Stringconcurrency()default;// 并发消费线程数量StringautoStartup()default;// 是否自动消费String[]properties()default{};//额外 Kafka 配置。booleansplitIterables()defaulttrue;StringcontentTypeConverter()default;Stringbatch()default;Stringfilter()default;Stringinfo()default;StringcontainerPostProcessor()default;}示例KafkaListener(topicstest)publicvoidlisten(Stringmessage){System.out.println(Received message: message);}2.2KafkaTemplateKafkaTemplate本质上就是对 Kafka Producer的高级封装发送消息的API如下方法作用send(topic, data)普通发送send(topic, key, data)带 keysend(topic, partition, key, data)指定分区send(record)完整 ProducerRecordsend(message)Spring MessagesendDefault()默认 TopicexecuteInTransaction()事务消息其中带 key 和普通发送的区别 Kafka会对key进行哈希运算对于同一个key会进入同一分区能够保证**局部顺序性**完整ProducerRecordpublicclassProducerRecordK,V{privatefinalStringtopic;// 主题privatefinalIntegerpartition;//分区privatefinalHeadersheaders;// 元数据头部用于在消息体之外传递额外信息privatefinalKkey;// keyprivatefinalVvalue;//消息privatefinalLongtimestamp;// 时间戳}场景示例链路追踪传入 traceId、spanId实现分布式追踪消息去重传入唯一 messageId消费端幂等判断内容标识标记消息的 contentType、encoding路由标记标记消息来源 source、目标 target自定义标记任意业务标识如 priority、retryCount接收header的方法 // 方式1用 Header 注解取单个值KafkaListener(topicstest-topic)publicvoidlisten(Stringmessage,Header(traceId)StringtraceId,Header(source)Stringsource,Acknowledgmentack){System.out.println(traceIdtraceId, sourcesource, bodymessage);ack.acknowledge();}// 方式2接收完整 Headers 对象KafkaListener(topicstest-topic)publicvoidlisten(Stringmessage,Headersheaders,Acknowledgmentack){headers.forEach(h-System.out.println(keyh.key(), valuenewString(h.value())));ack.acknowledge();}// 方式3接收 ConsumerRecord最完整包含分区、偏移量等所有信息KafkaListener(topicstest-topic)publicvoidlisten(ConsumerRecordString,Stringrecord,Acknowledgmentack){System.out.println(topicrecord.topic(), partitionrecord.partition(), offsetrecord.offset(), headersrecord.headers(), valuerecord.value());ack.acknowledge();}2.3 实现手动提交偏移量避免业务还没有跑完就提交的偏移量。如果执行过程中出现故障但是这条消息已经消费过了造成数据丢失。修改yml文件spring:kafka:consumer:# 消费者配置enable-auto-commit:false# 关闭自动提交配合 ack-mode: manual 手动提交偏移量listener:ack-mode:manualmissing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误避免启动失败KafkaListener(topicPatterntest-.*,groupIdtest-group)publicvoidlisten(Stringmessage,Acknowledgmentack){try{System.out.println(Received message: message);// 业务逻辑处理...}finally{ack.acknowledge();// 手动提交偏移量}}

相关文章:

Kafka基础篇

Kafaka安装和使用以及整和一、 安装(docker)1)创建docker-compose.yml文件2)测试二、 kafaka基础知识1)kafaka核心架构2) 工作流程三、Spring Boot 整合Kafka1. 导入依赖 ,配置yml文件2. API讲解2.1&#x…...

手机店还会存在吗

这两年买手机,有个很常见的小场景:人先进店,把样机拿起来拍几张照片,摸一下边框,试试重量,再问店员有没有现货。问完价格以后,很多人会低头打开电商平台。 门店最尴尬的地方就在这里。它承担了体…...

Langchain的学习(一)

目录 一,实操 编码 Runnable Runnable 是什么 核心方法(所有 Runnable 都有) 最关键能力:用 | 组合(LCEL) 常用内置 Runnable 总结 二,聊天模型-核心能力 定义模型 init_chat_model 本地部署 调用工具 定义工具-Tool version1 schema: version2(基于…...

ETime:高效推动你的时间

我做了一个开源时间工作台:ETime 如果你也试过很多时间管理工具,可能会遇到同一种疲惫:记录本身变成了另一件需要坚持的事。 ETime 想解决的不是“怎样把每一分钟都管起来”,而是更朴素的一件事:让开始更轻&#xff…...

别再让一条宽带拖慢整个公司!手把手教你用H3C防火墙配置双WAN口负载均衡(附HCL模拟器配置)

中小企业网络优化实战:H3C防火墙双WAN负载均衡配置指南 当视频会议频繁卡顿、文件传输速度像蜗牛爬行时,单条宽带已成为制约企业效率的瓶颈。对于50-200人规模的中小企业,双WAN负载均衡技术能以极低成本实现带宽翻倍,本文将用一台…...

别再手动拖拽了!用Java POI + XSSFDrawing,5行代码搞定Excel单元格图片批量插入(附完整源码)

5行代码实现Excel图片批量插入:Java POI XSSFDrawing高效开发指南 1. 为什么需要自动化Excel图片插入? 在日常报表开发中,我们经常遇到需要将大量图片(如用户头像、产品图)嵌入Excel单元格的场景。传统手动操作存在三…...

MiniMax Agent 正式更名 Mavis 上线多智能体协作

如果你用过AI助手,大概都有过这种感受:一个AI同时干太多事,要么顾此失彼,要么卡在某个环节原地转圈。 MiniMax显然也看到了这个问题。 5 月 13 日,他们正式宣布旗下Agent产品全面升级,并给它起了个新名字—…...

Day33-1: Serilog(日志中间件)VS OperLogHelper(操作日志帮助类)

一、一句话分清它们的作用 1. Serilog(日志中间件) 作用:记录系统运行日志 → 给程序员看的 控制台打印文件保存报错、异常、请求信息用于排查问题、调试、监控 2. OperLogHelper(操作日志帮助类) 作用&#xff1…...

5分钟搞定U盘验货!这款绿色工具真香到离谱

兄弟们,你有没有买过那种“1TB只要39块还包邮”的U盘? 醒醒!那玩意儿大概率是扩容盘——实际容量可能只有64GB,超出部分写进去的数据全是空气,轻则文件损坏,重则项目代码全丢,救都救不回来&…...

【Java杂项】为什么 b += 1 可以,但 b = b + 1 会报错?类型提升与复合赋值详解

【Java杂项】为什么 b 1 可以,但 b b 1 会报错?复合赋值与类型提升讲清楚前言一、先给结论:它不是简单的文本替换二、先看认知冲突2.1 普通赋值为什么报错2.2 复合赋值为什么能通过三、类型提升到底是什么3.1 常见类型提升结果3.2 为什么小…...

人类的自然关系与AI的形式化关系

“人类的自然关系”与“AI的形式化关系”是理解下一代人机环境系统智能的两个核心哲学维度。它们分别代表了智能系统在物理世界中的生存根基与在数字世界中的运行逻辑。我们可以从以下三个层面来深度解析这两者的区别与融合:人类的自然关系:从“征服掠夺…...

一文搞懂工业机器人通讯协议:TCP/IP、Modbus与专用协议对比

在我十年的工控开发生涯中,通讯问题永远是项目延期的第一大原因。我见过太多团队花了几个月时间做运动控制和视觉算法,最后却卡在了机器人通讯上:要么是数据传输不稳定,要么是速度跟不上产线节拍,要么是换个品牌机器人就要全部重写代码。 很多新手工程师觉得通讯就是&quo…...

态是相关,势是因果,感是具身,知是离身

态是相关,势是因果,感是具身,知是离身,用四个高度概括的词,切中了“人机环境系统智能”中态势感知四个核心维度的本质属性。我们可以结合之前的探讨,来深入拆解一下这句“十六字真言”:态是相关…...

C#上位机开发工业机器人:从零搭建第一个机器人控制程序

作为一名在工控行业摸爬滚打了十年的老工程师,我见过太多自动化工程师卡在"机器人上位机开发"这一关。很多人C#基础不错,也懂机器人原理,但就是不知道怎么把两者结合起来,写出一个能在生产环境运行的控制程序。 今天这篇文章,我会带着你从零开始,搭建一个完整…...

Google Cloud Dataflow 背后的流式处理模型

原文:towardsdatascience.com/the-stream-processing-model-behind-google-cloud-dataflow-0d927c9506a0?sourcecollection_archive---------3-----------------------#2024-04-27 在无界数据处理中的正确性、延迟和成本平衡 https://medium.com/vutrinh274?sour…...

5分钟搞定!NewGAN-Manager终极配置指南:让Football Manager游戏体验焕然一新

5分钟搞定!NewGAN-Manager终极配置指南:让Football Manager游戏体验焕然一新 【免费下载链接】NewGAN-Manager A tool to generate and manage xml configs for the Newgen Facepack. 项目地址: https://gitcode.com/gh_mirrors/ne/NewGAN-Manager …...

【MySQL百日打怪升级第8天】SELECT执行流程

【第8天】每天一个MySQL知识点,百日打怪升级 SQL基础:SELECT执行流程 大家好,我是一名拥有10年以上经验的DBA老兵。 做这个系列,源于一个朴素的愿望:把踩过的坑、总结的经验系统化输出,希望能帮到刚入行或…...

堆叠集成方法

原文:towardsdatascience.com/the-stacking-ensemble-method-984f5134463a 发现堆叠在机器学习中的力量——一种将多个模型组合成一个单一强大预测器的技术。本文从基础知识到高级技术探讨了堆叠,揭示了它是如何结合不同模型的优势以提高准确性的。无论你…...

离谱!上海交大一学生私吞 5000 奖金,还用豆包 P 假收据骗队友。网友:学历虽高但人品太低

①5 月 18 日,上海交大一则学生违纪通报冲上热搜,实锤了前几天网上曝光的一名学生侵占团队竞赛奖金、造假欺骗队友的恶劣行为。②在 2025 下半年,樊同学(上交大智慧能源学院女生)与 K 同学(电院男生&#x…...

ABAP 采购带组件收货BAPI

一、背景 有一项业务比较特殊,金靶的回收加工,既会有物料的消耗,也会收进上一批加工洗出来的物料,并且组件物料会带有批次,MIGO过账时需要填写批次,那么对应BAPI,也需要加入这一部分批次。如果…...

荣耀MagicOS 10系统游戏模式:如何启用幻影稳帧功能并调整游戏画面的流畅度与画质平衡?

用手机玩游戏,最怕遇到卡顿和画面不清晰。想开高帧率保证流畅,画质就可能下降;想开高画质享受视觉盛宴,又容易掉帧卡顿。这真是让不少玩家头疼的问题。如果你的荣耀手机升级到了MagicOS 10系统,那么恭喜你,…...

Perplexity不是越低越好!资深NLP架构师亲授:3类典型查询场景下的阈值黄金区间

更多请点击: https://kaifayun.com 第一章:Perplexity不是越低越好!资深NLP架构师亲授:3类典型查询场景下的阈值黄金区间 Perplexity(困惑度)常被误认为语言模型性能的“万能标尺”,但实际部署…...

一小时搞懂Python函数:原理+实践

目录 🙄什么是Python函数(了解函数的概念) 🤔为什么需要它?(背景和痛点) 😮函数的分类(函数有哪些?) 内置函数 标准库函数 第三方库函数 定…...

互联网大厂 Java 求职者面试:音视频场景下的技术挑战

互联网大厂 Java 求职者面试:音视频场景下的技术挑战在一次互联网大厂的面试中,面试官和候选人燕双非之间展开了一场精彩的对话。燕双非是一位幽默风趣的程序员,尽管他在技术上并不是特别扎实,但他总是能用他的幽默化解紧张氛围。…...

软件设计师下午题训练2-3题+2020下上午题错题解析 练习真题训练15

一、训练题2 1、2021上 (1) (2) a:团购点编号 b:客户电话 供货 主键 :(供货商编号,团购点编号) 外键:供货商编号、团购点编号 订单 主键:订单编号…...

PHP SimpleXML:深入解析与高效使用

PHP SimpleXML:深入解析与高效使用 引言 PHP 是一种广泛使用的服务器端脚本语言,它以其灵活性和强大的功能而闻名。在处理 XML 数据时,PHP 提供了多种方法,其中 SimpleXML 是一个简单且强大的库,它允许开发者轻松地解析和操作 XML 数据。本文将深入探讨 PHP SimpleXML 的…...

远洋边缘计算实战:基于 Linux 的客滚船高并发网络 QoS 调度与隔离策略

摘要:客滚船直连卫星网络面对几百名旅客并发时存在瘫痪与越权风险。本文记录了基于 Linux 构建标准工业级边缘网关多链路 QoS 调度与隔离的实操复盘。导语:在主导一艘国际客滚船的网络重构项目时,我们面临一个典型的高并发调度与合规挑战&…...

RAG检索体系①【第十一篇】:混合检索架构(BM25+向量+过滤),工业级召回落地方案

生产级 RAG 避坑实战合集【第十一篇】文章简介:前十篇我们彻底打通数据层改写层:文档清洗、Chunk切块、元数据、生命周期、Query双层改写。绝大多数人做完这些,直接无脑上单向量检索。线上投产全部翻车。本文直击行业痛点:纯向量检…...

c++11的初见

列表初始化 c11以后支持{ }的列表初始可以使用{ }括住数据来进行初始化&#xff0c;使用{ }初始化时可以省略号{ }中的数据要匹配构造&#xff1b;使用{ }可以统一初始化方式。#include<iostream> #include<vector> using namespace std; int main(){vector<pai…...

YOLO26优化:TIP2026 FourierSR | FourierSR引入YOLO C3k2:解决感受野局限,实现高效全局特征交互

💡💡💡现有 YOLO C3k2 模块主要基于卷积与跨阶段部分连接,虽能平衡计算与精度,但仍存在以下问题: 感受野受限:堆叠的小核卷积(如 33)感受野有限,难以捕获全局上下文,对尺度变化大或远距离依赖的目标(如小目标、遮挡目标)特征提取能力不足。 特征混合效率低:通…...