当前位置: 首页 > news >正文

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json库依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲 新建工程新增依赖数据对象序列化器接入数据源 测试修改Slot个数打包、提交、运行 工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象&#x…...

CV每日论文--2024.7.8

1、DisCo-Diff: Enhancing Continuous Diffusion Models with Discrete Latents 中文标题&#xff1a;DisCo-Diff&#xff1a;利用离散潜伏增强连续扩散模型 简介&#xff1a;这篇文章提出了一种新型的离散-连续潜变量扩散模型(DisCo-Diff),旨在改善传统扩散模型(DMs)存在的问…...

【AI大模型】赋能儿童安全:楼层与室内定位实践与未来发展

文章目录 引言第一章&#xff1a;AI与室内定位技术1.1 AI技术概述1.2 室内定位技术概述1.3 楼层定位的挑战与解决方案 第二章&#xff1a;儿童定位与安全监控的需求2.1 儿童安全问题的现状2.2 智能穿戴设备的兴起 第三章&#xff1a;技术实现细节3.1 硬件设计与选择传感器选择与…...

云服务器linux系统安装配置docker

在我们拿到一个纯净的linux系统时&#xff0c;我需要进行一些基础环境的配置 &#xff08;如果是云服务器可以用XShell远程连接&#xff0c;如果连接不上可能是服务器没开放22端口&#xff09; 下面是配置环境的步骤 sudo -s进入root权限&#xff1a;退出使用exit sudo -i进入…...

泰勒雷达图2

matplotlib绘制泰勒雷达图 import matplotlib.pyplot as plt import numpy as np from numpy.core.fromnumeric import shape import pandas as pd import dask.dataframe as dd from matplotlib.projections import PolarAxes import mpl_toolkits.axisartist.floating_axes a…...

数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比

开源生态 众所周知&#xff0c;MySQL主备库&#xff08;两节点&#xff09;一般通过异步复制、半同步复制&#xff08;Semi-Sync&#xff09;来实现数据高可用&#xff0c;但主备架构在机房网络故障、主机hang住等异常场景下&#xff0c;HA切换后大概率就会出现数据不一致的问…...

react根据后端返回数据动态添加路由

以下代码都为部分核心代码 一.根据不同的登录用户&#xff0c;返回不同的权限列表 &#xff0c;以下是三种不同用户限权列表 const pression { //超级管理员BigAdmin: [{key: "screen",icon: "FileOutlined",label: "数据图表",},{key: "…...

机器学习中的可解释性

「AI秘籍」系列课程&#xff1a; 人工智能应用数学基础 人工智能Python基础 人工智能基础核心知识 人工智能BI核心知识 人工智能CV核心知识 为什么我们需要了解模型如何进行预测 我们是否应该始终信任表现良好的模型&#xff1f;模型可能会拒绝你的抵押贷款申请或诊断你患…...

上海慕尼黑电子展开展,启明智显携物联网前沿方案亮相

随着科技创新的浪潮不断涌来&#xff0c;上海慕尼黑电子展在万众瞩目中盛大开幕。本次展会汇聚了全球顶尖的电子产品与技术解决方案&#xff0c;成为业界瞩目的焦点。启明智显作为物联网彩屏显示领域的佼佼者携产品亮相展会&#xff0c;为参展者带来了RTOS、LINUX全系列方案及A…...

Centos7离线安装ElasticSearch7.4.2

一、官网下载相关的安装包 ElasticSearch7.4.2&#xff1a; elasticsearch-7.4.2-linux-x86_64.tar.gz 下载中文分词器&#xff1a; elasticsearch-analysis-ik-7.4.2.zip 二、上传解压文件到服务器 上传到目录&#xff1a;/home/data/elasticsearch 解压文件&#xff1…...

深入理解sklearn中的模型参数优化技术

参数优化是机器学习中的关键步骤&#xff0c;它直接影响模型的性能和泛化能力。在sklearn中&#xff0c;参数优化可以通过多种方式实现&#xff0c;包括网格搜索&#xff08;GridSearchCV&#xff09;、随机搜索&#xff08;RandomizedSearchCV&#xff09;和贝叶斯优化等。本文…...

