当前位置: 首页 > news >正文

0基础学习PyFlink——使用PyFlink的SQL进行字数统计

在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中,我们使用了Python基础函数实现了字(符)统计的功能。这篇我们将切入PyFlink,使用这个框架实现字数统计功能。

PyFlink安装

安装Python

sudo apt install python3.10
sudo ln -s /usr/bin/python3.10 /usr/bin/python

安装虚拟环境

sudo apt install python3.10-venv

创建工程所在文件夹,并创建虚拟环境

mkdir pyflink-test
cd pyflink-test
python -m venv .env

进入虚拟环境,并安装PyFlink

source .env/bin/activate
pip3.10 install apache-flink

统计代码

Flink为开发者提供了如下不同层级的抽象。本篇我们将尽量使用SQL来实现功能。
在这里插入图片描述

创建环境

执行环境用于设置任务的属性(batch还是stream),以及一些运行时参数(parallelism.default等)。
和Hadoop不同的是,Flink是流批一体(既可以处理流,也可以处理批处理)的引擎,而前者是批处理引擎。
批处理很好理解,即给一批数据,我们一次性、成批处理完成。
而流处理则是指,数据源源不断进入引擎,没有尽头。
本文不对此做过多展开,只要记得本例使用的是批处理模式(in_batch_mode)即可。

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)

Source

在前两篇文章中,我们使用内存中的常规结构体,如dict等来保存Map过后的数据。而本文介绍的SQL方式,则是通过Table(表)的形式来存储,即输入的数据会Map到一张表中

    # define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')

这张表只有一个字段——String类型的word。它用于记录被切分后的一个个字符串。
这儿有个关键字with。它可以用于描述数据读写相关信息,即完成数据读写相关的设置。
connector用于指定连接方式,比如filesystem是指文件系统,即数据读写目标是一个文件;jdbc则是指一个数据库,比如mysql;kafka则是指一个Kafka服务。
format用于指定如何把二进制数据映射到表的列上。比如CSV,则是用“,”进行列的切割。

Execute

    # execute insertmy_select_ddl = """select word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

上述SQL我们按source表中的word字段聚类,统计每个字符出现的个数。
完整输出如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
OK
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+
5 rows in set

完整代码

# sql_print.py
import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')my_select_ddl = """select word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).print()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

测试的输入文件

“A”,
“B”,
“C”,
“D”,
“A”,
“E”,
“C”,
“D”,
“A”,

运行的指令是

python sql_print.py --input input1.csv

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/

相关文章:

0基础学习PyFlink——使用PyFlink的SQL进行字数统计

在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中,我们使用了Python基础函数实现了字(符)统计的功能。这篇我们将切入PyFlink,使用这个框架实现字数统计功能。 PyFl…...

【Java系列】ArrayList

ArrayList 添加元素访问元素修改元素删除元素计算大小迭代数组列表其他的引用类型ArrayList 排序Java ArrayList 方法系列文章系列文章版本记录 引言 ArrayList 类是一个可以动态修改的数组,与普通数组的区别就是它是没有固定大小的限制,我们可以添加或删…...

sqlalchemy 使用

Python中强大的通用ORM框架:SQLAlchemy - 知乎...

Python深度学习实战-基于class类搭建BP神经网络实现分类任务(附源码和实现效果)

实现功能 上篇文章介绍了用Squential搭建BP神经网络,Squential可以搭建出上层输出就是下层输入的顺序神经网络结构,无法搭出一些带有跳连的非顺序网络结构,这个时候我们可以选择类class搭建封装神经网络结构。 第一步:import ten…...

GIS 数据结构整理:网格索引

1 一维网格索引 把整个数据库数值空间划分成n*n的正方形网格,建立另一个倒排文件——栅格索引每一个网格在栅格索引中有一个索引条目(记录),在这个记录中登记所有位于或穿过该网格的物体的关键字 1.1 变长指针法 在这个网格的物体,按照序号…...

【打靶】vulhub打靶系列(一)—小白视野的渗透测试

主机探测 arpscan arp-scan -l 另一种方法arping for i in $(seq 1 200); do sudo arping -c 1 192.168.56.$i; done 注意这个必须是root权限 ​ 端口探测 nmap nmap -p- -sV -T4 192.168.56.104 发现8080端口 web测试 访问下web页面 1、通过逻辑点绕过 发送到xia_sq…...

kafka3.X集群安装(不使用zookeeper)

