当前位置: 首页 > news >正文

使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

1、本文架构

本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上,微服务部署在Windows 11系统上,Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之间通过Kafka Server实现异步通信。

出于便于测试的目的,我通过浏览器触发Kafka Producer发送消息,观察Kafka Consumer的后台是否打印出接收到的消息内容。

Ubuntu 上部署Kafka Server,详见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的搭建过程和完整代码,详见博文:微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

Kafka Producer微服务的完整代码,详见博文:使用微服务Spring Cloud集成Kafka实现异步通信-CSDN博客

本文的重点是实现下图中的深蓝色部分:Kafka Consumer微服务。

2、创建Spring boot项目(Kafka Consumer微服务项目):

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart

项目代码的完整目录如下图所示:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>microservice-kafka-consumer</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>microservice-kafka-consumer</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>         <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR4</version><type>pom</type><scope>import</scope></dependency>               </dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

编辑application.yml,配置kafka消费者:

consumer:
      #消费的主题
      topic: test-topic
      #消费者组id
      group-id: test-group
      #是否自动提交偏移量
      enable-auto-commit: true
      #提交偏移量的间隔-毫秒
      auto-commit-ms: 1000
      #客户端消费的会话超时时间-毫秒
      session-timeout-ms: 10000
      #实现DeSerializer接口的反序列化类键
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #实现DeSerializer接口的反序列化类值
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:port: 8030
spring:application:name: microservice-kafka-consumerkafka:bootstrap-servers: 192.168.23.131:9092consumer:group-id: test-groupenable-auto-commit: trueauto-commit-ms: 1000session-timeout-ms: 10000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializereureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true            

App.java的完整代码如下:

package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.kafka.annotation.KafkaListener;@SpringBootApplication
@EnableDiscoveryClient
public class App 
{@KafkaListener(topics = "mydemo1")public void listen(String msg) throws Exception {System.out.println( "-----> Recv a msg: " + msg );}public static void main( String[] args ){System.out.println( "Hello World!" );SpringApplication.run(App.class, args);}
}

3、测试

在浏览器输入,触发Kafka Producer向Kafka Server发送消息:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

在Kafka Consumer的后台打印出收到的消息:

相关文章:

使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

1、本文架构 本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上&#xff0c;微服务部署在Windows 11系统上&#xff0c;Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之…...

docker pull 超时Timeout失败的解决办法

当国内开发者docker pull遇到如下提示时&#xff0c;不要惊讶 [rootvm /]# docker pull postgres Using default tag: latest Error response from daemon: Get "https://registry-1.docker.io/v2/": dial tcp 128.121.146.235:443: i/o timeout [rootvm /]# 自2024…...

YOLOv7改进之主干DAMOYOLO结构,结合 CReToNeXt 结构,打造高性能检测器

一、DAMOYOLO理论部分 论文地址:2211.15444 (arxiv.org) 在本报告中,我们提出了一种快速准确的对象检测方法,称为 DAMO-YOLO,它实现了比最先进的 YOLO 系列更高的性能。DAMO-YOLO 是从 YOLO 扩展而来的,具有一些新技术,包括神经架构搜索 (NAS)、高效的重新参数化广义 …...

进度条(倒计时)Linux

\r回车(回到当前行开头) \n换行 行缓冲区概念 什么现象&#xff1f; 什么现象&#xff1f;&#xff1f; 什么现象&#xff1f;&#xff1f;&#xff1f; 自己总结&#xff1a; #pragma once 防止头文件被重复包含 倒计时 在main.c中&#xff0c;windows.h是不可以用的&…...

[每周一更]-(第117期):硬盘分区表类型:MBR和GPT区别

文章目录 1. **支持的磁盘容量**2. **分区数量**3. **引导方式**4. **冗余和数据恢复**5. **兼容性**6. **安全性**7. **操作系统支持**8. 对比 国庆假期前补一篇 在一次扫描机械硬盘故障的问题&#xff0c;发现我本机SSD和机械硬盘的分类型不一样&#xff0c;分别是GPT和MBR&a…...

河南移动:核心营业系统稳定运行超300天,数据库分布式升级实践|OceanBase案例

河南移动&#xff0c;作为电信全业务运营企业&#xff0c;不仅拥有庞大的客户群体和业务规模&#xff0c;还引领着业务产品与服务体系的创新发展。河南移动的原有核心营业系统承载着超过6000万的庞大用户量&#xff0c;管理着超过80TB的海量数据&#xff0c;因此也面临着数据规…...

22.1 k8s不同role级别的服务发现

本节重点介绍 : 服务发现的应用3种采集的k8s服务发现role 容器基础资源指标 role :nodek8s服务组件指标 role :endpoint部署在pod中业务埋点指标 role :pod 服务发现的应用 所有组件将自身指标暴露在各自的服务端口上&#xff0c;prometheus通过pull过来拉取指标但是promet…...

OpenCV计算机视觉库

计算机视觉和图像处理 Tensorflow入门深度神经网络图像分类目标检测图像分割OpenCVPytorchNLP自然语言处理 OpenCV 一、OpenCV简介1.1 简介1.2 OpenCV部署1.3 OpenCV模块 二、OpenCV基本操作2.1 图像的基本操作2.1.1 图像的IO操作2.1.2 绘制几何图像2.1.3 获取并修改图像的像素…...

