当前位置: 首页 > news >正文

Spring Boot教程之五十六:用 Apache Kafka 消费 JSON 消息

Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息

Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。

为了了解如何创建 Spring Boot 项目,请参阅本文。

工作步骤 

步骤 1:

转到Spring 初始化程序并创建具有以下依赖项的启动项目: 
Spring for Apache Kafka

步骤 2:

在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并重写toString方法以查看 JSON 格式的消息。以下是学生类的实现:

  • 学生模型

// Java program to implement a

// student class

 

// Creating a student class

public class Student {

 

    // Data members of the class

    int id;

    String firstName;

    String lastName;

 

    // Constructor of the student

    // Class

    public Student()

    {

    }

 

    // Parameterized constructor of

    // the student class

    public Student(int id, String firstName,

                   String lastName)

    {

        this.id = id;

        this.firstName = firstName;

        this.lastName = lastName;

    }

 

    @Override

    public String toString()

    {

        return "Student{"

            + "id = " + id

            + ", firstName = '" + firstName + "'"

            + ", lastName = '" + lastName + "'"

            + "}";

    }

}

步骤 3:

创建一个新的类Config并添加注释@Configuration@EnableKafka。现在使用 Student 类对象创建 Bean ConsumerFactoryConcurrentKafkaListenerContainerFactory 。

  • 配置类

@EnableKafka

@Configuration

public class Config {

 

    // Function to establish a connection

    // between Spring application

    // and Kafka server

    @Bean

    public ConsumerFactory<String, Student>

    studentConsumer()

    {

 

        // HashMap to store the configurations

        Map<String, Object> map

            = new HashMap<>();

 

        // put the host IP in the map

        map.put(ConsumerConfig

                    .BOOTSTRAP_SERVERS_CONFIG,

                "127.0.0.1:9092");

 

        // put the group ID of consumer in the map

        map.put(ConsumerConfig

                    .GROUP_ID_CONFIG,

                "id");

        map.put(ConsumerConfig

                    .KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        map.put(ConsumerConfig

                    .VALUE_DESERIALIZER_CLASS_CONFIG,

                JsonDeserializer.class);

 

        // return message in JSON formate

        return new DefaultKafkaConsumerFactory<>(

            map, new StringDeserializer(),

            new JsonDeserializer<>(Student.class));

    }

 

    @Bean

    public ConcurrentKafkaListenerContainerFactory<String,

                                                   Student>

    studentListner()

    {

        ConcurrentKafkaListenerContainerFactory<String,

                                                Student>

            factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(studentConsumer());

        return factory;

    }

}

步骤 4:

创建一个带有@Service注释的KafkaService类。此类将包含用于在控制台上发布消息的侦听器方法。 

  • KafkaService 类

@Service

public class KafkaService {

 

    // Annotation required to listen

    // the message from Kafka server

    @KafkaListener(topics = "JsonTopic",

                   groupId = "id", containerFactory

                                   = "studentListner")

    public void

    publish(Student student)