参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本 一、kafka集群实例角色规划 在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。 上图中黑色代表broker(消息代理服务)&…...

2023 年 的 DBA 有哪些变化?

作者:Craig S. Mullins 数据库专家,IBM 优化冠军,DB2 金牌顾问以及 IDUG 名人堂成员,数据库类畅销书作者,著有《DB2 Developers Guide》、《Database Administration: The Complete Guide to DBA Practices & Pro…...

vs2022 使用git同步报错以及解决每次推送要输入密码问题

1.使用 git GUI工具,例如:TortoiseGit ,把全局配置文件这样设置一下 设置全局.config ,这样即可。 [credential] helper store 2.如果推送代码或拉取代码一直失败,在当前的仓库下面,使用以下命令来重置一下密码 git …...

有哪些适用于 Windows 的PDF 阅读器?免费 PDF 阅读器清单

探索适用于 Windows 10 和 11 的最佳 PDF 阅读器 适用于 Windows 10 和 Windows 11 的最佳 PDF 阅读器让您可以在台式计算机上查看和共享文档。 最好的PDF 编辑器和免费的 PDF 编辑器配备了先进的工具,可以跨不同的操作系统工作。但是,当您只需要查看和…...

避雷!新增2本期刊被标记为「On Hold」,1区TOP刊仍在调查中!

近期小编在Master Journal List上查询期刊时偶然发现,又有2本期刊被科睿唯安标记为「On Hold」! 这2本期刊分别为MIGRATION LETTERS和REVISTA DE GESTAO E SECRETARIADO-GESEC. 此外还有6本期刊被标记为「On Hold」,目前共计8本期刊被「On …...

iOS 配置通用链接(Universal Link)服务端和开发者后台都配置好了,还是跳转不到App

