Java版Flink使用指南——定制RabbitMQ数据源的序列化器
大纲
- 新建工程
- 新增依赖
- 数据对象
- 序列化器
- 接入数据源
- 测试
- 修改Slot个数
- 打包、提交、运行
- 工程代码
在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。
新建工程
我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1

新增依赖
在pom.xml中新增RabbitMQ连接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>
新增Json库依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>
新增lombok库,主要是为了使用它的一些注解
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>
数据对象
我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java
package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}
这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。
序列化器
我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。
src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java
package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}
接入数据源
我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。
String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);
测试
修改Slot个数
由于我们要运行两个流式计算任务,于是需要两个Slot。
vim conf/config.yaml
将numberOfTaskSlots的值改成2。
打包、提交、运行
我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行

然后在日志中可以看到“已婚”的数据都在输出
tail -f log/*

工程代码
https://github.com/f304646673/FlinkDemo
相关文章:
Java版Flink使用指南——定制RabbitMQ数据源的序列化器
大纲 新建工程新增依赖数据对象序列化器接入数据源 测试修改Slot个数打包、提交、运行 工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象&#x…...
CV每日论文--2024.7.8
1、DisCo-Diff: Enhancing Continuous Diffusion Models with Discrete Latents 中文标题:DisCo-Diff:利用离散潜伏增强连续扩散模型 简介:这篇文章提出了一种新型的离散-连续潜变量扩散模型(DisCo-Diff),旨在改善传统扩散模型(DMs)存在的问…...
【AI大模型】赋能儿童安全:楼层与室内定位实践与未来发展
文章目录 引言第一章:AI与室内定位技术1.1 AI技术概述1.2 室内定位技术概述1.3 楼层定位的挑战与解决方案 第二章:儿童定位与安全监控的需求2.1 儿童安全问题的现状2.2 智能穿戴设备的兴起 第三章:技术实现细节3.1 硬件设计与选择传感器选择与…...
云服务器linux系统安装配置docker
在我们拿到一个纯净的linux系统时,我需要进行一些基础环境的配置 (如果是云服务器可以用XShell远程连接,如果连接不上可能是服务器没开放22端口) 下面是配置环境的步骤 sudo -s进入root权限:退出使用exit sudo -i进入…...
泰勒雷达图2
matplotlib绘制泰勒雷达图 import matplotlib.pyplot as plt import numpy as np from numpy.core.fromnumeric import shape import pandas as pd import dask.dataframe as dd from matplotlib.projections import PolarAxes import mpl_toolkits.axisartist.floating_axes a…...
数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比
开源生态 众所周知,MySQL主备库(两节点)一般通过异步复制、半同步复制(Semi-Sync)来实现数据高可用,但主备架构在机房网络故障、主机hang住等异常场景下,HA切换后大概率就会出现数据不一致的问…...
react根据后端返回数据动态添加路由
以下代码都为部分核心代码 一.根据不同的登录用户,返回不同的权限列表 ,以下是三种不同用户限权列表 const pression { //超级管理员BigAdmin: [{key: "screen",icon: "FileOutlined",label: "数据图表",},{key: "…...
机器学习中的可解释性
「AI秘籍」系列课程: 人工智能应用数学基础 人工智能Python基础 人工智能基础核心知识 人工智能BI核心知识 人工智能CV核心知识 为什么我们需要了解模型如何进行预测 我们是否应该始终信任表现良好的模型?模型可能会拒绝你的抵押贷款申请或诊断你患…...
上海慕尼黑电子展开展,启明智显携物联网前沿方案亮相
随着科技创新的浪潮不断涌来,上海慕尼黑电子展在万众瞩目中盛大开幕。本次展会汇聚了全球顶尖的电子产品与技术解决方案,成为业界瞩目的焦点。启明智显作为物联网彩屏显示领域的佼佼者携产品亮相展会,为参展者带来了RTOS、LINUX全系列方案及A…...
Centos7离线安装ElasticSearch7.4.2
一、官网下载相关的安装包 ElasticSearch7.4.2: elasticsearch-7.4.2-linux-x86_64.tar.gz 下载中文分词器: elasticsearch-analysis-ik-7.4.2.zip 二、上传解压文件到服务器 上传到目录:/home/data/elasticsearch 解压文件࿱…...
深入理解sklearn中的模型参数优化技术
参数优化是机器学习中的关键步骤,它直接影响模型的性能和泛化能力。在sklearn中,参数优化可以通过多种方式实现,包括网格搜索(GridSearchCV)、随机搜索(RandomizedSearchCV)和贝叶斯优化等。本文…...
【Elasticsearch】开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch
开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch 1.历史发展2.OpenSearch 与 Elasticsearch 相同点3.OpenSearch 与 Elasticsearch 不同点3.1 版本大不同3.2 许可证不同3.3 社区不同3.4 功能不同3.5 安全性不同3.6 性能不同3.7 价格不同3.8 两者可相互导入 4…...
欧拉openEuler 22.03 LTS-部署k8sv1.03.1
1.设置ip # vi /etc/sysconfig/network-scripts/ifcfg-ens32 TYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic DEFROUTEyes IPV4_FAILURE_FATALno #IPV6INITyes #IPV6_AUTOCONFyes #IPV6_DEFROUTEyes #IPV6_FAILURE_FATALno #IPV6_ADDR_GEN_MODEeui64 NAMEens1…...
老年生活照护实训室:为养老服务业输送专业人才
本文探讨了老年生活照护实训室在养老服务业专业人才培养中的关键作用。通过详细阐述实训室的功能、教学实践、对学生能力的培养以及面临的挑战和解决方案,强调了其在提升人才素质、满足行业需求方面的重要性,旨在为养老服务业的可持续发展提供有力的人才…...
go语言中使用WaitGroup和channel实现处理多线程问题
WaitGroup 背景 如果将一个任务分为任意个小任务,并且不关心小任务的执行顺序,并且希望等待全部的小任务执行完成后再去操作后面的逻辑,那我推荐你用sync.WaitGRoup 使用方法 比如,有一个任务需要执行 3 个子任务,…...
Open3D 计算点云的平均密度
目录 一、概述 1.1基于领域密度计算原理 1.2应用 二、代码实现 三、实现效果 2.1点云显示 2.2密度计算结果 一、概述 在点云处理中,点的密度通常表示为某个点周围一定区域内的点的数量。高密度区域表示点云较密集,低密度区域表示点云较稀疏。计算…...
C语言之数据在内存中的存储(1),整形与大小端字节序
目录 前言 一、整形数据在内存中的存储 二、大小端字节序 三、大小端字节序的判断 四、字符型数据在内存中的存储 总结 前言 本文主要讲述整型包括字符型是如何在内存中存储的,涉及到大小端字节序这一概念,还有如何判断大小端,希望对大…...
B端全局导航:左侧还是顶部?不是随随便便,有依据在。
一、什么是全局导航 B端系统的全局导航是指在B端系统中的主要导航菜单,它通常位于系统的顶部或左侧,提供了系统中各个模块和功能的入口。全局导航菜单可以帮助用户快速找到和访问系统中的各个功能模块,提高系统的可用性和用户体验。 全局导航…...
什么是海外仓管理自动化?策略及落地实施步骤指南
作为海外仓的管理者,你每天都面临提高海外仓运营效率、降低成本和满足客户需求的问题。海外仓自动化管理技术为这些问题提供了不错的解决思路,不过和任何新技术一样,从策略到落地实施,都有一个对基础逻辑的认识过程。 今天我们整…...
自定义控件三部曲之绘图篇(六)Paint之函数大汇总、ColorMatrix与滤镜效果、setColorFilter
在自定义控件的绘图篇中,Paint 类是核心的组成部分之一,它控制了在 Canvas 上绘制的内容的各种属性,包括颜色、风格、抗锯齿、透明度等等。下面将详细介绍 Paint 的主要功能以及如何使用 ColorMatrix 和 setColorFilter 来实现滤镜效果。 Pa…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...
离线语音识别方案分析
随着人工智能技术的不断发展,语音识别技术也得到了广泛的应用,从智能家居到车载系统,语音识别正在改变我们与设备的交互方式。尤其是离线语音识别,由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力,广…...
一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...
Selenium 查找页面元素的方式
Selenium 查找页面元素的方式 Selenium 提供了多种方法来查找网页中的元素,以下是主要的定位方式: 基本定位方式 通过ID定位 driver.find_element(By.ID, "element_id")通过Name定位 driver.find_element(By.NAME, "element_name"…...
英国云服务器上安装宝塔面板(BT Panel)
在英国云服务器上安装宝塔面板(BT Panel) 是完全可行的,尤其适合需要远程管理Linux服务器、快速部署网站、数据库、FTP、SSL证书等服务的用户。宝塔面板以其可视化操作界面和强大的功能广受国内用户欢迎,虽然官方主要面向中国大陆…...
