kafka复习:(3)自定义序列化器和反序列化器
一、实体类定义:
public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}
二、自定义序列化器和反序列化器
import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}
三、定义生产者和消费者
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}相关文章:
kafka复习:(3)自定义序列化器和反序列化器
一、实体类定义: public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name name;}public String getAddress() {return address;}public void setAddress(String a…...
Unity 图片资源的适配
前言 最近小编做Unity项目时,发现在资源处理这方面和Android有所不同;例如:Android的资源文件夹res下会有着mipmap-mdpi,mipmap-hdpi,mipmap-xhdpi,mipmap-xxhdpi,mipmap-xxxhdpi这五个文件夹&a…...
【Axure高保真原型】通过输入框动态控制折线图
今天和大家分享通过输入框动态控制折线图的原型模板,在输入框里维护项目数据,可以自动生成对应的折线图,鼠标移入对应折点,可以查看对应数据。使用也非常方便,只需要修改输入框里的数据,或者复制粘贴文本&a…...
【Java】树结构数据的搜索
这里写自定义目录标题 需要实现的效果前端需要的json格式:一定是一个完整的树结构错误错误的返回格式错误的返回格式实现的效果 正确正确的返回格式正确的展示画面 后端逻辑分析代码总览 数据库表结构 需要实现的效果 前端需要的json格式:一定是一个完整…...
ElementUI中的日历组件加载无效的问题
在ElementUI中提供了一个日历组件。在某些场景下还是比较有用的。只是在使用的时候会有些下坑,大家要注意下。 官网提供的信息比较简介。我们在引入到项目中使用的时候可以能会出现下面的错误提示。 Unknown custom element: <el-calendar> - did you …...
Git版本管理(03)stash临时操作和.gitignore配置
1 git stash操作(临时存储) 1.1 git stash常见流程 当你修改了某一个分支,但此时要切换分支时如果直接切换会因为一些修改冲突而checkout失败,那么此时就可以使用git stash命令来解决该问题。一般流程为: $git pull# 将当前未提交的修改…...
【ThingJS | 3D可视化】开发框架,一站式数字孪生
博主:_LJaXi Or 東方幻想郷 专栏: 数字孪生 | 3D可视化框架 开发工具:ThingJS在线开发工具 ThingJs 低代码开发 ThingJs 低代码开发注意点场景效果配置层级层级常用API实例化 Thing,加载场景load 加载函数ThingJs 层级关系图查找层…...
SpringBoot返回响应排除为 null 的字段
SpringBoot返回响应排除为 null 的字段 可以通过全局配置,使返回响应中为null的字段,不在出现在返回结果中。 注意:这样配置,使得返回响应包含的字段随请求结果变化,响应到底包含哪些字段不直观;除非业务…...
华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)
第41题 以下关于IS-IS协议说法错误的是? A、IS-IS协议支持CLNP网络 B、IS-IS 协议支持IP 网络 C、IS-IS 协议的报文直接由数据链路层封装 D、IS-IS协议是运行在AS之间的链路状态协议 答案:D 解析: 关于IS-IS协议的说法错误是D. IS-IS协议是运行在A…...
OpenAI推出GPT-3.5Turbo微调功能并更新API;Midjourney更新局部绘制功能
🦉 AI新闻 🚀 OpenAI推出GPT-3.5Turbo微调功能并更新API,将提供GPT-4微调功能 摘要:OpenAI宣布推出GPT-3.5Turbo微调功能,并更新API,使企业和开发者能够定制ChatGPT,达到或超过GPT-4的能力。通…...
相机成像之3A算法的综述
3A算法是摄像机成像控制技术中的三大自动控制算法。随着计算机视觉的迅速发展,该算法在摄像器材领域具有广泛的应用和前景。 那么3A控制算法又是指什么呢? (1)AE (Auto Exposure)自动曝光控制 (2)AF (Auto Focus)自动聚焦控制 (3)AWB (Auto White Balance)自动白平衡控…...
最新AI系统ChatGPT程序源码/微信公众号/H5端+搭建部署教程+完整知识库
一、前言 SparkAi系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。 那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧!…...
OpenCV实例(九)基于深度学习的运动目标检测(二)YOLOv2概述
基于深度学习的运动目标检测(二)YOLOv2&YOLOv3概述 1.YOLOv2概述2.YOLOv3概述2.1 新的基础网络结构:2.2 采用多尺度预测机制。2.3 使用简单的逻辑回归进行分类 1.YOLOv2概述 对YOLO存在的不足,业界又推出了YOLOv2。YOLOv2主要…...
【Docker】已经创建好的Docker怎么设置开机自启
已经创建好的Docker怎么设置开机自启 1.使用命令Docker update来完成2.查看是否开启3.验证是否开启 1.使用命令Docker update来完成 操作步骤: docker update --restartalways 容器ID2.查看是否开启 docker inspect 容器Id看到这里RestartPolicy设置为如图&#…...
E - Excellent Views
Problem - E - Codeforces 问题描述:数组H大小都不相同。从i到j是可行的,当且仅当 不存在 k ,使 ∣ i − k ∣ ≤ ∣ i − j ∣ , H k > H j 不存在k,使 \\ |i - k| \leq |i - j|, \quad H_k > H_j 不存在k,使…...
WiFi天线和NB-IoT天线不通用
表面看起来完全一样。但是把WiFi天线插到NB-IoT设备后,信号弱了很多。还导致设备反复重启...
IoT DC3 是一个基于 Spring Cloud 的开源的、分布式的物联网(IoT)平台本地部署步骤
dc3 windows 本地搭建步骤: 必要软件环境 进入原网页# 务必保证至少需要给 docker 分配:1 核 CPU 以及 4G 以上的运行内存! JDK : 推荐使用 Oracle JDK 1.8 或者 OpenJDK8,理论来说其他版本也行; Maven : 推荐…...
VBA Excel自定义函数的使用 简单的语法
一个简单的教程,实现VBA自定义函数。 新建模块 复制后面的代码放进来 函数的入口参数不定义,则认为是一块区域; 反之,如FindChar1 As String,则认为是输入的单值。 循环和分支如下例子,VB比较接近自然语…...
字节跳动 从需求到上线全流程 软件工程流程 需求评估 MVP
走进后端开发流程 整个课程会带大家先从理论出发,思考为什么有流程 大家以后工作的团队可能不一样,那么不同的团队也会有不同的流程,这背后的逻辑是什么 然后会带大家按照走一遍从需求到上线的全流程,告诉大家在流程的每个阶段&am…...
线性代数-矩阵的本质
线性代数-矩阵的本质 线性代数-矩阵的本质...
YOLO26改进 | MSHC多尺度异构卷积:用方形核与条带核捕获复杂空间纹理,以清晰动机打造超强创新!
# YOLO26改进最新创新改进系列 | MSHC多尺度异构卷积:用方形核与条带核捕获复杂空间纹理,以清晰动机打造超强创新! 购买相关资料后畅享一对一答疑! 畅享超多免费持续更新且可大幅度提升文章档次的纯干货工具! 这篇采用…...
Windows安卓子系统终极指南:从基础配置到专业开发全流程
Windows安卓子系统终极指南:从基础配置到专业开发全流程 【免费下载链接】WSA Developer-related issues and feature requests for Windows Subsystem for Android 项目地址: https://gitcode.com/gh_mirrors/ws/WSA 想要在Windows 11上无缝运行安卓应用吗&…...
深耕区域数字生态,智森传媒赋能本地中小企业破局增长
在本地生活流量红利消退、行业内卷加剧的当下,中小企业数字化转型已不是选择题,而是生存题。十堰智森网络传媒立足本土市场,以技术研发为根基,以区域获客为核心,以数字人直播为抓手,为中小企业搭建全链路数…...
PCI总线‘对话’的艺术:主从设备如何通过FRAME#、STOP#信号优雅地‘开始’与‘结束’传输
PCI总线‘对话’的艺术:主从设备如何通过FRAME#、STOP#信号优雅地‘开始’与‘结束’传输 在计算机系统的内部世界里,总线的数据传输就像一场精心编排的舞会。PCI总线作为这场舞会的舞台,主从设备之间的每一次交互都遵循着严格的礼仪规则。这…...
教培机构管理越忙越乱?用对工具,比多雇两个人更高效
不少培训机构校长都有同样的感受:明明团队很拼,每天从早忙到晚,可机构依旧问题不断。招生线索散落在微信、表格、登记本里,跟进不及时就白白流失;排课全靠人工核对,老师冲突、教室撞期、调课通知不到位是常…...
揭秘AI教材生成秘诀!AI教材写作工具助力,低查重完成20万字教材!
教材编写难题与AI工具解决方案 在编写教材时,如何才能精准满足不同的需求呢?不同学段的学生在认知能力上存在显著差异,内容过于复杂或简单都不合适;而在课堂教学和自主学习等不同场景下,对教材的要求又各不相同&#…...
别再想当然!用AD628/INA等差分放大器做单端采集,必须搞懂的共模电压计算(附Excel工具)
差分放大器单端采集实战指南:共模电压计算与设计避坑 在工业传感器接口和医疗设备信号链设计中,差分放大器常被用于单端信号采集的场景。许多工程师习惯性地认为,只要将差分放大器的负输入端接地,就能轻松实现单端转差分功能。但实…...
开放-构建-创新-连接:AMD AI开发者日即将登陆上海
近日,AMD宣布其面向AI 开发者的年度技术盛会2026年AMD AI 开发者日 (AMD AI DevDay 2026) 将于 5 月 19 日在上海前滩香格里拉酒店举行,AMD 董事会主席兼首席执行官 Lisa Su 博士也将出席并发表演讲。 本着“开放-构建-创新-连接”的理念,本…...
3分钟快速上手:91160-cli医疗预约自动化助手完整指南
3分钟快速上手:91160-cli医疗预约自动化助手完整指南 【免费下载链接】91160-cli 健康160全自动挂号脚本,捡漏神器 项目地址: https://gitcode.com/gh_mirrors/91/91160-cli 还在为医院挂号难而烦恼吗?91160-cli是一款专为医疗预约设计…...
量子计算威胁下的密码安全:从后量子密码到密码敏捷性实战解析
1. 量子计算:从实验室概念到国家安全的“灰犀牛”最近几年,每当我和业内的同行、安全专家,甚至是投资圈的朋友聊起前沿技术风险,话题总会在某个时刻滑向量子计算。这感觉很像十几年前大家第一次严肃讨论“云计算安全”时一样——一…...
