flink Data Source数据源
flink
Data Source数据源
Source
-
并行度
-
非并行:并行度只能为1
-
并行
-
-
基于集合的Source
-
fromElements
- package com.pxj.sx.flink;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FromElementDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> daat = env.fromElements("flink", "spark", "hive");daat.print();Thread.sleep(2000000);}
}
- fromElements(T ...) 方法是一个非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- fromCollection- fromCollection可以从一个结合读取数据,返回DataStream,该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;public class FromCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<String> wordList = Arrays.asList("flink", "spark", "hadoop", "flink");DataStreamSource<String> source = env.fromCollection(wordList);source.print();env.execute("pxj");}
}
- fromParallelCollection- fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.LongValueSequenceIterator;public class FromParallelCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//Source是多个并行的DataStreamSource<LongValue> dataSource = env.fromParallelCollection(new LongValueSequenceIterator(1, 10), LongValue.class);dataSource.print();env.execute("pxj");}
}
- generateSequence- generateSequence(long from, long to) 方法是一个并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)该方法需要传入两个long类型的参数,第一个是起始值,第二个是结束值,返回一个DataStreamSource。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。
-
基于Socket网络端口
-
socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是"\n",最大重新连接次数为0。
- package com.pxj.sx.flink;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocktDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);source.print();env.execute("pxj");}
}
-
基于文件
-
readFile
- package com.pxj.sx.flink;
-
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;public class ReadFlie {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.readFile(new TextInputFormat(null), "data/a.txt",FileProcessingMode.PROCESS_CONTINUOUSLY, 2000).print();env.execute("pxj");}
}
- readTextFile- package com.pxj.sx.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class ReadFlieDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/a.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> datas = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(",");for (String s : strings) {out.collect(Tuple2.of(s, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = datas.keyBy(0).sum(1);summed.print();env.execute("pxj");}
}
-
自定义Source
-
单并行度
-
可以实现 SourceFunction 或者 RichSourceFunction , 这两者都是非并行的source算子
- package com.pxj.sx.flink;
-
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class MySource2{public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource3());source.print();env.execute("pxj");}
}
class MySource3 extends RichSourceFunction<String> {private int i=0; //定义一个int类型的变量,从1开始private boolean flag=true; //定义一个flag标标志//run方法就是用来读取外部的数据或产生数据的逻辑@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (i<=100 && flag){Thread.sleep(1000); //为避免太快,睡眠1秒ctx.collect("data:"+i++);}}@Overridepublic void cancel() {flag=false;}
}
- 多并行度- 也可继承 ParallelSourceFunction 或者 RichParallelSourceFunction , 这两者都是可并行的source算子- 带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法
带 Parallel的,都可多实例并行执行source
- package com.pxj.sx.flink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class MySource1{public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("pxj");}
}
class MySource extends RichParallelSourceFunction<String> {private int i=0; //定义一个int类型的变量,从1开始private boolean flag=true; //定义一个flag标标志//run方法就是用来读取外部的数据或产生数据的逻辑@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (i<=100 && flag){Thread.sleep(1000); //为避免太快,睡眠1秒ctx.collect("data:"+i++);}}@Overridepublic void cancel() {flag=false;}
}
整理人:pxj_sx(潘陈)
日 期:2024-05-26 11:47:24
相关文章:
flink Data Source数据源
flink Data Source数据源 Source 并行度 非并行:并行度只能为1 并行 基于集合的Source fromElements package com.pxj.sx.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.ap…...
网络七层模型与云计算中的网络服务
网络七层模型,也称为OSI(Open System Interconnection)模型,是由国际标准化组织(ISO)制定的一个概念性框架,用于描述网络通信过程中信息是如何被封装、传输和解封装的。这一模型将复杂的网络通信…...
word一按空格就换行怎么办?word文本之间添加空格就换行怎么办?
如上图,无法在Connection和con之间添加空格,一按空格就会自动换行。 第一步:选中文本,打开段落。 第二步:点击中文版式,勾选允许西文在单词中间换行。 确定之后就解决一按空格就自动换行啦!...
Python 遍历字典的方法,你都掌握了吗
Python中的字典是一种非常灵活的数据结构,它允许通过键来存储和访问值。在处理字典时,经常需要遍历字典中的元素,以下是几种常见的遍历字典的方法。 1. 使用 for 循环直接遍历字典的键 字典的键是唯一的,可以直接通过 for 循环来…...
MySQL 8.4.0 LTS 变更解析:I_S 表、权限、关键字和客户端
↑ 关注“少安事务所”公众号,欢迎⭐收藏,不错过精彩内容~ MySQL 8.4.0 LTS 已经发布 ,作为发版模型变更后的第一个长期支持版本,注定要承担未来生产环境的重任,那么这个版本都有哪些新特性、变更,接下来少…...
LeetCode 124 —— 二叉树中的最大路径和
阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 二叉树的问题首先我们要想想是否能用递归来解决,本题也不例外,而递归的关键是找到子问题。 我们首先来看看一棵最简单的树,也就是示例 1。这样的一棵树总共有六条路径…...
美甲店会员预约系统管理小程序的作用是什么
女性爱美体现在方方面面,美丽好看的指甲也不能少,市场中美甲店、小摊不少,也跑出了不少连锁品牌,70后到00后,每个层级都有不少潜在客户,商家需要获取和完善转化路径,不断提高品牌影响力与自身内…...
..堆..
堆 堆是完全二叉树,即除了最后一列之外,上面的每一层都是满的(左右严格对称且每个节点都满子节点) 最后一列从左向右排序。 默认大根堆:每一个节点都大于其左右儿子,根节点就是整个数据结构的最大值 pr…...
【LLM多模态】综述Visual Instruction Tuning towards General-Purpose Multimodal Model
note 文章目录 note论文1. 论文试图解决什么问题2. 这是否是一个新的问题3. 这篇文章要验证一个什么科学假设4. 有哪些相关研究?如何归类?谁是这一课题在领域内值得关注的研究员?5. 论文中提到的解决方案之关键是什么?6. 论文中的…...
探索Linux中的神奇工具:重定向符的妙用
探索Linux中的神奇工具:重定向符的妙用 在Linux系统中,重定向符是一个强大的工具,用于控制命令的输入和输出,实现数据流的定向。本文将详细介绍重定向符的基本用法和一些实用技巧,帮助读者更好地理解和运用这个功能。…...
Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job
Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job 此文档从 Kubernetes 官网摘录 中文地址 英文地址 Job 会创建一个或者多个 Pod,并将继续重试 Pod 的执行,直到指定数量的 Pod 成功终止。 随着 Pod 成功结束,Job 跟踪记录成功完成的…...
办公自动化-Python如何提取Word标题并保存到Excel中?
办公自动化-Python如何提取Word标题并保存到Excel中? 应用场景需求分析实现思路实现过程安装依赖库打开需求文件获取word中所有标题去除不需要的标题创建工作簿和工作表分割标题功能名称存入测试对象GN-TC需求标识符存入测试项标识存入需求标识符 完整源码实现效果学…...
基于Java、SpringBoot和uniapp在线考试系统安卓APP和微信小程序
摘要 基于Java、SpringBoot和uniapp的在线考试系统安卓APP微信小程序是一种结合了现代Web开发技术和移动应用技术的解决方案,旨在为教育机构提供一个方便、高效和灵活的在线考试平台。该系统采用Java语言进行后端开发,使用SpringBoot框架简化企业级应用…...
抖音a-bogus加密解析(三)
要补的环境我给提示,大家自行操作,出了问题就是因为缺环境,没补好 window global; // reading _u未定义 window.requestAnimationFrame function () {} // XMLHttpRequest 未定义 window.XMLHttpRequest function () {} window.onwheelx …...
IS-IS DIS
原理概述 OSPF 协议支持4种网络类型, IS-IS 协议只支持两种网络类型,即广播网络和点到点网络。与 OSPF 协议相同, IS-IS 协议在广播网络中会将网络视为一个伪节点( Pseudonode ,简称 PSN ),并选举出一台 DIS ( Designa…...
random和range
含义: random(1,10) 不包含10,用于生成随机数。它可以生成浮点数或整数,取决于具体的使用方式。 range(0,1) 不包含1,用于生成一个整数序列。它可以生成一个指定范围内的连续整数序列。 区别在于&#x…...
研二学妹面试字节,竟倒在了ThreadLocal上,这是不要应届生还是不要女生啊?
一、写在开头 今天和一个之前研二的学妹聊天,聊及她上周面试字节的情况,着实感受到了Java后端现在找工作的压力啊,记得在18,19年的时候,研究生计算机专业的学生,背背八股文找个Java开发工作毫无问题&#x…...
Golang:gammazero/deque是一个快速环形缓冲区deque(双端队列)实现
gammazero/deque是一个快速环形缓冲区deque(双端队列)实现。 文档 https://github.com/gammazero/deque 安装 go get github.com/gammazero/deque代码示例 先入先出队列 package mainimport ("fmt""github.com/gammazero/deque&quo…...
C++ 时间处理-统计函数运行时间
1. 关键词2. 问题3. 解决思路4. 代码实现 4.1. timecount.h4.2. timecount.cpp 5. 测试代码6. 运行结果7. 源码地址 1. 关键词 C 时间处理 统计函数运行时间 跨平台 2. 问题 C如何简单便捷地实现“函数运行时间的统计”功能? 3. 解决思路 类的构造函数&#x…...
JAVA面试题大全(十五)
1、Zookeeper 是什么? zookper是一个分布式的,开放源码的分布式应用程序协调服务。是 google chubby 的开源实现,是 hadoop 和 hbase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