CentOS 系统中的文件挂载 U 盘

要将 CentOS 系统中的文件保存到 U 盘&#xff0c;可以按照以下步骤进行操作&#xff1a; 一、插入 U 盘并确定设备名称 将 U 盘插入 CentOS 系统的 USB 接口。使用 fdisk -l 命令查看系统中的磁盘和分区情况&#xff0c;确定 U 盘的设备名称。通常 U 盘会显示为类似于 /dev/…...

Lumerical脚本语言-变量操作(Manipulating variables)

下面的命令用来创建和存取变量。 命令描述= 赋值操作符 :数组操作符 []创建矩阵 % 创建包含空格的变量名称 linspace 创建线性空间数组 matrix 创建一个全为 0 的矩阵 randmatrix 创建一个所有元素为 0~1 之间的一个随机数的矩阵 randnmatrix 创建一个所有元素为平均值为 0…...

一个基本的包括爬虫、数据存储和前端展示框架0

创建一个完整的网络爬虫和前端展示页面是一个涉及多个步骤和技术的任务。下面我将为你提供一个基本的框架,包括爬虫代码(使用Python和Scrapy框架)和前端HTML页面(伏羲.html)。 爬虫代码 (使用Scrapy) 首先,你需要安装Scrapy库:bash pip install scrapy 然后,创建一个新…...

简历制作面试篇

一.面试技巧分析 模板: 推荐使用简洁一点的模板,不要太花哨,能够让HR和面试官清楚,快速知道信息就可以,太花哨容易分散别人的注意力。 格式: 一般选用PDF,不要用WORD。 照片: 技术岗一般不用贴照片,推进写上自己的联系方式或者微信。 专业技能: 描述专业技能…...

智能制造--EAP设备自动化程序

EAP是设备自动化程序&#xff08;Equipment Automation Program&#xff09;的缩写&#xff0c;他是一种用于控制制造设备进行自动化生产的系统。EAP系统与MES系统整合&#xff0c;校验产品信息&#xff0c;自动做账&#xff0c;同时收集产品生产过程中的制程数据和设备参数数据…...

LabVIEW混合控制器质量检测

随着工业自动化水平的提高&#xff0c;对控制器的精度、稳定性、可靠性要求也在不断上升。特别是在工程机械、自动化生产、风力发电等领域&#xff0c;传统的质量检测方法已无法满足现代工业的高要求。因此&#xff0c;开发一套自动化、精确、可扩展的混合控制器质量检测平台成…...

新技术浪潮下的等保测评:云计算、物联网与大数据的挑战与机遇

随着信息技术的飞速发展&#xff0c;云计算、物联网&#xff08;IoT&#xff09;和大数据等新兴技术正以前所未有的速度改变着我们的生活和工作方式。这些技术的广泛应用不仅为信息系统带来了前所未有的性能提升&#xff0c;同时也对等保测评&#xff08;信息安全等级保护测评&…...

微信小程序技术框架选型

“近期在对团队的微信小程序进行技术框架选型&#xff0c;故对目前主流的微信小程序技术框架进行了一些分析和比较&#xff0c;包括各框架的维护团队、社区链接、GitHub star数、优缺点对比等方面&#xff0c;为团队提供技术框架选型参考” 一、引言 随着移动互联网的快速发展…...

SQL学习3

24.10.3学习目录 一.c语言操作数据库 一.c语言操作数据库 &#xff08;1&#xff09;打开、关闭数据库函数 //打开数据库 int sqlite3_open(char *db_name,sqlite3 **db);db_name&#xff1a;数据库文件名&#xff0c;若文件名中有ASCLL码中以外的字符&#xff0c;其必须为UT…...

Linux:进程控制(一)

目录 一、写时拷贝 1.创建子进程 2.写时拷贝 二、进程终止 1.函数返回值 2.错误码 3.异常退出 4.exit 5._exit 一、写时拷贝 父子进程&#xff0c;代码共享&#xff0c;不作写入操作时&#xff0c;数据也是共享的&#xff0c;当任意一方试图写入&#xff0c;便通过写时拷…...

初识算法 · 双指针(3)

目录 前言&#xff1a; 和为s的两数之和 题目解析&#xff1a; ​编辑 算法原理&#xff1a; 算法编写&#xff1a; 三数之和 题目解析 算法原理 算法编写 前言&#xff1a; 本文通过介绍和为S的两数之和&#xff0c;以及三数之和&#xff0c;对双指针算法进行深一步…...

【AI知识点】近似最近邻搜索(ANN, Approximate Nearest Neighbor Search)

近似最近邻搜索&#xff08;ANN, Approximate Nearest Neighbor Search&#xff09; 是一种用于高维数据检索的技术&#xff0c;目标是在给定查询的情况下&#xff0c;快速找到距离查询点最近的数据点&#xff0c;尽管结果可能并不完全精确。这种方法特别适用于高维数据&#x…...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》

在注意力分散、内容高度同质化的时代&#xff0c;情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现&#xff0c;消费者对内容的“有感”程度&#xff0c;正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中&#xff0…...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域&#xff0c;准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具&#xff0c;正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...