0基础学习PyFlink——使用DataStream进行字数统计
大纲
- source
- Map
- Splitting
- Mapping
- Reduce
- Keying
- Reducing
- 完整代码
- 结构
- 参考资料
在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。

本节介绍的DataStream API,则使用了类似的结构。
source
为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)
可以使用下面指令输出source内容
source.print()
A C B
A E B
E C D
Map
和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。
Splitting
def split(line):for s in line.split():yield ssplitted = source.flat_map(split)
上述splitted的结构输出是
A
C
B
A
E
B
E
C
D
Mapping
Mapping的操作就是将之前的数组结构转换成map结构
mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))
mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。
(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)
Reduce
Keying
这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。
keyed=mapped.key_by(lambda i: i[0])
可以看到keyed数据已经经过排序和聚合了。
(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)
Reducing
reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))
reduce的方法有如下注释
Applies a reduce transformation on the grouped data stream grouped on by the given
key position. TheReduceFunctionwill receive input values based on the key value.
Only input values with the same key will go to the same reducer.
特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。
(A,2)
(B,2)
(C,2)
(D,1)
(E,2)
完整代码
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()
结构

参考资料
- https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/
相关文章:
0基础学习PyFlink——使用DataStream进行字数统计
大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API,则使用了类似的结构。 source 为了方便&…...
OpenCV官方教程中文版 —— 图像去噪
OpenCV官方教程中文版 —— 图像去噪 前言一、原理二、OpenCV 中的图像去噪1.cv2.fastNlMeansDenoisingColored()2.cv2.fastNlMeansDenoisingMulti() 前言 目标 • 学习使用非局部平均值去噪算法去除图像中的噪音 • 学习函数 cv2.fastNlMeansDenoising(),cv2.fa…...
AcWing 102. 最佳牛围栏(前缀和+二分+DP)
AcWing 102. 最佳牛围栏 1、问题 2、分析 (1)暴力做法 看到这道题以后,我们可以先想一个最暴力的做法,就是我们去枚举所有长度至少为 F F F的区间,然后求出这个区间的和,再求出这个区间的平均值。最后在…...
React-表单受控绑定和获取Dom元素
一、表单受控组件 1.声明一个react状态 说明:useState const [value,setValue]useState("") 2.核心绑定流程 2.1绑定react状态 <div><input value{value}type"text"></input> 2.2绑定onChange事件 说明:e.…...
python hashlib模块及实例
hashlib 模块密码加密密码撞库密码加盐 一,hashlib模块 hashlib模块是用来为字符串进行加密的模块,通过该作用就可以为用户的密码进行加密。 通过模块中的hash算法可以为任意长度的字符串加密成长度相同的一串hash值。该hash算法得到的hash值有一下几个…...
垃圾回收GC
为什么要有垃圾回收? JVM之所以要有垃圾回收,是因为它能够自动管理内存,避免内存泄漏和内存溢出的问题,垃圾回收机制会自动检测和清理不再使用的对象,释放内存空间,使得开发者不需要手动管理内存,降低了开发难度和错误风险,同时,垃圾回收还可以优化内存分配,提高程序性能和响…...
kubernetes-service微服务
目录 一、service微服务 二、Ipvs模式 三、ClusterIP 1.ClusterIP 2.headless 四、NodePort 1.NodePort 2.默认端口 五、LoadBalancer 1.LoadBalancer 2.metallb 六、ExternalName 一、service微服务 Kubernetes Service微服务是一种基于Kubernetes的微服务架构&…...
让你笑到不行的笑话短视频接口,快来试试!
11在当今这个快节奏的社会中,笑话成为了许多人调节情绪的有效方法。如今,短视频平台已经成为了最受欢迎的娱乐方式之一,因此,将笑话和短视频结合起来,成为了一种很有趣的方式来带给我们欢乐。今天我们要介绍的是挖数据…...
系列四十五、Spring的事务传播行为案例演示(五)#MANDATORY
一、演示Spring的传播行为(MANDATORY) 1.1、StockServiceImplMANDATORY /*** Author : 一叶浮萍归大海* Date: 2023/10/30 15:43* Description: 演示MANDAORY的传播行为* 外部不存在事务:抛出异常 No existing transaction found for…...
idea插件(二)-- String Manipulation(字符串处理工具)
目录 1. 安装 String Manipulation 2. 默认快捷键 3. 操作说明 3.1 变量名的形式处理 3.2 文本形式的转化...
HQChart实战教程67-worker批量计算股票指标
HQChart实战教程67-worker批量计算股票指标 什么是Worker批量指标计算示例地址步骤1. 创建一个后台工作线程类2. 发送指标计算任务3. 接收计算结果数据对接 完整源码demo_workerthread_sina.htmlhqchart_worker_sina.js HQChart插件源码地址 什么是Worker Worker 接口是 Web W…...
博客系统自动化测试项目实践
文章目录 一.测试需求分析1.功能分析2.非功能分析 二.制定测试方案(计划 策略)三.编写测试用例四.执行自动化测试用例五.编写测试报告六.项目总结 一.测试需求分析 1.功能分析 通过功能测试需求分析 2.非功能分析 非功能分析主要从:界面,性能,安全性,…...
软考高级之系统架构师系列之操作系统基础
概念 接口 操作系统为用户提供两类接口:操作一级的接口和程序控制一级的接口。操作一级的接口包括操作控制命令、菜单命令等;程序控制一级的接口包括系统调用。 UMA和NUMA UMA,统一内存访问,Uniform Memory Access,…...
制作一个可以arm架构下运行的docker镜像(for Python)
看完本篇文章,你将得到一个可以arm架构下运行的python 基础镜像。 题外话 这里直接说docker镜像有点儿草率,因为目前很多容器都是Podman了。 podman的介绍 arm和aarch傻傻分不清楚 现在这两个是一样的意思了。 arm64和aarch64之间的区别 开始制作镜…...
Goland连接服务器/虚拟机远程编译开发
创建SSH连接 SSH用于与远程服务器建立连接 Settings -> Tools -> SSH Configurations 添加新的ssh连接,Host为ip地址,Username为用户名,认证方式这里选择密码验证 全部填完后可以点击Test Connection测试连接是否成功 创建Deployment…...
大数据Doris(十四):Doris表中的数据基本概念
文章目录 Doris表中的数据基本概念 一、Row & Column...
【Linux】Linux环境配置以及部署项目后端
🥳🥳Welcome Huihuis Code World ! !🥳🥳 接下来看看由辉辉所写的关于Linux的相关操作吧 目录 🥳🥳Welcome Huihuis Code World ! !🥳🥳 一.Linux环境配置 1.JDK ①上传安装包到…...
RabbitMQ消费者的可靠性
目录 一、消费者确认 二、失败重试机制 2.1、失败处理策略 三、业务幂等性 3.1、唯一消息ID 3.2、业务判断 3.3、兜底方案 一、消费者确认 RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后&#x…...
云计算助力史上首届“云上亚运”圆满成功!
201金,魔幻的BGM,以及崛起的中国科技,让杭州亚运会成功出圈。 很多网友表示太震撼了!开幕式很漂亮,杭州为了奥运造新城真豪横,看完一整个文化自信住! 赛场内外除了无数个令人感动的瞬间&#…...
博彦科技:以金融为起点,凭借创新技术平台真打实干
【科技明说 | 重磅专题】 成立于1995年的博彦科技,已有28年左右的发展历程。 我没有想到,博彦科技也对AIGC领域情有独钟。博彦科技自研的数字人产品SaaS平台,可以接入包括百度文心一言、阿里通义千问等AI大模型产品。可见&#…...
飞跨电容三电平拓扑的实战解析:从数学原理到SiC MOSFET的高频设计
1. 飞跨电容三电平拓扑的数学起源 飞跨电容三电平(FCML)拓扑的命名并非随意,它实际上植根于18世纪的数学拓扑学。数学拓扑学研究的是几何图形在连续变形下保持不变的性质,这个概念最早由欧拉在1736年研究柯尼斯堡七桥问题时提出。…...
嵌入式EEPROM文件化存储库:轻量级持久化方案
1. 项目概述PersistentStorage 是一个面向嵌入式设备 EEPROM 的轻量级、文件语义化持久化存储库,专为资源受限的 MCU(如 ESP32、STM32F0/F1、nRF52 等)设计。其核心设计理念是在无文件系统(FS)的裸机或 RTOS 环境中&am…...
Python 批量导出数据库数据至 Excel 文件页
简介 langchain专门用于构建LLM大语言模型,其中提供了大量的prompt模板,和组件,通过chain(链)的方式将流程连接起来,操作简单,开发便捷。 环境配置 安装langchain框架 pip install langchain langchain-community 其中…...
FPGA JESD204B链路调试实战:从时钟配置到同步状态解析
1. JESD204B接口基础:关键参数解析 第一次接触JESD204B接口时,我被那一堆参数搞得晕头转向。M、N、N、F、K这些字母组合看起来像密码一样,但理解它们对后续调试至关重要。让我用最直白的语言帮你梳理清楚。 M代表转换器数量,这个最…...
3分钟搞定Goods查询页:Map传参+StringUtils分割符实战(附避坑指南)
3分钟搞定商品查询页:Map传参与字符串分割的高效实践 商品查询功能作为电商系统的核心模块,其性能与用户体验直接影响转化率。本文将聚焦查询页开发中的两个关键技术点:Map传参优化与StringUtils分割技巧,通过可落地的代码示例&a…...
【.NET 9低代码开发终极指南】:20年微软生态专家亲授——零前端经验如何3天交付生产级业务应用?
第一章:.NET 9低代码开发全景认知与核心价值定位.NET 9 将低代码能力深度融入平台原生架构,不再依赖第三方插件或独立运行时,而是通过统一的组件模型、声明式 UI 编程范式与智能元数据驱动机制,实现“写少做多”的开发体验。其核心…...
胡桃讲编程・蓝屏急救进阶:磁盘修复 + 引导修复 + 网络修复,排除法根治顽固蓝屏
(微星 GL62M 7REX 地下程序员实操版 | 上期指令无效必看)哈喽各位自学开发、被蓝屏折磨到头疼的小伙伴们,我是胡桃~上一期咱们讲了三种高频蓝屏触发场景,还有系统修复的基础命令,不少朋友留言说:…...
保姆级教程:用海康VM搞定机械臂90度旋转放置的坐标纠偏(附旋转计算模块配置)
工业视觉实战:海康VM在机械臂90度旋转放置中的坐标纠偏全解析 当机械臂在放置前需要旋转90度时,视觉引导系统输出的坐标往往会出现偏差。这个问题困扰着不少自动化工程师——明明标定做得一丝不苟,为什么实际放置时还是会出现偏移?…...
飞利浦PhilipsMP系列监护仪协议对接实战指南
1. 飞利浦PhilipsMP系列监护仪协议对接基础 第一次接触医疗设备协议对接的开发者,可能会觉得这是个神秘的黑盒子。其实飞利浦MP系列的协议对接并没有想象中那么复杂,关键是要理解它的通信逻辑。我最早接触MP20的时候也踩过不少坑,后来发现只要…...
【RocketMQ】消息重试机制深度解析:从异常处理到死信队列的最佳实践
1. RocketMQ消息重试机制全景解读 第一次接触RocketMQ的重试功能时,我踩过一个坑:线上系统突然出现大量消息堆积,排查后发现是消费者处理异常导致消息不断重试。这个经历让我深刻认识到,理解消息重试机制是保障分布式系统可靠性的…...
