Flink Python作业快速入门
Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心
import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):"""计算文本文件中单词的频率,并将结果输出到指定路径。该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。参数:- input_path: 输入文本文件的路径。如果为None,则使用默认数据。- output_path: 输出结果的路径。如果为None,则直接打印结果。"""# 获取流处理环境并设置为流处理模式,设置并行度为1env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)env.set_parallelism(1)# 定义数据源if input_path is not None:# 从文件系统中读取数据ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:# 使用默认数据ds = env.from_collection(word_count_data)# 定义分割函数,将每行文本分割成单词def split(line):yield from line.split()# 计算单词频率ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 定义数据汇if output_path is not None:# 将结果写入文件系统ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:# 直接打印结果ds.print()# 提交作业以执行env.execute()if __name__ == '__main__':# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")# 创建一个ArgumentParser对象以处理命令行参数parser = argparse.ArgumentParser()# 添加可选的命令行参数,用于指定输入和输出文件parser.add_argument('--input',dest='input',required=False,help='要处理的输入文件。')parser.add_argument('--output',dest='output',required=False,help='要写入结果的输出文件。')# 获取命令行参数,排除脚本名称argv = sys.argv[1:]print("Command line arguments: ", argv)# 解析已知的命令行参数,并忽略未知参数known_args, _ = parser.parse_known_args(argv)print("known_args: ", known_args)# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径word_count(known_args.input, known_args.output)
相关文章:

