Flink Python作业快速入门
Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心

import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):"""计算文本文件中单词的频率,并将结果输出到指定路径。该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。参数:- input_path: 输入文本文件的路径。如果为None,则使用默认数据。- output_path: 输出结果的路径。如果为None,则直接打印结果。"""# 获取流处理环境并设置为流处理模式,设置并行度为1env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)env.set_parallelism(1)# 定义数据源if input_path is not None:# 从文件系统中读取数据ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:# 使用默认数据ds = env.from_collection(word_count_data)# 定义分割函数,将每行文本分割成单词def split(line):yield from line.split()# 计算单词频率ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 定义数据汇if output_path is not None:# 将结果写入文件系统ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:# 直接打印结果ds.print()# 提交作业以执行env.execute()if __name__ == '__main__':# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")# 创建一个ArgumentParser对象以处理命令行参数parser = argparse.ArgumentParser()# 添加可选的命令行参数,用于指定输入和输出文件parser.add_argument('--input',dest='input',required=False,help='要处理的输入文件。')parser.add_argument('--output',dest='output',required=False,help='要写入结果的输出文件。')# 获取命令行参数,排除脚本名称argv = sys.argv[1:]print("Command line arguments: ", argv)# 解析已知的命令行参数,并忽略未知参数known_args, _ = parser.parse_known_args(argv)print("known_args: ", known_args)# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径word_count(known_args.input, known_args.output)
相关文章:
Flink Python作业快速入门
Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心 import argparse # 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数 import logging import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.data…...
自定义函数库
求两点距离 double dis(double x1, double y1, double x2, double y2){return sqrt(pow(x2-x1, 2)pow(y2-y1, 2)); }判断闰年 bool isLeapYear(int year){return year%40 && year%100!0 || year%4000; }判断素数 bool isPrime(int num){if(num<2) return false;f…...
FreeRTOS例程2-任务挂起恢复与使用中断遇到的坑!
任务挂起简单点理解就是现在不需要执行这个任务,让它先暂停,就是挂起。恢复就是从刚才挂起的状态下继续运行。 API函数 任务挂起vTaskSuspend() 函数原型(tasks.c中): void vTaskSuspend( TaskHandle_t xTaskToSuspend ) 1. 参数: xTaskTo…...
L23.【LeetCode笔记】验证回文串(剖析几种解法)
目录 1.题目 2.自解 提交结果 反思 大小写之间的位运算 提交结果 3.代码优化 提交结果 编辑 4.LeetCode网友提供的解法 1.题目 https://leetcode.cn/problems/XltzEq/description/ 给定一个字符串 s ,验证 s 是否是 回文串 ,只考虑字母和数…...
FPGA 17 ,FPGA 与 SR-IOV虚拟化技术,高性能计算与虚拟化技术的结合(FPGA 与 SR-IOV 和 PCI,高性能计算与虚拟化的完美融合)
目录 前言 一. SR-IOV 的起源与发展 1. SR-IOV 的起源与时间线 2. SR-IOV 的诞生原因 3. SR-IOV 的详细介绍 二. SR-IOV 和 PCI 之间的关系 三. PCI 的起源与演进 1. PCI 的起源与时间线 2. PCI 的关键特性 四. FPGA 的独特魅力 1. FPGA 的定义与特性 2. FPGA 的内…...
解决navicat 导出excel数字为科学计数法问题
一、原因分析 用程序导出的csv文件,当字段中有比较长的数字字段存在时,在用excel软件查看csv文件时就会变成科学技术法的表现形式。 其实这个问题跟用什么语言导出csv文件没有关系。Excel显示数字时,如果数字大于12位,它会自动转化…...
[Unity] AppLovin Max接入Native 广告 Android篇
把下载下来的maxnativelibrary-release-文件放在Plugins/Android下 将这一行加入到mainTemplate.gradle文件中 implementation androidx.constraintlayout:constraintlayout:2.1.4添加下面的两个脚本 using System; using System.Collections; using System.Collections.Gener…...
Source Insight 4.0的安装
一、安装与破解 1、下载Source Insight 4.0安装包 https://pan.baidu.com/s/1t0u1RM19am0lyzhlNTqK9Q?pwdnvmk 2、下载程序破解补丁包 https://pan.baidu.com/s/1irvH-Kfwjf4zCCtWJByqJQ 其中包含文件si4.pediy.lic 和 sourceinsight4.exe。 3、安装下载的Source Insight …...
远程调试软件对比与使用推荐
远程调试软件对比与使用推荐 远程调试是现代软件开发中不可或缺的一部分,尤其是在处理分布式系统、云端服务或远程服务器上的问题时。以下是对几种常见远程调试工具的详细对比和推荐使用场景。 1. GDB (GNU Debugger) 特点 开源:完全免费且开源&…...
鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现
鸿蒙项目云捐助第二讲鸿蒙图文互动基本程序实现 结合第一讲建立的“Hello World”程序,得到如下图所示的界面。 这里的“Hello World”是通过“Priview”显示出来的。在这个界面中进行开发的前奏曲,可以通过点击更换图片的案例来体会一下鸿蒙Next的开发…...
求解球面的一组正交标架
目录 求解球面的一组正交标架 求解球面的一组正交标架 球面 r ( u , v ) ( a cos u cos v , a cos u sin v , a sin u ) \mathbf{r}(u,v)\left(a\cos u\cos v,a\cos u\sin v,a\sin u\right) r(u,v)(acosucosv,acosusinv,asinu), 求得 r u ( − a sin u c…...
php.ini 文件上传/执行时间/部分配置新手教程
1、上传文件大小配置 一般需要同时配置“upload_max_filesize”、“post_max_size”,配置格式如下: file_uploads On ;是否允许HTTP文件上传 upload_max_filesize 2M ;设置单个文件上传的最大尺寸 post_max_size 8M ;设置 POST 请求体的最大尺寸&am…...
【Leetcode Top 100】102. 二叉树的层序遍历
问题背景 给你二叉树的根节点 r o o t root root,返回其节点值的 层序遍历 。 (即逐层地,从左到右访问所有节点)。 数据约束 树中节点数目在范围 [ 0 , 2000 ] [0, 2000] [0,2000] 内 − 1000 ≤ N o d e . v a l ≤ 1000 -1…...
【C++笔记】AVL树
前言 各位读者朋友们大家好,上期我们讲解了map和set这两大容器的使用,这一期我们讲解最早的平衡二叉搜索树——AVL树。 目录 前言一. AVL树的概念二. AVL树的实现2.1 AVL树的结构2.2 AVL树的插入2.2.1 AVL树插入一个值的大致过程2.2.2 平衡因子的更新2…...
【竞技宝】LOL:JDG官宣yagao离队
北京时间2024年12月13日,在英雄联盟S14全球总决赛结束之后,各大赛区都已经进入了休赛期,目前休赛期也快进入尾声,LPL大部分队伍都开始陆续官宣转会期的动向,其中JDG就在近期正式官宣中单选手yagao离队,而后者大概率将直接选择退役。 近日,JDG战队在官方微博上连续发布阵容变动消…...
双目摄像头标定方法
打开matlab 找到这个标定 将双目左右目拍的图像上传(左右目最好不少于20张) 等待即可 此时已经完成标定,左下角为反投影误差,右边为外参可视化 把这些误差大的删除即可。 点击导出 此时回到主页面,即可看到成功导出 Ca…...
相差不超过k的最多数,最长公共子序列(一),排序子序列,体操队形,青蛙过河
相差不超过k的最多数 链接:相差不超过k的最多数 来源:牛客网 题目描述: 给定一个数组,选择一些数,要求选择的数中任意两数差的绝对值不超过 𝑘 。问最多能选择多少个数? 输入描述: 第一行输入两个正整…...
【自然语言处理与大模型】使用llama.cpp将HF格式大模型转换为GGUF格式
llama.cpp的主要目标是在本地和云端的各种硬件上以最小的设置和最先进的性能实现LLM推理。是一个专为大型语言模型(LLM)设计的高性能推理框架,完全使用C和C编写,没有外部依赖,这使得它可以很容易地被移植到不同的操作系…...
MongoDB存储照片和文件存储照片的区别在那里?
一、维度对比 比较维度MongoDB存储照片文件系统存储照片数据模型使用文档存储数据,可以存储不同结构的照片。以文件的形式存储照片,每个文件独立存在。性能高效的数据检索,适用于大规模应用程序中的高效检索和访问。但在处理大量高分辨率图片…...
协变量的概念
协变量的概念 协变量的概念 协变量(Covariate)是在统计分析和研究中,与因变量(被研究的主要变量)相关,并且可能对因变量产生影响的其他变量。它不是研究的主要关注对象,但需要在分析过程中被考虑进去,因为它可能会混淆或改变自变量与因变量之间的关系。举例说明 教育研…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...
使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