【Elasticsearch】开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch

开源搜索技术的演进与选择&#xff1a;Elasticsearch 与 OpenSearch 1.历史发展2.OpenSearch 与 Elasticsearch 相同点3.OpenSearch 与 Elasticsearch 不同点3.1 版本大不同3.2 许可证不同3.3 社区不同3.4 功能不同3.5 安全性不同3.6 性能不同3.7 价格不同3.8 两者可相互导入 4…...

欧拉openEuler 22.03 LTS-部署k8sv1.03.1

1.设置ip # vi /etc/sysconfig/network-scripts/ifcfg-ens32 TYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic DEFROUTEyes IPV4_FAILURE_FATALno #IPV6INITyes #IPV6_AUTOCONFyes #IPV6_DEFROUTEyes #IPV6_FAILURE_FATALno #IPV6_ADDR_GEN_MODEeui64 NAMEens1…...

老年生活照护实训室:为养老服务业输送专业人才

本文探讨了老年生活照护实训室在养老服务业专业人才培养中的关键作用。通过详细阐述实训室的功能、教学实践、对学生能力的培养以及面临的挑战和解决方案&#xff0c;强调了其在提升人才素质、满足行业需求方面的重要性&#xff0c;旨在为养老服务业的可持续发展提供有力的人才…...

go语言中使用WaitGroup和channel实现处理多线程问题

WaitGroup 背景 如果将一个任务分为任意个小任务&#xff0c;并且不关心小任务的执行顺序&#xff0c;并且希望等待全部的小任务执行完成后再去操作后面的逻辑&#xff0c;那我推荐你用sync.WaitGRoup 使用方法 比如&#xff0c;有一个任务需要执行 3 个子任务&#xff0c;…...

Open3D 计算点云的平均密度

目录 一、概述 1.1基于领域密度计算原理 1.2应用 二、代码实现 三、实现效果 2.1点云显示 2.2密度计算结果 一、概述 在点云处理中&#xff0c;点的密度通常表示为某个点周围一定区域内的点的数量。高密度区域表示点云较密集&#xff0c;低密度区域表示点云较稀疏。计算…...

C语言之数据在内存中的存储(1),整形与大小端字节序

目录 前言 一、整形数据在内存中的存储 二、大小端字节序 三、大小端字节序的判断 四、字符型数据在内存中的存储 总结 前言 本文主要讲述整型包括字符型是如何在内存中存储的&#xff0c;涉及到大小端字节序这一概念&#xff0c;还有如何判断大小端&#xff0c;希望对大…...

B端全局导航:左侧还是顶部?不是随随便便,有依据在。

一、什么是全局导航 B端系统的全局导航是指在B端系统中的主要导航菜单&#xff0c;它通常位于系统的顶部或左侧&#xff0c;提供了系统中各个模块和功能的入口。全局导航菜单可以帮助用户快速找到和访问系统中的各个功能模块&#xff0c;提高系统的可用性和用户体验。 全局导航…...

什么是海外仓管理自动化?策略及落地实施步骤指南

作为海外仓的管理者&#xff0c;你每天都面临提高海外仓运营效率、降低成本和满足客户需求的问题。海外仓自动化管理技术为这些问题提供了不错的解决思路&#xff0c;不过和任何新技术一样&#xff0c;从策略到落地实施&#xff0c;都有一个对基础逻辑的认识过程。 今天我们整…...

自定义控件三部曲之绘图篇(六)Paint之函数大汇总、ColorMatrix与滤镜效果、setColorFilter

在自定义控件的绘图篇中&#xff0c;Paint 类是核心的组成部分之一&#xff0c;它控制了在 Canvas 上绘制的内容的各种属性&#xff0c;包括颜色、风格、抗锯齿、透明度等等。下面将详细介绍 Paint 的主要功能以及如何使用 ColorMatrix 和 setColorFilter 来实现滤镜效果。 Pa…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...