Flink Python作业快速入门
Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心 import argparse # 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数 import logging import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.data…...
自定义函数库
求两点距离 double dis(double x1, double y1, double x2, double y2){return sqrt(pow(x2-x1, 2)pow(y2-y1, 2)); }判断闰年 bool isLeapYear(int year){return year%40 && year%100!0 || year%4000; }判断素数 bool isPrime(int num){if(num<2) return false;f…...
FreeRTOS例程2-任务挂起恢复与使用中断遇到的坑!
任务挂起简单点理解就是现在不需要执行这个任务,让它先暂停,就是挂起。恢复就是从刚才挂起的状态下继续运行。 API函数 任务挂起vTaskSuspend() 函数原型(tasks.c中): void vTaskSuspend( TaskHandle_t xTaskToSuspend ) 1. 参数: xTaskTo…...

L23.【LeetCode笔记】验证回文串(剖析几种解法)
目录 1.题目 2.自解 提交结果 反思 大小写之间的位运算 提交结果 3.代码优化 提交结果 编辑 4.LeetCode网友提供的解法 1.题目 https://leetcode.cn/problems/XltzEq/description/ 给定一个字符串 s ,验证 s 是否是 回文串 ,只考虑字母和数…...

FPGA 17 ,FPGA 与 SR-IOV虚拟化技术,高性能计算与虚拟化技术的结合(FPGA 与 SR-IOV 和 PCI,高性能计算与虚拟化的完美融合)
目录 前言 一. SR-IOV 的起源与发展 1. SR-IOV 的起源与时间线 2. SR-IOV 的诞生原因 3. SR-IOV 的详细介绍 二. SR-IOV 和 PCI 之间的关系 三. PCI 的起源与演进 1. PCI 的起源与时间线 2. PCI 的关键特性 四. FPGA 的独特魅力 1. FPGA 的定义与特性 2. FPGA 的内…...

解决navicat 导出excel数字为科学计数法问题
一、原因分析 用程序导出的csv文件,当字段中有比较长的数字字段存在时,在用excel软件查看csv文件时就会变成科学技术法的表现形式。 其实这个问题跟用什么语言导出csv文件没有关系。Excel显示数字时,如果数字大于12位,它会自动转化…...
[Unity] AppLovin Max接入Native 广告 Android篇
把下载下来的maxnativelibrary-release-文件放在Plugins/Android下 将这一行加入到mainTemplate.gradle文件中 implementation androidx.constraintlayout:constraintlayout:2.1.4添加下面的两个脚本 using System; using System.Collections; using System.Collections.Gener…...

Source Insight 4.0的安装
一、安装与破解 1、下载Source Insight 4.0安装包 https://pan.baidu.com/s/1t0u1RM19am0lyzhlNTqK9Q?pwdnvmk 2、下载程序破解补丁包 https://pan.baidu.com/s/1irvH-Kfwjf4zCCtWJByqJQ 其中包含文件si4.pediy.lic 和 sourceinsight4.exe。 3、安装下载的Source Insight …...

远程调试软件对比与使用推荐
远程调试软件对比与使用推荐 远程调试是现代软件开发中不可或缺的一部分,尤其是在处理分布式系统、云端服务或远程服务器上的问题时。以下是对几种常见远程调试工具的详细对比和推荐使用场景。 1. GDB (GNU Debugger) 特点 开源:完全免费且开源&…...

鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现
鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现 结合第一讲建立的“Hello World”程序,得到如下图所示的界面。 这里的“Hello World”是通过“Priview”显示出来的。在这个界面中进行开发的前奏曲,可以通过点击更换图片的案例来体会一下鸿蒙Next的开发…...
求解球面的一组正交标架
目录 求解球面的一组正交标架 求解球面的一组正交标架 球面 r ( u , v ) ( a cos u cos v , a cos u sin v , a sin u ) \mathbf{r}(u,v)\left(a\cos u\cos v,a\cos u\sin v,a\sin u\right) r(u,v)(acosucosv,acosusinv,asinu), 求得 r u ( − a sin u c…...
php.ini 文件上传/执行时间/部分配置新手教程
1、上传文件大小配置 一般需要同时配置“upload_max_filesize”、“post_max_size”,配置格式如下: file_uploads On ;是否允许HTTP文件上传 upload_max_filesize 2M ;设置单个文件上传的最大尺寸 post_max_size 8M ;设置 POST 请求体的最大尺寸&am…...
【Leetcode Top 100】102. 二叉树的层序遍历
问题背景 给你二叉树的根节点 r o o t root root,返回其节点值的 层序遍历 。 (即逐层地,从左到右访问所有节点)。 数据约束 树中节点数目在范围 [ 0 , 2000 ] [0, 2000] [0,2000] 内 − 1000 ≤ N o d e . v a l ≤ 1000 -1…...

【C++笔记】AVL树
前言 各位读者朋友们大家好,上期我们讲解了map和set这两大容器的使用,这一期我们讲解最早的平衡二叉搜索树——AVL树。 目录 前言一. AVL树的概念二. AVL树的实现2.1 AVL树的结构2.2 AVL树的插入2.2.1 AVL树插入一个值的大致过程2.2.2 平衡因子的更新2…...

【竞技宝】LOL:JDG官宣yagao离队
北京时间2024年12月13日,在英雄联盟S14全球总决赛结束之后,各大赛区都已经进入了休赛期,目前休赛期也快进入尾声,LPL大部分队伍都开始陆续官宣转会期的动向,其中JDG就在近期正式官宣中单选手yagao离队,而后者大概率将直接选择退役。 近日,JDG战队在官方微博上连续发布阵容变动消…...

双目摄像头标定方法
打开matlab 找到这个标定 将双目左右目拍的图像上传(左右目最好不少于20张) 等待即可 此时已经完成标定,左下角为反投影误差,右边为外参可视化 把这些误差大的删除即可。 点击导出 此时回到主页面,即可看到成功导出 Ca…...

相差不超过k的最多数,最长公共子序列(一),排序子序列,体操队形,青蛙过河
相差不超过k的最多数 链接:相差不超过k的最多数 来源:牛客网 题目描述: 给定一个数组,选择一些数,要求选择的数中任意两数差的绝对值不超过 𝑘 。问最多能选择多少个数? 输入描述: 第一行输入两个正整…...

【自然语言处理与大模型】使用llama.cpp将HF格式大模型转换为GGUF格式
llama.cpp的主要目标是在本地和云端的各种硬件上以最小的设置和最先进的性能实现LLM推理。是一个专为大型语言模型(LLM)设计的高性能推理框架,完全使用C和C编写,没有外部依赖,这使得它可以很容易地被移植到不同的操作系…...

MongoDB存储照片和文件存储照片的区别在那里?
一、维度对比 比较维度MongoDB存储照片文件系统存储照片数据模型使用文档存储数据,可以存储不同结构的照片。以文件的形式存储照片,每个文件独立存在。性能高效的数据检索,适用于大规模应用程序中的高效检索和访问。但在处理大量高分辨率图片…...
协变量的概念
协变量的概念 协变量的概念 协变量(Covariate)是在统计分析和研究中,与因变量(被研究的主要变量)相关,并且可能对因变量产生影响的其他变量。它不是研究的主要关注对象,但需要在分析过程中被考虑进去,因为它可能会混淆或改变自变量与因变量之间的关系。举例说明 教育研…...

深度学习在微纳光子学中的应用
深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...

初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...

用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...