Python 编写 Flink 应用程序经验记录(Flink1.17.1)
目录
官方API文档
提交作业到集群运行
官方示例
环境
编写一个 Flink Python Table API 程序
执行一个 Flink Python Table API 程序
实例处理Kafka后入库到Mysql
下载依赖
flink-kafka jar
读取kafka数据
写入mysql数据
flink-mysql jar
官方API文档
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/
提交作业到集群运行
#! /usr/bin/env python
# -*- coding: utf-8 -*-# /opt/test_flink.py
if __name__ == "__main__":print("这是一个简单的测试用例")
flink 安装目录下的 examples 目录里面已经提供了一些测试案例,我们也可以直接拿它来做实验。
提交至集群
./bin/flink run -py 代码文件
通过 flink run 即可运行应用程序,由于 flink 既可运行 Java 程序、也可以运行 Python 程序,所以这里我们需要指定 -py 参数,表示运行的是 py 文件。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。
export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3
下面来测试一下:
./bin/flink run -py /opt/test_flink.py

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然 flink run 是同时支持 Java、Python 等语言的。
不管使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。
提交单个 py 文件知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,只需要把这个启动文件提交上去即可。举例说明,当然这里仍不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。
先来看看编写的程序:

flink_test 就是主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的:

即使是多文件,提交方式也是相似的,输出结果表明提交成功了。
官方示例
环境
- Java 11
- Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1
编写一个 Flink Python Table API 程序
编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
接下来,我们将介绍如何创建源表和结果表。
t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())
你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:
my_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)my_sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
上面的程序展示了如何创建及注册表名分别为 source 和 sink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。
接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink。
最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()
该教程的完整代码如下:
import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)
执行一个 Flink Python Table API 程序
接下来,可以在命令行中运行作业(假设作业名为 word_count.py):
python word_count.py
上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。
最后,你可以得到如下运行结果:

实例处理Kafka后入库到Mysql
下载依赖
flink-kafka jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

