当前位置: 首页 > news >正文

SpringBoot Kafka发送消息与接收消息实例

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>SpringBootKafka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafka</name><description>SpringBootKafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- pom.xml --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency></dependencies><repositories><repository><id>central</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:kafka:bootstrap-servers: 192.168.110.105:9092#streams:#application-id: my-streams-appconsumer:group-id: myGroupIdauto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;@Slf4j
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;private final ObjectMapper objectMapper;@Autowiredpublic KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {this.kafkaTemplate = kafkaTemplate;this.objectMapper = objectMapper;}public void sendMessage(String message) {log.info("KafkaProducer message:{}", message);//kafkaTemplate.send("test", message).addCallback();Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});// 使用whenComplete方法completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});/*future.whenComplete((result, ex) -> {if (ex == null) {// 成功发送RecordMetadata metadata = result.getRecordMetadata();System.out.println("Message sent successfully with offset: " + metadata.offset());} else {// 发送失败System.err.println("Failed to send message due to: " + ex.getMessage());}});*/}public void sendUser(User user) throws JsonProcessingException {//final ProducerRecord<String, String> record = createRecord(data);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);String userJson = objectMapper.writeValueAsString(user);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);/*future.addCallback(success -> System.out.println("Message sent successfully: " + userJson),failure -> System.err.println("Failed to send message: " + failure.getMessage()));*/CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});}
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "myGroupId")public void consume(String message) {System.out.println("Received message: " + message);log.info("KafkaConsumer message:{}", message);}
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class MessageController {private final KafkaProducer producer;@Autowiredpublic MessageController(KafkaProducer producer) {this.producer = producer;}@GetMapping("/send-message")public String sendMessage() {log.info("MessageController sendMessage start!");producer.sendMessage("hello, Kafka!");log.info("MessageController sendMessage end!");return "Message sent successfully.";}@GetMapping("/send")public String sendMessage1() {log.info("MessageController sendMessage1 start!");User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();try {producer.sendUser(user);} catch (JsonProcessingException e) {throw new RuntimeException(e);}log.info("MessageController sendMessage1 end!");return "Message sendMessage1 successfully.";}
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

相关文章:

SpringBoot Kafka发送消息与接收消息实例

前言 Kafka的基本工作原理 我们将消息的发布&#xff08;publish&#xff09;称作 producer(生产者)&#xff0c;将消息的订阅&#xff08;subscribe&#xff09;表述为 consumer&#xff08;消费者&#xff09;&#xff0c;将中间的存储阵列称作 broker(代理)&#xff0c;这…...

【资料分析】刷题日记2

第一套 √ 2013-2016一共有13&#xff0c;14&#xff0c;15&#xff0c;16四年&#xff0c;亦即16 - 13 1 4年 √ 是多少倍 ③vs④&#xff1a;都是只给出了年均增速&#xff0c;③求的是其中一年的&#xff0c;无法确定&#xff1b;④求的是这个时段总共的&#xff0c;可…...

Aigtek功率放大器怎么选择型号

功率放大器在各个领域中都扮演着重要的角色&#xff0c;用于增强信号的幅度&#xff0c;以满足特定的需求。在选择功率放大器型号时&#xff0c;需要综合考虑多个因素&#xff0c;如应用领域、功率要求、频率范围、输入输出特性等。下面安泰电子官网将从这些方面详细介绍功率放…...

【RabbitMQ】重试机制、TTL

重试机制 在消息从Broker到消费者的传递过程中&#xff0c;可能会遇到各种问题&#xff0c;如网络故障、服务不可用、资源不足等&#xff0c;这些问题都可能导致消息处理失败。为了解决这些问题&#xff0c;RabbitMQ提供了重试机制&#xff0c;允许消息在处理失败之后重新发送…...

Linux用户及用户组操作命令笔记

1.用户概念及作用 用户&#xff1a;指的是Linux操作系统中用于管理系统或者服务的人 Linux下一切皆文件&#xff0c;所以用户管理的是相应的文件 基本上分为两种&#xff1a; 基本管理&#xff1a;文件的创建、删除、复制、查找、打包压缩等&#xff1b;文件的权限增加、减…...

threejs加载高度图渲染点云,不支持tiff

问题点 使用的point来渲染高度图点云&#xff0c;大数据图片无效渲染点多&#xff08;可以通过八叉树过滤掉无效点增加效率&#xff0c;这个太复杂&#xff09;&#xff0c;但是胜在简单能用 效果图 code 代码可运行&#xff0c;无需npm <!DOCTYPE html> <html la…...

MySQL面试题——第二篇

1. MySQL的优化手段有哪些&#xff1f; MySQL的常见的优化手段有以下五种 1. 查询优化 避免select * ,只查询需要的字段。小表驱动大表&#xff0c;即小的数据集驱动大的数据集&#xff0c;比如当B表的数据集小于A表时&#xff0c;用in优化exist。两表执行顺序是先查B表&#x…...

Unity Transform 组件

在 Unity 中&#xff0c;Transform 是一个非常重要的组件&#xff0c;它定义了物体的位置、旋转和缩放&#xff0c;几乎每个 GameObject 都包含一个 Transform 组件。Transform 组件的主要属性如下&#xff1a; 1. position 表示物体在世界空间中的位置。可以通过 transf…...

LeeCode 3. 无重复字符的最长子串

经典方法滑动窗口:(两个指针) 针对这个题我们首先假定两个指针 left 和 right 分别指在数组最左端. 然后两个变量记录长度length和maxlength. 并且因为不能有重复的字符,我们使用HashSet结构来当收集结果的表. 随着右指针不断往右移,左指针和右指针之间的就为截取的字符,而这…...

使用canal.deployer-1.1.7和canal.adapter-1.1.7实现mysql数据同步

1、下载地址 --查看是否开启bin_log日志&#xff0c;value on表示开启 SHOW VARIABLES LIKE log_bin; -- 查看bin_log日志文件 SHOW BINARY LOGS; --查看bin_log写入状态 SHOW MASTER STATUS; --查看bin_log存储格式 row SHOW VARIABLES LIKE binlog_format; --查看数据库服…...

VMware Workstation Pro 17下载及安装教程

下载 好消息&#xff01;从VMware Workstation Pro 17开始&#xff0c;个人可以免费使用了&#xff0c;再也不需要找破解激活码啥的了。 但是坏处却不小&#xff1a;其下载变得异常复杂。首先需要注册账号&#xff0c;外网非常慢很可能注册不上&#xff1b;其次根本找不到下载…...

集采良药:从“天价神药”到低价良药,伊马替尼的真实世界研究!

在医疗科技日新月异的今天&#xff0c;有一种药物以其卓越的疗效和深远的影响力&#xff0c;成为了众多患者心中的“精准武器”——伊马替尼。这款药物不仅在慢性髓细胞白血病&#xff08;CML&#xff09;的治疗上屡创佳绩&#xff0c;更是胃肠道间质瘤&#xff08;GIST&#x…...

00898 互联网软件应用与开发自考复习题

资料来自互联网软件应用与开发大纲 南京航空航天大学 高纲4295和JSP 应用与开发技术(第 3 版) 马建红、李学相 清华大学出版社2019年 第一章 一、选择题 通过Internet发送请求消息和响应消息使用&#xff08;&#xff09;网络协议。 FTP B. TCP/IP C. HTTP D. DNS Web应…...

linux 进程间通信之pthread(条件变量共享和互斥锁共享)

0,互斥锁共享 初始化和销毁mutex互斥锁 int pthread_mutexattr_init(pthread_mutexattr_t *attr); int pthread_mutexattr_destroy(pthread_mutexattr_t *attr); 进程共享属性有两种值: 1、PTHREAD_PROCESS_PRIVATE,这个是默认值(1),同一个进程中的多个线程访问同一个…...

数据结构-2.7.单链表的查找与长度计算

注&#xff1a;本文只探讨"带头结点"的情况(查找思路类似循环找到第i-1 个结点的代码) 一.按位查找&#xff1a; 1.代码演示&#xff1a; 版本一&#xff1a; #include<stdio.h> #include<stdlib.h> ​ ​ //定义单链表结点类型 typedef struct LNo…...

iotop 命令:磁盘IO监控和诊断

一、命令简介 ​iotop​命令用于监视磁盘I/O&#xff0c;实时显示每个进程或线程的读写速率等信息。非常适合用于诊断系统中的I/O瓶颈。 ‍ ​​ ‍ 安装 iotop 在大多数Linux发行版中&#xff0c;iotop​可能不是预装的。可以使用包管理器来安装它。 例如&#xff0c;在…...

解锁编程新境界:GitHub Copilot 让效率翻倍

Number.1&#xff1a;工具介绍 功能特点&#xff1a; 智能代码生成与补全&#xff1a;通过学习大量代码库和开发者的编码风格&#xff0c;能根据上下文自动推断可能的代码补全选项&#xff0c;甚至可以自动完成函数定义、循环结构等复杂代码片段。例如&#xff0c;当编写一个算…...

爱普生相机SD卡格式化后数据恢复指南

我借了朋友的‌爱普生相机&#xff0c;想查看一下内存&#xff0c;哎呀&#xff0c;一不小心按错了&#xff0c;竟然执行了格式化操作&#xff0c;这可真是太让人郁闷了&#xff0c;这还有机会挽救数据吗&#xff1f;心塞&#xff0c;求帮助&#xff01; 随着数码摄影的普及&am…...

【数据结构】排序算法---基数排序

文章目录 1. 定义2. 算法步骤2.1 MSD基数排序2.2 LSD基数排序 3. LSD 基数排序动图演示4. 性质5. 算法分析6. 代码实现C语言PythonJavaCGo 结语 ⚠本节要介绍的不是计数排序 1. 定义 基数排序&#xff08;英语&#xff1a;Radix sort&#xff09;是一种非比较型的排序算法&…...

二叉树(下)

目录 判断树是否相同 判断树是不是另一棵树的子树 二叉树翻转 判断平衡二叉树 二叉树层序遍历 这篇主要提供一些关于二叉树例题的讲解&#xff0c;如果对二叉树及其基本操作有疑问的可以转至&#xff1a; 二叉树&#xff08;上&#xff09;-CSDN博客二叉树&#xff08;中&…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

Nuxt.js 中的路由配置详解

Nuxt.js 通过其内置的路由系统简化了应用的路由配置&#xff0c;使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发&#xff0c;实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构&#xff0c;服务器端使用Java Servlet处理请求&#xff0c;数据库采用MySQL存储信息&#xff0…...