Springboot整合Kafka消息队列服务实例
一、Kafka相关概念
1、关于Kafka的描述
Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
2、关于Kafka的功能特点
- 通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;
- 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;
- 支持通过Kafka服务器和消费机集群来分区消息;
- 支持Hadoop并行数据加载;
- API包封装的非常好,简单易用,上手快 ;
- 分布式消息队列。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer(生产者),消息接受者称为Consumer(消费者);
3、Kafka消息功能
如下图所示,Kafka作为一个中间服务,代表一个broker(经纪人)角色,负责接收APP的消费与推送消息给其他相关APP。这里APP可分为Producer,Consumer。

消息的消费模式
点对点模式:点对点模式通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。
发布订阅模式:订阅模式是一个基于推送的消费传送模型,消息产生后,Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
4、Kafka消息队列的作用
- 应用程序之间解耦,生产者与消费者相互独立,各自异步执行。
- 消息数据持久化存储,直到所有消息都被消费,规避消息数据丢失的风险。
- 流量削峰,使用Kafka消息队列可以帮助server承接访问压力,尽可能避免应用程序崩溃。
- 降低进程间的耦合度,系统部分应用组件发生崩溃时,不会影响到整体系统的运行。
- 保证消息顺序执行,解决特定场景业务需求。
5、Kafka相关术语介绍
- Broker
一台kafka服务器就是一个broker(经纪人)。一个集群由多个broker组成。一个broker可以容纳多个topic(消息主题)。
- Producer
消息生产者,就是向kafka broker发消息的APP客户端。
- Consumer
消息消费者,向kafka broker取消息的APP客户端。
- Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。
- Consumer Group
每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。
- Partition
一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。
二、liunx系统下搭建Kafka环境
--新建kafka应用目录。并下载到当前目录下
cd /usr/localmkdir kafkacd kafka
--下载wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz--解压tar -zxvf kafka-3.7.0-src.tgz--启动服务cd kafka-3.7.0./bin/kafka-server-start.sh config/server.properties--查看服务ps -aux |grep kafka--开放kafka地址端口vim server.properties--添加下面注释advertised.listeners=PLAINTEXT://10.98.3.22:9092

三、Springboot2整合Kafka 服务
1、导入基础依赖
<!-- SpringBoot依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version>
</dependency>
2、项目目录结构

