当前位置: 首页 > news >正文

Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心


import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):"""计算文本文件中单词的频率,并将结果输出到指定路径。该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。参数:- input_path: 输入文本文件的路径。如果为None,则使用默认数据。- output_path: 输出结果的路径。如果为None,则直接打印结果。"""# 获取流处理环境并设置为流处理模式,设置并行度为1env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)env.set_parallelism(1)# 定义数据源if input_path is not None:# 从文件系统中读取数据ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:# 使用默认数据ds = env.from_collection(word_count_data)# 定义分割函数,将每行文本分割成单词def split(line):yield from line.split()# 计算单词频率ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 定义数据汇if output_path is not None:# 将结果写入文件系统ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:# 直接打印结果ds.print()# 提交作业以执行env.execute()if __name__ == '__main__':# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")# 创建一个ArgumentParser对象以处理命令行参数parser = argparse.ArgumentParser()# 添加可选的命令行参数,用于指定输入和输出文件parser.add_argument('--input',dest='input',required=False,help='要处理的输入文件。')parser.add_argument('--output',dest='output',required=False,help='要写入结果的输出文件。')# 获取命令行参数,排除脚本名称argv = sys.argv[1:]print("Command line arguments: ", argv)# 解析已知的命令行参数,并忽略未知参数known_args, _ = parser.parse_known_args(argv)print("known_args: ", known_args)# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径word_count(known_args.input, known_args.output)

相关文章:

Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心 import argparse # 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数 import logging import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.data…...

自定义函数库

