当前位置: 首页 > news >正文

flink Data Source数据源

flink

Data Source数据源

Source

  • 并行度

    • 非并行:并行度只能为1

    • 并行

  • 基于集合的Source

    • fromElements

      • package com.pxj.sx.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FromElementDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> daat = env.fromElements("flink", "spark", "hive");daat.print();Thread.sleep(2000000);}
}
	- fromElements(T ...) 方法是一个非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。-  fromCollection- fromCollection可以从一个结合读取数据,返回DataStream,该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;public class FromCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<String> wordList = Arrays.asList("flink", "spark", "hadoop", "flink");DataStreamSource<String> source = env.fromCollection(wordList);source.print();env.execute("pxj");}
}
- fromParallelCollection- fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.LongValueSequenceIterator;public class FromParallelCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//Source是多个并行的DataStreamSource<LongValue> dataSource = env.fromParallelCollection(new LongValueSequenceIterator(1, 10), LongValue.class);dataSource.print();env.execute("pxj");}
}
- generateSequence- generateSequence(long from, long to) 方法是一个并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)该方法需要传入两个long类型的参数,第一个是起始值,第二个是结束值,返回一个DataStreamSource。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。
  • 基于Socket网络端口

    • socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是"\n",最大重新连接次数为0。

      • package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocktDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);source.print();env.execute("pxj");}
}
  • 基于文件

    • readFile

      • package com.pxj.sx.flink;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;public class ReadFlie {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.readFile(new TextInputFormat(null), "data/a.txt",FileProcessingMode.PROCESS_CONTINUOUSLY, 2000).print();env.execute("pxj");}
}
- readTextFile- package com.pxj.sx.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class ReadFlieDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/a.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> datas = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(",");for (String s : strings) {out.collect(Tuple2.of(s, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = datas.keyBy(0).sum(1);summed.print();env.execute("pxj");}
}
  • 自定义Source

    • 单并行度

      • 可以实现 SourceFunction 或者 RichSourceFunction , 这两者都是非并行的source算子

        • package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class MySource2{public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource3());source.print();env.execute("pxj");}
}
class  MySource3 extends RichSourceFunction<String> {private int i=0; //定义一个int类型的变量,从1开始private boolean flag=true;  //定义一个flag标标志//run方法就是用来读取外部的数据或产生数据的逻辑@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (i<=100 && flag){Thread.sleep(1000); //为避免太快,睡眠1秒ctx.collect("data:"+i++);}}@Overridepublic void cancel() {flag=false;}
}
- 多并行度-    也可继承   ParallelSourceFunction  或者 RichParallelSourceFunction , 这两者都是可并行的source算子- 带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法

带 Parallel的,都可多实例并行执行source

				- package com.pxj.sx.flink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class MySource1{public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("pxj");}
}
class  MySource extends RichParallelSourceFunction<String> {private int i=0; //定义一个int类型的变量,从1开始private boolean flag=true;  //定义一个flag标标志//run方法就是用来读取外部的数据或产生数据的逻辑@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (i<=100 && flag){Thread.sleep(1000); //为避免太快,睡眠1秒ctx.collect("data:"+i++);}}@Overridepublic void cancel() {flag=false;}
}

整理人:pxj_sx(潘陈)
日 期:2024-05-26 11:47:24

相关文章:

flink Data Source数据源

flink Data Source数据源 Source 并行度 非并行&#xff1a;并行度只能为1 并行 基于集合的Source fromElements package com.pxj.sx.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.ap…...

网络七层模型与云计算中的网络服务

网络七层模型&#xff0c;也称为OSI&#xff08;Open System Interconnection&#xff09;模型&#xff0c;是由国际标准化组织&#xff08;ISO&#xff09;制定的一个概念性框架&#xff0c;用于描述网络通信过程中信息是如何被封装、传输和解封装的。这一模型将复杂的网络通信…...

