当前位置: 首页 > news >正文

kafka+spring cloud stream 发送接收消息

方案 1:使用旧版 @StreamListener(适用于 Spring Cloud Stream <= 2.x)

1. 添加依赖(pom.xml

<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 定义消息通道接口

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannels {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

3. 使用 @StreamListener 监听

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {

    @StreamListener(MyChannels.INPUT)
    public void handleMessage(String message) {
        System.out.println("Received via @StreamListener: " + message);
    }
}

4. 配置 application.properties

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain

方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class KafkaStreamListener {

    @Bean
    public Consumer<String> myInput() {
        return message -> {
            System.out.println("Received via Function: " + message);
        };
    }
}

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain

生产者代码示例(发送消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendMessage(String topic, String message) {
        streamBridge.send(topic, message);
    }
}

测试步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。

  2. 创建 Topic

    kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

  3. 发送消息

    kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");

  4. 查看消费者日志

    Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!


常见问题

  1. 版本兼容性

    • Spring Cloud Stream 3.x 后需使用函数式编程。

    • 检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。

  2. 绑定配置

    • 函数式模型中,绑定名称格式为 <functionName>-in-<index>(如 myInput-in-0)。

  3. 序列化配置

    • 若传递 JSON 对象,需配置 content-type=application/json 并添加 Jackson 依赖。


总结

  • 旧版:使用 @StreamListener + 通道接口(适合遗留代码升级)。

  • 新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。

  • 根据实际 Spring Cloud Stream 版本选择方案!

相关文章:

kafka+spring cloud stream 发送接收消息

方案 1&#xff1a;使用旧版 StreamListener&#xff08;适用于 Spring Cloud Stream < 2.x&#xff09; 1. 添加依赖&#xff08;pom.xml&#xff09; <!-- Spring Cloud Stream Kafka Binder --> <dependency> <groupId>org.springframework.clo…...

使用ArcGIS Pro自动矢量化水系

在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;自动矢量化是一项至关重要的技术&#xff0c;它能够将栅格图像中的要素转换为矢量数据&#xff0c;从而方便后续的分析和处理。本文将详细介绍如何使用ArcGIS Pro自动矢量化水系&#xff0c;适用于那些颜色相对统一、…...

在PyCharm中运行Jupyter Notebook的.ipynb文件及其pycharm软件的基础使用

&#xff08;注意需使用PyCharm专业版&#xff0c;学生、教师可以申请免费使用&#xff1a;https://www.jetbrains.com/shop/eform/students&#xff09; 1. pycharm2024版汉化 https://blog.csdn.net/m0_74103046/article/details/144560999 2. pycharm中的python控制台和J…...

实验 Figma MCP + Cursor 联合工作流

开源项目 Figma-Context-MCP 介绍 使用此 Model Context Protocol 服务器授予 Cursor 对 Figma 文件的访问权限。 当 Cursor 可以访问 Figma 设计数据时&#xff0c;它比粘贴屏幕截图等其他方法更能准确地进行代码转化。 开源仓库&#xff1a; GLips/Figma-Context-MCP 具体…...

移植live555 上的 rtsp

一、V4L2视频采集模块&#xff08;完整示例&#xff09; #include <linux/videodev2.h> #include <sys/ioctl.h> #include <fcntl.h>// 初始化V4L2摄像头 int init_v4l2_camera(const char* dev_path, int width, int height) {int fd open(dev_path, O_RD…...

Web Worker终极优化指南:4秒卡顿→0延迟的实战蜕变

&#x1f4a1; 导读&#xff1a;从4秒卡顿到丝滑响应 真实痛点场景&#xff1a;当斐波那契数列计算量达10亿次时&#xff0c;页面完全冻结4.2秒&#xff01;通过Web Worker优化后&#xff0c;UI响应时间降至16ms以内。本文手把手带您实现性能蜕变&#xff01; 一、Web Worker核…...

redis中的Lua脚本,redis的事务机制

lua脚本的特点 lua脚本可以操作redis数据库&#xff0c;并且脚本中的代码满足原子性&#xff0c;要么全部被执行&#xff0c;要么全部不执行 lua脚本的语法 脚本示例 lua脚本的草稿&#xff1a; 最终的lua脚本 lua脚本在java里调用的方法 RedisTemplete类里有一个方法&…...

CPU多级缓存与缓存一致性协议

CPU多级缓存与缓存一致性协议 CPU多级缓存和缓存一致性协议是计算机体系结构中优化性能与保证数据正确性的核心机制。以下从缓存层级设计、工作原理、一致性协议&#xff08;如MESI&#xff09;及其实现细节展开说明。 一、为什么需要多级缓存&#xff1f; CPU的计算速度远高…...

Apifox 增强 AI 接口调试功能:自动合并 SSE 响应、展示DeepSeek思考过程

在现代的API接口调试中&#xff0c;效率和精确性对于开发者和测试人员来说至关重要。Apifox&#xff0c;作为一款功能强大的API管理和调试工具&#xff0c;近年来不断提升其用户体验和智能化功能。最近&#xff0c;Apifox 推出了增强版的AI接口调试功能&#xff0c;其中包括自动…...

【电机控制】42步进电机+arduino:WHEELTEC_MS42DDC

轮趣科技 42步进电机arduino:WHEELTEC_MS42DDC 接线方式&#xff1a; WHEELTEC_MS42DDC有两个接口&#xff0c; 一端接口连接配套的DC电源&#xff0c;另外一端只需要用三根线&#xff0c;一根负极连接ardino 的GND&#xff0c;然后把该端口的tx和rx连接到arduino的rx和tx,下…...

使用LangChain构建第一个ReAct Agent

使用LangChain构建第一个ReAct Agent 准备环境 使用Anaconda 安装python 3.10 安装langchain、langchain_openai、langchain_community &#xff08;安装命令 pip install XXX&#xff09; 申请DeepSeek API&#xff1a;https://platform.deepseek.com/api_keys&#xff08;也…...

萝卜头笔作文赏析

在遥远的无寻王国&#xff0c;有这么一支小小的笔诞生了&#xff0c;人们见它又短又小&#xff0c;于是就给它取名叫萝卜头笔。萝卜头笔渐渐长大了&#xff0c;除了身子变粗些&#xff0c;其他什么都没变。一天&#xff0c;萝卜头笔来到了深山老林&#xff0c;那里枝叶繁茂&…...

RT-Thread+STM32L475VET6——USB鼠标模拟

文章目录 前言一、板载资源二、具体步骤1.配置icm20608传感器2.打开CubeMX进行USB配置3. 配置USB3.1 打开USB驱动3.2 声明USB3.3 剪切stm32xxxx_hal_msp.c中的void HAL_PCD_MspInit(PCD_HandleTypeDef* hpcd)和void HAL_PCD_MspDeInit(PCD_HandleTypeDef* hpcd)函数至board.c3.…...

rust 安全性

Rust 是 静态类型&#xff08;statically typed&#xff09; 语言&#xff0c; 也就是说在编译时就必须知道所有变量的类型&#xff0c; 这一点将贯穿整个章节。 C/C的安全问题 内存的不正确访问引发的内存安全问题 由于多个变量指向同一块内存区域导致的数据一致性问题 由于…...

大模型驱动的围术期质控系统全面解析与应用探索

目录 一、引言 1.1 研究背景与意义 1.2 研究目的与方法 1.3 研究创新点 二、大模型技术与围术期管理概述 2.1 大模型技术原理与发展现状 2.2 围术期管理流程与挑战 三、大模型在术前的应用 3.1 病历内涵质控 3.2 智能医学问答与知识查询 3.3 疾病风险预测与评估 3.…...

中兴B863AV3.2-T/B863AV3.1-T2/B863AV3.1-T2K_电信高安_S905L3A-B_安卓9.0_线刷固件包

中兴B863AV3.2-T&#xff0f;B863AV3.1-T2&#xff0f;B863AV3.1-T2K_电信高安_S905L3A-B_安卓9.0_线刷固件包 B863AV3.2-T B863AV3.1-T2 已知可通刷贵州、江苏、贵州、北京、河南、陕西等省份。 线刷方法&#xff1a;&#xff08;新手参考借鉴一下&#xff09; 1、准备好一…...

Android Binder机制

Binder是IPC&#xff08;进程间通信&#xff09;的一种机制&#xff0c;它允许不同的应用或系统服务在不同的进程中安全地交换数据。Binder的核心原理是基于客户端-服务器模型&#xff08;C/S架构)。 一、Binder的定义 1. Binder是Android中的一个类&#xff0c;它继承了IBind…...

【算法】初等数论

初等数论 模 取余&#xff0c;遵循尽可能让商向0靠近的原则&#xff0c;结果的正负和左操作数相同 取模&#xff0c;遵循尽可能让商向负无穷靠近的原则&#xff0c;结果的正负和右操作数相同 7/&#xff08;-3&#xff09;-2.3&#xff0c;产生了两个商-2和-3&#xff0c;取…...

Spring Boot3+Vue2极速整合:10分钟搭建DeepSeek AI对话系统

前言 在生成式AI技术蓬勃发展的今天&#xff0c;大语言模型已成为企业智能化转型和个人效率提升的核心驱动力。作为国产大模型的优秀代表&#xff0c;DeepSeek凭借其卓越的中文语义理解能力和开发者友好的API生态&#xff0c;正在成为构建本土化AI应用的首选平台。 本文将以S…...

Spring事务原理 二

在上一篇博文《Spring事务原理 一》中&#xff0c;我们熟悉了Spring声明式事务的AOP原理&#xff0c;以及事务执行的大体流程。 本文中&#xff0c;介绍了Spring事务的核心组件、传播行为的源码实现。下一篇中&#xff0c;我们将结合案例&#xff0c;来讲解实战中有关事务的易…...

VisualCppRedist AIO:Windows运行库一站式终极解决方案

VisualCppRedist AIO&#xff1a;Windows运行库一站式终极解决方案 【免费下载链接】vcredist AIO Repack for latest Microsoft Visual C Redistributable Runtimes 项目地址: https://gitcode.com/gh_mirrors/vc/vcredist 还在为打开软件时遇到"缺少MSVCP140.dll…...

终极Windows音频路由指南:用Audio Router实现多设备音频分发

终极Windows音频路由指南&#xff1a;用Audio Router实现多设备音频分发 【免费下载链接】audio-router Routes audio from programs to different audio devices. 项目地址: https://gitcode.com/gh_mirrors/au/audio-router 你是否曾为Windows系统的音频管理而烦恼&am…...

别再对着手册发愁了!手把手教你用STM32 HAL库搞定TDC-GP22的SPI通信(附完整代码)

STM32 HAL库驱动TDC-GP22激光测距模块实战指南 第一次拿到TDC-GP22模块时&#xff0c;我盯着那堆SPI时序图和寄存器配置说明发呆了半小时——文档里每个字都认识&#xff0c;但连起来就是不知道从哪下手。如果你也正在经历这种痛苦&#xff0c;别担心&#xff0c;这篇指南会带你…...

别再只盯着电磁力了:从模态匹配角度,聊聊电机NVH设计的极槽配合选择

电机NVH设计的极槽配合选择&#xff1a;模态匹配视角下的实战指南 当一台电机在实验室里发出刺耳的啸叫声时&#xff0c;工程师们的第一反应往往是检查电磁力参数。但鲜为人知的是&#xff0c;真正决定NVH&#xff08;噪声、振动与声振粗糙度&#xff09;性能的关键&#xff0c…...

告别云端:在树莓派4B上搭建你的私有AI聊天机器人(基于llama.cpp)

在树莓派4B上构建私有AI聊天机器人的完整实践指南 从零开始的边缘智能革命 当ChatGPT掀起全球AI浪潮时&#xff0c;大多数用户只能通过云端服务体验大语言模型的魅力。但有一群技术极客正在探索另一种可能——如何将这些强大的AI能力装进口袋大小的设备里。树莓派4B作为最受欢迎…...

Python高级应用系列(十三)Python C扩展与性能加速:Cython、ctypes、cffi

前言 Python以开发效率和可读性著称,但「性能」始终是其软肋。在CPU密集型场景下,纯Python代码的执行速度可能比C/C++慢数十甚至上百倍。 然而Python生态提供了多种性能加速方案,从调用C库到将Python代码编译为C,层次丰富、适用场景各异: 方案 定位 适用场景 ctypes 调用…...

逆向糖豆视频:从动态加载到防盗链破解的实战解析

1. 糖豆视频逆向分析的核心挑战 第一次尝试爬取糖豆视频时&#xff0c;我遇到了几个让人头疼的问题。最明显的就是视频只能播放5秒就中断&#xff0c;这其实是典型的防盗链机制在起作用。糖豆视频采用了动态加载技术&#xff0c;真实视频地址隐藏在层层接口之后&#xff0c;需要…...

Python Counter实战:5个数据分析场景让你秒懂这个统计神器

Python Counter实战&#xff1a;5个数据分析场景让你秒懂这个统计神器 在数据分析的日常工作中&#xff0c;统计元素出现频率是最基础却最频繁的需求之一。想象一下这样的场景&#xff1a;你需要分析电商平台上哪些商品被用户频繁浏览&#xff0c;或者统计社交媒体上热门话题的…...

GAN潜在空间探索与可控人脸生成实战

1. GAN潜在空间探索&#xff1a;从随机噪声到可控人脸生成生成对抗网络&#xff08;GAN&#xff09;最迷人的特性之一就是其潜在空间&#xff08;latent space&#xff09;的结构化特性。这个看似随机的多维空间&#xff0c;经过训练后实际上蕴含着丰富的语义信息。想象一下&am…...

“农机云”平台Docker安全加固白皮书:通过CIS Docker Benchmark 1.4.0认证的11项强制配置(附自动化check脚本)

第一章&#xff1a;农机云平台Docker安全加固白皮书概述农机云平台作为面向农业智能化的核心基础设施&#xff0c;其容器化部署广泛依赖 Docker 引擎承载边缘计算节点、农机调度服务、遥感数据处理微服务等关键组件。本白皮书聚焦于生产环境中 Docker 运行时与镜像生命周期的安…...