当前位置: 首页 > news >正文

kafka+spring cloud stream 发送接收消息

方案 1:使用旧版 @StreamListener(适用于 Spring Cloud Stream <= 2.x)

1. 添加依赖(pom.xml

<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 定义消息通道接口

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannels {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

3. 使用 @StreamListener 监听

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {

    @StreamListener(MyChannels.INPUT)
    public void handleMessage(String message) {
        System.out.println("Received via @StreamListener: " + message);
    }
}

4. 配置 application.properties

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain

方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class KafkaStreamListener {

    @Bean
    public Consumer<String> myInput() {
        return message -> {
            System.out.println("Received via Function: " + message);
        };
    }
}

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain

生产者代码示例(发送消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendMessage(String topic, String message) {
        streamBridge.send(topic, message);
    }
}

测试步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。

  2. 创建 Topic

    kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

  3. 发送消息

    kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");

  4. 查看消费者日志

    Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!


常见问题

  1. 版本兼容性

    • Spring Cloud Stream 3.x 后需使用函数式编程。

    • 检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。

  2. 绑定配置

    • 函数式模型中,绑定名称格式为 <functionName>-in-<index>(如 myInput-in-0)。

  3. 序列化配置

    • 若传递 JSON 对象,需配置 content-type=application/json 并添加 Jackson 依赖。


总结

  • 旧版:使用 @StreamListener + 通道接口(适合遗留代码升级)。

  • 新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。

  • 根据实际 Spring Cloud Stream 版本选择方案!

相关文章:

kafka+spring cloud stream 发送接收消息

方案 1&#xff1a;使用旧版 StreamListener&#xff08;适用于 Spring Cloud Stream < 2.x&#xff09; 1. 添加依赖&#xff08;pom.xml&#xff09; <!-- Spring Cloud Stream Kafka Binder --> <dependency> <groupId>org.springframework.clo…...

使用ArcGIS Pro自动矢量化水系

在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;自动矢量化是一项至关重要的技术&#xff0c;它能够将栅格图像中的要素转换为矢量数据&#xff0c;从而方便后续的分析和处理。本文将详细介绍如何使用ArcGIS Pro自动矢量化水系&#xff0c;适用于那些颜色相对统一、…...

在PyCharm中运行Jupyter Notebook的.ipynb文件及其pycharm软件的基础使用

&#xff08;注意需使用PyCharm专业版&#xff0c;学生、教师可以申请免费使用&#xff1a;https://www.jetbrains.com/shop/eform/students&#xff09; 1. pycharm2024版汉化 https://blog.csdn.net/m0_74103046/article/details/144560999 2. pycharm中的python控制台和J…...

实验 Figma MCP + Cursor 联合工作流

开源项目 Figma-Context-MCP 介绍 使用此 Model Context Protocol 服务器授予 Cursor 对 Figma 文件的访问权限。 当 Cursor 可以访问 Figma 设计数据时&#xff0c;它比粘贴屏幕截图等其他方法更能准确地进行代码转化。 开源仓库&#xff1a; GLips/Figma-Context-MCP 具体…...

移植live555 上的 rtsp

一、V4L2视频采集模块&#xff08;完整示例&#xff09; #include <linux/videodev2.h> #include <sys/ioctl.h> #include <fcntl.h>// 初始化V4L2摄像头 int init_v4l2_camera(const char* dev_path, int width, int height) {int fd open(dev_path, O_RD…...

Web Worker终极优化指南:4秒卡顿→0延迟的实战蜕变

&#x1f4a1; 导读&#xff1a;从4秒卡顿到丝滑响应 真实痛点场景&#xff1a;当斐波那契数列计算量达10亿次时&#xff0c;页面完全冻结4.2秒&#xff01;通过Web Worker优化后&#xff0c;UI响应时间降至16ms以内。本文手把手带您实现性能蜕变&#xff01; 一、Web Worker核…...

redis中的Lua脚本,redis的事务机制

lua脚本的特点 lua脚本可以操作redis数据库&#xff0c;并且脚本中的代码满足原子性&#xff0c;要么全部被执行&#xff0c;要么全部不执行 lua脚本的语法 脚本示例 lua脚本的草稿&#xff1a; 最终的lua脚本 lua脚本在java里调用的方法 RedisTemplete类里有一个方法&…...

CPU多级缓存与缓存一致性协议

CPU多级缓存与缓存一致性协议 CPU多级缓存和缓存一致性协议是计算机体系结构中优化性能与保证数据正确性的核心机制。以下从缓存层级设计、工作原理、一致性协议&#xff08;如MESI&#xff09;及其实现细节展开说明。 一、为什么需要多级缓存&#xff1f; CPU的计算速度远高…...

Apifox 增强 AI 接口调试功能:自动合并 SSE 响应、展示DeepSeek思考过程

在现代的API接口调试中&#xff0c;效率和精确性对于开发者和测试人员来说至关重要。Apifox&#xff0c;作为一款功能强大的API管理和调试工具&#xff0c;近年来不断提升其用户体验和智能化功能。最近&#xff0c;Apifox 推出了增强版的AI接口调试功能&#xff0c;其中包括自动…...

【电机控制】42步进电机+arduino:WHEELTEC_MS42DDC

轮趣科技 42步进电机arduino:WHEELTEC_MS42DDC 接线方式&#xff1a; WHEELTEC_MS42DDC有两个接口&#xff0c; 一端接口连接配套的DC电源&#xff0c;另外一端只需要用三根线&#xff0c;一根负极连接ardino 的GND&#xff0c;然后把该端口的tx和rx连接到arduino的rx和tx,下…...

使用LangChain构建第一个ReAct Agent

使用LangChain构建第一个ReAct Agent 准备环境 使用Anaconda 安装python 3.10 安装langchain、langchain_openai、langchain_community &#xff08;安装命令 pip install XXX&#xff09; 申请DeepSeek API&#xff1a;https://platform.deepseek.com/api_keys&#xff08;也…...

萝卜头笔作文赏析

在遥远的无寻王国&#xff0c;有这么一支小小的笔诞生了&#xff0c;人们见它又短又小&#xff0c;于是就给它取名叫萝卜头笔。萝卜头笔渐渐长大了&#xff0c;除了身子变粗些&#xff0c;其他什么都没变。一天&#xff0c;萝卜头笔来到了深山老林&#xff0c;那里枝叶繁茂&…...

RT-Thread+STM32L475VET6——USB鼠标模拟

文章目录 前言一、板载资源二、具体步骤1.配置icm20608传感器2.打开CubeMX进行USB配置3. 配置USB3.1 打开USB驱动3.2 声明USB3.3 剪切stm32xxxx_hal_msp.c中的void HAL_PCD_MspInit(PCD_HandleTypeDef* hpcd)和void HAL_PCD_MspDeInit(PCD_HandleTypeDef* hpcd)函数至board.c3.…...

rust 安全性

Rust 是 静态类型&#xff08;statically typed&#xff09; 语言&#xff0c; 也就是说在编译时就必须知道所有变量的类型&#xff0c; 这一点将贯穿整个章节。 C/C的安全问题 内存的不正确访问引发的内存安全问题 由于多个变量指向同一块内存区域导致的数据一致性问题 由于…...

大模型驱动的围术期质控系统全面解析与应用探索

目录 一、引言 1.1 研究背景与意义 1.2 研究目的与方法 1.3 研究创新点 二、大模型技术与围术期管理概述 2.1 大模型技术原理与发展现状 2.2 围术期管理流程与挑战 三、大模型在术前的应用 3.1 病历内涵质控 3.2 智能医学问答与知识查询 3.3 疾病风险预测与评估 3.…...

中兴B863AV3.2-T/B863AV3.1-T2/B863AV3.1-T2K_电信高安_S905L3A-B_安卓9.0_线刷固件包

中兴B863AV3.2-T&#xff0f;B863AV3.1-T2&#xff0f;B863AV3.1-T2K_电信高安_S905L3A-B_安卓9.0_线刷固件包 B863AV3.2-T B863AV3.1-T2 已知可通刷贵州、江苏、贵州、北京、河南、陕西等省份。 线刷方法&#xff1a;&#xff08;新手参考借鉴一下&#xff09; 1、准备好一…...

Android Binder机制

Binder是IPC&#xff08;进程间通信&#xff09;的一种机制&#xff0c;它允许不同的应用或系统服务在不同的进程中安全地交换数据。Binder的核心原理是基于客户端-服务器模型&#xff08;C/S架构)。 一、Binder的定义 1. Binder是Android中的一个类&#xff0c;它继承了IBind…...

【算法】初等数论

初等数论 模 取余&#xff0c;遵循尽可能让商向0靠近的原则&#xff0c;结果的正负和左操作数相同 取模&#xff0c;遵循尽可能让商向负无穷靠近的原则&#xff0c;结果的正负和右操作数相同 7/&#xff08;-3&#xff09;-2.3&#xff0c;产生了两个商-2和-3&#xff0c;取…...

Spring Boot3+Vue2极速整合:10分钟搭建DeepSeek AI对话系统

前言 在生成式AI技术蓬勃发展的今天&#xff0c;大语言模型已成为企业智能化转型和个人效率提升的核心驱动力。作为国产大模型的优秀代表&#xff0c;DeepSeek凭借其卓越的中文语义理解能力和开发者友好的API生态&#xff0c;正在成为构建本土化AI应用的首选平台。 本文将以S…...

Spring事务原理 二

在上一篇博文《Spring事务原理 一》中&#xff0c;我们熟悉了Spring声明式事务的AOP原理&#xff0c;以及事务执行的大体流程。 本文中&#xff0c;介绍了Spring事务的核心组件、传播行为的源码实现。下一篇中&#xff0c;我们将结合案例&#xff0c;来讲解实战中有关事务的易…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

工厂方法模式和抽象工厂方法模式的battle

1.案例直接上手 在这个案例里面&#xff0c;我们会实现这个普通的工厂方法&#xff0c;并且对比这个普通工厂方法和我们直接创建对象的差别在哪里&#xff0c;为什么需要一个工厂&#xff1a; 下面的这个是我们的这个案例里面涉及到的接口和对应的实现类&#xff1a; 两个发…...

【笔记】AI Agent 项目 SUNA 部署 之 Docker 构建记录

#工作记录 构建过程记录 Microsoft Windows [Version 10.0.27871.1000] (c) Microsoft Corporation. All rights reserved.(suna-py3.12) F:\PythonProjects\suna>python setup.py --admin███████╗██╗ ██╗███╗ ██╗ █████╗ ██╔════╝…...

[QMT量化交易小白入门]-六十二、ETF轮动中简单的评分算法如何获取历史年化收益32.7%

本专栏主要是介绍QMT的基础用法,常见函数,写策略的方法,也会分享一些量化交易的思路,大概会写100篇左右。 QMT的相关资料较少,在使用过程中不断的摸索,遇到了一些问题,记录下来和大家一起沟通,共同进步。 文章目录 相关阅读1. 策略概述2. 趋势评分模块3 代码解析4 木头…...

从0开始学习R语言--Day17--Cox回归

Cox回归 在用医疗数据作分析时&#xff0c;最常见的是去预测某类病的患者的死亡率或预测他们的结局。但是我们得到的病人数据&#xff0c;往往会有很多的协变量&#xff0c;即使我们通过计算来减少指标对结果的影响&#xff0c;我们的数据中依然会有很多的协变量&#xff0c;且…...