word一按空格就换行怎么办?word文本之间添加空格就换行怎么办?

如上图&#xff0c;无法在Connection和con之间添加空格&#xff0c;一按空格就会自动换行。 第一步&#xff1a;选中文本&#xff0c;打开段落。 第二步&#xff1a;点击中文版式&#xff0c;勾选允许西文在单词中间换行。 确定之后就解决一按空格就自动换行啦&#xff01;...

Python 遍历字典的方法,你都掌握了吗

Python中的字典是一种非常灵活的数据结构&#xff0c;它允许通过键来存储和访问值。在处理字典时&#xff0c;经常需要遍历字典中的元素&#xff0c;以下是几种常见的遍历字典的方法。 1. 使用 for 循环直接遍历字典的键 字典的键是唯一的&#xff0c;可以直接通过 for 循环来…...

MySQL 8.4.0 LTS 变更解析:I_S 表、权限、关键字和客户端

↑ 关注“少安事务所”公众号&#xff0c;欢迎⭐收藏&#xff0c;不错过精彩内容~ MySQL 8.4.0 LTS 已经发布 &#xff0c;作为发版模型变更后的第一个长期支持版本&#xff0c;注定要承担未来生产环境的重任&#xff0c;那么这个版本都有哪些新特性、变更&#xff0c;接下来少…...

LeetCode 124 —— 二叉树中的最大路径和

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 二叉树的问题首先我们要想想是否能用递归来解决&#xff0c;本题也不例外&#xff0c;而递归的关键是找到子问题。 我们首先来看看一棵最简单的树&#xff0c;也就是示例 1。这样的一棵树总共有六条路径&#xf…...

美甲店会员预约系统管理小程序的作用是什么

女性爱美体现在方方面面&#xff0c;美丽好看的指甲也不能少&#xff0c;市场中美甲店、小摊不少&#xff0c;也跑出了不少连锁品牌&#xff0c;70后到00后&#xff0c;每个层级都有不少潜在客户&#xff0c;商家需要获取和完善转化路径&#xff0c;不断提高品牌影响力与自身内…...

..堆..

堆 堆是完全二叉树&#xff0c;即除了最后一列之外&#xff0c;上面的每一层都是满的&#xff08;左右严格对称且每个节点都满子节点&#xff09; 最后一列从左向右排序。 默认大根堆&#xff1a;每一个节点都大于其左右儿子&#xff0c;根节点就是整个数据结构的最大值 pr…...

【LLM多模态】综述Visual Instruction Tuning towards General-Purpose Multimodal Model

note 文章目录 note论文1. 论文试图解决什么问题2. 这是否是一个新的问题3. 这篇文章要验证一个什么科学假设4. 有哪些相关研究&#xff1f;如何归类&#xff1f;谁是这一课题在领域内值得关注的研究员&#xff1f;5. 论文中提到的解决方案之关键是什么&#xff1f;6. 论文中的…...

探索Linux中的神奇工具:重定向符的妙用

探索Linux中的神奇工具&#xff1a;重定向符的妙用 在Linux系统中&#xff0c;重定向符是一个强大的工具&#xff0c;用于控制命令的输入和输出&#xff0c;实现数据流的定向。本文将详细介绍重定向符的基本用法和一些实用技巧&#xff0c;帮助读者更好地理解和运用这个功能。…...

Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job

Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job 此文档从 Kubernetes 官网摘录 中文地址 英文地址 Job 会创建一个或者多个 Pod&#xff0c;并将继续重试 Pod 的执行&#xff0c;直到指定数量的 Pod 成功终止。 随着 Pod 成功结束&#xff0c;Job 跟踪记录成功完成的…...

办公自动化-Python如何提取Word标题并保存到Excel中?

办公自动化-Python如何提取Word标题并保存到Excel中&#xff1f; 应用场景需求分析实现思路实现过程安装依赖库打开需求文件获取word中所有标题去除不需要的标题创建工作簿和工作表分割标题功能名称存入测试对象GN-TC需求标识符存入测试项标识存入需求标识符 完整源码实现效果学…...

