当前位置: 首页 > news >正文

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml

加入Kafka的配置

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {private int id;private String description;private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "task-topic";ObjectMapper objectMapper = new ObjectMapper();for (int i = 1; i <= 5; i++) {// 定义一个对象实例User user = User.builder().id(1).name("Alice").build();Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();//JAVA对象转化为JSON字符串String message =  objectMapper.writeValueAsString(task);kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {String message = record.value();ObjectMapper objectMapper = new ObjectMapper();Task task = objectMapper.readValue(message,Task.class);// 取出System.out.println("User - Received: " + task.getAssignedUser());// 手动提交offsetacknowledgment.acknowledge();}
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

相关文章:

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency> 2、配置application.yml 加入Kafk…...

深度学习(1)

一、torch的安装 基于直接设备情况&#xff0c;选择合适的torch版本&#xff0c;有显卡的建议安装GPU版本&#xff0c;可以通过nvidia-smi命令来查看显卡驱动的版本&#xff0c;在官网中根据cuda版本&#xff0c;选择合适的版本号&#xff0c;下面是安装示例代码 GPU&#xff…...

golang 嵌入式armv7l压缩编译打包

编译 Go 应用程序 go build -ldflags"-s -w" -o myapp.exe . 使用 UPX 压缩可执行文件&#xff08;window下载并设置环境变量&#xff09; upx --best --lzma myapp.exe 可从10M压缩到1M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 …...

Makefile 之 join

join $(join <list1>,<list2> ) 名称&#xff1a;连接函数——join。 功能&#xff1a;把<list2>中的单词对应地加到<list1>的单词后面。如果<list1>的单词个数要比<list2>的多&#xff0c; 那么&#xff0c;<list1>中的多出…...

集合卡尔曼滤波(Ensemble Kalman Filter),用于二维滤波(模拟平面上的目标跟踪),MATLAB代码

集合卡尔曼滤波&#xff08;Ensemble Kalman Filter&#xff09; 文章目录 引言理论基础卡尔曼滤波集合卡尔曼滤波初始化预测步骤更新步骤卡尔曼增益更新集合 MATLAB 实现运行结果3. 应用领域结论 引言 集合卡尔曼滤波&#xff08;Ensemble Kalman Filter, EnKF&#xff09;是…...

北京申请中级职称流程(2024年)

想找个完整详细点的申请流程资料真不容易&#xff0c;做个分享送给需要的人吧。 不清楚为什么说文章过度宣传&#xff0c;把链接和页面去掉了&#xff0c;网上自己找一下。 最好用windows自带的EDGE浏览器打开申请网站&#xff0c;只有在开始申请的时间内才可以进行网上申报&…...

ubuntu.24安装cuda

1.下载CUDA Toolkit https://developer.nvidia.com/cuda-toolkit-archive 2.按照命令下载&#xff0c;安装 sudo sh cuda_12.2.2_535.104.05_linux.run 3.环境变量 sudo vi /etc/profile 最后面添加 export PATH“/usr/local/cuda-12.2/bin: P A T H " e x p o r t L D L…...

unity li2cpp逆向原理是什么?

主要涉及将Unity游戏引擎中的C#代码转换为C代码&#xff0c;并进一步编译为各平台的原生&#xff08;Native&#xff09;代码的过程&#xff0c;以及逆向工程工具如何利用这一过程中的特定文件来还原和分析原始代码。以下是对Unity IL2CPP逆向原理的详细解释&#xff1a; 对惹…...

Python网络爬虫实践案例:爬取猫眼电影Top100

以下是一个Python网络爬虫的实践案例&#xff0c;该案例将演示如何使用Python爬取猫眼电影Top100的电影名称、主演和上映时间等信息&#xff0c;并将这些信息保存到TXT文件中。此案例使用了requests库来发送HTTP请求&#xff0c;使用re库进行正则表达式匹配&#xff0c;并包含详…...

卷积神经网络(CNN)中的权重(weights)和偏置项(bias)

在卷积神经网络&#xff08;CNN&#xff09;中&#xff0c;权重&#xff08;weights&#xff09;和偏置项&#xff08;bias&#xff09;是两个至关重要的参数&#xff0c;它们在网络的学习和推断过程中起着关键作用。 一、权重&#xff08;Weights&#xff09; 1. 定义&#xf…...

华为FusionCube 500-8.2.0SPC100 实施部署文档

环境&#xff1a; 产品&#xff1a;FusionCube 500版本&#xff1a;8.2.0.SPC100场景&#xff1a;虚拟化基础设施平台&#xff1a;FusionCompute两节点 MCNA * 2硬件部署&#xff08;塔式交付场景&#xff09;免交换组网&#xff08;配置AR卡&#xff09; 前置准备 组网规划 节…...

Android 网络请求(二)OKHttp网络通信

学习笔记 OkHttp 是一个非常强大且流行的 HTTP 客户端库&#xff0c;广泛用于 Android 开发中进行网络请求。与 HttpURLConnection 相比&#xff0c;OkHttp 提供了更简单、更高效的 API&#xff0c;特别是在处理复杂的 HTTP 请求时。 如何使用 OkHttp 进行网络请求 以下是使…...

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…...

如何在Word文件中设置水印以及如何禁止修改水印

在日常办公和学习中&#xff0c;我们经常需要在Word文档中设置水印&#xff0c;以保护文件的版权或标明文件的机密性。水印可以是文字形式&#xff0c;也可以是图片形式&#xff0c;能够灵活地适应不同的需求。但仅仅设置水印是不够的&#xff0c;有时我们还需要确保水印不被随…...

.NET桌面应用架构Demo与实战|WPF+MVVM+EFCore+IOC+DI+Code First+AutoMapper

目录 .NET桌面应用架构Demo与实战|WPFMVVMEFCoreIOCDICode FirstAutoPapper技术栈简述项目地址&#xff1a;功能展示项目结构项目引用1. 新建模型2. Data层&#xff0c;依赖EF Core&#xff0c;实现数据库增删改查3. Bussiness层&#xff0c;实现具体的业务逻辑4. Service层&am…...

el-table根据指定字段合并行和列+根据屏幕高度实时设置el-table的高度

文章目录 html代码script代码arraySpanMethod.js代码 html代码 <template><div class"rightBar"><cl-table ref"tableData"border :span-method"arraySpanMethod" :data"tableData" :columns"columns":max-…...

图像处理 之 凸包和最小外围轮廓生成

“ 最小包围轮廓之美” 一起来欣赏图形之美~ 1.原始图片 男人牵着机器狗 2.轮廓提取 轮廓提取 3.最小包围轮廓 最小包围轮廓 4.凸包 凸包 5.凸包和最小包围轮廓的合照 凸包和最小包围轮廓的合照 上述图片中凸包、最小外围轮廓效果为作者实现算法生成。 图形几何之美系列&#…...

萤石设备视频接入平台EasyCVR私有化视频平台视频监控系统的需求及不同场景摄像机的选择

在现代社会&#xff0c;随着安全意识的提高和技术的进步&#xff0c;安防监控视频系统已成为保障人们生活和财产安全的重要工具。EasyCVR安防监控视频系统&#xff0c;以其先进的网络传输技术和强大的功能&#xff0c;为各种规模的项目提供了一个高效、可靠的监控解决方案。以下…...

网络安全之接入控制

身份鉴别 ​ 定义:验证主题真实身份与其所声称的身份是否符合的过程&#xff0c;主体可以是用户、进程、主机。同时也可实现防重放&#xff0c;防假冒。 ​ 分类:单向鉴别、双向鉴别、三向鉴别。 ​ 主题身份标识信息:密钥、用户名和口令、证书和私钥 Internet接入控制过程 …...

Sqlite: Java使用、sqlite-devel

这里写目录标题 一、简介二、使用1. Java项目中&#xff08;1&#xff09;引入驱动&#xff08;2&#xff09;工具类&#xff08;3&#xff09;调用举例 2. sqlite-devel in linuxsqlite-devel使用 三、更多应用1. 数据类型2. 如何存储日期和时间3. 备份 一、简介 非常轻量级&…...

基于RK3568嵌入式主板的智能炒菜机方案:从硬件选型到系统集成实战

1. 项目概述&#xff1a;当嵌入式主板“掌勺”智能厨房最近几年&#xff0c;智能厨电赛道卷得厉害&#xff0c;从智能电饭煲到自动炒菜机&#xff0c;大家都在琢磨怎么让做饭这件事变得更“傻瓜”。我接触过不少这类项目&#xff0c;发现一个核心痛点&#xff1a;很多所谓的“智…...

Vue/React/Svelte通用Lovable实践框架(内部首发):1套配置+4个插件=自动注入用户喜爱度

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;Vue/React/Svelte通用Lovable实践框架&#xff08;内部首发&#xff09;&#xff1a;1套配置4个插件自动注入用户喜爱度 Lovable 是一套面向用户体验&#xff08;UX&#xff09;可量化提升的前端工程化实践框架…...

2026年,写给所有还在迷茫的技术人:你的坚持终将闪耀

站在2026年的节点回望&#xff0c;整个互联网行业的寒潮似乎还没完全退去&#xff0c;AI大模型重构业务逻辑的浪潮又拍在了每个技术人的岸边。尤其是对千万软件测试从业者来说&#xff0c;这种迷茫感来得更加具体&#xff1a;手工测试岗位不断被自动化脚本挤压&#xff0c;纯功…...

自动化测试的最佳实践:这6个原则让你的测试脚本更稳定

在当前互联网行业快速迭代的开发模式下&#xff0c;自动化测试已经成为保障软件交付质量、提升测试效率的核心手段。据行业调研数据显示&#xff0c;成熟的互联网测试团队中&#xff0c;核心回归测试场景的自动化覆盖率已经超过80%&#xff0c;自动化测试承担了绝大部分重复性测…...

Solaar 4.0:解锁罗技设备的完整Linux管理体验

Solaar 4.0&#xff1a;解锁罗技设备的完整Linux管理体验 【免费下载链接】Solaar Linux device manager for Logitech devices 项目地址: https://gitcode.com/gh_mirrors/so/Solaar 你是否曾为管理多款罗技无线设备而烦恼&#xff1f;不同设备需要不同的配置工具&…...

如何用bsf创建第一个3D场景:从零开始的完整教程

如何用bsf创建第一个3D场景&#xff1a;从零开始的完整教程 【免费下载链接】B3DFramework Modern C library for the development of real-time graphical applications 项目地址: https://gitcode.com/gh_mirrors/bs/B3DFramework bsf&#xff08;B3DFramework&#x…...

机器学习博士生存指南:问题定义、三维技术栈与认知带宽管理

1. 这不是“读博指南”&#xff0c;而是一份机器学习方向博士生的生存手记 我带过7届硕士、指导过4位博士&#xff0c;自己也从MIT CSAIL实验室的PhD candidate一路走到现在&#xff0c;在工业界和学术界都完整跑过ML方向的闭环——从ICML投稿被拒5次到最终以共同作者身份参与N…...

【Elasticsearch从入门到精通】第08篇:Elasticsearch集群扩展与运维——水平扩展与节点管理

上一篇【第07篇】Elasticsearch集群安全配置——TLS/SSL与密钥库管理 下一篇【第09篇】Elasticsearch API规范详解——多索引、日期数学与通用选项 摘要 Elasticsearch天生为分布式设计&#xff0c;其高扩展性和高可用性是核心优势。但在实际生产中&#xff0c;如何合理规划节…...

Windows curl证书错误SEC_E_UNTRUSTED_ROOT解决方案

1. 这个错误不是curl的问题&#xff0c;而是Windows在替你“把关” 你在Windows命令行里敲下 curl https://api.example.com &#xff0c;结果弹出一串红色报错&#xff1a; curl: (35) schannel: next InitializeSecurityContext failed: Unknown error (0x80092012) - T…...

平均 CPU 利用率指标为何该摒弃?多个案例揭示真相!

1. 作者信息与文章背景Jeremy Theocharis 是《平凡即卓越》作者、UMH 联合创始人兼首席技术官。文章基于其在 2026 年 4 月云原生亚琛聚会上的演讲&#xff0c;探讨为何应摒弃平均 CPU 利用率指标。2. 应用程序问题引出我们应用程序中的一个 Go 函数在生产环境总是被取消执行。…...