4.1、Flink任务怎样读取集合中的数据
1、API说明
非并行数据源:
def fromElements[T: TypeInformation](data: T*): DataStream[T]
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T]
并行数据源:
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T])
使用场景:
常用来调试代码使用
2、这是一个完整的入门案例
开发语言:Java1.8
Flink版本:flink1.17.0
package com.baidu.datastream.source;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;import java.util.Arrays;
import java.util.List;// --------------------------------------------------------------------------------------------
// TODO 从集合中读取数据
// --------------------------------------------------------------------------------------------/** TODO 通过`读取Java集合中数据`来创建 DataStreamSource** 方法1:fromCollection* Collection、Iterator -> DataStreamSource* 方法2:fromElements* OUT... data -> DataStreamSource* 方法3:fromParallelCollection* SplittableIterator -> DataStreamSource* 重要提示:* fromCollection、fromElements 创建的是非并行source算子(并行度只能为1)* fromParallelCollection 创建的是并行算子(并行度>=1)* */public class ReadCollection {public static void main(String[] args) throws Exception {fromCollection();//fromElements();//fromParallelCollection();}public static void fromCollection() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取Java集合数据List<String> list = Arrays.asList("刘备", "张飞", "关羽", "赵云", "马超", "黄忠");env.fromCollection(list).print();// 3.触发程序执行env.execute();}public static void fromElements() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取给定的对象序列env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠").print();// 3.触发程序执行env.execute();}public static void fromParallelCollection() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取给定的对象序列NumberSequenceIterator numberSequenceIterator = new NumberSequenceIterator(1, 10);env.fromParallelCollection(numberSequenceIterator, Long.class).print();/** 注意: fromParallelCollection生成的source为并行算子* 集合中的数据会被平均分配到并行子任务中去* */// 3.触发程序执行env.execute();}
}
相关文章:
4.1、Flink任务怎样读取集合中的数据
1、API说明 非并行数据源: def fromElements[T: TypeInformation](data: T*): DataStream[T] def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] def fromCollection[T: TypeInformation] (data: Iterator[T]): Data…...
JD商品详情页面+关键词搜索商品列表API接口数据,详情页面数据返回值说明
作为一个工作助手,我无法直接为您提供实时的京东商品详情。您可以通过访问京东官方网站或使用API接口来查看商品的详细信息。每个商品的详情页面通常会包括以下内容: 1. 商品标题:商品的名称和简要描述。 2. 价格信息:包括原价、…...
Electron v26.0.0-beta.11 发布,跨平台桌面应用开发工具
Electron是一款跨平台桌面应用开发工具,它基于Web技术,使用HTML、CSS和JavaScript来构建应用程序。它能够让开发者使用熟悉的Web开发技术来创建强大的桌面应用,并且可以运行在Windows、macOS和Linux等多个操作系统上。 一、Electron的特点 跨平台:Electron可以将Web技术转…...
提高办案效率:公检系统引入自动校对技术
引入自动校对技术到公检系统中可以有效提高办案效率。自动校对技术结合公检系统的特点,可以在以下方面提高办案效率: 1.节省时间:自动校对技术可以快速检测和修正法律文书中的语法、拼写和标点符号等错误。与手动校对相比,自动校对…...
iptables 清空
清空 iptables中所有策略。 -Z: 重置计数器 -F:删除所有规则(flush) -X:删除所有自定义的链 规则清理和重建如下: iptables -Z iptables -t nat -F iptables -t nat -X iptables -t nat -P PREROUTING A…...
网络安全(黑客)零基础入门
导语 什么是 Web 安全?我又该如何入门学习它呢?学习过程中又应注意哪些问题呢?... 或许你的心中有着这样的疑问、不过别着急,本文会为你一一解答这些问题。 正文 定义 Web 安全,顾名思义便是由保障 Web 应用能够持续…...
Al Go: 蒙特卡洛树搜索(MCTS)简介
目录 1. 前言 1.1 Minimax 1.2 剪枝 1.3 蒙特卡洛树搜索 1.4 为什么随机走子会可行呢? 2. vanilla Monte Carlo tree search 3. UCT-based trade-off between exploitation and exploration 4. MCTS基本算法流程 5. Efficiency Through Expert Policies 6…...
Client-go操作Deployment
在工作中需要对kubernetes进行自定义资源的开发,操作K8s的资源肯定是必不可少的。K8s原生语言是用Go编写的,所以在CRD中使用client-go来操作资源。本次介绍一下使用client-go来操作Deployment。 1. 创建main函数 func main() {homePath : homedir.Home…...
设计模式——单例模式(懒汉和饿汉)
单例模式 一、概念 单例模式是一种对象创建型模式,使用单例模式,可以保证为一个类只生成唯一的实例对象。也就是说,在整个程序空间中,该类只存在一个实例对象。一个类只能有一个实例在生活中是很常见的,比如打印机程…...
详解——Vue3递归函数功能
在 Vue 3 中,递归函数是一种在组件中调用自身的技术。递归函数在解决树状数据结构、无限级分类、嵌套组件等情况下非常有用。以下是一个示例,展示如何在 Vue 3 中实现递归函数的功能、代码和原理: 1. 创建递归组件: 首先&#x…...
【VSCode】查看二进制文件
1.安装插件Hex Editor 2.打开二进制文件 3.执行Hex Editor命令...
C#设计模式之观察者模式
题目:假设你正在开发一个简单的新闻发布系统,该系统允许用户订阅不同的新闻频道,并在有新闻发布时向订阅者发送通知。使用观察者模式设计和实现该系统。观察者模式的相关概念和定义: 观察者模式是一种行为设计模式,它定…...
小红书攻略:爆款引流,如何在激烈竞争中脱颖而出?
小红书(RED)作为国内最具影响力的社交电商平台之一,是很多品牌运营者的首选之一。然而,在小红书的激烈竞争中,如何快速引流、吸引关注,成为了品牌运营者面临的挑战。本篇文章一秒推小编将为您介绍小红书运营…...
Ubuntu中的安装卸载及删除方法
说明:由于图形化界面方法(如Add/Remove... 和Synaptic Package Manageer)比较简单,所以这里主要总结在终端通过命令行方式进行的软件包安装、卸载和删除的方法。 一、Ubuntu中软件安装方法 1、APT方式 (1࿰…...
获取历史dokcer镜像项目,并上传gitlab,再打包镜像管理
今天遇到一个问题: 发现一个部署在Jenkins的脚本用的docker镜像是:test_project:v20191108,即这个项目是19年的一个版本,由于代码不断更新,用现在的最新代码运行该脚本,可能不能运行了,必须用19…...
【Go语言】Golang保姆级入门教程 Go初学者chapter3
Go语言 第三章 运算符 下划线“_”本身在Go中一个特殊的标识符,成为空标识符。可以代表任何其他的标识符,但是他对应的值就会被忽略 仅仅被作为站维度使用, 不能作为标识符使用 因为Go语言中没有private public 所以标记变量首字母大写代表其…...
网络防御(4)
一、结合以下问题对当天内容进行总结 1. 什么是IDS? 2. IDS和防火墙有什么不同? 3. IDS工作原理? 4. IDS的主要检测方法有哪些详细说明? 5. IDS的部署方式有哪些? 6. IDS的签名是什么意思?签名过滤器有什么…...
conda错误处理:ResolvePackageNotFound
当运行conda env create -f environment.yaml时出现"ResolvePackageNotFound"错误,这可能是由于环境配置文件中指定的依赖项无法找到或不可用。 错误消息中列出的依赖项包括pip20.3、python3.8.5和cudatoolkit11.3。 尝试以下解决方案: 更新…...
linux初学者小命令
linux初学者小命令 一.在正式学习linux命令之前需要先认识一下linux环境中命令是如何被执行的shell是一个属于linux内核的软件,在系统启动后加载进RAM(内存)内,每个用户通过终端登录系统后,就会运行。负责不间断的接收用户的输入,…...
宝尊电商短期前景堪忧,宝尊国际能否取得成功还有待验证
来源:猛兽财经 作者:猛兽财经 核心业务面临短期逆风 在2023年第一季度财报中,宝尊电商(BZUN)表示其电商业务(简称BEC)主要包括:品牌的门店运营、客户服务以及物流和供应链管理、IT和数字营销等增值服务”。…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
椭圆曲线密码学(ECC)
一、ECC算法概述 椭圆曲线密码学(Elliptic Curve Cryptography)是基于椭圆曲线数学理论的公钥密码系统,由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA,ECC在相同安全强度下密钥更短(256位ECC ≈ 3072位RSA…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
Webpack性能优化:构建速度与体积优化策略
一、构建速度优化 1、升级Webpack和Node.js 优化效果:Webpack 4比Webpack 3构建时间降低60%-98%。原因: V8引擎优化(for of替代forEach、Map/Set替代Object)。默认使用更快的md4哈希算法。AST直接从Loa…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...