3、生产者与消费者yml文件配置
#消费者配置
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: test-consumer-group#生产者配置spring:kafka:bootstrap-servers: 127.0.0.1:9092
4、生成消息
@RestController
@RequestMapping("/kafka")
public class ProducerController {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public HttpResult sendMsg () {MsgLog msgLog = new MsgLog(1,"消息生成",1,"消息日志",new Date()) ;String msg = JSON.toJSONString(msgLog) ;// 这里Topic如果不存在,会自动创建kafkaTemplate.send("cicada-topic", msg);return HttpResult.create(HttpStatus.SUCCESS,msg);}
}
@Component
public class ConsumerMsg {private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);//此注解是监听主题为cicada-topic的消息队列@KafkaListener(topics = "cicada-topic")public void listenMsg (ConsumerRecord<?,String> record) {String value = record.value();LOGGER.info("ConsumerMsg====>>"+value);}
}
相关文章:
Springboot整合Kafka消息队列服务实例
一、Kafka相关概念 1、关于Kafka的描述 Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处…...
kotlin——MVVM框架下的大型项目优化
目录 概要 优化思路 一、重构过长的Activity 二、优化臃肿的ViewModel 示例代码: 概要 在大型项目中,随着项目越做越大,activity和viewmodel的代码会越来越多,尽量保持Activity和ViewModel的代码精简和易于维护是非常重要的。个人…...
echarts实现折线图点击添加标记
文章目录 背景一、代码示例 背景 业务场景体现在功能层面主要两点, 折线图表设置点击事件点击事件与图标渲染标记绑定 对于节点没有被添加标记的可以,弹框提示添加标记,并提供标记内容输入框,已经添加过标记的点,点…...
循环赛日程表
描述 n 2 ^ k个选手 每个选手必须与其他n-1个选手各赛一次每个选手一天赛一次比赛打n-1天 思路 k 3时的解 我们先进行假设:每个选手第一天和自己比,然后分解成4个子问题: (1)14号的第14天,对手1~4号; (2)14号的第58天&a…...
计算机网络:运输层 - 概述
计算机网络:运输层 - 概述 运输层的任务端口号复用与分用UDP协议首部格式 TCP协议面向字节流 运输层的任务 物理层、数据链路层以及网络层,他们共同解决了将主机通过网络互联起来所面临的问题,实现了主机到主机的通信。 网络层的作用范围是…...
使用阿里开源的Spring Cloud Alibaba AI开发第一个大模型应用
背景 前段时间看到Spring推出了SpringAI,可以方便快速的接入ChatGPT等国外的大模型,现在阿里巴巴也紧追脚步推出了Spring Cloud Alibaba AI,Spring Cloud Alibaba AI 目前基于 Spring AI 0.8.1 版本 API 完成通义系列大模型的接入。通义接入…...
`THREE.PointsMaterial` 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性,比如颜色、大小和透明度。
demo案例 THREE.PointsMaterial 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性,比如颜色、大小和透明度。下面是对其构造函数的参数、属性和方法的详细讲解。 构造函数 const material new THREE.PointsMaterial(parameters);参数&am…...
Android-Android Studio-FAQ
1 需求 2 接口 3 Android Studio xml布局代码补全功能失效问题 最终解决方案就是尝试修改compileSdk 为不同SDK版本来解决问题,将原本34修改为32测试会发现xml代码补全功能有效了! 参考资料 Android Studio xml布局代码补全功能失效问题_android studi…...
架构师指南:现代 Datalake 参考架构
这篇文章的缩写版本于 2024 年 3 月 26 日出现在 The New Stack 上。 旨在最大化其数据资产的企业正在采用可扩展、灵活和统一的数据存储和分析方法。这一趋势是由企业架构师推动的,他们的任务是制定符合不断变化的业务需求的基础设施。现代数据湖体系结构通过将数…...
通讯协议大全(UART,RS485,SPI,IIC)
参考自: 常见的通讯协议总结(USART、IIC、SPI、485、CAN)-CSDN博客 UART那么好用,为什么单片机还需要I2C和SPI?_哔哩哔哩_bilibili 5分钟看懂!串口RS232 RS485最本质的区别!_哔哩哔哩_bilibili 喜欢几位…...
基于EXCEL数据表格创建省份专题地图
1 数据源 随着西藏于5月1日发布2022年一季度经济运行情况,31省份一季度GDP数据已全部出炉。 总量方面,粤苏鲁稳居前三;增速方面,23省份高于“全国线”,新疆表现最佳,增速达到7.0%。 表格表现数据不够直观…...
基于java+springboot+vue实现的电商应用系统(文末源码+Lw)241
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本电商应用系统就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…...
好文!12个策略解决 Kafka 数据丢失问题
哥们儿!有遇到Kafka数据丢失问题的问题吗,你是如何解决的?今天的文章,V哥来详细解释一下,整理了12种解决策略,希望可以帮助你解决项目中的问题:以下是一些常见的解决方案和最佳实践。 生产者确认…...
Android 第三方框架:网络:OkHttp:源码分析:拦截器
文章目录 涉及到的设计模式 责任链模式:ArrayList策略模式:Interceptor和XXXInterceptor源码分析API总结涉及到的设计模式 责任链模式:ArrayList ArrayList 用ArrayList作为保存顺序的数据结构 把系统提供的各种Interceptor和自定义的Interceptor放入ArrayList中 RealI…...
FlowUs AI的使用教程和使用体验
FlowUs AI 使用教程 FlowUs AI特点使其成为提升个人和团队生产力的有力工具,无论是在学术研究、内容创作、技术开发还是日常办公中都能发挥重要作用。现在来看看如何使用FlowUs AI吧! 注册与登录:首先,确保您已经注册并登录FlowU…...
SwiftUI 6.0(iOS 18)ScrollView 全新的滚动位置(ScrollPosition)揭秘
概览 在只有方寸之间大小的手持设备上要想体面的向用户展示海量信息,滚动视图(ScrollView)无疑是绝佳的“东牀之选”。 在 SwiftUI 历史的长河中,总觉得苹果对于 ScrollView 视图功能的升级是在“挤牙膏”。这不,在本…...
阿贝云免费虚拟主机和免费云服务器评测
阿贝云是一家提供免费虚拟主机和免费云服务器的服务商,为用户提供了一个便捷的搭建网站和应用的平台。他们的服务受到了很多用户的好评。用户可以轻松地在阿贝云上创建自己的网站,并享受免费的虚拟主机和云服务器。通过阿贝云的服务,用户可以…...
不懂就问,开通小程序地理位置接口有那么难吗?
小程序地理位置接口有什么功能? 若提审后被驳回,理由是“当前提审小程序代码包中地理位置相关接口( chooseAddress、getLocation )暂未开通,建议完成接口开通后或移除接口相关内容后再进行后续版本提审”,那么遇到这种情况&#x…...
Python 全栈系列256 异步任务与队列消息控制(填坑)
说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行…...
从零开始的Ollama指南:部署私域大模型
大模型相关目录 大模型,包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步,扬帆起航。 大模型应用向开发路径:AI代理工作流大模型应用开发实用开源项目汇总大模…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
