当前位置: 首页 > news >正文

【Flink实战】玩转Flink里面核心的Source Operator实战

🚀 作者 :“大数据小禅”

🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


目录导航

      • Flink 的API层级介绍Source Operator速览
      • Flink 预定义的Source 数据源 案例实战
      • Flink自定义的Source 数据源案例-订单来源实战

Flink 的API层级介绍Source Operator速览

  • Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象

    • 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理

    • 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发

      • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
    • 第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差

      • 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
      • 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
    • 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式

      • SQL 抽象与 Table API 抽象之间的关联是非常紧密的
    • 注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
      在这里插入图片描述

  • Flink编程模型

在这里插入图片描述

  • Source来源

    • 元素集合

      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    • 文件/文件系统

      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    • 基于Socket

      • env.socketTextStream(“ip”, 8888)
    • 自定义Source,实现接口自定义数据源,rich相关的api更丰富

      • 并行度为1

        • SourceFunction
        • RichSourceFunction
      • 并行度大于1

        • ParallelSourceFunction
        • RichParallelSourceFunction
  • Connectors与第三方系统进行对接(用于source或者sink都可以)

    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
  • Apache Bahir连接器

    • 里面也有kafka、RabbitMQ、ES的连接器更多
  • 总结 和外部系统进行读取写入的

    • 第一种 Flink 里面预定义的 source 和 sink。
    • 第二种 Flink 内部也提供部分 Boundled connectors。
    • 第三种是第三方 Apache Bahir 项目中的连接器。
    • 第四种是通过异步 IO 方式
      • 异步I/O是Flink提供的非常底层的与外部系统交互

Flink 预定义的Source 数据源 案例实战

  • Source来源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
 public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//相同类型元素的数据流 sourceDataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");stringDS1.print("stringDS1");DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));stringDS2.print("stringDS2");DataStreamSource<Long> longDS3 = env.fromSequence(0,10);longDS3.print("longDS3");//DataStream需要调用execute,可以取个名称env.execute("xdclass job");}
  • 文件/文件系统
    • env.readTextFile(本地文件);
    • env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");//DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");textDS.print();env.execute("xdclass job");
}
  • 基于Socket
    • env.socketTextStream(“ip”, 8888)
   public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);stringDataStream.print();env.execute(" job");
}

Flink自定义的Source 数据源案例-订单来源实战

  • 自定义Source,实现接口自定义数据源

    • 并行度为1

      • SourceFunction
      • RichSourceFunction
    • 并行度大于1

      • ParallelSourceFunction
      • RichParallelSourceFunction
    • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等

  • 创建接口

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {private volatile Boolean flag = true;private  Random random = new Random();private static List<String> list = new ArrayList<>();static {list.add("spring boot2.x课程");list.add("微服务SpringCloud课程");list.add("RabbitMQ消息队列");list.add("Kafka课程");list.add("Flink流式技术课程");list.add("工业级微服务项目大课训练营");list.add("Linux课程");}@Overridepublic void run(SourceContext<VideoOrder> ctx) throws Exception {while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(10);int money = random.nextInt(100);int videoNum = random.nextInt(list.size());String title = list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任务*/@Overridepublic void cancel() {flag = false;}
}
  • 案例
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要调用execute,可以取个名称env.execute("custom source job");}

不断产生很多订单

在这里插入图片描述

相关文章:

【Flink实战】玩转Flink里面核心的Source Operator实战

&#x1f680; 作者 &#xff1a;“大数据小禅” &#x1f680; 文章简介 &#xff1a;【Flink实战】玩转Flink里面核心的Source Operator实战 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 目录导航 Flink 的API层级介绍Source Operator速览Flin…...

[2023-09-12]Oracle备库查询报ORA-01187

一个多表关联的语句在备库执行查询时提示ORA-01187: cannot read from file because it failed verification tests&#xff0c;单独对某一个表查询则正常返回&#xff08;因为不需要排序等&#xff0c;没有用到临时表空间&#xff09;。 查看报错信息发现是提示的临时数据文件…...

