当前位置: 首页 > news >正文

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json库依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲 新建工程新增依赖数据对象序列化器接入数据源 测试修改Slot个数打包、提交、运行 工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象&#x…...

CV每日论文--2024.7.8

1、DisCo-Diff: Enhancing Continuous Diffusion Models with Discrete Latents 中文标题&#xff1a;DisCo-Diff&#xff1a;利用离散潜伏增强连续扩散模型 简介&#xff1a;这篇文章提出了一种新型的离散-连续潜变量扩散模型(DisCo-Diff),旨在改善传统扩散模型(DMs)存在的问…...

【AI大模型】赋能儿童安全:楼层与室内定位实践与未来发展

文章目录 引言第一章&#xff1a;AI与室内定位技术1.1 AI技术概述1.2 室内定位技术概述1.3 楼层定位的挑战与解决方案 第二章&#xff1a;儿童定位与安全监控的需求2.1 儿童安全问题的现状2.2 智能穿戴设备的兴起 第三章&#xff1a;技术实现细节3.1 硬件设计与选择传感器选择与…...

云服务器linux系统安装配置docker

在我们拿到一个纯净的linux系统时&#xff0c;我需要进行一些基础环境的配置 &#xff08;如果是云服务器可以用XShell远程连接&#xff0c;如果连接不上可能是服务器没开放22端口&#xff09; 下面是配置环境的步骤 sudo -s进入root权限&#xff1a;退出使用exit sudo -i进入…...

泰勒雷达图2

matplotlib绘制泰勒雷达图 import matplotlib.pyplot as plt import numpy as np from numpy.core.fromnumeric import shape import pandas as pd import dask.dataframe as dd from matplotlib.projections import PolarAxes import mpl_toolkits.axisartist.floating_axes a…...

数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比

开源生态 众所周知&#xff0c;MySQL主备库&#xff08;两节点&#xff09;一般通过异步复制、半同步复制&#xff08;Semi-Sync&#xff09;来实现数据高可用&#xff0c;但主备架构在机房网络故障、主机hang住等异常场景下&#xff0c;HA切换后大概率就会出现数据不一致的问…...

react根据后端返回数据动态添加路由

以下代码都为部分核心代码 一.根据不同的登录用户&#xff0c;返回不同的权限列表 &#xff0c;以下是三种不同用户限权列表 const pression { //超级管理员BigAdmin: [{key: "screen",icon: "FileOutlined",label: "数据图表",},{key: "…...

机器学习中的可解释性

「AI秘籍」系列课程&#xff1a; 人工智能应用数学基础 人工智能Python基础 人工智能基础核心知识 人工智能BI核心知识 人工智能CV核心知识 为什么我们需要了解模型如何进行预测 我们是否应该始终信任表现良好的模型&#xff1f;模型可能会拒绝你的抵押贷款申请或诊断你患…...

上海慕尼黑电子展开展,启明智显携物联网前沿方案亮相

随着科技创新的浪潮不断涌来&#xff0c;上海慕尼黑电子展在万众瞩目中盛大开幕。本次展会汇聚了全球顶尖的电子产品与技术解决方案&#xff0c;成为业界瞩目的焦点。启明智显作为物联网彩屏显示领域的佼佼者携产品亮相展会&#xff0c;为参展者带来了RTOS、LINUX全系列方案及A…...

Centos7离线安装ElasticSearch7.4.2

一、官网下载相关的安装包 ElasticSearch7.4.2&#xff1a; elasticsearch-7.4.2-linux-x86_64.tar.gz 下载中文分词器&#xff1a; elasticsearch-analysis-ik-7.4.2.zip 二、上传解压文件到服务器 上传到目录&#xff1a;/home/data/elasticsearch 解压文件&#xff1…...

深入理解sklearn中的模型参数优化技术

参数优化是机器学习中的关键步骤&#xff0c;它直接影响模型的性能和泛化能力。在sklearn中&#xff0c;参数优化可以通过多种方式实现&#xff0c;包括网格搜索&#xff08;GridSearchCV&#xff09;、随机搜索&#xff08;RandomizedSearchCV&#xff09;和贝叶斯优化等。本文…...

【Elasticsearch】开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch

开源搜索技术的演进与选择&#xff1a;Elasticsearch 与 OpenSearch 1.历史发展2.OpenSearch 与 Elasticsearch 相同点3.OpenSearch 与 Elasticsearch 不同点3.1 版本大不同3.2 许可证不同3.3 社区不同3.4 功能不同3.5 安全性不同3.6 性能不同3.7 价格不同3.8 两者可相互导入 4…...

欧拉openEuler 22.03 LTS-部署k8sv1.03.1

