当前位置: 首页 > news >正文

【极数系列】Flink集成DataSource读取集合数据(07)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于集合读取数据
    • 3.1 集合创建数据流
    • 3.2 迭代器创建数据流
    • 3.3 给定对象创建数据流
    • 3.4 迭代并行器创建数据流
    • 3.5 基于时间间隔创建数据流
    • 3.6 自定义数据流
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建集合数据流作业
    • 4.3 运行结果日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkListSourceJob(集合)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于集合读取数据

3.1 集合创建数据流

fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型

3.2 迭代器创建数据流

fromCollection(Iterator, Class) 
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

3.3 给定对象创建数据流

fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

3.4 迭代并行器创建数据流

注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、

错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable

fromParallelCollection(SplittableIterator, Class) 
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型

3.5 基于时间间隔创建数据流

generateSequence 
基于给定间隔内的数字序列并行生成数据流。

3.6 自定义数据流

addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

4.2 创建集合数据流作业

注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可

package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.sql.DataSource;
import java.util.*;/*** @description flink的list集合source应用* @author 浅夏的猫* @datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式://STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);List<String> list = Arrays.asList("测试", "开发", "运维");// 01 从集合创建数据流DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list);// 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
//        DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class);// 03 从给定的对象序列中创建数据流DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维");// 04 从迭代器并行创建数据流NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10);DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE);// 05 基于给定间隔内的数字序列并行生成数据流DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10);//自定义数据流DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {//自定义你自己的数据来源for (int i = 0; i < 10; i++) {sourceContext.collect("测试数据" + i);}}@Overridepublic void cancel() {}});//5.输出打印dataStreamSource_01.print();
//        dataStreamSource_02.print();dataStreamSource_03.print();dataStreamSource_04.print();dataStreamSource_05.print();dataStreamSource_06.print();//6.启动运行env.execute();}}

4.3 运行结果日志

在这里插入图片描述

相关文章:

【极数系列】Flink集成DataSource读取集合数据(07)

文章目录 01 引言02 简介概述03 基于集合读取数据3.1 集合创建数据流3.2 迭代器创建数据流3.3 给定对象创建数据流3.4 迭代并行器创建数据流3.5 基于时间间隔创建数据流3.6 自定义数据流 04 源码实战demo4.1 pom.xml依赖4.2 创建集合数据流作业4.3 运行结果日志 01 引言 源码地…...

React hooks子组件暴露方法示例

说明 通常情况下&#xff0c;React 子组件使用父组件的方法或值通过props传递&#xff0c;反过来&#xff0c;父组件如果需要子组件的方法就需要子组件将自己的方法暴露出去。以下是一个实例&#xff1a; User.tsx import React, { FC, useEffect, useState, useRef } from …...

数据结构:大顶堆、小顶堆

堆是其中一种非常重要且实用的数据结构。堆可以用于实现优先队列&#xff0c;进行堆排序&#xff0c;以及解决各种与查找和排序相关的问题。本文将深入探讨两种常见的堆结构&#xff1a;大顶堆和小顶堆&#xff0c;并通过 C 语言展示如何实现和使用它们。 一、定义 堆是一种完…...

电加热热水器上架亚马逊美国站需要的UL174报告

电加热热水器上架亚马逊美国站需要的UL174报告 家用热水器出口美国需要办理UL174测试报告。 热水器就是指通过各种物理原理&#xff0c;在一定时间内使冷水温度升高变成热水的一种装置。分为制造冷气部分和制造热水部分。其实这两个部分又是紧密地联系在一起&#xff0c;密不可…...

使用visual studio写一个简单的c语言程序

官网下载visual studio&#xff0c;社区版免费的 https://visualstudio.microsoft.com/zh-hans/ 下载好以后选择自己的需求进行安装&#xff0c;我选择了两个&#xff0c;剩下的是默认。 创建文件&#xff1a;...

怎么创建facebook广告

创建Facebook广告的文章应由本人根据自身实际情况书写&#xff0c;以下仅供参考&#xff0c;请您根据自身实际情况撰写。 创建Facebook广告的步骤&#xff1a; 确定目标受众和广告主题&#xff1a;首先需要明确你的目标受众是谁&#xff0c;他们有什么特点&#xff0c;以及你想…...

pdf怎么转成高清图?pdf在线转换器推荐分享

在日常的工作或者学习中&#xff0c;有时候会需要将编辑好的pdf转高清图片&#xff0c;这样更方便我们后续使用&#xff0c;那么怎么将pdf转图片&#xff08;https://www.yasuotu.com/pdftopic&#xff09;还能保持清晰呢&#xff1f;下面介绍一款pdf转换工具&#xff0c;支持p…...

postgresql 查询缓慢原因分析

pg_stat_activity 最近发现系统运行缓慢&#xff0c;查询数据老是超时&#xff0c;于是排查下pg_stat_activity 系统表&#xff0c;看看有没有耗时的查询sql SELECT pid, state, query, query_start, backend_type FROM pg_stat_activity WHERE state active AND query LIK…...

N65总账凭证管理凭证查询(sql)

--核算账簿 select code , name , pk_setofbook from org_setofbook where ( pk_setofbook in ( select pk_setofbook from org_accountingbook where 1 1 and ( pk_group N0001A11000000000037X ) and ( accountenablestate 2 ) ) ) order by code;--核算账簿 select code …...

投资1300万欧元!芬兰正式启动量子旗舰项目

​内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 编辑丨慕一 编译/排版丨卉可 琳梦 深度好文&#xff1a;800字丨8分钟阅读 近日&#xff0c;芬兰研究委员会向新启动的芬兰量子旗舰&#xff08;FQF&#xff09;项目拨款1300万欧元&#xf…...

【3分钟开服】幻兽帕鲁服务器一键部署保姆教程

在帕鲁的世界&#xff0c;你可以选择与神奇的生物「帕鲁」一同享受悠闲的生活&#xff0c;也可以投身于与偷猎者进行生死搏斗的冒险。帕鲁可以进行战斗、繁殖、协助你做农活&#xff0c;也可以为你在工厂工作。你也可以将它们进行售卖&#xff0c;或肢解后食用。 引用自&#x…...

PandaWallet :Web3.0世界的入口

如果说互联网的普及和发展造就了移动支付&#xff0c;那么Web3的到来则书写了加密支付的新篇章&#xff0c;并将加密钱包的发展推向新高潮。 传统电子钱包的功能是储存资产与移动支付。加密钱包在储存资产与移动支付的基础上&#xff0c;增加了身份标识的功能。这也是Web3中用户…...

微软Azure-openAI 测试调用及说明

本文是公司在调研如何集成Azure-openAI时&#xff0c;调试测试用例得出的原文&#xff0c;原文主要基于官方说明文档简要整理实现 本文已假定阅读者申请部署了模型&#xff0c;已获取到所需的密钥和终结点 变量名称值ENDPOINT从 Azure 门户检查资源时&#xff0c;可在“密钥和…...

java 图书管理系统 spring boot项目

java 图书管理系统ssm框架 spring boot项目 功能有管理员模块&#xff1a;图书管理&#xff0c;读者管理&#xff0c;借阅管理&#xff0c;登录&#xff0c;修改密码 读者端&#xff1a;可查看图书信息&#xff0c;借阅记录&#xff0c;登录&#xff0c;修改密码 技术&#…...

Ubuntu系统安装 Redis

环境准备 Ubuntu 系统版本&#xff1a;22.04.3Redis 版本&#xff1a;6.2.12 检查本地 make 环境 make -version若没有安装&#xff0c;则需要安装 sudo apt install make检查本地 gcc 环境 gcc -version若没有安装&#xff0c;则需要安装 sudo apt install gcc。 sudo a…...

简单记录一下如何安装python以及pycharm(图文教程)(可供福建专升本理工类同学使用)

本教程主要给不懂计算机的或者刚刚开始学习python的同学&#xff08;福建专升本理工类&#xff09;&网友学习使用&#xff0c;基础操作&#xff0c;比较详细&#xff0c;其他问题等待补充&#xff01; 安装Python 1.进入python官网&#xff08;https://www.python.org/&a…...

浏览器内存泄漏排查指南

1、setTimeout执行原理 使用setInterval/setTimeOut遇到的坑 - 掘金 2、Chrome自带的Performance工具 当我们怀疑页面发生了内存泄漏的时候&#xff0c;可以先用Performance录制一段时间内页面的内存变化。 点击开始录制执行可能引起内存泄漏的操作点击停止录制 如果录制结束…...

ClickHouse(22)ClickHouse集成HDFS表引擎详细解析

文章目录 HDFS用法实施细节配置可选配置选项及其默认值的列表libhdfs3 支持的ClickHouse 额外的配置限制 Kerberos 支持虚拟列 资料分享系列文章clickhouse系列文章知乎系列文章 HDFS 这个引擎提供了与Apache Hadoop生态系统的集成&#xff0c;允许通过ClickHouse管理HDFS上的…...

idea报错 :(java: 找不到符号)

java: 找不到符号 符号: 变量 adminService 位置: 类 com.example.controller.WebController 查到网上一个办法&#xff1a;因为项目是maven&#xff1a;先点clean在点package...

设计软件最重要的目标是可理解性?

当您设计一款软件时&#xff0c;设计时最重要的一点就是可理解性。安全性、性能和正确性都很重要&#xff0c;但它们次优于可理解性。 被误解的软件会产生Bug缺陷 如果软件的实施者和维护者对软件存在误解&#xff0c;那么软件最终就会出现缺陷。主要缺陷。这些缺陷有多种形式…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...