当前位置: 首页 > news >正文

0基础学习PyFlink——使用PyFlink的SQL进行字数统计

在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中,我们使用了Python基础函数实现了字(符)统计的功能。这篇我们将切入PyFlink,使用这个框架实现字数统计功能。

PyFlink安装

安装Python

sudo apt install python3.10
sudo ln -s /usr/bin/python3.10 /usr/bin/python

安装虚拟环境

sudo apt install python3.10-venv

创建工程所在文件夹,并创建虚拟环境

mkdir pyflink-test
cd pyflink-test
python -m venv .env

进入虚拟环境,并安装PyFlink

source .env/bin/activate
pip3.10 install apache-flink

统计代码

Flink为开发者提供了如下不同层级的抽象。本篇我们将尽量使用SQL来实现功能。
在这里插入图片描述

创建环境

执行环境用于设置任务的属性(batch还是stream),以及一些运行时参数(parallelism.default等)。
和Hadoop不同的是,Flink是流批一体(既可以处理流,也可以处理批处理)的引擎,而前者是批处理引擎。
批处理很好理解,即给一批数据,我们一次性、成批处理完成。
而流处理则是指,数据源源不断进入引擎,没有尽头。
本文不对此做过多展开,只要记得本例使用的是批处理模式(in_batch_mode)即可。

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)

Source

在前两篇文章中,我们使用内存中的常规结构体,如dict等来保存Map过后的数据。而本文介绍的SQL方式,则是通过Table(表)的形式来存储,即输入的数据会Map到一张表中

    # define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')

这张表只有一个字段——String类型的word。它用于记录被切分后的一个个字符串。
这儿有个关键字with。它可以用于描述数据读写相关信息,即完成数据读写相关的设置。
connector用于指定连接方式,比如filesystem是指文件系统,即数据读写目标是一个文件;jdbc则是指一个数据库,比如mysql;kafka则是指一个Kafka服务。
format用于指定如何把二进制数据映射到表的列上。比如CSV,则是用“,”进行列的切割。

Execute

    # execute insertmy_select_ddl = """select word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

上述SQL我们按source表中的word字段聚类,统计每个字符出现的个数。
完整输出如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
OK
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+
5 rows in set

完整代码

# sql_print.py
import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')my_select_ddl = """select word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).print()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

测试的输入文件

“A”,
“B”,
“C”,
“D”,
“A”,
“E”,
“C”,
“D”,
“A”,

运行的指令是

python sql_print.py --input input1.csv

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/

相关文章:

0基础学习PyFlink——使用PyFlink的SQL进行字数统计

在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中,我们使用了Python基础函数实现了字(符)统计的功能。这篇我们将切入PyFlink,使用这个框架实现字数统计功能。 PyFl…...

【Java系列】ArrayList

ArrayList 添加元素访问元素修改元素删除元素计算大小迭代数组列表其他的引用类型ArrayList 排序Java ArrayList 方法系列文章系列文章版本记录 引言 ArrayList 类是一个可以动态修改的数组,与普通数组的区别就是它是没有固定大小的限制,我们可以添加或删…...

sqlalchemy 使用

Python中强大的通用ORM框架:SQLAlchemy - 知乎...

Python深度学习实战-基于class类搭建BP神经网络实现分类任务(附源码和实现效果)

实现功能 上篇文章介绍了用Squential搭建BP神经网络,Squential可以搭建出上层输出就是下层输入的顺序神经网络结构,无法搭出一些带有跳连的非顺序网络结构,这个时候我们可以选择类class搭建封装神经网络结构。 第一步:import ten…...

GIS 数据结构整理:网格索引

1 一维网格索引 把整个数据库数值空间划分成n*n的正方形网格,建立另一个倒排文件——栅格索引每一个网格在栅格索引中有一个索引条目(记录),在这个记录中登记所有位于或穿过该网格的物体的关键字 1.1 变长指针法 在这个网格的物体,按照序号…...

【打靶】vulhub打靶系列(一)—小白视野的渗透测试

主机探测 arpscan arp-scan -l 另一种方法arping for i in $(seq 1 200); do sudo arping -c 1 192.168.56.$i; done 注意这个必须是root权限 ​ 端口探测 nmap nmap -p- -sV -T4 192.168.56.104 发现8080端口 web测试 访问下web页面 1、通过逻辑点绕过 发送到xia_sq…...

kafka3.X集群安装(不使用zookeeper)

参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本 一、kafka集群实例角色规划 在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。 上图中黑色代表broker(消息代理服务)&…...

2023 年 的 DBA 有哪些变化?

作者:Craig S. Mullins 数据库专家,IBM 优化冠军,DB2 金牌顾问以及 IDUG 名人堂成员,数据库类畅销书作者,著有《DB2 Developers Guide》、《Database Administration: The Complete Guide to DBA Practices & Pro…...

