当前位置: 首页 > news >正文

kafka复习:(3)自定义序列化器和反序列化器

一、实体类定义:


public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}

二、自定义序列化器和反序列化器


import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}

三、定义生产者和消费者

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}

相关文章:

kafka复习:(3)自定义序列化器和反序列化器

一、实体类定义&#xff1a; public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name name;}public String getAddress() {return address;}public void setAddress(String a…...

Unity 图片资源的适配

前言 最近小编做Unity项目时&#xff0c;发现在资源处理这方面和Android有所不同&#xff1b;例如&#xff1a;Android的资源文件夹res下会有着mipmap-mdpi&#xff0c;mipmap-hdpi&#xff0c;mipmap-xhdpi&#xff0c;mipmap-xxhdpi&#xff0c;mipmap-xxxhdpi这五个文件夹&a…...

【Axure高保真原型】通过输入框动态控制折线图

今天和大家分享通过输入框动态控制折线图的原型模板&#xff0c;在输入框里维护项目数据&#xff0c;可以自动生成对应的折线图&#xff0c;鼠标移入对应折点&#xff0c;可以查看对应数据。使用也非常方便&#xff0c;只需要修改输入框里的数据&#xff0c;或者复制粘贴文本&a…...

【Java】树结构数据的搜索

这里写自定义目录标题 需要实现的效果前端需要的json格式&#xff1a;一定是一个完整的树结构错误错误的返回格式错误的返回格式实现的效果 正确正确的返回格式正确的展示画面 后端逻辑分析代码总览 数据库表结构 需要实现的效果 前端需要的json格式&#xff1a;一定是一个完整…...

ElementUI中的日历组件加载无效的问题

在ElementUI中提供了一个日历组件。在某些场景下还是比较有用的。只是在使用的时候会有些下坑&#xff0c;大家要注意下。   官网提供的信息比较简介。我们在引入到项目中使用的时候可以能会出现下面的错误提示。 Unknown custom element: <el-calendar> - did you …...

Git版本管理(03)stash临时操作和.gitignore配置

1 git stash操作(临时存储) 1.1 git stash常见流程 当你修改了某一个分支&#xff0c;但此时要切换分支时如果直接切换会因为一些修改冲突而checkout失败&#xff0c;那么此时就可以使用git stash命令来解决该问题。一般流程为&#xff1a; $git pull# 将当前未提交的修改…...

【ThingJS | 3D可视化】开发框架,一站式数字孪生

博主&#xff1a;_LJaXi Or 東方幻想郷 专栏&#xff1a; 数字孪生 | 3D可视化框架 开发工具&#xff1a;ThingJS在线开发工具 ThingJs 低代码开发 ThingJs 低代码开发注意点场景效果配置层级层级常用API实例化 Thing&#xff0c;加载场景load 加载函数ThingJs 层级关系图查找层…...

SpringBoot返回响应排除为 null 的字段

SpringBoot返回响应排除为 null 的字段 可以通过全局配置&#xff0c;使返回响应中为null的字段&#xff0c;不在出现在返回结果中。 注意&#xff1a;这样配置&#xff0c;使得返回响应包含的字段随请求结果变化&#xff0c;响应到底包含哪些字段不直观&#xff1b;除非业务…...

华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)

第41题 以下关于IS-IS协议说法错误的是? A、IS-IS协议支持CLNP网络 B、IS-IS 协议支持IP 网络 C、IS-IS 协议的报文直接由数据链路层封装 D、IS-IS协议是运行在AS之间的链路状态协议 答案&#xff1a;D 解析&#xff1a; 关于IS-IS协议的说法错误是D. IS-IS协议是运行在A…...

OpenAI推出GPT-3.5Turbo微调功能并更新API;Midjourney更新局部绘制功能

&#x1f989; AI新闻 &#x1f680; OpenAI推出GPT-3.5Turbo微调功能并更新API&#xff0c;将提供GPT-4微调功能 摘要&#xff1a;OpenAI宣布推出GPT-3.5Turbo微调功能&#xff0c;并更新API&#xff0c;使企业和开发者能够定制ChatGPT&#xff0c;达到或超过GPT-4的能力。通…...

相机成像之3A算法的综述

3A算法是摄像机成像控制技术中的三大自动控制算法。随着计算机视觉的迅速发展,该算法在摄像器材领域具有广泛的应用和前景。 那么3A控制算法又是指什么呢? (1)AE (Auto Exposure)自动曝光控制 (2)AF (Auto Focus)自动聚焦控制 (3)AWB (Auto White Balance)自动白平衡控…...

最新AI系统ChatGPT程序源码/微信公众号/H5端+搭建部署教程+完整知识库

一、前言 SparkAi系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。 那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧&#xff01…...

OpenCV实例(九)基于深度学习的运动目标检测(二)YOLOv2概述

基于深度学习的运动目标检测&#xff08;二&#xff09;YOLOv2&YOLOv3概述 1.YOLOv2概述2.YOLOv3概述2.1 新的基础网络结构&#xff1a;2.2 采用多尺度预测机制。2.3 使用简单的逻辑回归进行分类 1.YOLOv2概述 对YOLO存在的不足&#xff0c;业界又推出了YOLOv2。YOLOv2主要…...

