当前位置: 首页 > news >正文

Springboot整合Kafka消息队列服务实例

一、Kafka相关概念

1、关于Kafka的描述

Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2、关于Kafka的功能特点

  1. 通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;
  2. 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;
  3. 支持通过Kafka服务器和消费机集群来分区消息;
  4. 支持Hadoop并行数据加载;
  5. API包封装的非常好,简单易用,上手快 ;
  6. 分布式消息队列。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer(生产者),消息接受者称为Consumer(消费者);

3、Kafka消息功能

如下图所示,Kafka作为一个中间服务,代表一个broker(经纪人)角色,负责接收APP的消费与推送消息给其他相关APP。这里APP可分为Producer,Consumer。

消息的消费模式

点对点模式:点对点模式通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。

发布订阅模式:订阅模式是一个基于推送的消费传送模型,消息产生后,Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

4、Kafka消息队列的作用

  • 应用程序之间解耦,生产者与消费者相互独立,各自异步执行。
  • 消息数据持久化存储,直到所有消息都被消费,规避消息数据丢失的风险。
  • 流量削峰,使用Kafka消息队列可以帮助server承接访问压力,尽可能避免应用程序崩溃。
  • 降低进程间的耦合度,系统部分应用组件发生崩溃时,不会影响到整体系统的运行。
  • 保证消息顺序执行,解决特定场景业务需求。

5、Kafka相关术语介绍

  • Broker

   一台kafka服务器就是一个broker(经纪人)。一个集群由多个broker组成。一个broker可以容纳多个topic(消息主题)。

  • Producer

    消息生产者,就是向kafka broker发消息的APP客户端。

  • Consumer

    消息消费者,向kafka broker取消息的APP客户端。

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。

  • Consumer Group

     每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。

  • Partition

一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。

二、liunx系统下搭建Kafka环境

       

--新建kafka应用目录。并下载到当前目录下
cd  /usr/localmkdir kafkacd kafka 
--下载wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz--解压tar -zxvf  kafka-3.7.0-src.tgz--启动服务cd kafka-3.7.0./bin/kafka-server-start.sh    config/server.properties--查看服务ps -aux |grep kafka--开放kafka地址端口vim server.properties--添加下面注释advertised.listeners=PLAINTEXT://10.98.3.22:9092

三、Springboot2整合Kafka 服务

1、导入基础依赖

<!-- SpringBoot依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version>
</dependency>

2、项目目录结构

3、生产者与消费者yml文件配置

#消费者配置
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: test-consumer-group#生产者配置spring:kafka:bootstrap-servers: 127.0.0.1:9092

4、生成消息

@RestController
@RequestMapping("/kafka")
public class ProducerController {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public HttpResult sendMsg () {MsgLog msgLog = new MsgLog(1,"消息生成",1,"消息日志",new Date()) ;String msg = JSON.toJSONString(msgLog) ;// 这里Topic如果不存在,会自动创建kafkaTemplate.send("cicada-topic", msg);return HttpResult.create(HttpStatus.SUCCESS,msg);}
}
@Component
public class ConsumerMsg {private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);//此注解是监听主题为cicada-topic的消息队列@KafkaListener(topics = "cicada-topic")public void listenMsg (ConsumerRecord<?,String> record) {String value = record.value();LOGGER.info("ConsumerMsg====>>"+value);}
}

相关文章:

Springboot整合Kafka消息队列服务实例

一、Kafka相关概念 1、关于Kafka的描述 Kafka是由Apache开源&#xff0c;具有分布式、分区的、多副本的、多订阅者&#xff0c;基于Zookeeper协调的分布式处理平台&#xff0c;由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据&#xff0c;并高速的处…...

kotlin——MVVM框架下的大型项目优化

目录 概要 优化思路 一、重构过长的Activity 二、优化臃肿的ViewModel 示例代码&#xff1a; 概要 在大型项目中&#xff0c;随着项目越做越大&#xff0c;activity和viewmodel的代码会越来越多&#xff0c;尽量保持Activity和ViewModel的代码精简和易于维护是非常重要的。个人…...

echarts实现折线图点击添加标记

文章目录 背景一、代码示例 背景 业务场景体现在功能层面主要两点&#xff0c; 折线图表设置点击事件点击事件与图标渲染标记绑定 对于节点没有被添加标记的可以&#xff0c;弹框提示添加标记&#xff0c;并提供标记内容输入框&#xff0c;已经添加过标记的点&#xff0c;点…...

循环赛日程表

描述 n 2 ^ k个选手 每个选手必须与其他n-1个选手各赛一次每个选手一天赛一次比赛打n-1天 思路 k 3时的解 我们先进行假设&#xff1a;每个选手第一天和自己比&#xff0c;然后分解成4个子问题&#xff1a; (1)14号的第14天&#xff0c;对手1~4号; (2)14号的第58天&a…...

计算机网络:运输层 - 概述

计算机网络&#xff1a;运输层 - 概述 运输层的任务端口号复用与分用UDP协议首部格式 TCP协议面向字节流 运输层的任务 物理层、数据链路层以及网络层&#xff0c;他们共同解决了将主机通过网络互联起来所面临的问题&#xff0c;实现了主机到主机的通信。 网络层的作用范围是…...

使用阿里开源的Spring Cloud Alibaba AI开发第一个大模型应用

背景 前段时间看到Spring推出了SpringAI&#xff0c;可以方便快速的接入ChatGPT等国外的大模型&#xff0c;现在阿里巴巴也紧追脚步推出了Spring Cloud Alibaba AI&#xff0c;Spring Cloud Alibaba AI 目前基于 Spring AI 0.8.1 版本 API 完成通义系列大模型的接入。通义接入…...

