当前位置: 首页 > news >正文

Spring Boot学习(三十三):集成kafka

前言

下面是zookeeper和kafka的官网下载地址,大家可以学习下载

zookeeper下载地址:http://zookeeper.apache.org/releases.html

kafka下载地址:http://kafka.apache.org/downloads.html

1、添加依赖

在 pom.xml 文件中添加kafka依赖,依赖如下

	<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、配置Kafka信息

在 application.properties(或 application.yml)文件中配置 Kafka 的相关信息,下面是一个简单的示例:

#kafka地址,多个地址使用,分隔
spring.kafka.bootstrap-servers=127.0.0.1:9092
#消费者组ID
spring.kafka.consumer.group-id=myGroup
#序列化和反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3、发送消息

因为我们是springboot项目,已经集成了KafkaTemplate,我们可以直接使用KafkaTemplate来发送消息

下面,我编写一个发送消息的生产者

/*** 消息生产者*/
@Component
@Slf4j
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/*** 发送消息* @param topic 主题* @param msg   消息*/public void send(String topic,String msg){kafkaTemplate.send(topic,msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("发送消息失败:{}", ex);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("发送消息成功:{}");}});}/*** 发送消息* @param topic* @param msg*/public void send(String topic, Object msg) {send(topic, JSONObject.toJSONString(msg));}}

编写好生产者之后,我们就可以使用生产者发送消息,如下

	@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("send")public void sendMsg(){kafkaProducer.send("my-topic","hello world");}

如果想定制KafkaTemplate,那么可以在配置类进行配置,如下所示

@Configuration
public class KafakaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;/*** 配置属性* @return*/@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}/*** 定制KafkaTemplate* @return*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setDefaultTopic("myGroup");return kafkaTemplate;}}

4、消费消息

使用 @KafkaListener 注解创建 Kafka 消费者,并监听指定的主题。接收到消息后,可以通过方法参数来接收消息:

@Slf4j
@Component
public class KafkaConsumer {/*** 消费my-topic主题的消息* @param message*/@KafkaListener(topics = "my-topic",groupId = "myGroup1")public void  receiveMessage(String message){log.info("消费消息:"+message);}
}

同一消费者组,只会有一个消费者进行消费,如果想配置多个消费者同时处理,可以使用 @KafkaListener 注解来配置多个消费者。每个消费者需要配置不同的 group-id,监听主题一致,如下所示,就会有两个消费者同时消费

@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = "my-topic",groupId = "myGroup1")public void  receiveMessage(String message){log.info("消费消息:"+message);}@KafkaListener(topics = "my-topic",groupId = "myGroup2")public void  receiveMessage2(String message){log.info("消费消息:"+message);}}

相关文章:

Spring Boot学习(三十三):集成kafka

前言 下面是zookeeper和kafka的官网下载地址&#xff0c;大家可以学习下载 zookeeper下载地址&#xff1a;http://zookeeper.apache.org/releases.html kafka下载地址&#xff1a;http://kafka.apache.org/downloads.html 1、添加依赖 在 pom.xml 文件中添加kafka依赖&am…...

MOSFET

MOSFET 电子元器件百科 文章目录 MOSFET前言一、MOSFET是什么二、MOSFET类别三、MOSFET应用实例四、MOSFET作用原理总结前言 MOSFET是一种常见的半导体器件,通过栅极电场控制通道区的导通特性,以控制电流流动。它在现代电子电路中发挥着重要的作用,并广泛应用于各种应用领域…...

DriveWorks——参数化设计非标定制利器

DriveWorks基本介绍 DriveWorks是一套被 SOLIDWORKS 认可为金牌合作伙伴产品的设计自动化软件。DriveWorks 可自动创建特定于订单的销售文档和 SOLIDWORKS 制造数据。减少重复性任务&#xff0c;消除错误&#xff0c;增加销售额&#xff0c;并在创纪录的时间内交付定制产品。 为…...

DevEco Studio集成ArkUI-X

语雀知识库地址&#xff1a;语雀HarmonyOS知识库 飞书知识库地址&#xff1a;飞书HarmonyOS知识库 在上篇文章(HarmonyOS应用开发工具DevEco Studio安装与使用)中我说到官方推出了4.0 Beta版本的IDE&#xff0c;这篇文章就来介绍这个版本的安装与使用 该版本集成了HarmonyOS多…...

网络视频服务器的作用是什么?

随着互联网的快速发展和网络带宽的提升&#xff0c;网络视频已经成为人们日常生活中不可或缺的一部分。网络视频服务器作为支持和传输网络视频的关键基础设施&#xff0c;发挥着重要的作用。本文将以网络视频服务器的作用为方向&#xff0c;探讨其在现代社会中的重要性。 首先…...

解决vue3使用iconpark控制台预警提示问题

前言 最近在项目中使用 iconpark-icon 来管理图标&#xff0c;一切都很顺利&#xff0c;引入链接后&#xff0c;图标正常显示&#xff0c;没有报错。但是控制台却发出了预警信息。 [Vue warn]: Failed to resolve component: iconpark-icon If this is a native custom eleme…...

VMware 虚拟机 NAT 模式网络配置