【Docker】已经创建好的Docker怎么设置开机自启

已经创建好的Docker怎么设置开机自启 1.使用命令Docker update来完成2.查看是否开启3.验证是否开启 1.使用命令Docker update来完成 操作步骤&#xff1a; docker update --restartalways 容器ID2.查看是否开启 docker inspect 容器Id看到这里RestartPolicy设置为如图&#…...

E - Excellent Views

Problem - E - Codeforces 问题描述&#xff1a;数组H大小都不相同。从i到j是可行的&#xff0c;当且仅当 不存在 k &#xff0c;使 ∣ i − k ∣ ≤ ∣ i − j ∣ , H k > H j 不存在k&#xff0c;使 \\ |i - k| \leq |i - j|, \quad H_k > H_j 不存在k&#xff0c;使…...

WiFi天线和NB-IoT天线不通用

表面看起来完全一样。但是把WiFi天线插到NB-IoT设备后&#xff0c;信号弱了很多。还导致设备反复重启...

IoT DC3 是一个基于 Spring Cloud 的开源的、分布式的物联网(IoT)平台本地部署步骤

dc3 windows 本地搭建步骤&#xff1a; ​​ 必要软件环境 进入原网页# 务必保证至少需要给 docker 分配&#xff1a;1 核 CPU 以及 4G 以上的运行内存&#xff01; JDK : 推荐使用 Oracle JDK 1.8 或者 OpenJDK8&#xff0c;理论来说其他版本也行&#xff1b; Maven : 推荐…...

VBA Excel自定义函数的使用 简单的语法

一个简单的教程&#xff0c;实现VBA自定义函数。 新建模块 复制后面的代码放进来 函数的入口参数不定义&#xff0c;则认为是一块区域&#xff1b; 反之&#xff0c;如FindChar1 As String&#xff0c;则认为是输入的单值。 循环和分支如下例子&#xff0c;VB比较接近自然语…...

字节跳动 从需求到上线全流程 软件工程流程 需求评估 MVP

走进后端开发流程 整个课程会带大家先从理论出发&#xff0c;思考为什么有流程 大家以后工作的团队可能不一样&#xff0c;那么不同的团队也会有不同的流程&#xff0c;这背后的逻辑是什么 然后会带大家按照走一遍从需求到上线的全流程&#xff0c;告诉大家在流程的每个阶段&am…...

线性代数-矩阵的本质

线性代数-矩阵的本质 线性代数-矩阵的本质...

R 语言科研绘图第 55 期 --- 网络图-聚类

在发表科研论文的过程中&#xff0c;科研绘图是必不可少的&#xff0c;一张好看的图形会是文章很大的加分项。 为了便于使用&#xff0c;本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中&#xff0c;获取方式&#xff1a; R 语言科研绘图模板 --- sciRplothttps://mp.…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...

windows系统MySQL安装文档

概览&#xff1a;本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容&#xff0c;为学习者提供全面的操作指导。关键要点包括&#xff1a; 解压 &#xff1a;下载完成后解压压缩包&#xff0c;得到MySQL 8.…...

jdbc查询mysql数据库时,出现id顺序错误的情况

我在repository中的查询语句如下所示&#xff0c;即传入一个List<intager>的数据&#xff0c;返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致&#xff0c;会导致返回的id是从小到大排列的&#xff0c;但我不希望这样。 Query("SELECT NEW com…...

CSS3相关知识点

CSS3相关知识点 CSS3私有前缀私有前缀私有前缀存在的意义常见浏览器的私有前缀 CSS3基本语法CSS3 新增长度单位CSS3 新增颜色设置方式CSS3 新增选择器CSS3 新增盒模型相关属性box-sizing 怪异盒模型resize调整盒子大小box-shadow 盒子阴影opacity 不透明度 CSS3 新增背景属性ba…...

负载均衡器》》LVS、Nginx、HAproxy 区别

虚拟主机 先4&#xff0c;后7...

高效的后台管理系统——可进行二次开发

随着互联网技术的迅猛发展&#xff0c;企业的数字化管理变得愈加重要。后台管理系统作为数据存储与业务管理的核心&#xff0c;成为了现代企业不可或缺的一部分。今天我们要介绍的是一款名为 若依后台管理框架 的系统&#xff0c;它不仅支持跨平台应用&#xff0c;还能提供丰富…...

MeshGPT 笔记

[2311.15475] MeshGPT: Generating Triangle Meshes with Decoder-Only Transformers https://library.scholarcy.com/try 真正意义上的AI生成三维模型MESHGPT来袭&#xff01;_哔哩哔哩_bilibili GitHub - lucidrains/meshgpt-pytorch: Implementation of MeshGPT, SOTA Me…...

STL 2迭代器

文章目录 1.迭代器2.输入迭代器3.输出迭代器1.插入迭代器 4.前向迭代器5.双向迭代器6.随机访问迭代器7.不同容器返回的迭代器类型1.输入 / 输出迭代器2.前向迭代器3.双向迭代器4.随机访问迭代器5.特殊迭代器适配器6.为什么 unordered_set 只提供前向迭代器&#xff1f; 1.迭代器…...