【Flink实战】玩转Flink里面核心的Source Operator实战
🚀 作者 :“大数据小禅”
🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战
🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
目录导航
- Flink 的API层级介绍Source Operator速览
- Flink 预定义的Source 数据源 案例实战
- Flink自定义的Source 数据源案例-订单来源实战
Flink 的API层级介绍Source Operator速览
-
Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象
-
第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理
-
第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发
- 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
-
第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差
- 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
- 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
-
第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式
- SQL 抽象与 Table API 抽象之间的关联是非常紧密的
-
注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层

-
-
Flink编程模型

-
Source来源
-
元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
-
文件/文件系统
- env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
-
基于Socket
- env.socketTextStream(“ip”, 8888)
-
自定义Source,实现接口自定义数据源,rich相关的api更丰富
-
并行度为1
- SourceFunction
- RichSourceFunction
-
并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
-
-
-
Connectors与第三方系统进行对接(用于source或者sink都可以)
- Flink本身提供Connector例如kafka、RabbitMQ、ES等
- 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
-
Apache Bahir连接器
- 里面也有kafka、RabbitMQ、ES的连接器更多
-
总结 和外部系统进行读取写入的
- 第一种 Flink 里面预定义的 source 和 sink。
- 第二种 Flink 内部也提供部分 Boundled connectors。
- 第三种是第三方 Apache Bahir 项目中的连接器。
- 第四种是通过异步 IO 方式
- 异步I/O是Flink提供的非常底层的与外部系统交互
Flink 预定义的Source 数据源 案例实战
- Source来源
- 元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
- 元素集合
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//相同类型元素的数据流 sourceDataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");stringDS1.print("stringDS1");DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));stringDS2.print("stringDS2");DataStreamSource<Long> longDS3 = env.fromSequence(0,10);longDS3.print("longDS3");//DataStream需要调用execute,可以取个名称env.execute("xdclass job");}
- 文件/文件系统
- env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");//DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");textDS.print();env.execute("xdclass job");
}
- 基于Socket
- env.socketTextStream(“ip”, 8888)
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);stringDataStream.print();env.execute(" job");
}
Flink自定义的Source 数据源案例-订单来源实战
-
自定义Source,实现接口自定义数据源
-
并行度为1
- SourceFunction
- RichSourceFunction
-
并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
-
Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
-
-
创建接口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {private volatile Boolean flag = true;private Random random = new Random();private static List<String> list = new ArrayList<>();static {list.add("spring boot2.x课程");list.add("微服务SpringCloud课程");list.add("RabbitMQ消息队列");list.add("Kafka课程");list.add("Flink流式技术课程");list.add("工业级微服务项目大课训练营");list.add("Linux课程");}@Overridepublic void run(SourceContext<VideoOrder> ctx) throws Exception {while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(10);int money = random.nextInt(100);int videoNum = random.nextInt(list.size());String title = list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任务*/@Overridepublic void cancel() {flag = false;}
}
- 案例
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要调用execute,可以取个名称env.execute("custom source job");}
不断产生很多订单

