当前位置: 首页 > news >正文

0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",

相关文章:

0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 select word, count(1) as count from source group by word; ------------------------------------------------------ |…...

会声会影2024旗舰版详细功能介绍

随着网络视频的蓬勃发展,越来越多的人开始涉足视频剪辑领域,毕竟技多不压身嘛。在众多剪辑软件中,剪映和会声会影是备受新手青睐的两种。那么,会声会影和剪映哪个好呢?在它们之间,哪一个更适合初学者呢接&a…...

QtCreator 查看类帮助文档,快捷键操作:按两次F1 全屏帮助,Esc取消全屏

如何查看类帮助文档 选择类&#xff0c;按F1查看类帮助文档。 示例&#xff1a; #include <QLabel> // 将光标放在QLabel上&#xff0c;按F1右侧弹出的类帮助手册可视宽度很小&#xff0c;如果按两次 F1 键&#xff0c;帮助文档将会以全屏模式显示&#xff0c;以便更清…...

C语言文件操作(1)

C语言文件操作&#xff08;1&#xff09; 文章目录 C语言文件操作&#xff08;1&#xff09;一、理解文件1.概述2.分类①.正常角度②.文本文件和二进制文件 二、文件的打开和关闭1.流和标准流2.文件类型指针3.文件的打开和关闭以及使用类型 三、文件缓冲区 一、理解文件 1.概述…...

adb 操作命令(adb调试QT项目使用到的命令)

1.adb连接串口 获取root权限 adb root && adb remount && adb shell2.测试串口命令 stty -F /dev/ttyS4 cs8 -parenb -cstopb -echoecho "12345\n" > /dev/ttyS8cat /dev/ttyS4 &3.软件在安卓系统上的名字已经活动名称&#xff08;下面是示…...

mysql下载和安装,使用

先下载安装 官方下载 已下载备份软件 安装&#xff0c;一路下一步设置环境变量 4. 打开一个cmd&#xff0c;输入mysql -u root -p...

Redis底层核心数据结构详解

文章目录 一、深入String&#xff08;SDS&#xff09;1. 字符串简介2. SDS存在的意义3. SDS结构设计4. SDS与C字符串的区别4.1 常数复杂度获取字符串长度4.2 杜绝缓冲区溢出4.3 二进制安全4.4 SDS API 5 小结 二、深入List (QuickList)1. 链表节点结构设计2. Redis的链表实现的…...

如何学会从产品经理角度去思考问题?

如何学会从产品经理角度去思考问题&#xff1f; 从产品经理的角度思考问题意味着你需要关注产品从构思到上市全过程中的各个方面&#xff0c;包括用户需求、市场趋势、设计、开发、测试、上市后的用户反馈等。以下是一些策略和方法&#xff0c;帮助你培养从产品经理角度思考问…...

正则表达式的神奇世界:表达、匹配和提取

正则表达式的神奇世界&#xff1a;表达、匹配和提取 前言第一&#xff1a; 什么是正则表达式&#xff1f;第二&#xff1a; 字符匹配和量词&#xff1a;1. 字符匹配&#xff1a;2. 量词&#xff1a;3. 贪婪和非贪婪匹配&#xff1a; 第三&#xff1a;字符类和元字符1. 字符类&a…...

密码登录虽安全,但有时很麻烦!如何禁用或删除Windows 11中的密码登录

如果你想在Windows 11上自动登录,在本指南中,我们将向你展示如何删除你的帐户密码。 在Windows 11上,你可以至少通过三种方式从帐户中删除登录密码。在你的帐户上使用密码有助于保护你的计算机和文件免受来自internet或本地的未经授权的访问。然而,在某些情况下,密码可能…...

Python实现的快速排序代码

Python实现的快速排序代码 def bubble_sort(arr): n len(arr) for i in range(n): for j in range(0, n-i-1): if arr[j] > arr[j1]: arr[j], arr[j1] arr[j1], arr[j] return arr 冒泡排序是一种简单的排序算法&#xff0c;它重复地遍历要排序的数列&#xff0c;…...

【USRP】通信基带物理层历史

无线通信的基带物理层开发历史涵盖了从早期无线技术到当前复杂的移动通信标准的各种进步。以下是关于无线通信基带物理层开发的简要历史概述&#xff1a; 无线电初期&#xff1a;20世纪初&#xff0c;Guglielmo Marconi等人通过无线电进行了早期的无线通信尝试。这些早期的尝试…...

依靠继承与聚合,实现maven搭建分布式项目

简介聚合 对于复杂的Maven项目&#xff0c;一般建议采用多模块的方式来设计开发&#xff0c;便于后期维护管理。但是构建项目时&#xff0c;如果每次都需要按模块一个一个进行构建会十分麻烦&#xff0c;而Maven的聚合功能就可以很好的解决这个问题&#xff0c;当用户对聚合模…...