`THREE.PointsMaterial` 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性,比如颜色、大小和透明度。

demo案例 THREE.PointsMaterial 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性&#xff0c;比如颜色、大小和透明度。下面是对其构造函数的参数、属性和方法的详细讲解。 构造函数 const material new THREE.PointsMaterial(parameters);参数&am…...

Android-Android Studio-FAQ

1 需求 2 接口 3 Android Studio xml布局代码补全功能失效问题 最终解决方案就是尝试修改compileSdk 为不同SDK版本来解决问题&#xff0c;将原本34修改为32测试会发现xml代码补全功能有效了&#xff01; 参考资料 Android Studio xml布局代码补全功能失效问题_android studi…...

架构师指南:现代 Datalake 参考架构

这篇文章的缩写版本于 2024 年 3 月 26 日出现在 The New Stack 上。 旨在最大化其数据资产的企业正在采用可扩展、灵活和统一的数据存储和分析方法。这一趋势是由企业架构师推动的&#xff0c;他们的任务是制定符合不断变化的业务需求的基础设施。现代数据湖体系结构通过将数…...

通讯协议大全(UART,RS485,SPI,IIC)

参考自&#xff1a; 常见的通讯协议总结&#xff08;USART、IIC、SPI、485、CAN&#xff09;-CSDN博客 UART那么好用&#xff0c;为什么单片机还需要I2C和SPI&#xff1f;_哔哩哔哩_bilibili 5分钟看懂!串口RS232 RS485最本质的区别&#xff01;_哔哩哔哩_bilibili 喜欢几位…...

基于EXCEL数据表格创建省份专题地图

1 数据源 随着西藏于5月1日发布2022年一季度经济运行情况&#xff0c;31省份一季度GDP数据已全部出炉。 总量方面&#xff0c;粤苏鲁稳居前三&#xff1b;增速方面&#xff0c;23省份高于“全国线”&#xff0c;新疆表现最佳&#xff0c;增速达到7.0%。 表格表现数据不够直观…...

基于java+springboot+vue实现的电商应用系统(文末源码+Lw)241

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本电商应用系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…...

好文!12个策略解决 Kafka 数据丢失问题

哥们儿&#xff01;有遇到Kafka数据丢失问题的问题吗&#xff0c;你是如何解决的&#xff1f;今天的文章&#xff0c;V哥来详细解释一下&#xff0c;整理了12种解决策略&#xff0c;希望可以帮助你解决项目中的问题&#xff1a;以下是一些常见的解决方案和最佳实践。 生产者确认…...

Android 第三方框架:网络:OkHttp:源码分析:拦截器

文章目录 涉及到的设计模式 责任链模式:ArrayList策略模式:Interceptor和XXXInterceptor源码分析API总结涉及到的设计模式 责任链模式:ArrayList ArrayList 用ArrayList作为保存顺序的数据结构 把系统提供的各种Interceptor和自定义的Interceptor放入ArrayList中 RealI…...

FlowUs AI的使用教程和使用体验

FlowUs AI 使用教程 FlowUs AI特点使其成为提升个人和团队生产力的有力工具&#xff0c;无论是在学术研究、内容创作、技术开发还是日常办公中都能发挥重要作用。现在来看看如何使用FlowUs AI吧&#xff01; 注册与登录&#xff1a;首先&#xff0c;确保您已经注册并登录FlowU…...

SwiftUI 6.0(iOS 18)ScrollView 全新的滚动位置(ScrollPosition)揭秘

概览 在只有方寸之间大小的手持设备上要想体面的向用户展示海量信息&#xff0c;滚动视图&#xff08;ScrollView&#xff09;无疑是绝佳的“东牀之选”。 在 SwiftUI 历史的长河中&#xff0c;总觉得苹果对于 ScrollView 视图功能的升级是在“挤牙膏”。这不&#xff0c;在本…...

阿贝云免费虚拟主机和免费云服务器评测

阿贝云是一家提供免费虚拟主机和免费云服务器的服务商&#xff0c;为用户提供了一个便捷的搭建网站和应用的平台。他们的服务受到了很多用户的好评。用户可以轻松地在阿贝云上创建自己的网站&#xff0c;并享受免费的虚拟主机和云服务器。通过阿贝云的服务&#xff0c;用户可以…...

不懂就问,开通小程序地理位置接口有那么难吗?

小程序地理位置接口有什么功能&#xff1f; 若提审后被驳回&#xff0c;理由是“当前提审小程序代码包中地理位置相关接口( chooseAddress、getLocation )暂未开通&#xff0c;建议完成接口开通后或移除接口相关内容后再进行后续版本提审”&#xff0c;那么遇到这种情况&#x…...

Python 全栈系列256 异步任务与队列消息控制(填坑)

说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后&#xff0c;产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据&#xff0c;对输入的数据进行模式识别后分流。worker1我设施为10秒运行…...

从零开始的Ollama指南:部署私域大模型

大模型相关目录 大模型&#xff0c;包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步&#xff0c;扬帆起航。 大模型应用向开发路径&#xff1a;AI代理工作流大模型应用开发实用开源项目汇总大模…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)

一、OpenBCI_GUI 项目概述 &#xff08;一&#xff09;项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台&#xff0c;其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言&#xff0c;首次接触 OpenBCI 设备时&#xff0c;往…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...

RabbitMQ 各类交换机

为什么要用交换机&#xff1f; 交换机用来路由消息。如果直发队列&#xff0c;这个消息就被处理消失了&#xff0c;那别的队列也需要这个消息怎么办&#xff1f;那就要用到交换机 交换机类型 1&#xff0c;fanout&#xff1a;广播 特点 广播所有消息​​&#xff1a;将消息…...