目录 一、什么是 Universal Link? 1.背景介绍 2.特点 3.运行机制原理&流程图 二、配置教程 1.第一步:开启 Associated Domains 服务 1.1 开通 Associated Domains 2.第二步:服务器配置 apple-app-site-association(AAS…...

【环境】Linux下Anaconda/ Miniconda安装+百度Paddle环境搭建+Cudnn(3090显卡+CUDA11.8+cudnn8.6.0)

清华源帮助链接:https://mirror.tuna.tsinghua.edu.cn/help/anaconda/ 下载链接:https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/ 其他深度学习环境相关博文:【stable-diffusion】4090显卡下dreambooth、lora、sd模型微调的GUI环境…...

【Python机器学习】零基础掌握AdaBoostRegressor集成学习

有没有经历过这样的状况:需要预测未来房价走势,但传统的预测方法并不总是准确? 房价预测一直是人们关注的热点话题,无论是房产商、购房者,还是政府,都需要准确地知道未来房价的走势。那么,有没有一种更加精准、稳定的预测方法呢?答案是有的——AdaBoost Regressor算法…...

各种添加路由的方法

Linux 篇: ipv4: #添加到主机的路由 # route add –host 192.168.168.110 dev eth0 # route add –host 192.168.168.119 gw 192.168.168.1 #添加到网络的路由 # route add –net IP netmask MASK eth0 # route add –net IP netmask MASK gw IP # route add –n…...

MySQL外键

目录 一.外键 1.表与表之间建立关系 2.什么是外键 3.一对多关系 4.多对多关系 (1)建表会遇到的问题 (2)解决循环建表的问题 5.一对一关系、 6.小结 二.多表查询 1.数据准备 2.多表查询案例 (1)…...

自制数据库迁移工具-C版-02-HappySunshineV1.1-(支持Gbase8a)

目录 一、环境信息 二、简述 三、升级点 四、支持功能 五、安装包下载地址 六、配置参数介绍 七、安装步骤 1、配置环境变量 2、生效环境变量 3、检验动态链接是否正常 4、修改配置文件MigrationConfig.txt 八、运行效果 一、环境信息 名称值CPUIntel(R) Core(TM) i…...

k8s创建pod-affinity亲和性时报错解决办法

1.如下报错 Error from server (BadRequest): error when creating “pod-required-affinity-demo-2.yaml”: Pod in version “v1” cannot be handled as a Pod: json: cannot unmarshal string into Go struct field LabelSelectorRequirement.spec.affinity.podAffinity.re…...

基于边缘智能网关的储能系统安全监测管理方案

“储能系统充电”是配套新能源汽车产业发展的重要应用之一。得益于电池技术的发展,新能源汽车正逐步迈入快充时代,由于在使用快速充电桩时,可能导致用电峰值负荷超过电网的承载能力,对于电网的稳定性和持续性会有较大影响&#xf…...

大数据Flink(一百零一):SQL 表值函数(Table Function)

文章目录 SQL 表值函数(Table Function) SQL 表值函数(Table Function) Python UDTF,即 Python TableFunction,针对每一条输入数据,Python UDTF 可以产生 0 条、1 条或者多条输出数据,此外,一条输出数据可以包含多个列。比如以下示例,定义了一个名字为 split 的Pyt…...

exp1_code

#include <iostream> using namespace std; // 链栈节点结构 struct StackNode { int data; StackNode* next; StackNode(int val) : data(val), next(nullptr) {} }; // 顺序栈实现 class SeqStack { private: int* data; int top; int capac…...

Unity的日志管理类

脚本功能&#xff1a; 1&#xff0c;打印日志到控制台 2&#xff0c;显示日志到UI Text 3&#xff0c;将日志写入本地文件 这对unity开发安卓平台来说很有用 using System; using System.IO; using System.Text; using UnityEngine; using UnityEngine.UI;public class FileLo…...

STM32什么是寄存器

提示&#xff1a;文章 文章目录 前言一、背景二、2.12.2 三、3.1 总结 前言 前期疑问&#xff1a; 1、什么是寄存器&#xff1f; 答&#xff1a;在4GB的地址空间中&#xff0c;512MB的block2上&#xff0c;每4个字节组成32位&#xff0c;这个32位为一个单元&#xff0c;控制&a…...

MySQL Binlog 数据恢复全指南

MySQL Binlog 数据恢复全指南 一、Binlog 核心概念 1. 什么是 Binlog&#xff1f; Binlog&#xff08;二进制日志&#xff09;是 MySQL 记录所有修改数据的 SQL 语句的日志文件&#xff0c;采用二进制格式存储。它是 MySQL 最重要的日志之一&#xff0c;具有三大核心功能&am…...

Ubuntu18.6 学习QT问题记录以及虚拟机安装Ubuntu后的设置

Ubuntu安装 1、VM 安装 Ubuntu后窗口界面太小 Vmware Tools 工具安装的有问题 处理办法&#xff1a; 1、重新挂载E:\VMwareWorkstation\linux.iso文件&#xff0c;该文件在VMware安装目录下 2、Ubuntu桌面出现vmtools共享文件夹&#xff0c;将gz文件拷贝至本地&#xff0c;解…...

Web前端基础:JavaScript

1.JS核心语法 1.1 JS引入方式 第一种方式&#xff1a;内部脚本&#xff0c;将JS代码定义在HTML页面中 JavaScript代码必须位于<script></script>标签之间在HTML文档中&#xff0c;可以在任意地方&#xff0c;放置任意数量的<script></script>一般会把…...

SAP 在 AI 与数据统一平台上的战略转向

在 2025 年 SAP Sapphire 大会上&#xff0c;SAP 展示了其最新的产品战略和技术整合方向&#xff0c;与以往不同的是&#xff0c;今年的讨论更加务实、聚焦客户实际需求。SAP 强调&#xff0c;ERP 的转型不再是“一刀切”或破坏性的&#xff0c;而是可以根据客户现状&#xff0…...

React Navive初识

文章目录 搭建开发环境安装 Node、homebrew、Watchman安装 Node安装 homebrew安装 watchman 安装 React Native 的命令行工具&#xff08;react-native-cli&#xff09;创建新项目编译并运行 React Native 应用在 ios 模拟器上运行 调试访问 App 内的开发菜单 搭建开发环境 在…...

用HTML5 Canvas打造交互式心形粒子动画:从基础到优化实战

用HTML5 Canvas打造交互式心形粒子动画&#xff1a;从基础到优化实战 引言 在Web交互设计中&#xff0c;粒子动画因其动态美感和视觉吸引力被广泛应用于节日特效、情感化界面等场景。本文将通过实战案例&#xff0c;详细讲解如何使用HTML5 Canvas和JavaScript实现一个「心之律…...

探索C++标准模板库(STL):String接口的底层实现(下篇)

前引&#xff1a;在C的面向对象编程中&#xff0c;对象模型是理解语言行为的核心。无论是类的成员函数如何访问数据&#xff0c;还是资源管理如何自动化&#xff0c;其底层机制均围绕两个关键概念展开&#xff1a;this指针与六大默认成员函数。它们如同对象的“隐形守护者”&am…...