相关文章:
【Flink实战】玩转Flink里面核心的Source Operator实战
🚀 作者 :“大数据小禅” 🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 目录导航 Flink 的API层级介绍Source Operator速览Flin…...
[2023-09-12]Oracle备库查询报ORA-01187
一个多表关联的语句在备库执行查询时提示ORA-01187: cannot read from file because it failed verification tests,单独对某一个表查询则正常返回(因为不需要排序等,没有用到临时表空间)。 查看报错信息发现是提示的临时数据文件…...
leetcode 16.最接近的三数之和
给你一个长度为 n 的整数数组 nums 和 一个目标值 target。请你从 nums 中选出三个整数,使它们的和与 target 最接近。 返回这三个数的和。 假定每组输入只存在恰好一个解。 示例 1: 输入:nums [-1,2,1,-4], target 1 输出:…...
antd table 自定义排序图标
要在Ant Design的Table组件中自定义排序图标,可以使用sorter和sortDirections属性来实现自定义排序逻辑和图标。以下是一个示例,演示如何在Ant Design的Table中自定义排序图标: import React, { useState } from react; import { Table, Spa…...
第十九章、【Linux】开机流程、模块管理与Loader
19.1.1 开机流程一览 以个人计算机架设的 Linux 主机为例,当你按下电源按键后计算机硬件会主动的读取 BIOS 或 UEFI BIOS 来载入硬件信息及进行硬件系统的自我测试, 之后系统会主动的去读取第一个可开机的设备 (由 BIOS 设置的) …...
GMAC PHY介绍
1.1PHY接口发展 (1)MII支持10M/100Mbps,一个接口由14根线组成,它的支持还是比较灵活的,但是有一个缺点是因为它一个端口用的信号线太多。参考芯片:DP83848 、DM900A(该芯片内部集成了MAC和PHY接…...
华为OD机考算法题:最远足迹
目录 题目部分 解读与分析 代码实现 题目部分 题目最远足迹难度易题目说明某探险队负责对地下洞穴进行探险。 探险队成员在进行探险任务时,随身携带的记录器会不定期地记录自身的坐标,但在记录的间隙中也会记录其他数据。探索工作结束后,…...
QScrollBar滚动条、QSlider滑块、 QDial表盘
QAbstractSlider 类、 QSCrollBar 类、 QSlider 类 一、 基本原理 1、 QAbstractSlider 继承自 QWidget,该类主要用于提供一个范围内的整数值, 2、 QAbstractSlider 类是 QScrollBar 类(滚动条)、 QSlider 类(滑块)、 QDial 类(表盘)的父类,因…...
Prometheus+Grafana可视化监控【MySQL状态】
文章目录 一、安装Docker二、安装MySQL数据库(Docker容器方式)三、安装Prometheus四、安装Grafana五、Pronetheus和Grafana相关联六、安装mysqld_exporter七、Grafana添加MySQL监控模板 一、安装Docker 注意:我这里使用之前写好脚本进行安装Docker,如果…...
五,编译定制rom并刷机实现硬改(二)
系列文章目录 第一章 安卓aosp源码编译环境搭建 第二章 手机硬件参数介绍和校验算法 第三章 修改安卓aosp代码更改硬件参数 第四章 编译定制rom并刷机实现硬改(一) 第五章 编译定制rom并刷机实现硬改(二) 第六章 不root不magisk不xposed lsposed frida原生修改定位 第七章 安卓…...
Modbus协议详解3:数据帧格式 - RTU帧 ASCII帧的区别
Modbus既然是一种通信协议,那它就应该有规定的通信格式用于在设备之间的指令接收与识别。 本文就着重讲讲Modbus协议的RTU帧和ASCII帧。 Modbus帧在串行链路上的格式如下: 在上图的格式中: 1)地址域:指代的是子节点地址…...
认识数据分析
文章目录 1. 认识数据分析1.1 数据自身的三大属性1.2 建数仓 数据分析的工程技术1.3 数据分析解决问题的原理1.4 数据分析的具体流程1.5 数据的中心化和智能化1.6 数据分析的四种类型和六个方向 1. 认识数据分析 1.1 数据自身的三大属性 客观:用数字衡量和表现一件…...
Learn Prompt-ChatGPT 精选案例:写作博客
在 ChatGPT 的帮助下,文本内容的产出,尤其是撰写博客文章的过程得到了进一步的简化。你可以让 ChatGPT 激发你的灵感,也可以让它美化你的文章内容。 这里我们希望能通过prompt写出一篇以“ChatGPT对社会各行各业的影响”为主题的博客。 本页…...
《确保安全:PostgreSQL安全配置与最佳实践》
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🐅🐾猫头虎建议程序员必备技术栈一览表📖: 🛠️ 全栈技术 Full Stack: 📚…...
Unity中Shader抓取屏幕并实现扭曲效果
文章目录 前言一、屏幕抓取,在上一篇文章已经写了二、实现抓取后的屏幕扭曲实现思路:1、屏幕扭曲要借助传入 UV 贴图进行扭曲2、传入贴图后在顶点着色器的输入参数处,传入一个 float2 uv : TEXCOORD,用于之后对扭曲贴图进行采样3、…...
深浅拷贝详解
深浅拷贝 经典真题 深拷贝和浅拷贝的区别?如何实现 深拷贝和浅拷贝概念 首先,我们需要明确深拷贝和浅拷贝的概念。 浅拷贝:只是拷贝了基本类型的数据,而引用类型数据,复制后也是会发生引用,我们把这种拷…...
@Scheduled 定时任务
Scheduled(cron"30 * * * * ?") 1.cron表达式格式: {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} 2.cron表达式各占位符解释: {秒数}{分钟} > 允许值范围: 0~59 ,不允许为空值,若值不合法,调度器将…...
丙烯酸共聚聚氯乙烯树脂
声明 本文是学习GB-T 42790-2023 丙烯酸共聚聚氯乙烯树脂. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件规定了丙烯酸共聚聚氯乙烯树脂的外观、物化性能等技术要求,描述了相应的采样、试验方 法、检验规则、标志、包装、…...
Navicat导入Excel数据顺序变了
项目场景: Navicat导入Excel数据 问题描述 从Excel表格中导入数据到数据库中。但是,在导入的过程中,我们常会发现数据顺序出现了问题,导致数据错位,给数据的处理带来了极大的麻烦。 原因分析: 这个问题的…...
uni-app的生命周期
uni-app的生命周期包括应用生命周期和页面生命周期。 应用生命周期涵盖了整个uni-app应用的启动、运行和销毁过程,主要包括以下几个生命周期函数: onLaunch:应用初始化时触发,只触发一次。onShow:应用启动或从后台进…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