华为OD 叠积木(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

vue重修之自定义项目、ESLint和代码规范修复

文章目录 VueCli 自定义创建项目ESlint代码规范及手动修复代码规范错误 VueCli 自定义创建项目 安装脚手架 (已安装) npm i vue/cli -g创建项目 vue create xxx选项 Vue CLI v5.0.8 ? Please pick a preset:Default ([Vue 3] babel, eslint)Default ([Vue 2] babel, eslint) …...

华为OD 完全二叉树非叶子部分后序遍历(200分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应往年部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为OD…...

AI是未来?——神经网络篇

AI是未来&#xff1f;——神经网络篇 文章目录 AI是未来&#xff1f;——神经网络篇1. 神经网络小记问题记录&#xff1a; 1. 神经网络小记 疑问&#xff1a;假如让神经网络识别一张猫的图片&#xff0c;他经过了n个神经元节点最终识别为了狗。那么此时观察产生反应的这些神经…...

c语言练习94:分割链表

分割链表 给你一个链表的头节点 head 和一个特定值 x &#xff0c;请你对链表进行分隔&#xff0c;使得所有 小于 x 的节点都出现在 大于或等于 x 的节点之前。 你不需要 保留 每个分区中各节点的初始相对位置。 示例 1&#xff1a; 输入&#xff1a;head [1,4,3,2,5,2], x…...

华为OD 数组二叉树(200分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应往年部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为OD…...

Upload-labs(1-20关保姆级教程)

靶场下载链接 https://github.com/c0ny1/upload-labs 话不多说&#xff0c;直接喂饭 lab-1 上传php木马&#xff0c;发现弹出提示框&#xff0c;查看源码可知是前端过滤 bp抓包&#xff0c;先上传一张正常的jpg图片 修改文件内容和后缀&#xff0c;大概就是想怎么改就怎么…...

Overleaf上LaTeX Beamer字体自定义实战:手把手教你用fontspec包搞定中文和英文字体

Overleaf平台LaTeX Beamer字体定制全攻略&#xff1a;从基础配置到高级技巧 在学术报告和教学演示领域&#xff0c;LaTeX Beamer因其专业的排版质量和稳定的输出效果而备受青睐。然而&#xff0c;当涉及到中英混排场景时&#xff0c;许多用户都会遇到字体配置的挑战——如何让中…...

SSM框架在零售业数字化转型中的实践:超市管理系统全流程解析

1. 为什么零售业需要数字化转型&#xff1f; 最近几年我走访了不少中小型超市&#xff0c;发现一个共同痛点&#xff1a;很多老板还在用纸质小本本记录进货和销售数据&#xff0c;月底对账时经常出现"货卖完了但钱对不上"的情况。有个开社区超市的张老板跟我吐槽&am…...

Huntarr实战案例:如何从零搭建完整的媒体自动化系统

Huntarr实战案例&#xff1a;如何从零搭建完整的媒体自动化系统 【免费下载链接】Sonarr-Hunter Assists Sonarr to check for missing TV Shows 项目地址: https://gitcode.com/gh_mirrors/so/Sonarr-Hunter Huntarr是一款强大的媒体自动化工具&#xff0c;能够帮助用户…...

在PyTorch 2.8 环境中运行MATLAB引擎:混合编程实现算法验证

在PyTorch 2.8环境中运行MATLAB引擎&#xff1a;混合编程实现算法验证 1. 引言&#xff1a;当深度学习遇上工程计算 想象一下这个场景&#xff1a;你正在用PyTorch开发一个深度学习模型&#xff0c;需要对输入信号进行复杂的滤波处理&#xff0c;或者要对模型输出进行精细的控…...

YimMenu终极指南:5步掌握GTA5最强免费防崩溃辅助工具

YimMenu终极指南&#xff1a;5步掌握GTA5最强免费防崩溃辅助工具 【免费下载链接】YimMenu YimMenu, a GTA V menu protecting against a wide ranges of the public crashes and improving the overall experience. 项目地址: https://gitcode.com/GitHub_Trending/yi/YimMe…...

SD-PPP:Photoshop与AI绘图工作流的革命性融合

SD-PPP&#xff1a;Photoshop与AI绘图工作流的革命性融合 【免费下载链接】sd-ppp A Photoshop AI plugin 项目地址: https://gitcode.com/gh_mirrors/sd/sd-ppp 在创意设计领域&#xff0c;传统工作流程中设计师需要在多个软件间频繁切换&#xff0c;这种割裂的操作模式…...

OpenClaw本地部署指南|nanobot镜像预置GPU监控Dashboard(Grafana+Prometheus模板)

OpenClaw本地部署指南&#xff5c;nanobot镜像预置GPU监控Dashboard&#xff08;GrafanaPrometheus模板&#xff09; 1. 项目简介 nanobot是一款受OpenClaw启发的超轻量级个人人工智能助手&#xff0c;仅需约4000行代码就能提供核心代理功能&#xff0c;比传统方案的代码量减…...

Star CCM+ 实战:旋风分离器(cyclone separator)体网格生成与优化策略

1. 旋风分离器网格生成前的准备工作 在开始使用Star CCM生成旋风分离器体网格之前&#xff0c;我们需要做好充分的准备工作。旋风分离器作为一种常见的气固分离设备&#xff0c;其内部流动特性复杂&#xff0c;包含强烈的旋转流场和湍流现象。这就对网格质量提出了更高要求&am…...

Qwen2.5-VL-7B-Instruct入门指南:多模态指令微调数据格式解析

Qwen2.5-VL-7B-Instruct入门指南&#xff1a;多模态指令微调数据格式解析 1. 项目概述 Qwen2.5-VL-7B-Instruct是一款强大的多模态视觉-语言模型&#xff0c;能够同时处理图像和文本输入&#xff0c;理解复杂的跨模态指令。这个7B参数的模型特别适合需要视觉理解和语言生成相…...

HTML5中Mediastream实现摄像头画面实时捕获

HTML5通过MediaStream API可直接调用摄像头&#xff1a;先用navigator.mediaDevices.getUserMedia({video:true})获取流并赋给video.srcObject&#xff0c;再用canvas逐帧绘制处理&#xff1b;需处理权限异常、复用流、设置约束参数&#xff0c;并注意HTTPS和移动端autoplay/mu…...