当前位置: 首页 > news >正文

SpringBoot整合Kafka (一)

📑前言

本文主要讲了SpringBoot整合Kafka文章⛺️

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、生产者\n\n
        • 2.1、Kafka生产者消息监听
        • 2.2、生产者写入分区策略
          • 指定写入分区
          • 根据key写入分区
          • 随机选择分区
        • 2.3、带回调的生产者
  • 📑文章末尾![在这里插入图片描述](https://img-blog.csdnimg.cn/e3b1caa28e7a4ce08cb3f1c3b27c4eb6.png)


一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
1.2、YML配置
spring:kafka:bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数consumer: # 消费端反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;@Test
void contextLoads() {ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {@KafkaListener(topics = "test01-topic")public void readMsg(String msg){System.out.println("msg = " + msg);}
}

2、生产者\n\n

2.1、Kafka生产者消息监听
@Component
public class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info("Message send error : " + producerRecord.toString());}
}
2.2、生产者写入分区策略

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。

指定写入分区

100条数据全部写入到2号分区

for (int i = 0; i < 100; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:"+topic+",分区:" + partition);}});Thread.sleep(1000);
}
根据key写入分区
String key = "kafka-key";
System.out.println("根据key计算分区:" + (key.hashCode() % 3));
for (int i = 0; i < 10; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", key, "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:" + topic + ",分区:" + partition);}});Thread.sleep(1000);
}
随机选择分区
 for (int i = 0; i < 10; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:" + topic + ",分区:" + partition);}});Thread.sleep(1000);}
2.3、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

for (int i = 0; i < 100; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:"+topic+",分区:" + partition);}});Thread.sleep(1000);
}

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾在这里插入图片描述

相关文章:

SpringBoot整合Kafka (一)

&#x1f4d1;前言 本文主要讲了SpringBoot整合Kafka文章⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#xff1a;努力一点&#xff0c;优秀一点 目录 文章目录 &…...

随机分词与tokenizer(BPE->BBPE->Wordpiece->Unigram->sentencepiece->bytepiece)

0 tokenizer综述 根据不同的切分粒度可以把tokenizer分为: 基于词的切分&#xff0c;基于字的切分和基于subword的切分。 基于subword的切分是目前的主流切分方式。subword的切分包括: BPE(/BBPE), WordPiece 和 Unigram三种分词模型。其中WordPiece可以认为是一种特殊的BPE。完…...

成都工业学院Web技术基础(WEB)实验四:CSS3布局应用

写在前面 1、基于2022级计算机大类实验指导书 2、代码仅提供参考&#xff0c;前端变化比较大&#xff0c;按照要求&#xff0c;只能做到像&#xff0c;不能做到一模一样 3、图片和文字仅为示例&#xff0c;需要自行替换 4、如果代码不满足你的要求&#xff0c;请寻求其他的…...

TikTok科技趋势:平台如何引领数字社交革命?

TikTok作为一款颠覆性的短视频应用&#xff0c;不仅改变了用户的娱乐方式&#xff0c;更在数字社交领域引领了一场革命。本文将深入探讨TikTok在科技趋势方面的引领作用&#xff0c;分析其在数字社交革命中的关键角色&#xff0c;以及通过技术创新如何不断满足用户需求&#xf…...

【上海大学数字逻辑实验报告】六、时序电路

一、 实验目的 掌握同步二进制计数器和移位寄存器的原理。学会用分立元件构成2位同步二进制加计数器。学会在Quartus II上设计单向移位寄存器。学会在Quartus II上设计环形计数器。 二、 实验原理 同步计数器是指计数器中的各触发器的时钟脉冲输入端连接在一起&#xff0c;接…...

docker版zerotier-planet服务端搭建

1:ZeroTier 介绍2:为什么要自建PLANET 服务器3:开始安装 3.1:准备条件 3.1.1 安装git3.1.2 安装docker3.1.3 启动docker3.2:下载项目源码3.3:执行安装脚本3.4 下载 planet 文件3.5 新建网络 3.5.1 创建网络4.客户端配置 4.1 Windows 配置 4.2 加入网络4.2 Linux 客户端4.…...

【Spring教程28】Spring框架实战:从零开始学习SpringMVC 之 请求与请求参数详解

目录 1 设置请求映射路径1.1 环境准备 1.2 问题分析1.3 设置映射路径 2 请求参数2.1 环境准备2.2 参数传递2.2.1 GET发送单个参数2.2.2 GET发送多个参数2.2.3 GET请求中文乱码2.2.4 POST发送参数2.2.5 POST请求中文乱码 欢迎大家回到《Java教程之Spring30天快速入门》&#xff…...

node.js和浏览器之间的区别

node.js是什么 Node.js是一种基于Chrome V8引擎的JavaScript运行环境&#xff0c;可以在服务器端运行JavaScript代码 Node.js 在浏览器之外运行 V8 JavaScript 引擎。 这使得 Node.js 非常高效。 浏览器如何运行js代码 nodejs运行环境 在浏览器中&#xff0c;大部分时间你所…...

【python并发任务的几种方式】

文章目录 1 Process:2 Thread:3 ThreadPoolExecutor:4 各种方式的优缺点&#xff1a;5 线程与进程的结束方式5.1 线程结束的几种方式5.2 进程的结束方式 6 应用场景效率对比 在Python中&#xff0c;有几种方法可以处理并行执行任务。其中&#xff0c;Process、Thread和ThreadPo…...

使用ROS模板基于ECS和RDS创建WordPress环境

本文教程介绍如何使用ROS模板基于ECS和RDS&#xff08;Relational Database Service&#xff09;创建WordPress环境。 前提条件 如果您是首次使用ROS&#xff0c;必须先开通ROS服务。ROS服务免费&#xff0c;开通服务不会产生任何费用。 背景信息 WordPress是使用PHP语言开…...

龙迅LT2611UXC 双PORT LVDS转HDMI(2.0)+音频

描述&#xff1a; LT2611UXC是一个高性能的LVDS到HDMI2.0的转换器&#xff0c;用于STB&#xff0c;DVD应用程序。 LVDS输入可配置为单端口或双端口&#xff0c;有1个高速时钟通道&#xff0c;3~4个高速数据通道&#xff0c;最大运行1.2Gbps/通道&#xff0c;可支持高达9.6Gbp…...

websocket和SSE通信示例(无需安装任何插件)

websocket和SSE通信示例&#xff08;无需安装任何插件&#xff09; 源码示例&#xff08;两种方案任意切换&#xff09; data(){return {heartBeatInterval:5000,// 心跳间隔时间&#xff0c;单位为毫秒webSocket:null,heartBeatTimer:null,} }, mounted() {// this.initWebS…...

计算机网络(三)

&#xff08;十一&#xff09;路由算法 A、路由算法分类 动态路由和静态路由 静态路由&#xff1a;人工配制&#xff0c;路由信息更新慢&#xff0c;优先级高。这种在实际网络中要投入成本大&#xff0c;准确但是可行性弱。 动态路由&#xff1a;路由更新快&#xff0c;自动…...

HttpURLConnection OOM问题记录

使用HttpURLConnection 上传大文件&#xff0c;会出现内存溢出问题&#xff1a; 观察HttpURLConnection 源码&#xff1a; Overridepublic synchronized OutputStream getOutputStream() throws IOException {connecting true;SocketPermission p URLtoSocketPermission(th…...

WT588F02B单片机语音芯片在磁疗仪中的应用介绍

随着健康意识的普及和科技的发展&#xff0c;磁疗仪作为一种常见的理疗设备&#xff0c;受到了广大用户的关注。为了提升用户体验和操作便捷性&#xff0c;唯创知音WT588F02B单片机语音芯片被成功应用于磁疗仪中。这一结合将为磁疗仪带来智能化的语音交互功能&#xff0c;为用户…...

深度学习——第5章 神经网络基础知识

第5章 神经网络基础知识 目录 5.1 由逻辑回归出发 5.2 损失函数 5.3 梯度下降 5.4 计算图 5.5总结 在第1课《深度学习概述》中&#xff0c;我们介绍了神经网络的基本结构&#xff0c;了解了神经网络的基本单元组成是神经元。如何构建神经网络&#xff0c;如何训练、优化神…...

微信网页授权步骤说明

总览 引导用户进入授权页面同意授权&#xff0c;获取code通过code换取网页授权access_token&#xff08;与基础支持中的access_token不同&#xff09;如果需要&#xff0c;开发者可以刷新网页授权access_token&#xff0c;避免过期&#xff08;一般不需要&#xff09;通过网页…...

linux bash shell变量操作符 —— 筑梦之路

1. 变量子串 ${var} 返回变量var的内容&#xff0c;单独使用时有没有{}一样&#xff0c;混合多个变量和常量时&#xff0c;用{}界定变量名 ${#var} 返回变量var内容的长度 ${var:offset} 从变量var中的偏移量offset开始截取到字符串结尾的子字符串&#xff0c;offset从0开始 ${…...

2.61【Python生成器与迭代器】

Python迭代器与生成器 迭代器 什么是迭代器 首先迭代是指python中访问元素的一种方式&#xff0c;迭代器是一个可以记住遍历位置的对象&#xff0c;因此不会像列表那样一次性全部生成&#xff0c;而是可以等到用的时候才生成&#xff0c;因此节省了大量的内存资源 可迭代对…...

devecho stuido npm 失败

使用华为推荐的设置npm 代理方式仍然无效。还是得使用npm 命令去设置代理。地址参考&#xff1a; npm设置和取消代理的方法_npm查看代理-CSDN博客 最后使用自己的代理加载成功&#xff0c;使用华为推荐的代理不成功&#xff0c;不清楚什么原因。 华为推荐的环境配置如下&…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...