leetcode 16.最接近的三数之和

给你一个长度为 n 的整数数组 nums 和 一个目标值 target。请你从 nums 中选出三个整数&#xff0c;使它们的和与 target 最接近。 返回这三个数的和。 假定每组输入只存在恰好一个解。 示例 1&#xff1a; 输入&#xff1a;nums [-1,2,1,-4], target 1 输出&#xff1a;…...

antd table 自定义排序图标

要在Ant Design的Table组件中自定义排序图标&#xff0c;可以使用sorter和sortDirections属性来实现自定义排序逻辑和图标。以下是一个示例&#xff0c;演示如何在Ant Design的Table中自定义排序图标&#xff1a; import React, { useState } from react; import { Table, Spa…...

第十九章、【Linux】开机流程、模块管理与Loader

19.1.1 开机流程一览 以个人计算机架设的 Linux 主机为例&#xff0c;当你按下电源按键后计算机硬件会主动的读取 BIOS 或 UEFI BIOS 来载入硬件信息及进行硬件系统的自我测试&#xff0c; 之后系统会主动的去读取第一个可开机的设备 &#xff08;由 BIOS 设置的&#xff09; …...

GMAC PHY介绍

1.1PHY接口发展 &#xff08;1&#xff09;MII支持10M/100Mbps&#xff0c;一个接口由14根线组成&#xff0c;它的支持还是比较灵活的&#xff0c;但是有一个缺点是因为它一个端口用的信号线太多。参考芯片&#xff1a;DP83848 、DM900A&#xff08;该芯片内部集成了MAC和PHY接…...

华为OD机考算法题:最远足迹

目录 题目部分 解读与分析 代码实现 题目部分 题目最远足迹难度易题目说明某探险队负责对地下洞穴进行探险。 探险队成员在进行探险任务时&#xff0c;随身携带的记录器会不定期地记录自身的坐标&#xff0c;但在记录的间隙中也会记录其他数据。探索工作结束后&#xff0c;…...

QScrollBar滚动条、QSlider滑块、 QDial表盘

QAbstractSlider 类、 QSCrollBar 类、 QSlider 类 一、 基本原理 1、 QAbstractSlider 继承自 QWidget&#xff0c;该类主要用于提供一个范围内的整数值&#xff0c; 2、 QAbstractSlider 类是 QScrollBar 类(滚动条)、 QSlider 类(滑块)、 QDial 类(表盘)的父类&#xff0c;因…...

Prometheus+Grafana可视化监控【MySQL状态】

文章目录 一、安装Docker二、安装MySQL数据库(Docker容器方式)三、安装Prometheus四、安装Grafana五、Pronetheus和Grafana相关联六、安装mysqld_exporter七、Grafana添加MySQL监控模板 一、安装Docker 注意&#xff1a;我这里使用之前写好脚本进行安装Docker&#xff0c;如果…...

五,编译定制rom并刷机实现硬改(二)

系列文章目录 第一章 安卓aosp源码编译环境搭建 第二章 手机硬件参数介绍和校验算法 第三章 修改安卓aosp代码更改硬件参数 第四章 编译定制rom并刷机实现硬改(一) 第五章 编译定制rom并刷机实现硬改(二) 第六章 不root不magisk不xposed lsposed frida原生修改定位 第七章 安卓…...

Modbus协议详解3:数据帧格式 - RTU帧 ASCII帧的区别

Modbus既然是一种通信协议&#xff0c;那它就应该有规定的通信格式用于在设备之间的指令接收与识别。 本文就着重讲讲Modbus协议的RTU帧和ASCII帧。 Modbus帧在串行链路上的格式如下&#xff1a; 在上图的格式中&#xff1a; 1&#xff09;地址域&#xff1a;指代的是子节点地址…...

认识数据分析

文章目录 1. 认识数据分析1.1 数据自身的三大属性1.2 建数仓 数据分析的工程技术1.3 数据分析解决问题的原理1.4 数据分析的具体流程1.5 数据的中心化和智能化1.6 数据分析的四种类型和六个方向 1. 认识数据分析 1.1 数据自身的三大属性 客观&#xff1a;用数字衡量和表现一件…...