读取kafka数据
#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import loggingfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializerfrom pyflink.common import Row
from pyflink.datastream import FlatMapFunctiondef read_kafka():env = StreamExecutionEnvironment.get_execution_environment()env.add_jars("file:///D:/安技汇/运营平台/DataManage/flink-sql-connector-kafka-1.17.1.jar")source = KafkaSource.builder() \.set_bootstrap_servers("172.16.12.128:9092") \.set_topics("test") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()# 从消费组提交的位点开始消费,不指定位点重置策略#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \# 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \# 从时间戳大于等于指定时间戳(毫秒)的数据开始消费#.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \# 从最早位点开始消费#.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \# 从最末尾位点开始消费#.set_starting_offsets(KafkaOffsetsInitializer.latest()) \#.set_property("partition.discovery.interval.ms", "10000") # 每 10 秒检查一次新分区#.set_property("security.protocol", "SASL_PLAINTEXT") \#.set_property("sasl.mechanism", "PLAIN") \#.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")kafka_stream.print()env.execute("Source")if __name__ == "__main__":logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")read_kafka()
写入mysql数据
flink-mysql jar
没通,待补充。。
相关文章:
Python 编写 Flink 应用程序经验记录(Flink1.17.1)
目录 官方API文档 提交作业到集群运行 官方示例 环境 编写一个 Flink Python Table API 程序 执行一个 Flink Python Table API 程序 实例处理Kafka后入库到Mysql 下载依赖 flink-kafka jar 读取kafka数据 写入mysql数据 flink-mysql jar 官方API文档 https://nigh…...
如何 通过使用优先级提示,来控制所有网页资源加载顺序
当你打开浏览器的网络标签时,你会看到大量的活动。资源正在下载,信息正在提交,事件正在记录,等等。 由于有太多的活动,有效地管理这些流量的优先级变得至关重要。带宽争用是真实存在的,当所有请求同时触发时…...
10月25日,每日信息差
今天是2023年10月26日,以下是为您准备的14条信息差 第一、百世集团牵头成立全国智慧物流与供应链行业产教融合共同体在杭州正式成立,该共同体由百世集团、浙江工商大学、浙江经济职业技术学院共同牵头 第二、问界M9预定量突破15000台 第三、前三季度我…...
泛微OA之获取每月固定日期
文章目录 1.需求及效果1.1需求1.2效果 2. 思路3. 实现 1.需求及效果 1.1需求 需要获取每个月的7号作为需发布日期,需要自动填充1.2效果 自动获取每个月的七号2. 思路 1.功能并不复杂,可以用泛微前端自带的插入代码块的功能来实现。 2.将这需要赋值的…...
Dataworks API:调取 MC 项目下所有表单
文章目录 前言Dataworks API 文档解读GetMetaDBTableList 接口文档 API 调试在线调试本地调试运行环境账密问题请求数据进一步处理 小结 前言 最近,我需要对公司的数据资产进行梳理,这其中便包括了Dataworks各个项目下的表单。这些表单,作为…...
Node编写更新用户头像接口
目录 定义路由和处理函数 验证表单数据 编辑 实现更新用户头像的功能 定义路由和处理函数 向外共享定义的更新用户头像处理函数 // 更新用户头像的处理函数 exports.updateAvatar (req, res) > {res.send(更新成功) } 定义更新用户头像路由 // 更新用户头像的路由…...
MySQL3:MySQL中一条更新SQL是如何执行的?
MySQL3:MySQL中一条更新SQL是如何执行的? MySQL中一条更新SQL是如何执行的?1.Buffer Pool缓冲池2.Redo logredo log作用Redo log文件位置redo log为什么是2个? 3.Undo log4.更新过程5.InnoDB官网架构InnoDB架构-内存结构①Buffer …...
p5.js map映射
本文简介 带尬猴,我嗨德育处主任 p5.js 为开发者提供了很多有用的方法,这些方法实现起来可能不难,但却非常实用,能大大减少我们的开发时间。 本文将通过举例说明的方式来讲解 映射 map() 方法。 什么是映射 从 p5.js 文档 中可…...
idea提交代码冲突后,代码意外消失解决办法
敲了大半天的代码,解决冲突后,直接消失了当时慌的一批CCCCC 右击项目Local History ----show History 找到最近提交的内容右击选择Revert,代码全回来了...
爬虫批量下载科研论文(SciHub)
系列文章目录 利用 eutils 实现自动下载序列文件 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 系列文章目录前言一、获取文献信息二、下载文献PDF文件参考 前言 大家好✨,这里是bio🦖。…...
explain查询sql执行计划返回的字段的详细说明
当使用EXPLAIN命令查看SQL语句的执行计划时,会返回一张表格,其中包含了该SQL语句的执行计划。下面是每个字段的详细分析: id:执行计划的唯一标识符。如果查询中有子查询,每个子查询都会有一个唯一的ID。在执行计划中&a…...
讯飞输入法13.0发布,推出行业首款生成式AI输入法
🦉 AI新闻 🚀 讯飞输入法13.0发布,推出行业首款生成式AI输入法 摘要:科大讯飞在2023年全球开发者节上发布了全新讯飞输入法13.0版本,其中最大的亮点是推出了行业首款生成式AI输入法。这次升级将生成式AI能力融入输入…...
35. 搜索插入位置、Leetcode的Python实现
博客主页:🏆看看是李XX还是李歘歘 🏆 🌺每天分享一些包括但不限于计算机基础、算法等相关的知识点🌺 💗点关注不迷路,总有一些📖知识点📖是你想要的💗 ⛽️今…...
使用 DDPO 在 TRL 中微调 Stable Diffusion 模型
引言 扩散模型 (如 DALL-E 2、Stable Diffusion) 是一类文生图模型,在生成图像 (尤其是有照片级真实感的图像) 方面取得了广泛成功。然而,这些模型生成的图像可能并不总是符合人类偏好或人类意图。因此出现了对齐问题,即如何确保模型的输出与…...
cocosCreator 之 crypto-es数据加密
版本: 3.8.0 语言: TypeScript 环境: Mac 简介 项目开发中,针对于一些明文数据,比如本地存储和Http数据请求等,进行加密保护,是有必要的。 关于加密手段主要有: 对称加密 使用相…...
Leetcode---368周赛
题目列表 2908. 元素和最小的山形三元组 I 2909. 元素和最小的山形三元组 II 2910. 合法分组的最少组数 2911. 得到 K 个半回文串的最少修改次数 一、元素和最小的山形三元组I 没什么好说的,不会其他方法就直接暴力,时间复杂度O(n^3),代…...
矢量图形编辑软件Illustrator 2023 mac中文版软件特点(ai2023) v27.9
illustrator 2023 mac是一款矢量图形编辑软件,用于创建和编辑排版、图标、标志、插图和其他类型的矢量图形。 illustrator 2023 mac软件特点 矢量图形:illustrator创建的图形是矢量图形,可以无限放大而不失真,这与像素图形编辑软…...
一、Docker Compose——什么是 Docker Compose
Docker Compose 是一个用来定义和运行多容器 Docker 应用程序的工具,他的方便之处就是可以使用 YAML 文件来配置将要运行的 Docker 容器,然后使用一条命令即可创建并启动配置好的 Docker 容器了;相比手动输入命令的繁琐,Docker Co…...
Java提升技术,进阶为高级开发和架构师的路线
原文网址:Java提升技术,进阶为高级开发和架构师的路线-CSDN博客 简介 Java怎样提升技术?怎样进阶为高级开发和架构师?本文介绍靠谱的成长路线。 首先点明,只写业务代码是无法成长技术的。提升技术的两个方法是&…...
记一次 .Net+SqlSugar 查询超时的问题排查过程
环境和版本:.Net 6 SqlSuger 5.1.4.* ,数据库是mysql 5.7 ,数据量在2000多条左右 业务是一个非常简单的查询,代码如下: var list _dbClient.Queryable<tb_name>().ToList(); tb_name 下配置了一对多的关系…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
Git常用命令完全指南:从入门到精通
Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
华为OD机试-最短木板长度-二分法(A卷,100分)
此题是一个最大化最小值的典型例题, 因为搜索范围是有界的,上界最大木板长度补充的全部木料长度,下界最小木板长度; 即left0,right10^6; 我们可以设置一个候选值x(mid),将木板的长度全部都补充到x,如果成功…...
土建施工员考试:建筑施工技术重点知识有哪些?
《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目,核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容,附学习方向和应试技巧: 一、施工组织与进度管理 核心目标: 规…...
flow_controllers
关键点: 流控制器类型: 同步(Sync):发布操作会阻塞,直到数据被确认发送。异步(Async):发布操作非阻塞,数据发送由后台线程处理。纯同步(PureSync…...