基于Java、SpringBoot和uniapp在线考试系统安卓APP和微信小程序

摘要 基于Java、SpringBoot和uniapp的在线考试系统安卓APP微信小程序是一种结合了现代Web开发技术和移动应用技术的解决方案&#xff0c;旨在为教育机构提供一个方便、高效和灵活的在线考试平台。该系统采用Java语言进行后端开发&#xff0c;使用SpringBoot框架简化企业级应用…...

抖音a-bogus加密解析(三)

要补的环境我给提示&#xff0c;大家自行操作&#xff0c;出了问题就是因为缺环境&#xff0c;没补好 window global; // reading _u未定义 window.requestAnimationFrame function () {} // XMLHttpRequest 未定义 window.XMLHttpRequest function () {} window.onwheelx …...

IS-IS DIS

原理概述 OSPF 协议支持4种网络类型&#xff0c; IS-IS 协议只支持两种网络类型&#xff0c;即广播网络和点到点网络。与 OSPF 协议相同&#xff0c; IS-IS 协议在广播网络中会将网络视为一个伪节点( Pseudonode &#xff0c;简称 PSN )&#xff0c;并选举出一台 DIS ( Designa…...

random和range

含义&#xff1a; random(1&#xff0c;10) 不包含10&#xff0c;用于生成随机数。它可以生成浮点数或整数&#xff0c;取决于具体的使用方式。 range(0&#xff0c;1) 不包含1&#xff0c;用于生成一个整数序列。它可以生成一个指定范围内的连续整数序列。 区别在于&#x…...

研二学妹面试字节,竟倒在了ThreadLocal上,这是不要应届生还是不要女生啊?

一、写在开头 今天和一个之前研二的学妹聊天&#xff0c;聊及她上周面试字节的情况&#xff0c;着实感受到了Java后端现在找工作的压力啊&#xff0c;记得在18&#xff0c;19年的时候&#xff0c;研究生计算机专业的学生&#xff0c;背背八股文找个Java开发工作毫无问题&#x…...

Golang:gammazero/deque是一个快速环形缓冲区deque(双端队列)实现

gammazero/deque是一个快速环形缓冲区deque&#xff08;双端队列&#xff09;实现。 文档 https://github.com/gammazero/deque 安装 go get github.com/gammazero/deque代码示例 先入先出队列 package mainimport ("fmt""github.com/gammazero/deque&quo…...

C++ 时间处理-统计函数运行时间

1. 关键词2. 问题3. 解决思路4. 代码实现 4.1. timecount.h4.2. timecount.cpp 5. 测试代码6. 运行结果7. 源码地址 1. 关键词 C 时间处理 统计函数运行时间 跨平台 2. 问题 C如何简单便捷地实现“函数运行时间的统计”功能&#xff1f; 3. 解决思路 类的构造函数&#x…...

JAVA面试题大全(十五)

1、Zookeeper 是什么&#xff1f; zookper是一个分布式的&#xff0c;开放源码的分布式应用程序协调服务。是 google chubby 的开源实现&#xff0c;是 hadoop 和 hbase 的重要组件。它是一个为分布式应用提供一致性服务的软件&#xff0c;提供的功能包括&#xff1a;配置维护…...

email2phonenumber与Phonerator对比分析:选择最适合你的OSINT工具

email2phonenumber与Phonerator对比分析&#xff1a;选择最适合你的OSINT工具 【免费下载链接】email2phonenumber A OSINT tool to obtain a targets phone number just by having his email address 项目地址: https://gitcode.com/gh_mirrors/em/email2phonenumber e…...

基于PID的四旋翼无人机轨迹跟踪控制 0. 直接运行simulink仿真文件.slx 1

