当前位置: 首页 > news >正文

0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",

相关文章:

0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 select word, count(1) as count from source group by word; ------------------------------------------------------ |…...

会声会影2024旗舰版详细功能介绍

随着网络视频的蓬勃发展,越来越多的人开始涉足视频剪辑领域,毕竟技多不压身嘛。在众多剪辑软件中,剪映和会声会影是备受新手青睐的两种。那么,会声会影和剪映哪个好呢?在它们之间,哪一个更适合初学者呢接&a…...

QtCreator 查看类帮助文档,快捷键操作:按两次F1 全屏帮助,Esc取消全屏

如何查看类帮助文档 选择类&#xff0c;按F1查看类帮助文档。 示例&#xff1a; #include <QLabel> // 将光标放在QLabel上&#xff0c;按F1右侧弹出的类帮助手册可视宽度很小&#xff0c;如果按两次 F1 键&#xff0c;帮助文档将会以全屏模式显示&#xff0c;以便更清…...

C语言文件操作(1)

C语言文件操作&#xff08;1&#xff09; 文章目录 C语言文件操作&#xff08;1&#xff09;一、理解文件1.概述2.分类①.正常角度②.文本文件和二进制文件 二、文件的打开和关闭1.流和标准流2.文件类型指针3.文件的打开和关闭以及使用类型 三、文件缓冲区 一、理解文件 1.概述…...

adb 操作命令(adb调试QT项目使用到的命令)

1.adb连接串口 获取root权限 adb root && adb remount && adb shell2.测试串口命令 stty -F /dev/ttyS4 cs8 -parenb -cstopb -echoecho "12345\n" > /dev/ttyS8cat /dev/ttyS4 &3.软件在安卓系统上的名字已经活动名称&#xff08;下面是示…...

mysql下载和安装,使用

先下载安装 官方下载 已下载备份软件 安装&#xff0c;一路下一步设置环境变量 4. 打开一个cmd&#xff0c;输入mysql -u root -p...

Redis底层核心数据结构详解

文章目录 一、深入String&#xff08;SDS&#xff09;1. 字符串简介2. SDS存在的意义3. SDS结构设计4. SDS与C字符串的区别4.1 常数复杂度获取字符串长度4.2 杜绝缓冲区溢出4.3 二进制安全4.4 SDS API 5 小结 二、深入List (QuickList)1. 链表节点结构设计2. Redis的链表实现的…...

如何学会从产品经理角度去思考问题?

如何学会从产品经理角度去思考问题&#xff1f; 从产品经理的角度思考问题意味着你需要关注产品从构思到上市全过程中的各个方面&#xff0c;包括用户需求、市场趋势、设计、开发、测试、上市后的用户反馈等。以下是一些策略和方法&#xff0c;帮助你培养从产品经理角度思考问…...

正则表达式的神奇世界:表达、匹配和提取

正则表达式的神奇世界&#xff1a;表达、匹配和提取 前言第一&#xff1a; 什么是正则表达式&#xff1f;第二&#xff1a; 字符匹配和量词&#xff1a;1. 字符匹配&#xff1a;2. 量词&#xff1a;3. 贪婪和非贪婪匹配&#xff1a; 第三&#xff1a;字符类和元字符1. 字符类&a…...

密码登录虽安全,但有时很麻烦!如何禁用或删除Windows 11中的密码登录

如果你想在Windows 11上自动登录,在本指南中,我们将向你展示如何删除你的帐户密码。 在Windows 11上,你可以至少通过三种方式从帐户中删除登录密码。在你的帐户上使用密码有助于保护你的计算机和文件免受来自internet或本地的未经授权的访问。然而,在某些情况下,密码可能…...

Python实现的快速排序代码

Python实现的快速排序代码 def bubble_sort(arr): n len(arr) for i in range(n): for j in range(0, n-i-1): if arr[j] > arr[j1]: arr[j], arr[j1] arr[j1], arr[j] return arr 冒泡排序是一种简单的排序算法&#xff0c;它重复地遍历要排序的数列&#xff0c;…...

【USRP】通信基带物理层历史

无线通信的基带物理层开发历史涵盖了从早期无线技术到当前复杂的移动通信标准的各种进步。以下是关于无线通信基带物理层开发的简要历史概述&#xff1a; 无线电初期&#xff1a;20世纪初&#xff0c;Guglielmo Marconi等人通过无线电进行了早期的无线通信尝试。这些早期的尝试…...

依靠继承与聚合,实现maven搭建分布式项目

简介聚合 对于复杂的Maven项目&#xff0c;一般建议采用多模块的方式来设计开发&#xff0c;便于后期维护管理。但是构建项目时&#xff0c;如果每次都需要按模块一个一个进行构建会十分麻烦&#xff0c;而Maven的聚合功能就可以很好的解决这个问题&#xff0c;当用户对聚合模…...

华为OD 叠积木(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

vue重修之自定义项目、ESLint和代码规范修复

文章目录 VueCli 自定义创建项目ESlint代码规范及手动修复代码规范错误 VueCli 自定义创建项目 安装脚手架 (已安装) npm i vue/cli -g创建项目 vue create xxx选项 Vue CLI v5.0.8 ? Please pick a preset:Default ([Vue 3] babel, eslint)Default ([Vue 2] babel, eslint) …...

华为OD 完全二叉树非叶子部分后序遍历(200分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应往年部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为OD…...

AI是未来?——神经网络篇

AI是未来&#xff1f;——神经网络篇 文章目录 AI是未来&#xff1f;——神经网络篇1. 神经网络小记问题记录&#xff1a; 1. 神经网络小记 疑问&#xff1a;假如让神经网络识别一张猫的图片&#xff0c;他经过了n个神经元节点最终识别为了狗。那么此时观察产生反应的这些神经…...

c语言练习94:分割链表

分割链表 给你一个链表的头节点 head 和一个特定值 x &#xff0c;请你对链表进行分隔&#xff0c;使得所有 小于 x 的节点都出现在 大于或等于 x 的节点之前。 你不需要 保留 每个分区中各节点的初始相对位置。 示例 1&#xff1a; 输入&#xff1a;head [1,4,3,2,5,2], x…...

华为OD 数组二叉树(200分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应往年部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为OD…...

Upload-labs(1-20关保姆级教程)

靶场下载链接 https://github.com/c0ny1/upload-labs 话不多说&#xff0c;直接喂饭 lab-1 上传php木马&#xff0c;发现弹出提示框&#xff0c;查看源码可知是前端过滤 bp抓包&#xff0c;先上传一张正常的jpg图片 修改文件内容和后缀&#xff0c;大概就是想怎么改就怎么…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

Opencv中的addweighted函数

一.addweighted函数作用 addweighted&#xff08;&#xff09;是OpenCV库中用于图像处理的函数&#xff0c;主要功能是将两个输入图像&#xff08;尺寸和类型相同&#xff09;按照指定的权重进行加权叠加&#xff08;图像融合&#xff09;&#xff0c;并添加一个标量值&#x…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路&#xff1a; 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑&#xff1a;async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...