vs2022 使用git同步报错以及解决每次推送要输入密码问题

1.使用 git GUI工具,例如:TortoiseGit ,把全局配置文件这样设置一下 设置全局.config ,这样即可。 [credential] helper store 2.如果推送代码或拉取代码一直失败,在当前的仓库下面,使用以下命令来重置一下密码 git …...

有哪些适用于 Windows 的PDF 阅读器?免费 PDF 阅读器清单

探索适用于 Windows 10 和 11 的最佳 PDF 阅读器 适用于 Windows 10 和 Windows 11 的最佳 PDF 阅读器让您可以在台式计算机上查看和共享文档。 最好的PDF 编辑器和免费的 PDF 编辑器配备了先进的工具,可以跨不同的操作系统工作。但是,当您只需要查看和…...

避雷!新增2本期刊被标记为「On Hold」,1区TOP刊仍在调查中!

近期小编在Master Journal List上查询期刊时偶然发现,又有2本期刊被科睿唯安标记为「On Hold」! 这2本期刊分别为MIGRATION LETTERS和REVISTA DE GESTAO E SECRETARIADO-GESEC. 此外还有6本期刊被标记为「On Hold」,目前共计8本期刊被「On …...

iOS 配置通用链接(Universal Link)服务端和开发者后台都配置好了,还是跳转不到App

目录 一、什么是 Universal Link? 1.背景介绍 2.特点 3.运行机制原理&流程图 二、配置教程 1.第一步:开启 Associated Domains 服务 1.1 开通 Associated Domains 2.第二步:服务器配置 apple-app-site-association(AAS…...

【环境】Linux下Anaconda/ Miniconda安装+百度Paddle环境搭建+Cudnn(3090显卡+CUDA11.8+cudnn8.6.0)

清华源帮助链接:https://mirror.tuna.tsinghua.edu.cn/help/anaconda/ 下载链接:https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/ 其他深度学习环境相关博文:【stable-diffusion】4090显卡下dreambooth、lora、sd模型微调的GUI环境…...

【Python机器学习】零基础掌握AdaBoostRegressor集成学习

有没有经历过这样的状况:需要预测未来房价走势,但传统的预测方法并不总是准确? 房价预测一直是人们关注的热点话题,无论是房产商、购房者,还是政府,都需要准确地知道未来房价的走势。那么,有没有一种更加精准、稳定的预测方法呢?答案是有的——AdaBoost Regressor算法…...

各种添加路由的方法

Linux 篇: ipv4: #添加到主机的路由 # route add –host 192.168.168.110 dev eth0 # route add –host 192.168.168.119 gw 192.168.168.1 #添加到网络的路由 # route add –net IP netmask MASK eth0 # route add –net IP netmask MASK gw IP # route add –n…...

MySQL外键

目录 一.外键 1.表与表之间建立关系 2.什么是外键 3.一对多关系 4.多对多关系 (1)建表会遇到的问题 (2)解决循环建表的问题 5.一对一关系、 6.小结 二.多表查询 1.数据准备 2.多表查询案例 (1)…...

自制数据库迁移工具-C版-02-HappySunshineV1.1-(支持Gbase8a)

目录 一、环境信息 二、简述 三、升级点 四、支持功能 五、安装包下载地址 六、配置参数介绍 七、安装步骤 1、配置环境变量 2、生效环境变量 3、检验动态链接是否正常 4、修改配置文件MigrationConfig.txt 八、运行效果 一、环境信息 名称值CPUIntel(R) Core(TM) i…...

k8s创建pod-affinity亲和性时报错解决办法

1.如下报错 Error from server (BadRequest): error when creating “pod-required-affinity-demo-2.yaml”: Pod in version “v1” cannot be handled as a Pod: json: cannot unmarshal string into Go struct field LabelSelectorRequirement.spec.affinity.podAffinity.re…...

基于边缘智能网关的储能系统安全监测管理方案

“储能系统充电”是配套新能源汽车产业发展的重要应用之一。得益于电池技术的发展,新能源汽车正逐步迈入快充时代,由于在使用快速充电桩时,可能导致用电峰值负荷超过电网的承载能力,对于电网的稳定性和持续性会有较大影响&#xf…...

大数据Flink(一百零一):SQL 表值函数(Table Function)

文章目录 SQL 表值函数(Table Function) SQL 表值函数(Table Function) Python UDTF,即 Python TableFunction,针对每一条输入数据,Python UDTF 可以产生 0 条、1 条或者多条输出数据,此外,一条输出数据可以包含多个列。比如以下示例,定义了一个名字为 split 的Pyt…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...

springboot 日志类切面,接口成功记录日志,失败不记录

springboot 日志类切面&#xff0c;接口成功记录日志&#xff0c;失败不记录 自定义一个注解方法 import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/***…...