基于PID的四旋翼无人机轨迹跟踪控制0. 直接运行simulink仿真文件.slx 1. 如果出现文件或变量不能识别的警告或错误&#xff0c;建议将文件夹添加到matlab搜索路径以检索到所需文件&#xff0c;或者进入到最里层文件夹运行程序。 2. 如果想去掉simulink模块的封面图&#xff08;…...

OctoPrintAPI嵌入式库:Arduino/ESP32轻量级REST客户端

1. 项目概述OctoPrintAPI 是一个专为 Arduino 兼容微控制器设计的轻量级 C 库&#xff0c;其核心目标是为嵌入式设备提供稳定、可移植、低侵入性的 OctoPrint REST API 访问能力。该库并非独立服务&#xff0c;而是作为“网络客户端适配层”存在——它不实现 HTTP 协议栈&#…...

AntiDupl.NET:彻底清理重复图片的终极免费解决方案

AntiDupl.NET&#xff1a;彻底清理重复图片的终极免费解决方案 【免费下载链接】AntiDupl A program to search similar and defect pictures on the disk 项目地址: https://gitcode.com/gh_mirrors/an/AntiDupl 你是否曾因电脑中堆积如山的重复照片而烦恼&#xff1f;…...

简单几步:REX-UniNLU快速部署,打造个人中文文本分析工具

简单几步&#xff1a;REX-UniNLU快速部署&#xff0c;打造个人中文文本分析工具 想快速搭建一个能理解中文、分析情感、识别实体的智能系统吗&#xff1f;REX-UniNLU是一个基于ModelScope DeBERTa的高精度中文自然语言处理系统&#xff0c;通过简洁的Web界面&#xff0c;让你无…...

K8s RBAC实战:一个实验搞定权限控制

RBAC 详解(基于角色的访问控制) 一个实验搞定RBAC 在Kubernetes中&#xff0c;授权有ABAC&#xff08;基于属性的访问控制&#xff09;、RBAC&#xff08;基于角色的访问控制&#xff09;、Webhook、Node、AlwaysDeny&#xff08;一直拒绝&#xff09;和AlwaysAllow&#xff08…...

基于高斯牛顿法的PnP优化:从重投影误差到相机位姿估计

1. 从投影误差到相机位姿&#xff1a;PnP问题的本质 想象你站在一个陌生城市&#xff0c;手里只有几张随手拍的照片。如何通过这些二维图像推断出自己当时拍摄的位置和角度&#xff1f;这正是Perspective-n-Point&#xff08;PnP&#xff09;问题要解决的核心场景。在实际的视觉…...

电子书怎么转TXT?这4个电子书转TXT工具亲测有效,小白也能秒会!

随着数字阅读的普及&#xff0c;EPUB格式电子书因排版精美深受用户喜爱&#xff0c;但在编辑、手机阅读或跨设备分享时&#xff0c;TXT格式的兼容性和灵活性更具优势。本文针对不同使用场景&#xff0c;整理了4种亲测有效的EPUB转TXT方法&#xff0c;每个方法均拆解详细步骤&am…...

软件实施交付转运维学习第五天:用户管理和权限管理

今天是软件实施交付转运维学习的第五天。前面四天我们分别了解了运维的基本概念、Linux常用命令。今天&#xff0c;我们进入一个既基础又极其重要的模块——用户管理和权限管理。无论是操作系统层面&#xff0c;还是应用系统层面&#xff0c;用户和权限都是安全的基石。谁可以访…...

Qwen3-TTS声音克隆实战:用3秒音频生成你的专属语音助手

Qwen3-TTS声音克隆实战&#xff1a;用3秒音频生成你的专属语音助手 1. 声音克隆技术带来的变革 想象一下&#xff0c;只需要录制3秒钟的语音&#xff0c;就能让AI完全模仿你的声音&#xff0c;用你的语调朗读任何文字内容。这不是科幻电影里的场景&#xff0c;而是Qwen3-TTS-…...