配置的核心点在于 网关要一致&#xff0c;才能访问外网 比如下面的网关都是&#xff1a;192.168.145.2 问题总结&#xff1a; 当时重启电脑后如果连不上外网了&#xff0c;检查下 windows 服务中 NAT服务是否已经启动...

5-redis高级-哨兵

1 哨兵 1.1 python 操作哨兵 1 哨兵 # 主从---》一主多从-主库用来写-从库用来读-主库挂了--》整个系统就不能写数据了#主从复制存在的问题&#xff1a;1 主从复制&#xff0c;主节点发生故障&#xff0c;需要做故障转移&#xff0c;可以手动转移&#xff1a;让其中一个slave变…...

鸿蒙HarmonyOS4.0开发应用学习笔记

黑马程序员鸿蒙4.0视频学习笔记&#xff0c;供自己回顾使用。1.安装开发工具DevEco Studio 鸿蒙harmony开发文档指南 DevEco Studio下载地址 选择或者安装环境 选择和下载SDK 安装总览 编辑器界面 2.TypeScript语法 2.1变量声明 //string 、number、boolean、any、u…...

联通宽带+老毛子Padavan固件 开启IP v6

联通宽带开启IP v6 参考&#xff1a; 联通宽带开启 IPV6 的方法_联通ipv6怎么开通-CSDN博客 个人宽带如何开启IPv6网络访问 - 知乎 (zhihu.com) 首先&#xff0c;你要确定当前你所在的地区运营商已经开通了IPV6&#xff0c;可以使用手机流量 IP查询(ipw.cn) | IPv6测试 | IPv…...

唯创知音WT2003Hx系列单片机语音芯片:家庭理疗产品的智能声音伴侣

随着科技的不断创新&#xff0c;家庭理疗产品正迎来一场智能化的变革。唯创知音的WT2003Hx系列单片机语音芯片以其强大的功能和高品质音频播放能力&#xff0c;为家庭理疗产品带来了更为智能、沉浸式的用户体验。 1. MP3高品质音频播放 WT2003Hx系列语音芯片支持高品质的MP3音…...

2023_Spark_实验二十七:Linux中Crontab(定时任务)命令详解及使用教程

Crontab介绍&#xff1a; Linux crontab是用来crontab命令常见于Unix和类Unix的操作系统之中&#xff0c;用于设置周期性被执行的指令。该命令从标准输入设备读取指令&#xff0c;并将其存放于“crontab”文件中&#xff0c;以供之后读取和执行。该词来源于希腊语 chronos(χρ…...

Java动态代理实现与原理详细分析

Java动态代理实现与原理详细分析 关于Java中的动态代理&#xff0c;我们首先需要了解的是一种常用的设计模式–代理模式&#xff0c;而对于代理&#xff0c;根据创建代理类的 时间点&#xff0c;又可以分为静态代理和动态代理。 1、代理模式 代理模式是常用的java设计模式&…...

[实践总结] 使用Apache HttpClient 4.x进行进行一次Http请求

使用Apache HttpClient 4.x进行进行一次Http请求 依赖 <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --> <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactI…...

易宝OA 两处任意文件上传漏洞复现

0x01 产品简介 易宝OA系统是一种专门为企业和机构的日常办公工作提供服务的综合性软件平台,具有信息管理、 流程管理 、知识管理(档案和业务管理)、协同办公等多种功能。 0x02 漏洞概述 易宝OA系统UploadFile、BasicService.asmx等接口处存在文件上传漏洞,未授权的攻击者可…...

echart饼图高亮颜色设置,数据为0时候,labelLine不显示

鼠标移上去高亮&#xff0c;颜色变浅&#xff0c;希望不改变颜色 在series.data中为各项设置itemStyle&#xff0c;官方设置不生效&#xff0c;不知原因&#xff0c;可能版本问题 itemStyle: {normal: { color: #DFEAFF, },emphasis: { color: #DFEAFF }},数据为0时候显示饼图…...

Kafka 的消息格式:了解消息结构与序列化

Kafka 作为一款高性能的消息中间件系统&#xff0c;其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式&#xff0c;包括消息的结构、序列化与反序列化&#xff0c;以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析&…...

装箱 Box 数据类型

装箱是最简单直接的一种智能指针&#xff0c;它的类型是Box<T>。装箱使我们可以把数据存储到堆上&#xff0c;并在栈上保留一个指向堆数据的指针。装箱操作常常被用于下面的场景&#xff1a; 当你拥有一个无法在编译时确定大小的类型&#xff0c;但又想使用这个类型的值…...

多传感器融合SLAM在自动驾驶方向的初步探索的记录

1. VIO的不可观问题 现有的VIO都是解决的六自由度的问题, 但是对于行驶在路面上的车来说, 通常情况下不会有roll与z方向的自由度, 而且车体模型限制了不可能有纯yaw的变换. 同时由于IMU在Z轴上与roll, pitch上激励不足, 会导致IMU在初始化过程中尺度不准以及重力方向估计错误,…...

ffmpeg与opencv-python处理视频

安装 opencv pip install opencv-pythonFFmpeg 1.下载 FFmpeg 访问FFmpeg官方网站。选择 “Windows builds from gyan.dev” 链接&#xff0c;这会带您到一个包含最新版本 FFmpeg Windows 构建的页面。选择一个适合您系统的版本&#xff08;例如&#xff0c;32位或64位&…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...