当前位置: 首页 > news >正文

0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",

相关文章:

0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 select word, count(1) as count from source group by word; ------------------------------------------------------ |…...

会声会影2024旗舰版详细功能介绍

随着网络视频的蓬勃发展,越来越多的人开始涉足视频剪辑领域,毕竟技多不压身嘛。在众多剪辑软件中,剪映和会声会影是备受新手青睐的两种。那么,会声会影和剪映哪个好呢?在它们之间,哪一个更适合初学者呢接&a…...

QtCreator 查看类帮助文档,快捷键操作:按两次F1 全屏帮助,Esc取消全屏

如何查看类帮助文档 选择类&#xff0c;按F1查看类帮助文档。 示例&#xff1a; #include <QLabel> // 将光标放在QLabel上&#xff0c;按F1右侧弹出的类帮助手册可视宽度很小&#xff0c;如果按两次 F1 键&#xff0c;帮助文档将会以全屏模式显示&#xff0c;以便更清…...

C语言文件操作(1)

C语言文件操作&#xff08;1&#xff09; 文章目录 C语言文件操作&#xff08;1&#xff09;一、理解文件1.概述2.分类①.正常角度②.文本文件和二进制文件 二、文件的打开和关闭1.流和标准流2.文件类型指针3.文件的打开和关闭以及使用类型 三、文件缓冲区 一、理解文件 1.概述…...

adb 操作命令(adb调试QT项目使用到的命令)

1.adb连接串口 获取root权限 adb root && adb remount && adb shell2.测试串口命令 stty -F /dev/ttyS4 cs8 -parenb -cstopb -echoecho "12345\n" > /dev/ttyS8cat /dev/ttyS4 &3.软件在安卓系统上的名字已经活动名称&#xff08;下面是示…...

mysql下载和安装,使用

先下载安装 官方下载 已下载备份软件 安装&#xff0c;一路下一步设置环境变量 4. 打开一个cmd&#xff0c;输入mysql -u root -p...

Redis底层核心数据结构详解

文章目录 一、深入String&#xff08;SDS&#xff09;1. 字符串简介2. SDS存在的意义3. SDS结构设计4. SDS与C字符串的区别4.1 常数复杂度获取字符串长度4.2 杜绝缓冲区溢出4.3 二进制安全4.4 SDS API 5 小结 二、深入List (QuickList)1. 链表节点结构设计2. Redis的链表实现的…...

如何学会从产品经理角度去思考问题?

如何学会从产品经理角度去思考问题&#xff1f; 从产品经理的角度思考问题意味着你需要关注产品从构思到上市全过程中的各个方面&#xff0c;包括用户需求、市场趋势、设计、开发、测试、上市后的用户反馈等。以下是一些策略和方法&#xff0c;帮助你培养从产品经理角度思考问…...

正则表达式的神奇世界:表达、匹配和提取

正则表达式的神奇世界&#xff1a;表达、匹配和提取 前言第一&#xff1a; 什么是正则表达式&#xff1f;第二&#xff1a; 字符匹配和量词&#xff1a;1. 字符匹配&#xff1a;2. 量词&#xff1a;3. 贪婪和非贪婪匹配&#xff1a; 第三&#xff1a;字符类和元字符1. 字符类&a…...

密码登录虽安全,但有时很麻烦!如何禁用或删除Windows 11中的密码登录

如果你想在Windows 11上自动登录,在本指南中,我们将向你展示如何删除你的帐户密码。 在Windows 11上,你可以至少通过三种方式从帐户中删除登录密码。在你的帐户上使用密码有助于保护你的计算机和文件免受来自internet或本地的未经授权的访问。然而,在某些情况下,密码可能…...

Python实现的快速排序代码

Python实现的快速排序代码 def bubble_sort(arr): n len(arr) for i in range(n): for j in range(0, n-i-1): if arr[j] > arr[j1]: arr[j], arr[j1] arr[j1], arr[j] return arr 冒泡排序是一种简单的排序算法&#xff0c;它重复地遍历要排序的数列&#xff0c;…...

