kafka复习:(3)自定义序列化器和反序列化器
一、实体类定义:
public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}
二、自定义序列化器和反序列化器
import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}
三、定义生产者和消费者
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}
相关文章:
kafka复习:(3)自定义序列化器和反序列化器
一、实体类定义: public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name name;}public String getAddress() {return address;}public void setAddress(String a…...

Unity 图片资源的适配
前言 最近小编做Unity项目时,发现在资源处理这方面和Android有所不同;例如:Android的资源文件夹res下会有着mipmap-mdpi,mipmap-hdpi,mipmap-xhdpi,mipmap-xxhdpi,mipmap-xxxhdpi这五个文件夹&a…...

【Axure高保真原型】通过输入框动态控制折线图
今天和大家分享通过输入框动态控制折线图的原型模板,在输入框里维护项目数据,可以自动生成对应的折线图,鼠标移入对应折点,可以查看对应数据。使用也非常方便,只需要修改输入框里的数据,或者复制粘贴文本&a…...

【Java】树结构数据的搜索
这里写自定义目录标题 需要实现的效果前端需要的json格式:一定是一个完整的树结构错误错误的返回格式错误的返回格式实现的效果 正确正确的返回格式正确的展示画面 后端逻辑分析代码总览 数据库表结构 需要实现的效果 前端需要的json格式:一定是一个完整…...

ElementUI中的日历组件加载无效的问题
在ElementUI中提供了一个日历组件。在某些场景下还是比较有用的。只是在使用的时候会有些下坑,大家要注意下。 官网提供的信息比较简介。我们在引入到项目中使用的时候可以能会出现下面的错误提示。 Unknown custom element: <el-calendar> - did you …...
Git版本管理(03)stash临时操作和.gitignore配置
1 git stash操作(临时存储) 1.1 git stash常见流程 当你修改了某一个分支,但此时要切换分支时如果直接切换会因为一些修改冲突而checkout失败,那么此时就可以使用git stash命令来解决该问题。一般流程为: $git pull# 将当前未提交的修改…...

【ThingJS | 3D可视化】开发框架,一站式数字孪生
博主:_LJaXi Or 東方幻想郷 专栏: 数字孪生 | 3D可视化框架 开发工具:ThingJS在线开发工具 ThingJs 低代码开发 ThingJs 低代码开发注意点场景效果配置层级层级常用API实例化 Thing,加载场景load 加载函数ThingJs 层级关系图查找层…...

SpringBoot返回响应排除为 null 的字段
SpringBoot返回响应排除为 null 的字段 可以通过全局配置,使返回响应中为null的字段,不在出现在返回结果中。 注意:这样配置,使得返回响应包含的字段随请求结果变化,响应到底包含哪些字段不直观;除非业务…...

华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)
第41题 以下关于IS-IS协议说法错误的是? A、IS-IS协议支持CLNP网络 B、IS-IS 协议支持IP 网络 C、IS-IS 协议的报文直接由数据链路层封装 D、IS-IS协议是运行在AS之间的链路状态协议 答案:D 解析: 关于IS-IS协议的说法错误是D. IS-IS协议是运行在A…...

OpenAI推出GPT-3.5Turbo微调功能并更新API;Midjourney更新局部绘制功能
🦉 AI新闻 🚀 OpenAI推出GPT-3.5Turbo微调功能并更新API,将提供GPT-4微调功能 摘要:OpenAI宣布推出GPT-3.5Turbo微调功能,并更新API,使企业和开发者能够定制ChatGPT,达到或超过GPT-4的能力。通…...

相机成像之3A算法的综述
3A算法是摄像机成像控制技术中的三大自动控制算法。随着计算机视觉的迅速发展,该算法在摄像器材领域具有广泛的应用和前景。 那么3A控制算法又是指什么呢? (1)AE (Auto Exposure)自动曝光控制 (2)AF (Auto Focus)自动聚焦控制 (3)AWB (Auto White Balance)自动白平衡控…...

最新AI系统ChatGPT程序源码/微信公众号/H5端+搭建部署教程+完整知识库
一、前言 SparkAi系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。 那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧!…...