Learn Prompt-ChatGPT 精选案例:写作博客

在 ChatGPT 的帮助下&#xff0c;文本内容的产出&#xff0c;尤其是撰写博客文章的过程得到了进一步的简化。你可以让 ChatGPT 激发你的灵感&#xff0c;也可以让它美化你的文章内容。 这里我们希望能通过prompt写出一篇以“ChatGPT对社会各行各业的影响”为主题的博客。 本页…...

《确保安全:PostgreSQL安全配置与最佳实践》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f405;&#x1f43e;猫头虎建议程序员必备技术栈一览表&#x1f4d6;&#xff1a; &#x1f6e0;️ 全栈技术 Full Stack: &#x1f4da…...

Unity中Shader抓取屏幕并实现扭曲效果

文章目录 前言一、屏幕抓取&#xff0c;在上一篇文章已经写了二、实现抓取后的屏幕扭曲实现思路&#xff1a;1、屏幕扭曲要借助传入 UV 贴图进行扭曲2、传入贴图后在顶点着色器的输入参数处&#xff0c;传入一个 float2 uv : TEXCOORD&#xff0c;用于之后对扭曲贴图进行采样3、…...

深浅拷贝详解

深浅拷贝 经典真题 深拷贝和浅拷贝的区别&#xff1f;如何实现 深拷贝和浅拷贝概念 首先&#xff0c;我们需要明确深拷贝和浅拷贝的概念。 浅拷贝&#xff1a;只是拷贝了基本类型的数据&#xff0c;而引用类型数据&#xff0c;复制后也是会发生引用&#xff0c;我们把这种拷…...

@Scheduled 定时任务

Scheduled(cron"30 * * * * ?") 1.cron表达式格式&#xff1a; {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} 2.cron表达式各占位符解释&#xff1a; {秒数}{分钟} > 允许值范围: 0~59 ,不允许为空值&#xff0c;若值不合法&#xff0c;调度器将…...

丙烯酸共聚聚氯乙烯树脂

声明 本文是学习GB-T 42790-2023 丙烯酸共聚聚氯乙烯树脂. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件规定了丙烯酸共聚聚氯乙烯树脂的外观、物化性能等技术要求&#xff0c;描述了相应的采样、试验方 法、检验规则、标志、包装、…...

Navicat导入Excel数据顺序变了

项目场景&#xff1a; Navicat导入Excel数据 问题描述 从Excel表格中导入数据到数据库中。但是&#xff0c;在导入的过程中&#xff0c;我们常会发现数据顺序出现了问题&#xff0c;导致数据错位&#xff0c;给数据的处理带来了极大的麻烦。 原因分析&#xff1a; 这个问题的…...

uni-app的生命周期

uni-app的生命周期包括应用生命周期和页面生命周期。 应用生命周期涵盖了整个uni-app应用的启动、运行和销毁过程&#xff0c;主要包括以下几个生命周期函数&#xff1a; onLaunch&#xff1a;应用初始化时触发&#xff0c;只触发一次。onShow&#xff1a;应用启动或从后台进…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

聊聊 Pulsar:Producer 源码解析

一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台&#xff0c;以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中&#xff0c;Producer&#xff08;生产者&#xff09; 是连接客户端应用与消息队列的第一步。生产者…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

免费PDF转图片工具

免费PDF转图片工具 一款简单易用的PDF转图片工具&#xff0c;可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件&#xff0c;也不需要在线上传文件&#xff0c;保护您的隐私。 工具截图 主要特点 &#x1f680; 快速转换&#xff1a;本地转换&#xff0c;无需等待上…...

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...

nnUNet V2修改网络——暴力替换网络为UNet++

更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...

Docker拉取MySQL后数据库连接失败的解决方案

在使用Docker部署MySQL时&#xff0c;拉取并启动容器后&#xff0c;有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致&#xff0c;包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因&#xff0c;并提供解决方案。 一、确认MySQL容器的运行状态 …...