Flink流批一体计算(16):PyFlink DataStream API
目录
概述
Pipeline Dataflow
代码示例WorldCount.py
执行脚本WorldCount.py
概述
Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。
Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。
当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。
FlinkKafkaConsumer是一个Source Operator,Map、KeyBy、TimeWindow、Apply是Transformation Operator,RollingSink是一个Sink Operator。
Pipeline Dataflow
在Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask。
Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。
紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度。
紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。
图中上半部分表示的是将Source和map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,keyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。
图中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。
代码示例WorldCount.py
在本章中,你将学习如何使用 PyFlink 和 DataStream API 构建一个简单的流式应用程序。
编写一个简单的 Python DataStream 作业。
该程序读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。
import argparse
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)# define the sourceif input_path is not None:ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")ds = env.from_collection(word_count_data)def split(line):yield from line.split()# compute word countds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkif output_path is not None:ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:print("Printing result to stdout. Use --output to specify output path.")ds.print()# submit for executionenv.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)
执行脚本WorldCount.py
python word_count.py
相关文章:

Flink流批一体计算(16):PyFlink DataStream API
目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程…...
软考高级系统架构设计师系列论文九十三:论计算机网络的安全性设计
软考高级系统架构设计师系列论文九十三:论计算机网络的安全性设计 一、计算机网络安全性设计相关知识点二、摘要三、正文四、总结一、计算机网络安全性设计相关知识点 软考高级系统架构设计师:计算机网络...

山西电力市场日前价格预测【2023-08-29】
日前价格预测 预测明日(2023-08-29)山西电力市场全天平均日前电价为321.48元/MWh。其中,最高日前电价为372.80元/MWh,预计出现在19: 30。最低日前电价为272.85元/MWh,预计出现在12: 30。 价差方向预测 1: 实…...

计算机毕设 基于深度学习的人脸专注度检测计算系统 - opencv python cnn
文章目录 1 前言2 相关技术2.1CNN简介2.2 人脸识别算法2.3专注检测原理2.4 OpenCV 3 功能介绍3.1人脸录入功能3.2 人脸识别3.3 人脸专注度检测3.4 识别记录 4 最后 1 前言 🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新…...

ES 7.6 - APi基础操作篇
ES7.6-APi基础操作篇 前言相关知识索引相关创建索引查询索引查询所有索引删除索引关闭与打开索引关闭索引打开索引 冻结与解冻索引冻结索引解冻索引 映射相关创建映射查看映射新增字段映射 文档相关(CURD)新增文档根据ID查询修改文档全量覆盖根据ID选择性修改根据条件批量更新 …...
【Go 基础篇】Go语言循环结构:实现重复执行与迭代控制
介绍 循环结构是编程中的重要概念,它允许我们重复执行一段代码块,或者按照一定的条件进行迭代控制。Go语言提供了多种循环结构,包括for、while和do-while等,用于不同的场景下实现循环操作。本篇博客将深入探讨Go语言中的循环结构…...

RabbitMQ笔记-RabbitMQ基本术语
RabbitMQ基本术语 相关概念; 生产者(Producer):投递消息。消息:消息体(payload)标签(label);生产者把消息交给rabbitmq,rabbitmq会根据标签把消息发给感兴趣…...

Git向远程仓库与推送以及拉取远程仓库
理解分布式版本控制系统 1.中央服务器 我们⽬前所说的所有内容(⼯作区,暂存区,版本库等等),都是在本地也就是在你的笔记本或者计算机上。⽽我们的 Git 其实是分布式版本控制系统!什么意思呢? 那我们多人…...
PostgreSQL+SSL链路测试
SSL一个各种证书在此就不详细介绍了,PostgreSQL要支持SSL的前提需要打开openssl选项,包括客户端和服务器端。 测试过程。 1. 生成私钥 root用户: mkdir -p /opt/ssl/private mkdir -p /opt/ssl/share/ca-certificateschmod 755 -R /opt/ss…...

服务器(容器)开发指南——code-server
文章目录 code-server简介code-server的安装与使用code-server的安装code-server的启动code-server的简单启动指定配置启动code-server code-server环境变量配置 code-server端口转发自动端口转发手动添加转发端口 nginx反向代理code-servercode-server打包开发版镜像 GitHub官…...

C++贪吃蛇(控制台版)
C自学精简实践教程 目录(必读) 目录 主要考察 需求 输入文件 运行效果 实现思路 枚举类型 enum class 启动代码 输入文件data.txt 的内容 参考答案 学生实现的效果 主要考察 模块划分 文本文件读取 UI与业务分离 控制台交互 数据抽象 需求 用户输入字母表示方…...
Java之字符串实践
功能概述 字符串是Java编程中常用的数据类型,本文对String部分常见功能做了对应实践以及分析。 功能实践 场景1:字符串比较 用例代码 Test public void test_string_compare() {String s1 "abc";String s2 s1;String s5 "abc&quo…...

BM20 数组中的逆序对
描述 解题思路:归并排序 分治:分治即“分而治之”,“分”指的是将一个大而复杂的问题划分成多个性质相同但是规模更小的子问题,子问题继续按照这样划分,直到问题可以被轻易解决;“治”指的是将子问题单独进…...
高德猎鹰轨迹查询相关接口
高德猎鹰轨迹官网:服务管理-API文档-开发指南-猎鹰轨迹服务 | 高德地图API 轨迹查询 httpclient的post // post方法请求 创建轨迹 private static void createTrace() {String key "高德注册的key";String sid "服务id"; // 服务idString…...

整理总结新手开始抖音小店经营:常见问题及解决办法
抖音小店作为一种新兴的电商模式,在短时间内获得了广泛的关注和使用。然而,对于新手来说,抖音小店经营可能会遇到一些问题。下面是四川不若与众总结的一些常见的问题以及相应的解决办法。 问题一:产品选择困难 对于新手来说&#…...

4-1-netty
非阻塞io 服务端就一个线程,可以处理无数个连接 收到所有的连接都放到集合channelList里面 selector是有事件集合的 对server来说优先关注连接事件 遍历连接事件...
hive 动态分区-动态分区数量太多也会导致效率下降只设置非严格模式也能执行动态分区
hive 动态分区-动态分区数量太多也会导致效率下降&只设置非严格模式也能执行动态分区 结论 在非严格模式下不开启动态分区的功能的参数(配置如下),同样也能进行动态分区数据写入,目测原因是不严格检查SQL中是否指定分区或者…...

java八股文面试[JVM]——JVM调优
知识来源: 【2023年面试】JVM性能调优实战_哔哩哔哩_bilibili...
FairyGUI-Unity 异形屏适配
本文中会修改到FairyGUI源代码,涉及两个文件Stage和StageCamera,需要对Unity的屏幕类了解。 在网上查找有很多的异形屏适配操作,但对于FairyGUI相关的描述操作很少,这里我贴出一下自己在实际应用中的异形屏UI适配操作。 原理 获…...

Oracle监听器启动出错:本地计算机上的OracleOraDb11g_home1TNSListener服务启动后又停止了解决方案
在启动oracle的服务OracleOraDb11g_home1TNSListener时,提示服务启动后又停止了。 解决方法: 修改oracle安装目录下的两个配置文件: 以上两个文件,对应的HOST的值,都改为127.0.0.1 然后再启动服务,启动成…...

linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...

如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...

蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

【Linux】Linux 系统默认的目录及作用说明
博主介绍:✌全网粉丝23W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...

windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...