求两点距离 double dis(double x1, double y1, double x2, double y2){return sqrt(pow(x2-x1, 2)pow(y2-y1, 2)); }判断闰年 bool isLeapYear(int year){return year%40 && year%100!0 || year%4000; }判断素数 bool isPrime(int num){if(num<2) return false;f…...

FreeRTOS例程2-任务挂起恢复与使用中断遇到的坑!

任务挂起简单点理解就是现在不需要执行这个任务&#xff0c;让它先暂停&#xff0c;就是挂起。恢复就是从刚才挂起的状态下继续运行。 API函数 任务挂起vTaskSuspend() 函数原型(tasks.c中): void vTaskSuspend( TaskHandle_t xTaskToSuspend ) 1. 参数&#xff1a; xTaskTo…...

L23.【LeetCode笔记】验证回文串(剖析几种解法)

目录 1.题目 2.自解 提交结果 反思 大小写之间的位运算 提交结果 3.代码优化 提交结果 ​编辑 4.LeetCode网友提供的解法 1.题目 https://leetcode.cn/problems/XltzEq/description/ 给定一个字符串 s &#xff0c;验证 s 是否是 回文串 &#xff0c;只考虑字母和数…...

FPGA 17 ,FPGA 与 SR-IOV虚拟化技术,高性能计算与虚拟化技术的结合(FPGA 与 SR-IOV 和 PCI,高性能计算与虚拟化的完美融合)

目录 前言 一. SR-IOV 的起源与发展 1. SR-IOV 的起源与时间线 2. SR-IOV 的诞生原因 3. SR-IOV 的详细介绍 二. SR-IOV 和 PCI 之间的关系 三. PCI 的起源与演进 1. PCI 的起源与时间线 2. PCI 的关键特性 四. FPGA 的独特魅力 1. FPGA 的定义与特性 2. FPGA 的内…...

解决navicat 导出excel数字为科学计数法问题

一、原因分析 用程序导出的csv文件&#xff0c;当字段中有比较长的数字字段存在时&#xff0c;在用excel软件查看csv文件时就会变成科学技术法的表现形式。 其实这个问题跟用什么语言导出csv文件没有关系。Excel显示数字时&#xff0c;如果数字大于12位&#xff0c;它会自动转化…...

[Unity] AppLovin Max接入Native 广告 Android篇

把下载下来的maxnativelibrary-release-文件放在Plugins/Android下 将这一行加入到mainTemplate.gradle文件中 implementation androidx.constraintlayout:constraintlayout:2.1.4添加下面的两个脚本 using System; using System.Collections; using System.Collections.Gener…...

Source Insight 4.0的安装

一、安装与破解 1、下载Source Insight 4.0安装包 https://pan.baidu.com/s/1t0u1RM19am0lyzhlNTqK9Q?pwdnvmk 2、下载程序破解补丁包 https://pan.baidu.com/s/1irvH-Kfwjf4zCCtWJByqJQ 其中包含文件si4.pediy.lic 和 sourceinsight4.exe。 3、安装下载的Source Insight …...

远程调试软件对比与使用推荐

远程调试软件对比与使用推荐 远程调试是现代软件开发中不可或缺的一部分&#xff0c;尤其是在处理分布式系统、云端服务或远程服务器上的问题时。以下是对几种常见远程调试工具的详细对比和推荐使用场景。 1. GDB (GNU Debugger) 特点 开源&#xff1a;完全免费且开源&…...

鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现

鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现 结合第一讲建立的“Hello World”程序&#xff0c;得到如下图所示的界面。 这里的“Hello World”是通过“Priview”显示出来的。在这个界面中进行开发的前奏曲&#xff0c;可以通过点击更换图片的案例来体会一下鸿蒙Next的开发…...

求解球面的一组正交标架

目录 求解球面的一组正交标架 求解球面的一组正交标架 球面 r ( u , v ) ( a cos ⁡ u cos ⁡ v , a cos ⁡ u sin ⁡ v , a sin ⁡ u ) \mathbf{r}(u,v)\left(a\cos u\cos v,a\cos u\sin v,a\sin u\right) r(u,v)(acosucosv,acosusinv,asinu), 求得 r u ( − a sin ⁡ u c…...

php.ini 文件上传/执行时间/部分配置新手教程

1、上传文件大小配置 一般需要同时配置“upload_max_filesize”、“post_max_size”&#xff0c;配置格式如下&#xff1a; file_uploads On ;是否允许HTTP文件上传 upload_max_filesize 2M ;设置单个文件上传的最大尺寸 post_max_size 8M ;设置 POST 请求体的最大尺寸&am…...

【Leetcode Top 100】102. 二叉树的层序遍历

问题背景 给你二叉树的根节点 r o o t root root&#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 数据约束 树中节点数目在范围 [ 0 , 2000 ] [0, 2000] [0,2000] 内 − 1000 ≤ N o d e . v a l ≤ 1000 -1…...

【C++笔记】AVL树

前言 各位读者朋友们大家好&#xff0c;上期我们讲解了map和set这两大容器的使用&#xff0c;这一期我们讲解最早的平衡二叉搜索树——AVL树。 目录 前言一. AVL树的概念二. AVL树的实现2.1 AVL树的结构2.2 AVL树的插入2.2.1 AVL树插入一个值的大致过程2.2.2 平衡因子的更新2…...

【竞技宝】LOL:JDG官宣yagao离队

北京时间2024年12月13日,在英雄联盟S14全球总决赛结束之后,各大赛区都已经进入了休赛期,目前休赛期也快进入尾声,LPL大部分队伍都开始陆续官宣转会期的动向,其中JDG就在近期正式官宣中单选手yagao离队,而后者大概率将直接选择退役。 近日,JDG战队在官方微博上连续发布阵容变动消…...

双目摄像头标定方法

打开matlab 找到这个标定 将双目左右目拍的图像上传&#xff08;左右目最好不少于20张&#xff09; 等待即可 此时已经完成标定&#xff0c;左下角为反投影误差&#xff0c;右边为外参可视化 把这些误差大的删除即可。 点击导出 此时回到主页面&#xff0c;即可看到成功导出 Ca…...

相差不超过k的最多数,最长公共子序列(一),排序子序列,体操队形,青蛙过河

相差不超过k的最多数 链接:相差不超过k的最多数 来源&#xff1a;牛客网 题目描述&#xff1a; 给定一个数组&#xff0c;选择一些数&#xff0c;要求选择的数中任意两数差的绝对值不超过 &#x1d458; 。问最多能选择多少个数&#xff1f; 输入描述: 第一行输入两个正整…...

【自然语言处理与大模型】使用llama.cpp将HF格式大模型转换为GGUF格式

llama.cpp的主要目标是在本地和云端的各种硬件上以最小的设置和最先进的性能实现LLM推理。是一个专为大型语言模型&#xff08;LLM&#xff09;设计的高性能推理框架&#xff0c;完全使用C和C编写&#xff0c;没有外部依赖&#xff0c;这使得它可以很容易地被移植到不同的操作系…...

MongoDB存储照片和文件存储照片的区别在那里?

一、维度对比 比较维度MongoDB存储照片文件系统存储照片数据模型使用文档存储数据&#xff0c;可以存储不同结构的照片。以文件的形式存储照片&#xff0c;每个文件独立存在。性能高效的数据检索&#xff0c;适用于大规模应用程序中的高效检索和访问。但在处理大量高分辨率图片…...

协变量的概念

协变量的概念 协变量的概念 协变量(Covariate)是在统计分析和研究中,与因变量(被研究的主要变量)相关,并且可能对因变量产生影响的其他变量。它不是研究的主要关注对象,但需要在分析过程中被考虑进去,因为它可能会混淆或改变自变量与因变量之间的关系。举例说明 教育研…...

【[LeetCode每日一题】Leetcode 1768.交替合并字符串

Leetcode 1768.交替合并字符串 题目描述&#xff1a; 给定两个字符串 word1 和 word2&#xff0c;以交替的方式将它们合并成一个新的字符串。即&#xff0c;第一个字符来自 word1&#xff0c;第二个字符来自 word2&#xff0c;第三个字符来自 word1&#xff0c;依此类推。如果…...

SRT协议学习

SRT(Secure Reliable Transport)协议是一种开源的视频传输协议&#xff0c;旨在提供安全&#xff0c;可靠&#xff0c;低延迟的视频流传输。以下是SRT协议的一些关键的工作原理。 1 安全传输&#xff0c;SRT通过使用AES加密和数据完整性验证来确保数据的安全传输。它可以在不信…...

南昌大学《2024年837自动控制原理真题》 (完整版)

本文内容&#xff0c;全部选自自动化考研联盟的&#xff1a;《南昌大学873自控考研资料》的真题篇。后续会持续更新更多学校&#xff0c;更多年份的真题&#xff0c;记得关注哦~ 目录 2024年真题 Part1&#xff1a;2024年完整版真题 2024年真题...

ASP.NET Core 应用程序的启动与配置:Program.cs 文件的全面解析

ASP.NET Core 应用程序的启动与配置&#xff1a;Program.cs 文件的全面解析 Program.cs 是 ASP.NET Core 应用程序的入口点&#xff0c;负责应用程序的启动和配置。以下是 Program.cs 文件中完成的主要工作&#xff0c;按逻辑步骤进行总结&#xff1a; 1. 创建和配置主机环境…...

2020-12-02 数字过滤

缘由 C语言 数组&#xff1a;数字过滤-CSDN问答 void chuli(int n15236) {int aa[47]{0},j0,m0;while(n)aa[j]n%10,n/10;while(j)if(aa[--j]%2)m*10,maa[j];cout << m << ends; ​​​​​​​} void 数字过滤(int n 15236) {int aa[47]{0}, j 0, m 0;while (…...

长短期记忆神经网络(LSTM)介绍

1、应用现状 长短期记忆神经网络&#xff08;LSTM&#xff09;是一种特殊的循环神经网络(RNN)。原始的RNN在训练中&#xff0c;随着训练时间的加长以及网络层数的增多&#xff0c;很容易出现梯度爆炸或者梯度消失的问题&#xff0c;导致无法处理较长序列数据&#xff0c;从而无…...

数据结构 ——二叉树转广义表

数据结构 ——二叉树转广义表 1、树转广义表 如下一棵树&#xff0c;转换为广义表 root(c(a()(b()()))(e(d()())(f()(j(h()())())))) (根&#xff08;左子树&#xff09;&#xff08;右子树&#xff09;) 代码实现 #include<stdio.h> #include<stdlib.h>//保存…...

chattts生成的音频与字幕修改完善,每段字幕对应不同颜色的视频,准备下一步插入视频。

上一节中&#xff0c;实现了先生成一个固定背景的与音频长度一致的视频&#xff0c;然后插入字幕。再合并成一个视频的方法。 但是&#xff1a;这样有点单了&#xff0c;所以&#xff1a; 1.根据字幕的长度先生成视频片断 2.在片段上加上字幕。 3.合并所有片断&#xff0c;…...

数据结构开始——时间复杂度和空间复杂度知识点笔记总结

好了&#xff0c;经过了漫长的时间学习c语言语法知识&#xff0c;现在我们到了数据结构的学习。 首先&#xff0c;我们得思考一下 什么是数据结构&#xff1f; 数据结构(Data Structure)是计算机存储、组织数据的方式&#xff0c;指相互之间存在一种或多种特定关系的数据元素…...

路由策略与策略路由

路由策略 常用有Router-Policy&#xff0c;Filter-Policy等 控制路由是否可达&#xff0c;通过修改路由条目相关参数影响流量的转发 基于控制平面&#xff0c;会影响路由表表项&#xff0c;但只能基于目地址进行策略判定&#xff0c;于路由协议相结合使用 Router-Policy …...