Flink Python作业快速入门
Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心

import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):"""计算文本文件中单词的频率,并将结果输出到指定路径。该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。参数:- input_path: 输入文本文件的路径。如果为None,则使用默认数据。- output_path: 输出结果的路径。如果为None,则直接打印结果。"""# 获取流处理环境并设置为流处理模式,设置并行度为1env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)env.set_parallelism(1)# 定义数据源if input_path is not None:# 从文件系统中读取数据ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:# 使用默认数据ds = env.from_collection(word_count_data)# 定义分割函数,将每行文本分割成单词def split(line):yield from line.split()# 计算单词频率ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 定义数据汇if output_path is not None:# 将结果写入文件系统ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:# 直接打印结果ds.print()# 提交作业以执行env.execute()if __name__ == '__main__':# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")# 创建一个ArgumentParser对象以处理命令行参数parser = argparse.ArgumentParser()# 添加可选的命令行参数,用于指定输入和输出文件parser.add_argument('--input',dest='input',required=False,help='要处理的输入文件。')parser.add_argument('--output',dest='output',required=False,help='要写入结果的输出文件。')# 获取命令行参数,排除脚本名称argv = sys.argv[1:]print("Command line arguments: ", argv)# 解析已知的命令行参数,并忽略未知参数known_args, _ = parser.parse_known_args(argv)print("known_args: ", known_args)# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径word_count(known_args.input, known_args.output)
相关文章:
Flink Python作业快速入门
Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心 import argparse # 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数 import logging import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.data…...
自定义函数库
求两点距离 double dis(double x1, double y1, double x2, double y2){return sqrt(pow(x2-x1, 2)pow(y2-y1, 2)); }判断闰年 bool isLeapYear(int year){return year%40 && year%100!0 || year%4000; }判断素数 bool isPrime(int num){if(num<2) return false;f…...
FreeRTOS例程2-任务挂起恢复与使用中断遇到的坑!
任务挂起简单点理解就是现在不需要执行这个任务,让它先暂停,就是挂起。恢复就是从刚才挂起的状态下继续运行。 API函数 任务挂起vTaskSuspend() 函数原型(tasks.c中): void vTaskSuspend( TaskHandle_t xTaskToSuspend ) 1. 参数: xTaskTo…...
L23.【LeetCode笔记】验证回文串(剖析几种解法)
目录 1.题目 2.自解 提交结果 反思 大小写之间的位运算 提交结果 3.代码优化 提交结果 编辑 4.LeetCode网友提供的解法 1.题目 https://leetcode.cn/problems/XltzEq/description/ 给定一个字符串 s ,验证 s 是否是 回文串 ,只考虑字母和数…...
FPGA 17 ,FPGA 与 SR-IOV虚拟化技术,高性能计算与虚拟化技术的结合(FPGA 与 SR-IOV 和 PCI,高性能计算与虚拟化的完美融合)
目录 前言 一. SR-IOV 的起源与发展 1. SR-IOV 的起源与时间线 2. SR-IOV 的诞生原因 3. SR-IOV 的详细介绍 二. SR-IOV 和 PCI 之间的关系 三. PCI 的起源与演进 1. PCI 的起源与时间线 2. PCI 的关键特性 四. FPGA 的独特魅力 1. FPGA 的定义与特性 2. FPGA 的内…...
解决navicat 导出excel数字为科学计数法问题
一、原因分析 用程序导出的csv文件,当字段中有比较长的数字字段存在时,在用excel软件查看csv文件时就会变成科学技术法的表现形式。 其实这个问题跟用什么语言导出csv文件没有关系。Excel显示数字时,如果数字大于12位,它会自动转化…...
[Unity] AppLovin Max接入Native 广告 Android篇
把下载下来的maxnativelibrary-release-文件放在Plugins/Android下 将这一行加入到mainTemplate.gradle文件中 implementation androidx.constraintlayout:constraintlayout:2.1.4添加下面的两个脚本 using System; using System.Collections; using System.Collections.Gener…...
Source Insight 4.0的安装
一、安装与破解 1、下载Source Insight 4.0安装包 https://pan.baidu.com/s/1t0u1RM19am0lyzhlNTqK9Q?pwdnvmk 2、下载程序破解补丁包 https://pan.baidu.com/s/1irvH-Kfwjf4zCCtWJByqJQ 其中包含文件si4.pediy.lic 和 sourceinsight4.exe。 3、安装下载的Source Insight …...
远程调试软件对比与使用推荐
远程调试软件对比与使用推荐 远程调试是现代软件开发中不可或缺的一部分,尤其是在处理分布式系统、云端服务或远程服务器上的问题时。以下是对几种常见远程调试工具的详细对比和推荐使用场景。 1. GDB (GNU Debugger) 特点 开源:完全免费且开源&…...
鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现
鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现 结合第一讲建立的“Hello World”程序,得到如下图所示的界面。 这里的“Hello World”是通过“Priview”显示出来的。在这个界面中进行开发的前奏曲,可以通过点击更换图片的案例来体会一下鸿蒙Next的开发…...
求解球面的一组正交标架
目录 求解球面的一组正交标架 求解球面的一组正交标架 球面 r ( u , v ) ( a cos u cos v , a cos u sin v , a sin u ) \mathbf{r}(u,v)\left(a\cos u\cos v,a\cos u\sin v,a\sin u\right) r(u,v)(acosucosv,acosusinv,asinu), 求得 r u ( − a sin u c…...
php.ini 文件上传/执行时间/部分配置新手教程
1、上传文件大小配置 一般需要同时配置“upload_max_filesize”、“post_max_size”,配置格式如下: file_uploads On ;是否允许HTTP文件上传 upload_max_filesize 2M ;设置单个文件上传的最大尺寸 post_max_size 8M ;设置 POST 请求体的最大尺寸&am…...
【Leetcode Top 100】102. 二叉树的层序遍历
问题背景 给你二叉树的根节点 r o o t root root,返回其节点值的 层序遍历 。 (即逐层地,从左到右访问所有节点)。 数据约束 树中节点数目在范围 [ 0 , 2000 ] [0, 2000] [0,2000] 内 − 1000 ≤ N o d e . v a l ≤ 1000 -1…...
【C++笔记】AVL树
前言 各位读者朋友们大家好,上期我们讲解了map和set这两大容器的使用,这一期我们讲解最早的平衡二叉搜索树——AVL树。 目录 前言一. AVL树的概念二. AVL树的实现2.1 AVL树的结构2.2 AVL树的插入2.2.1 AVL树插入一个值的大致过程2.2.2 平衡因子的更新2…...
【竞技宝】LOL:JDG官宣yagao离队
北京时间2024年12月13日,在英雄联盟S14全球总决赛结束之后,各大赛区都已经进入了休赛期,目前休赛期也快进入尾声,LPL大部分队伍都开始陆续官宣转会期的动向,其中JDG就在近期正式官宣中单选手yagao离队,而后者大概率将直接选择退役。 近日,JDG战队在官方微博上连续发布阵容变动消…...
双目摄像头标定方法
打开matlab 找到这个标定 将双目左右目拍的图像上传(左右目最好不少于20张) 等待即可 此时已经完成标定,左下角为反投影误差,右边为外参可视化 把这些误差大的删除即可。 点击导出 此时回到主页面,即可看到成功导出 Ca…...
相差不超过k的最多数,最长公共子序列(一),排序子序列,体操队形,青蛙过河
相差不超过k的最多数 链接:相差不超过k的最多数 来源:牛客网 题目描述: 给定一个数组,选择一些数,要求选择的数中任意两数差的绝对值不超过 𝑘 。问最多能选择多少个数? 输入描述: 第一行输入两个正整…...
【自然语言处理与大模型】使用llama.cpp将HF格式大模型转换为GGUF格式
llama.cpp的主要目标是在本地和云端的各种硬件上以最小的设置和最先进的性能实现LLM推理。是一个专为大型语言模型(LLM)设计的高性能推理框架,完全使用C和C编写,没有外部依赖,这使得它可以很容易地被移植到不同的操作系…...
MongoDB存储照片和文件存储照片的区别在那里?
一、维度对比 比较维度MongoDB存储照片文件系统存储照片数据模型使用文档存储数据,可以存储不同结构的照片。以文件的形式存储照片,每个文件独立存在。性能高效的数据检索,适用于大规模应用程序中的高效检索和访问。但在处理大量高分辨率图片…...
协变量的概念
协变量的概念 协变量的概念 协变量(Covariate)是在统计分析和研究中,与因变量(被研究的主要变量)相关,并且可能对因变量产生影响的其他变量。它不是研究的主要关注对象,但需要在分析过程中被考虑进去,因为它可能会混淆或改变自变量与因变量之间的关系。举例说明 教育研…...
效率直接起飞!2026年最值得信赖的专业AI论文软件
2026年AI论文写作工具已从“内容生成”升级为智能学术辅助系统,核心评价维度包括文献真实性、格式合规性、长文本逻辑、查重降重、AIGC合规与多语言支持。本次测评覆盖6款主流工具,测试场景涵盖中英文论文、全流程与专项功能、免费与付费版本,…...
从零打造 AI 小说创作平台(四):项目与章节管理
从零打造 AI 小说创作平台(四):项目与章节管理 系列:从零打造 AI 小说创作平台 NovelForge 篇章:第 4 篇 / 共 10 篇 关键词:CRUD、自动保存、软删除、章节排序、字数统计 前言 项目管理是连接用户认证和 AI 创作流水线的桥梁。这个模块看似简单(就是 CRUD),但有几个…...
对比直连与通过Taotoken调用大模型API的延迟体感差异
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 对比直连与通过Taotoken调用大模型API的延迟体感差异 在集成大模型API到应用时,开发者通常会关注请求的响应速度&#…...
REFramework技术深度解析:企业级游戏引擎扩展框架的架构演进与设计哲学
REFramework技术深度解析:企业级游戏引擎扩展框架的架构演进与设计哲学 【免费下载链接】REFramework Mod loader, scripting platform, and VR support for all RE Engine games 项目地址: https://gitcode.com/GitHub_Trending/re/REFramework 在游戏开发领…...
使用Node.js和Taotoken快速构建一个智能客服聊天接口
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 使用Node.js和Taotoken快速构建一个智能客服聊天接口 本教程面向具备Node.js基础的后端开发者,旨在指导你如何使用Open…...
华南x79-8d 支持 E5-2680 V3 或者 E5-2680 V4吗
不支持。 华南金牌 X79-8D 主板仅支持 E5-2600系列V1和V2版本的处理器,无法兼容您提到的 E5-2680 V3 或 V4。以下是关于该主板CPU支持情况的详细说明:💡 为什么不支持 V3/V4?根本原因在于CPU的接口和主板芯片组不匹配:…...
Arm开发中DSTREAM调试探针无法识别的排查指南
1. DSTREAM调试探针在Arm开发环境中不可选的排查指南当使用Arm Development Studio(Arm DS)进行嵌入式开发时,DSTREAM系列调试探针(包括DSTREAM-ST、DSTREAM-PT、DSTREAM-HT和DSTREAM-XT)偶尔会出现无法在开发环境中被…...
AI医疗落地实操指南:临床决策支持与人机协同诊疗
1. 这不是科幻片,是每天在三甲医院晨交班时发生的事 “AI把医生取代了?”——这是我过去三年被问得最多的问题,通常来自刚轮转到信息科的住院医,或是陪孩子看病时刷到短视频的家长。但真实情况比这复杂得多:上周五我蹲…...
别再只盯着人脸了!手把手教你用Python复现2023年最新的多模态情绪识别模型COGMEN
别再只盯着人脸了!手把手教你用Python复现2023年最新的多模态情绪识别模型COGMEN 情绪识别技术正在经历从单一模态到多模态融合的范式转变。传统基于面部表情的分析方法往往受限于光照条件、遮挡问题以及文化差异带来的表达偏差。2023年发布的COGMEN模型通过引入图…...
HTTPS抓包失败根因分析:证书信任链与全平台配置实战
1. 为什么HTTPS抓包不是“装个插件就完事”——从浏览器报错红锁说起你刚在Burp Suite里点开Proxy → Options → Import Burps CA Certificate,双击安装完证书,兴冲冲打开Chrome访问https://example.com,结果地址栏赫然挂着一把刺眼的红色锁…...