OpenCV实例(九)基于深度学习的运动目标检测(二)YOLOv2概述
基于深度学习的运动目标检测(二)YOLOv2&YOLOv3概述 1.YOLOv2概述2.YOLOv3概述2.1 新的基础网络结构:2.2 采用多尺度预测机制。2.3 使用简单的逻辑回归进行分类 1.YOLOv2概述 对YOLO存在的不足,业界又推出了YOLOv2。YOLOv2主要…...

【Docker】已经创建好的Docker怎么设置开机自启
已经创建好的Docker怎么设置开机自启 1.使用命令Docker update来完成2.查看是否开启3.验证是否开启 1.使用命令Docker update来完成 操作步骤: docker update --restartalways 容器ID2.查看是否开启 docker inspect 容器Id看到这里RestartPolicy设置为如图&#…...
E - Excellent Views
Problem - E - Codeforces 问题描述:数组H大小都不相同。从i到j是可行的,当且仅当 不存在 k ,使 ∣ i − k ∣ ≤ ∣ i − j ∣ , H k > H j 不存在k,使 \\ |i - k| \leq |i - j|, \quad H_k > H_j 不存在k,使…...

WiFi天线和NB-IoT天线不通用
表面看起来完全一样。但是把WiFi天线插到NB-IoT设备后,信号弱了很多。还导致设备反复重启...

IoT DC3 是一个基于 Spring Cloud 的开源的、分布式的物联网(IoT)平台本地部署步骤
dc3 windows 本地搭建步骤: 必要软件环境 进入原网页# 务必保证至少需要给 docker 分配:1 核 CPU 以及 4G 以上的运行内存! JDK : 推荐使用 Oracle JDK 1.8 或者 OpenJDK8,理论来说其他版本也行; Maven : 推荐…...

VBA Excel自定义函数的使用 简单的语法
一个简单的教程,实现VBA自定义函数。 新建模块 复制后面的代码放进来 函数的入口参数不定义,则认为是一块区域; 反之,如FindChar1 As String,则认为是输入的单值。 循环和分支如下例子,VB比较接近自然语…...

字节跳动 从需求到上线全流程 软件工程流程 需求评估 MVP
走进后端开发流程 整个课程会带大家先从理论出发,思考为什么有流程 大家以后工作的团队可能不一样,那么不同的团队也会有不同的流程,这背后的逻辑是什么 然后会带大家按照走一遍从需求到上线的全流程,告诉大家在流程的每个阶段&am…...
线性代数-矩阵的本质
线性代数-矩阵的本质 线性代数-矩阵的本质...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

push [特殊字符] present
push 🆚 present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中,push 和 present 是两种不同的视图控制器切换方式,它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...

【Linux】Linux 系统默认的目录及作用说明
博主介绍:✌全网粉丝23W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
前端工具库lodash与lodash-es区别详解
lodash 和 lodash-es 是同一工具库的两个不同版本,核心功能完全一致,主要区别在于模块化格式和优化方式,适合不同的开发环境。以下是详细对比: 1. 模块化格式 lodash 使用 CommonJS 模块格式(require/module.exports&a…...

qt+vs Generated File下的moc_和ui_文件丢失导致 error LNK2001
qt 5.9.7 vs2013 qt add-in 2.3.2 起因是添加一个新的控件类,直接把源文件拖进VS的项目里,然后VS卡住十秒,然后编译就报一堆 error LNK2001 一看项目的Generated Files下的moc_和ui_文件丢失了一部分,导致编译的时候找不到了。因…...

以太网PHY布局布线指南
1. 简介 对于以太网布局布线遵循以下准则很重要,因为这将有助于减少信号发射,最大程度地减少噪声,确保器件作用,最大程度地减少泄漏并提高信号质量。 2. PHY设计准则 2.1 DRC错误检查 首先检查DRC规则是否设置正确,然…...
OCC笔记:TDF_Label中有多个相同类型属性
注:OCCT版本:7.9.1 TDF_Label中有多个相同类型的属性的方案 OCAF imposes the restriction that only one attribute type may be allocated to one label. It is necessary to take into account the design of the application data tree. For exampl…...