Flink 定时加载数据源
一、简介
flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单
如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现
二、pom.xml 文件
注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带

下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.ye</groupId><artifactId>flink-study</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.0</flink.version><!--<flink.version>1.12.7</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
<!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ye.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>
三、自定义数据源
使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串
public class TimerSinkRich extends RichSourceFunction<String> {private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();private boolean flag = true;private Timer timer;private TimerTask timerTask;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);timerTask = new TimerTask() {@Overridepublic void run() {// 可以在这块获取 MySQL、redis 等连接并查询数据Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();// 延时和执行周期参数可以通过构造方法传递timer.schedule(timerTask,1000,5000);}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (flag){if(queue.size()>0){ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if(null!=timer) timer.cancel();if(null!=timerTask) timerTask.cancel();// 撤销任务时,flink 默认 30 s(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败if(queue.size()<=0) flag = false;}
}
四、flink 加载数据源并启动
public class TimerSinkStreamJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = executionEnvironment.addSource(new TimerSinkRich());streamSource.print();executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");}
}
本地测试成功

五、上传 flink 集群
1、flink 1.16.0
启动成功

撤销任务成功

solt 也成功释放

2、flink 1.12.7
启动成功

撤销任务当然也没问题,同样能正常释放 slot

当然你也可以不要 open() 方法
public class DiySinkRich extends RichSourceFunction<String> {private TimerTask timerTask;private Timer timer;private boolean flag = true;private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();@Overridepublic void run(SourceFunction.SourceContext<String> ctx) throws Exception {timerTask = new TimerTask() {@Overridepublic void run() {Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();timer.schedule(timerTask, 1000, 5000);while (flag) {if (queue.size() > 0) {ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if (timer != null) timer.cancel();if (timerTask != null) timerTask.cancel();if (queue.size() == 0) flag = false;}
}
以上就是 flink 定时加载数据源的简单实例
相关文章:
Flink 定时加载数据源
一、简介 flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处…...
ChatGPT、人工智能、人类和一些酒桌闲聊
© 2023 Conmajia Initiated 10th March, 2023 昨天跟某化学家喝酒,期间提到了 ChatGPT。他的评价是:这鬼东西大量输出毫无意义、错漏百出甚至是虚假的信息,“in a confident accent”。例如某次 GPT 针对“描述某某记者”这一问题&#…...
WebRTC开源库内部调用abort函数引发程序发生闪退问题的排查
目录 1、初始问题描述 2、使用Process Explorer工具查看到处理音视频业务的rtcmpdll.dll模块没有加载起来 3、使用Dependency Walker工具查看到rtcmpdll.dll依赖的库有问题 4、更新库之后Debug程序启动时就发生异常,程序闪退 5、VS调试时看不到有效的函数调用堆…...
Golang并发编程
Golang并发编程 文章目录Golang并发编程1. 协程2. channel2.1 channel的创建2.2 使用waitGroup实现同步3. 并发编程3.1 并发编程之runtime包3.2 mutex互斥锁3.3 channel遍历3.3.1 for if遍历3.3.2 for range3.4 select switch3.5 Timer3.5.1 time.NewTimer()3.5.2 Stop、reset…...
windows+Anaconda环境下安装BERT成功安装方法及问题汇总
前言 在WindowsAnaconda环境下安装BERT,遇到各种问题,几经磨难,最终成功。接下来,先介绍成功的安装方法,再附上遇到的问题汇总 成功的安装方法 1、创建虚拟环境 注意:必须加上python3.7.12以创建环境&a…...
git - 简易指南
git - 简易指南 创建新仓库 创建新文件夹,打开,然后执行 git init 以创建新的 git 仓库。 检出仓库 执行如下命令以创建一个本地仓库的克隆版本: git clone /path/to/repository 如果是远端服务器上的仓库,你的命令会是这个样…...
[论文笔记]Transformer-XL: Attentive Language Models Beyond a Fixed-Length Context
引言 我们知道Transformer很好用,但它设定的最长长度是512。像一篇文章超过512个token是很容易的,那么我们在处理这种长文本的情况下也想利用Transformer的强大表达能力需要怎么做呢? 本文就带来一种处理长文本的Transformer变种——Transf…...
华为OD机试题 - 找目标字符串(JavaScript)| 机考必刷
更多题库,搜索引擎搜 梦想橡皮擦华为OD 👑👑👑 更多华为OD题库,搜 梦想橡皮擦 华为OD 👑👑👑 更多华为机考题库,搜 梦想橡皮擦华为OD 👑👑👑 华为OD机试题 最近更新的博客使用说明本篇题解:找目标字符串题目输入输出示例一输入输出说明Code解题思路版权说…...
C++面向对象编程之六:重载操作符(<<,>>,+,+=,==,!=,=)
重载操作符C允许我们重新定义操作符(例如:,-,*,/)等,使其对于我们自定义的类类型对象,也能像内置数据类型(例如:int,float,double&…...
JS_wangEditor富文本编辑器
官网:https://www.wangeditor.com/ 引入 CSS 定义样式 <link href"https://unpkg.com/wangeditor/editorlatest/dist/css/style.css" rel"stylesheet"> <style>#editor—wrapper {border: 1px solid #ccc;z-index: 100; /* 按需定…...
Django实践-06导出excel/pdf/echarts
文章目录Django实践-06导出excel/pdf/echartsDjango实践-06导出excel/pdf/echarts导出excel安装依赖库修改views.py添加excel导出函数修改urls.py添加excel/运行测试导出pdf安装依赖库修改views.py添加pdf导出函数修改urls.py添加pdf/生成前端统计图表修改views.py添加get_teac…...
java并发入门(一)共享模型—Synchronized、Wait/Notify、pack/unpack
一、共享模型—管程 1、共享存在的问题 1.1 共享变量案例 package com.yyds.juc.monitor;import lombok.extern.slf4j.Slf4j;Slf4j(topic "c.MTest1") public class MTest1 {static int counter 0;public static void main(String[] args) throws InterruptedEx…...
Ast2500增加用户自定义功能
备注:这里使用的AMI的开发环境MegaRAC进行AST2500软件开发,并非openlinux版本。1、添加上电后自动执行的任务在PDKAccess.c中列出了系统启动过程中的所有任务,若需要添加功能,在相应的任务中添加自定义线程。一般在两个任务里面添…...
用Python暴力求解德·梅齐里亚克的砝码问题
文章目录固定个数的砝码可称量重量砝码的组合方法40镑砝码的组合问 一个商人有一个40磅的砝码,由于跌落在地而碎成4块。后来,称得每块碎片的重量都是整磅数,而且可以用这4 块来称从1 至40 磅之间的任意整数磅的重物。问这4 块砝码片各重多少&…...
离散Hopfield神经网络的分类——高校科研能力评价
离散Hopfield网络离散Hopfield网络是一种经典的神经网络模型,它的基本原理是利用离散化的神经元和离散化的权值矩阵来实现模式识别和模式恢复的功能。它最初由美国物理学家John Hopfield在1982年提出,是一种单层的全连接神经网络,被广泛应用于…...
Retrofit核心源码分析(三)- Call逻辑分析和扩展机制
在前面的两篇文章中,我们已经对 Retrofit 的注解解析、动态代理、网络请求和响应处理机制有了一定的了解。在这篇文章中,我们将深入分析 Retrofit 的 Call 逻辑,并介绍 Retrofit 的扩展机制。 一、Call 逻辑分析 Call 是 Retrofit 中最基本…...
源码分析spring如和对@Component注解进行BeanDefinition注册的
Spring ioc主要职责为依赖进行处理(依赖注入、依赖查找)、容器以及托管的(java bean、资源配置、事件)资源声明周期管理;在ioc容器启动对元信息进行读取(比如xml bean注解等)、事件管理、国际化等处理;首先…...
C语言--字符串函数1
目录前言strlenstrlen的模拟实现strcpystrcatstrcat的模拟实现strcmpstrcmp的模拟实现strncpystrncatstrncmpstrstrstrchr和strrchrstrstr的模拟实现前言 本章我们将重点介绍处理字符和字符串的库函数的使用和注意事项。 strlen 我们先来看一个我们最熟悉的求字符串长度的库…...
Webstorm使用、nginx启动、FinalShell使用
文章目录 主题设置FinalShellFinalShell nginx 启动历史命令Nginx页面发布配置Webstorm的一些常用快捷键代码生成字体大小修改Webstorm - gitCode 代码拉取webstorm 汉化webstorm导致CPU占用率高方法一 【忽略node_modules】方法二 【设置 - 代码编辑 - 快速预览文档 - 关闭】主…...
源码分析Spring @Configuration注解如何巧夺天空,偷梁换柱。
前言 回想起五年前的一次面试,面试官问Configuration注解和Component注解有什么区别?记得当时的回答是: 相同点:Configuration注解继承于Component注解,都可以用来通过ClassPathBeanDefinitionScanner装载Spring bean…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...
《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...
绕过 Xcode?使用 Appuploader和主流工具实现 iOS 上架自动化
iOS 应用的发布流程一直是开发链路中最“苹果味”的环节:强依赖 Xcode、必须使用 macOS、各种证书和描述文件配置……对很多跨平台开发者来说,这一套流程并不友好。 特别是当你的项目主要在 Windows 或 Linux 下开发(例如 Flutter、React Na…...
二维FDTD算法仿真
二维FDTD算法仿真,并带完全匹配层,输入波形为高斯波、平面波 FDTD_二维/FDTD.zip , 6075 FDTD_二维/FDTD_31.m , 1029 FDTD_二维/FDTD_32.m , 2806 FDTD_二维/FDTD_33.m , 3782 FDTD_二维/FDTD_34.m , 4182 FDTD_二维/FDTD_35.m , 4793...
高分辨率图像合成归一化流扩展
大家读完觉得有帮助记得关注和点赞!!! 1 摘要 我们提出了STARFlow,一种基于归一化流的可扩展生成模型,它在高分辨率图像合成方面取得了强大的性能。STARFlow的主要构建块是Transformer自回归流(TARFlow&am…...
Django RBAC项目后端实战 - 03 DRF权限控制实现
项目背景 在上一篇文章中,我们完成了JWT认证系统的集成。本篇文章将实现基于Redis的RBAC权限控制系统,为系统提供细粒度的权限控制。 开发目标 实现基于Redis的权限缓存机制开发DRF权限控制类实现权限管理API配置权限白名单 前置配置 在开始开发权限…...