1.设置ip # vi /etc/sysconfig/network-scripts/ifcfg-ens32 TYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic DEFROUTEyes IPV4_FAILURE_FATALno #IPV6INITyes #IPV6_AUTOCONFyes #IPV6_DEFROUTEyes #IPV6_FAILURE_FATALno #IPV6_ADDR_GEN_MODEeui64 NAMEens1…...

老年生活照护实训室:为养老服务业输送专业人才

本文探讨了老年生活照护实训室在养老服务业专业人才培养中的关键作用。通过详细阐述实训室的功能、教学实践、对学生能力的培养以及面临的挑战和解决方案&#xff0c;强调了其在提升人才素质、满足行业需求方面的重要性&#xff0c;旨在为养老服务业的可持续发展提供有力的人才…...

go语言中使用WaitGroup和channel实现处理多线程问题

WaitGroup 背景 如果将一个任务分为任意个小任务&#xff0c;并且不关心小任务的执行顺序&#xff0c;并且希望等待全部的小任务执行完成后再去操作后面的逻辑&#xff0c;那我推荐你用sync.WaitGRoup 使用方法 比如&#xff0c;有一个任务需要执行 3 个子任务&#xff0c;…...

Open3D 计算点云的平均密度

目录 一、概述 1.1基于领域密度计算原理 1.2应用 二、代码实现 三、实现效果 2.1点云显示 2.2密度计算结果 一、概述 在点云处理中&#xff0c;点的密度通常表示为某个点周围一定区域内的点的数量。高密度区域表示点云较密集&#xff0c;低密度区域表示点云较稀疏。计算…...

C语言之数据在内存中的存储(1),整形与大小端字节序

目录 前言 一、整形数据在内存中的存储 二、大小端字节序 三、大小端字节序的判断 四、字符型数据在内存中的存储 总结 前言 本文主要讲述整型包括字符型是如何在内存中存储的&#xff0c;涉及到大小端字节序这一概念&#xff0c;还有如何判断大小端&#xff0c;希望对大…...

B端全局导航:左侧还是顶部?不是随随便便,有依据在。

一、什么是全局导航 B端系统的全局导航是指在B端系统中的主要导航菜单&#xff0c;它通常位于系统的顶部或左侧&#xff0c;提供了系统中各个模块和功能的入口。全局导航菜单可以帮助用户快速找到和访问系统中的各个功能模块&#xff0c;提高系统的可用性和用户体验。 全局导航…...

什么是海外仓管理自动化?策略及落地实施步骤指南

作为海外仓的管理者&#xff0c;你每天都面临提高海外仓运营效率、降低成本和满足客户需求的问题。海外仓自动化管理技术为这些问题提供了不错的解决思路&#xff0c;不过和任何新技术一样&#xff0c;从策略到落地实施&#xff0c;都有一个对基础逻辑的认识过程。 今天我们整…...

自定义控件三部曲之绘图篇(六)Paint之函数大汇总、ColorMatrix与滤镜效果、setColorFilter

在自定义控件的绘图篇中&#xff0c;Paint 类是核心的组成部分之一&#xff0c;它控制了在 Canvas 上绘制的内容的各种属性&#xff0c;包括颜色、风格、抗锯齿、透明度等等。下面将详细介绍 Paint 的主要功能以及如何使用 ColorMatrix 和 setColorFilter 来实现滤镜效果。 Pa…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础

第三周 Day 3 &#x1f3af; 今日目标 理解类&#xff08;class&#xff09;和对象&#xff08;object&#xff09;的关系学会定义类的属性、方法和构造函数&#xff08;init&#xff09;掌握对象的创建与使用初识封装、继承和多态的基本概念&#xff08;预告&#xff09; &a…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...

Windows电脑能装鸿蒙吗_Windows电脑体验鸿蒙电脑操作系统教程

鸿蒙电脑版操作系统来了&#xff0c;很多小伙伴想体验鸿蒙电脑版操作系统&#xff0c;可惜&#xff0c;鸿蒙系统并不支持你正在使用的传统的电脑来安装。不过可以通过可以使用华为官方提供的虚拟机&#xff0c;来体验大家心心念念的鸿蒙系统啦&#xff01;注意&#xff1a;虚拟…...

WebRTC调研

WebRTC是什么&#xff0c;为什么&#xff0c;如何使用 WebRTC有什么优势 WebRTC Architecture Amazon KVS WebRTC 其它厂商WebRTC 海康门禁WebRTC 海康门禁其他界面整理 威视通WebRTC 局域网 Google浏览器 Microsoft Edge 公网 RTSP RTMP NVR ONVIF SIP SRT WebRTC协…...