【USRP】通信基带物理层历史

无线通信的基带物理层开发历史涵盖了从早期无线技术到当前复杂的移动通信标准的各种进步。以下是关于无线通信基带物理层开发的简要历史概述&#xff1a; 无线电初期&#xff1a;20世纪初&#xff0c;Guglielmo Marconi等人通过无线电进行了早期的无线通信尝试。这些早期的尝试…...

依靠继承与聚合,实现maven搭建分布式项目

简介聚合 对于复杂的Maven项目&#xff0c;一般建议采用多模块的方式来设计开发&#xff0c;便于后期维护管理。但是构建项目时&#xff0c;如果每次都需要按模块一个一个进行构建会十分麻烦&#xff0c;而Maven的聚合功能就可以很好的解决这个问题&#xff0c;当用户对聚合模…...

华为OD 叠积木(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

vue重修之自定义项目、ESLint和代码规范修复

文章目录 VueCli 自定义创建项目ESlint代码规范及手动修复代码规范错误 VueCli 自定义创建项目 安装脚手架 (已安装) npm i vue/cli -g创建项目 vue create xxx选项 Vue CLI v5.0.8 ? Please pick a preset:Default ([Vue 3] babel, eslint)Default ([Vue 2] babel, eslint) …...

华为OD 完全二叉树非叶子部分后序遍历(200分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应往年部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为OD…...

AI是未来?——神经网络篇

AI是未来&#xff1f;——神经网络篇 文章目录 AI是未来&#xff1f;——神经网络篇1. 神经网络小记问题记录&#xff1a; 1. 神经网络小记 疑问&#xff1a;假如让神经网络识别一张猫的图片&#xff0c;他经过了n个神经元节点最终识别为了狗。那么此时观察产生反应的这些神经…...

c语言练习94:分割链表

分割链表 给你一个链表的头节点 head 和一个特定值 x &#xff0c;请你对链表进行分隔&#xff0c;使得所有 小于 x 的节点都出现在 大于或等于 x 的节点之前。 你不需要 保留 每个分区中各节点的初始相对位置。 示例 1&#xff1a; 输入&#xff1a;head [1,4,3,2,5,2], x…...

华为OD 数组二叉树(200分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应往年部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为OD…...

Upload-labs(1-20关保姆级教程)

靶场下载链接 https://github.com/c0ny1/upload-labs 话不多说&#xff0c;直接喂饭 lab-1 上传php木马&#xff0c;发现弹出提示框&#xff0c;查看源码可知是前端过滤 bp抓包&#xff0c;先上传一张正常的jpg图片 修改文件内容和后缀&#xff0c;大概就是想怎么改就怎么…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者

抖音增长新引擎&#xff1a;品融电商&#xff0c;一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中&#xff0c;品牌如何破浪前行&#xff1f;自建团队成本高、效果难控&#xff1b;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

DingDing机器人群消息推送

文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人&#xff0c;点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置&#xff0c;详见说明文档 成功后&#xff0c;记录Webhook 2 API文档说明 点击设置说明 查看自…...

多模态图像修复系统:基于深度学习的图片修复实现

多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...

为什么要创建 Vue 实例

核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)

一、OpenBCI_GUI 项目概述 &#xff08;一&#xff09;项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台&#xff0c;其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言&#xff0c;首次接触 OpenBCI 设备时&#xff0c;往…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

用鸿蒙HarmonyOS5实现中国象棋小游戏的过程

下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...

Java数组Arrays操作全攻略

Arrays类的概述 Java中的Arrays类位于java.util包中&#xff0c;提供了一系列静态方法用于操作数组&#xff08;如排序、搜索、填充、比较等&#xff09;。这些方法适用于基本类型数组和对象数组。 常用成员方法及代码示例 排序&#xff08;sort&#xff09; 对数组进行升序…...