当前位置: 首页 > news >正文

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml

加入Kafka的配置

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {private int id;private String description;private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "task-topic";ObjectMapper objectMapper = new ObjectMapper();for (int i = 1; i <= 5; i++) {// 定义一个对象实例User user = User.builder().id(1).name("Alice").build();Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();//JAVA对象转化为JSON字符串String message =  objectMapper.writeValueAsString(task);kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {String message = record.value();ObjectMapper objectMapper = new ObjectMapper();Task task = objectMapper.readValue(message,Task.class);// 取出System.out.println("User - Received: " + task.getAssignedUser());// 手动提交offsetacknowledgment.acknowledge();}
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

相关文章:

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency> 2、配置application.yml 加入Kafk…...

深度学习(1)

一、torch的安装 基于直接设备情况&#xff0c;选择合适的torch版本&#xff0c;有显卡的建议安装GPU版本&#xff0c;可以通过nvidia-smi命令来查看显卡驱动的版本&#xff0c;在官网中根据cuda版本&#xff0c;选择合适的版本号&#xff0c;下面是安装示例代码 GPU&#xff…...

golang 嵌入式armv7l压缩编译打包

编译 Go 应用程序 go build -ldflags"-s -w" -o myapp.exe . 使用 UPX 压缩可执行文件&#xff08;window下载并设置环境变量&#xff09; upx --best --lzma myapp.exe 可从10M压缩到1M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 …...

Makefile 之 join

join $(join <list1>,<list2> ) 名称&#xff1a;连接函数——join。 功能&#xff1a;把<list2>中的单词对应地加到<list1>的单词后面。如果<list1>的单词个数要比<list2>的多&#xff0c; 那么&#xff0c;<list1>中的多出…...

集合卡尔曼滤波(Ensemble Kalman Filter),用于二维滤波(模拟平面上的目标跟踪),MATLAB代码

集合卡尔曼滤波&#xff08;Ensemble Kalman Filter&#xff09; 文章目录 引言理论基础卡尔曼滤波集合卡尔曼滤波初始化预测步骤更新步骤卡尔曼增益更新集合 MATLAB 实现运行结果3. 应用领域结论 引言 集合卡尔曼滤波&#xff08;Ensemble Kalman Filter, EnKF&#xff09;是…...

北京申请中级职称流程(2024年)

想找个完整详细点的申请流程资料真不容易&#xff0c;做个分享送给需要的人吧。 不清楚为什么说文章过度宣传&#xff0c;把链接和页面去掉了&#xff0c;网上自己找一下。 最好用windows自带的EDGE浏览器打开申请网站&#xff0c;只有在开始申请的时间内才可以进行网上申报&…...

ubuntu.24安装cuda

1.下载CUDA Toolkit https://developer.nvidia.com/cuda-toolkit-archive 2.按照命令下载&#xff0c;安装 sudo sh cuda_12.2.2_535.104.05_linux.run 3.环境变量 sudo vi /etc/profile 最后面添加 export PATH“/usr/local/cuda-12.2/bin: P A T H " e x p o r t L D L…...

unity li2cpp逆向原理是什么?

主要涉及将Unity游戏引擎中的C#代码转换为C代码&#xff0c;并进一步编译为各平台的原生&#xff08;Native&#xff09;代码的过程&#xff0c;以及逆向工程工具如何利用这一过程中的特定文件来还原和分析原始代码。以下是对Unity IL2CPP逆向原理的详细解释&#xff1a; 对惹…...

Python网络爬虫实践案例:爬取猫眼电影Top100

以下是一个Python网络爬虫的实践案例&#xff0c;该案例将演示如何使用Python爬取猫眼电影Top100的电影名称、主演和上映时间等信息&#xff0c;并将这些信息保存到TXT文件中。此案例使用了requests库来发送HTTP请求&#xff0c;使用re库进行正则表达式匹配&#xff0c;并包含详…...

卷积神经网络(CNN)中的权重(weights)和偏置项(bias)

在卷积神经网络&#xff08;CNN&#xff09;中&#xff0c;权重&#xff08;weights&#xff09;和偏置项&#xff08;bias&#xff09;是两个至关重要的参数&#xff0c;它们在网络的学习和推断过程中起着关键作用。 一、权重&#xff08;Weights&#xff09; 1. 定义&#xf…...

华为FusionCube 500-8.2.0SPC100 实施部署文档

环境&#xff1a; 产品&#xff1a;FusionCube 500版本&#xff1a;8.2.0.SPC100场景&#xff1a;虚拟化基础设施平台&#xff1a;FusionCompute两节点 MCNA * 2硬件部署&#xff08;塔式交付场景&#xff09;免交换组网&#xff08;配置AR卡&#xff09; 前置准备 组网规划 节…...

Android 网络请求(二)OKHttp网络通信

学习笔记 OkHttp 是一个非常强大且流行的 HTTP 客户端库&#xff0c;广泛用于 Android 开发中进行网络请求。与 HttpURLConnection 相比&#xff0c;OkHttp 提供了更简单、更高效的 API&#xff0c;特别是在处理复杂的 HTTP 请求时。 如何使用 OkHttp 进行网络请求 以下是使…...

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…...

如何在Word文件中设置水印以及如何禁止修改水印

在日常办公和学习中&#xff0c;我们经常需要在Word文档中设置水印&#xff0c;以保护文件的版权或标明文件的机密性。水印可以是文字形式&#xff0c;也可以是图片形式&#xff0c;能够灵活地适应不同的需求。但仅仅设置水印是不够的&#xff0c;有时我们还需要确保水印不被随…...

.NET桌面应用架构Demo与实战|WPF+MVVM+EFCore+IOC+DI+Code First+AutoMapper

目录 .NET桌面应用架构Demo与实战|WPFMVVMEFCoreIOCDICode FirstAutoPapper技术栈简述项目地址&#xff1a;功能展示项目结构项目引用1. 新建模型2. Data层&#xff0c;依赖EF Core&#xff0c;实现数据库增删改查3. Bussiness层&#xff0c;实现具体的业务逻辑4. Service层&am…...

el-table根据指定字段合并行和列+根据屏幕高度实时设置el-table的高度

文章目录 html代码script代码arraySpanMethod.js代码 html代码 <template><div class"rightBar"><cl-table ref"tableData"border :span-method"arraySpanMethod" :data"tableData" :columns"columns":max-…...

图像处理 之 凸包和最小外围轮廓生成

“ 最小包围轮廓之美” 一起来欣赏图形之美~ 1.原始图片 男人牵着机器狗 2.轮廓提取 轮廓提取 3.最小包围轮廓 最小包围轮廓 4.凸包 凸包 5.凸包和最小包围轮廓的合照 凸包和最小包围轮廓的合照 上述图片中凸包、最小外围轮廓效果为作者实现算法生成。 图形几何之美系列&#…...

萤石设备视频接入平台EasyCVR私有化视频平台视频监控系统的需求及不同场景摄像机的选择

在现代社会&#xff0c;随着安全意识的提高和技术的进步&#xff0c;安防监控视频系统已成为保障人们生活和财产安全的重要工具。EasyCVR安防监控视频系统&#xff0c;以其先进的网络传输技术和强大的功能&#xff0c;为各种规模的项目提供了一个高效、可靠的监控解决方案。以下…...

网络安全之接入控制

身份鉴别 ​ 定义:验证主题真实身份与其所声称的身份是否符合的过程&#xff0c;主体可以是用户、进程、主机。同时也可实现防重放&#xff0c;防假冒。 ​ 分类:单向鉴别、双向鉴别、三向鉴别。 ​ 主题身份标识信息:密钥、用户名和口令、证书和私钥 Internet接入控制过程 …...

Sqlite: Java使用、sqlite-devel

这里写目录标题 一、简介二、使用1. Java项目中&#xff08;1&#xff09;引入驱动&#xff08;2&#xff09;工具类&#xff08;3&#xff09;调用举例 2. sqlite-devel in linuxsqlite-devel使用 三、更多应用1. 数据类型2. 如何存储日期和时间3. 备份 一、简介 非常轻量级&…...

创新音乐体验:foobox-cn全攻略

创新音乐体验&#xff1a;foobox-cn全攻略 【免费下载链接】foobox-cn DUI 配置 for foobar2000 项目地址: https://gitcode.com/GitHub_Trending/fo/foobox-cn 在数字音乐时代&#xff0c;如何将本地播放器与网络电台无缝融合&#xff0c;打造个性化的音乐中心&#xf…...

LangChain工具绑定避坑指南:为什么你的bind_tools不工作?

LangChain工具绑定深度解析&#xff1a;从原理到实战的避坑指南 当你第一次尝试在LangChain中绑定自定义工具时&#xff0c;可能会遇到各种令人困惑的问题——工具明明定义了却无法调用&#xff0c;参数传递总是出错&#xff0c;或者LLM完全无视你的工具指令。这些问题往往不是…...

pmap命令隐藏玩法:用-XX参数挖出Linux进程的所有内存秘密

pmap命令隐藏玩法&#xff1a;用-XX参数挖出Linux进程的所有内存秘密 当系统性能出现瓶颈时&#xff0c;开发者和运维工程师往往需要深入分析进程的内存使用情况。虽然常见的pmap -x命令能提供基本的内存映射信息&#xff0c;但真正的高手都知道&#xff0c;-XX选项才是揭开内…...

万字详解:现象级OpenClaw(俗称“龙虾”)能做什么-周红伟

OpenClaw是一款开源的AI智能体框架&#xff0c;它不是“聊天机器人”&#xff0c;而是“AI执行引擎”——连接大模型的思考能力与电脑的真实操作权限&#xff0c;让AI从“只说不做”变成“说到做到”&#xff0c;可自动完成文件管理、跨应用协同、浏览器操作、代码生成等复杂任…...

打破邮件营销壁垒:免费响应式HTML模板的实战指南

打破邮件营销壁垒&#xff1a;免费响应式HTML模板的实战指南 【免费下载链接】email-templates Free HTML email templates for Mailchimp and other emails services 项目地址: https://gitcode.com/gh_mirrors/ema/email-templates 一、邮件营销的隐形痛点与解决方案 …...

Comsol 锂枝晶模型 “五合一”:探索枝晶生长的多元奥秘

comsol 锂枝晶模型 五合一 单枝晶定向生长、多枝晶定向生长、多枝晶 随机生长只 无序生长随机形核以及雪花枝晶&#xff0c;包含相场、浓度场和电场三种物理场在锂电领域&#xff0c;锂枝晶的生长一直是研究的重点&#xff0c;因为它严重影响电池的安全性与性能。今天咱就来唠唠…...

AutoGLM-Phone-9B快速上手:图文语音全能AI,小白也能轻松部署

AutoGLM-Phone-9B快速上手&#xff1a;图文语音全能AI&#xff0c;小白也能轻松部署 1. AutoGLM-Phone-9B简介 1.1 什么是AutoGLM-Phone-9B AutoGLM-Phone-9B是一款专为移动设备优化的多模态AI模型&#xff0c;它能同时理解文字、图片和语音信息。简单来说&#xff0c;就像给…...

【技术选型指南】Avalonia、MAUI、Uno Platform、Flutter、Electron、Qt与Tauri:从场景到决策的深度剖析

1. 跨平台框架选型的核心考量因素 当你准备启动一个新项目或重构现有技术栈时&#xff0c;面对琳琅满目的跨平台框架&#xff0c;选择困难症很容易发作。我经历过多次这样的技术决策过程&#xff0c;发现关键在于先明确项目的核心需求。就像装修房子前要先确定是想要北欧简约风…...

【跟韩工学Ubuntu第9课】第9章 系统备份、恢复与迁移-005篇

文章目录 第9章 系统备份、恢复与迁移 Ubuntu Server 生产级系统管理(企业级完整版) 9.1 备份策略基础(企业级理论精讲) 9.1.1 企业备份核心价值观 9.1.2 企业级3-2-1备份黄金法则 9.1.3 全量备份(Full Backup) 定义 企业级优点 企业级缺点 企业适用场景 9.1.4 增量备份(…...

OpCore Simplify革新:4步实现OpenCore EFI配置的极简实践

OpCore Simplify革新&#xff1a;4步实现OpenCore EFI配置的极简实践 【免费下载链接】OpCore-Simplify A tool designed to simplify the creation of OpenCore EFI 项目地址: https://gitcode.com/GitHub_Trending/op/OpCore-Simplify 你是否曾在普通PC上安装macOS时&…...