    {

        System.out.println("New Entry: "

                           + student);

    }

}

步骤 5:

启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此,打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。

步骤6:

现在使用下面给出的命令创建一个新主题: 

bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 mac 和 linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 windows 

步骤 7:

现在运行 Kafka 生产者控制台,使用以下命令: 

bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // 适用于 mac 和 linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // 适用于 windows 

步骤 8:

运行应用程序并在 Kafka 生产器上输入消息并按回车键。

相关文章:

Spring Boot教程之五十六:用 Apache Kafka 消费 JSON 消息

Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息 Apache Kafka 是一个流处理系统&#xff0c;可让您在进程、应用程序和服务器之间发送消息。在本文中&#xff0c;我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。 为了了解如何创建 …...

Elasticsearch ES|QL 地理空间索引加入纽约犯罪地图

可以根据地理空间数据连接两个索引。在本教程中&#xff0c;我将向你展示如何通过混合邻里多边形和 GPS 犯罪事件坐标来创建纽约市的犯罪地图。 安装 如果你还没有安装好自己的 Elasticsearch 及 Kibana 的话&#xff0c;请参考如下的链接来进行安装。 如何在 Linux&#xff0…...

csp-j知识点:联合(Union)的基本概念

一、联合&#xff08;Union&#xff09;的基本概念 联合是C/C语言中一种特殊的数据结构&#xff0c;它的主要特点是所有成员共享同一块内存空间。这意味着在任何给定时刻&#xff0c;联合中只有一个成员是有效的&#xff0c;因为它们都占用相同的物理内存位置。联合的大小取决…...

docker-compose 方式安装部署confluence

一、confluence简介 Confluence是一款由澳大利亚软件公司Atlassian开发的企业协作工具。它是一个基于web的团队协作平台&#xff0c;用于帮助团队成员共享和协同工作的知识、文档、想法和项目。 Confluence提供了一个集中管理和共享文档、知识库和项目信息的平台。团队成员可…...

深入理解计算机系统阅读笔记-第十二章

第12章 网络编程 12.1 客户端-服务器编程模型 每个网络应用都是基于客户端-服务器模型的。根据这个模型&#xff0c;一个应用时由一个服务器进程和一个或者多个客户端进程组成。服务器管理某种资源&#xff0c;并且通过操作这种资源来为它的客户端提供某种服务。例如&#xf…...

网络原理(九):数据链路层 - 以太网协议 应用层 - DNS 协议

目录 1. 数据链路层 1.1 以太网协议 1.1.1 以太网帧格式 1.2 mac 地址 1.2.1 IP 地址和 mac 地址的区别 1.3 帧中的类型字段 1.3.1 MTU - 最长载荷长度 1.3.2 ARP 协议 2. DNS 协议 1. 数据链路层 数据链路层, 是一个底层的层次, 主要用于交换机开发, 对于 Java 开发…...

rtthread学习笔记系列(4/5/6/7/15/16)

文章目录 4. 杂项4.1 检查是否否是2的幂 5. 预编译命令void类型和rt_noreturn类型的区别 6.map文件分析7.汇编.s文件7.1 汇编指令7.1.1 BX7.1.2 LR链接寄存器7.1.4 []的作用7.1.4 简单的指令 7.2 MSR7.3 PRIMASK寄存器7.4.中断启用禁用7.3 HardFault_Handler 15 ARM指针寄存器1…...

【拒绝算法PUA】3065. 超过阈值的最少操作数 I

系列文章目录 【拒绝算法PUA】0x00-位运算 【拒绝算法PUA】0x01- 区间比较技巧 【拒绝算法PUA】0x02- 区间合并技巧 【拒绝算法PUA】0x03 - LeetCode 排序类型刷题 【拒绝算法PUA】LeetCode每日一题系列刷题汇总-2025年持续刷新中 C刷题技巧总结&#xff1a; [温习C/C]0x04 刷…...

今日总结 2025-01-14

学习目标 掌握运用 VSCode 开发 uni - app 的配置流程。学会将配置完善的项目作为模板上传至 Git&#xff0c;实现复用。项目启动 创建项目&#xff1a;借助 Vue - Cli 方式创建项目&#xff0c;推荐从国内地址 https://gitee.com/dcloud/uni - preset - vue/repository/archiv…...

关于扫描模型 拓扑 和 传递贴图工作流笔记

关于MAYA拓扑和传递贴图的操作笔记 一、拓扑低模: 1、拓扑工作区位置: 1、准备出 目标 高模。 (高模的状态如上 ↑ )。 2、打开顶点吸附,和建模工具区,选择四边形绘制. 2、拓扑快捷键使…...

C#知识|泛型Generic概念与方法

哈喽&#xff0c;你好啊&#xff0c;我是雷工&#xff01; 关于泛型在前面学习记录过 《泛型集合List相关方法》、《Dictionary泛型集合的使用总结》&#xff1b; 其中泛型集合 List<T>、Dictionary<k,v>所在的命名空间为&#xff1a;System.Collection.Generic…...

centos 8 中安装Docker

注&#xff1a;本次样式安装使用的是centos8 操作系统。 1、镜像下载 具体的镜像下载地址各位可以去官网下载&#xff0c;选择适合你们的下载即可&#xff01; 1、CentOS官方下载地址&#xff1a;https://vault.centos.org/ 2、阿里云开源镜像站下载&#xff1a;centos安装包…...

vscode vue 自动格式化

vscode vue 自动格式化 安装Prettier和Vetur插件 选择设置&#xff0c;并且转到编辑文件。增加如下内容。 {"editor.formatOnSave": true,"editor.defaultFormatter": "esbenp.prettier-vscode","[vue]": {"editor.defaultFor…...

Webpack 5 混淆插件terser-webpack-plugin生命周期作用时机和使用注意事项

参考案例代码 海南酷森科技有限公司/webpack-simple-demo Terser&#xff08;简要的/简短的&#xff09; 混淆依据 混淆是发生在代码已经 bundle 之后的事情 变量或者函数在被引用或赋值时才能被混淆 孤立的函数或者变量可能会被移除&#xff0c;但不会被混淆&#xff0c;要…...

MQTT(Message Queuing Telemetry Transport)协议

文章目录 一、MQTT 的原理1. 通信模型2. 核心概念3. 工作流程 二、MQTT 的优势1. 轻量级2. 异步通信3. 可靠性4. 实时性5. 支持断线重连6. 跨平台支持7. 安全性 三、MQTT 的典型应用场景四、与其他协议的对比 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;…...

【MySQL学习笔记】MySQL存储过程

存储过程 1、基础语法2、变量2.1 系统变量2.2 用户自定义变量2.3 局部变量 3、if 流程控制4、参数5、case 流程控制6、循环结构6.1 while 循环6.2 repeat 循环6.3 loop 循环 7、游标 存储过程是事先经过编译并存储在数据库中的一段 SQL 语句的集合&#xff0c;调用存储过程可以…...

Vue2+OpenLayers实现折线绘制、起始点标记和轨迹打点的完整功能(提供Gitee源码)

目录 一、案例截图 二、安装OpenLayers库 三、代码实现 3.1、HTML页面 3.2、初始化变量 3.3、创建起始点位 3.4、遍历轨迹点 3.5、画折线 3.6、初始化弹窗信息 3.7、初始化地图上标点的点击事件 3.8、完整代码 四、Gitee源码 一、案例截图 二、安装OpenLayers库 n…...

基于Spring Boot的城市垃圾分类管理系统设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…...

linux: 文本编辑器vim

文本编辑器 vi的工作模式 (vim和vi一致) 进入vim的方法 方法一:输入 vim 文件名 此时左下角有 "文件名" 文件行数,字符数量 方法一: 输入 vim 新文件名 此时新建了一个文件并进入vim,左下角有 "文件名"[New File] 灰色的长方形就是光标,输入文字,左下…...

Eclipse Debug 调试

关于Eclipse的Debug调试功能&#xff0c;有几点重要的信息可以分享。 Debug的启动方式&#xff1a;Eclipse提供了多种启动程序调试的方式&#xff0c;包括通过菜单(Run –> Debug)、点击“绿色臭虫”图标、右键选择Debug As以及使用快捷键(F11)【0†source】。 调试中最常用…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

在软件开发中正确使用MySQL日期时间类型的深度解析

在日常软件开发场景中&#xff0c;时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志&#xff0c;到供应链系统的物流节点时间戳&#xff0c;时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库&#xff0c;其日期时间类型的…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...