0基础学习PyFlink——使用DataStream进行字数统计
大纲
- source
- Map
- Splitting
- Mapping
- Reduce
- Keying
- Reducing
- 完整代码
- 结构
- 参考资料
在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。

本节介绍的DataStream API,则使用了类似的结构。
source
为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)
可以使用下面指令输出source内容
source.print()
A C B
A E B
E C D
Map
和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。
Splitting
def split(line):for s in line.split():yield ssplitted = source.flat_map(split)
上述splitted的结构输出是
A
C
B
A
E
B
E
C
D
Mapping
Mapping的操作就是将之前的数组结构转换成map结构
mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))
mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。
(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)
Reduce
Keying
这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。
keyed=mapped.key_by(lambda i: i[0])
可以看到keyed数据已经经过排序和聚合了。
(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)
Reducing
reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))
reduce的方法有如下注释
Applies a reduce transformation on the grouped data stream grouped on by the given
key position. TheReduceFunctionwill receive input values based on the key value.
Only input values with the same key will go to the same reducer.
特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。
(A,2)
(B,2)
(C,2)
(D,1)
(E,2)
完整代码
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()
结构

参考资料
- https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/
相关文章:
0基础学习PyFlink——使用DataStream进行字数统计
大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API,则使用了类似的结构。 source 为了方便&…...
OpenCV官方教程中文版 —— 图像去噪
OpenCV官方教程中文版 —— 图像去噪 前言一、原理二、OpenCV 中的图像去噪1.cv2.fastNlMeansDenoisingColored()2.cv2.fastNlMeansDenoisingMulti() 前言 目标 • 学习使用非局部平均值去噪算法去除图像中的噪音 • 学习函数 cv2.fastNlMeansDenoising(),cv2.fa…...
AcWing 102. 最佳牛围栏(前缀和+二分+DP)
AcWing 102. 最佳牛围栏 1、问题 2、分析 (1)暴力做法 看到这道题以后,我们可以先想一个最暴力的做法,就是我们去枚举所有长度至少为 F F F的区间,然后求出这个区间的和,再求出这个区间的平均值。最后在…...
React-表单受控绑定和获取Dom元素
一、表单受控组件 1.声明一个react状态 说明:useState const [value,setValue]useState("") 2.核心绑定流程 2.1绑定react状态 <div><input value{value}type"text"></input> 2.2绑定onChange事件 说明:e.…...
python hashlib模块及实例
hashlib 模块密码加密密码撞库密码加盐 一,hashlib模块 hashlib模块是用来为字符串进行加密的模块,通过该作用就可以为用户的密码进行加密。 通过模块中的hash算法可以为任意长度的字符串加密成长度相同的一串hash值。该hash算法得到的hash值有一下几个…...
垃圾回收GC
为什么要有垃圾回收? JVM之所以要有垃圾回收,是因为它能够自动管理内存,避免内存泄漏和内存溢出的问题,垃圾回收机制会自动检测和清理不再使用的对象,释放内存空间,使得开发者不需要手动管理内存,降低了开发难度和错误风险,同时,垃圾回收还可以优化内存分配,提高程序性能和响…...
kubernetes-service微服务
目录 一、service微服务 二、Ipvs模式 三、ClusterIP 1.ClusterIP 2.headless 四、NodePort 1.NodePort 2.默认端口 五、LoadBalancer 1.LoadBalancer 2.metallb 六、ExternalName 一、service微服务 Kubernetes Service微服务是一种基于Kubernetes的微服务架构&…...
让你笑到不行的笑话短视频接口,快来试试!
11在当今这个快节奏的社会中,笑话成为了许多人调节情绪的有效方法。如今,短视频平台已经成为了最受欢迎的娱乐方式之一,因此,将笑话和短视频结合起来,成为了一种很有趣的方式来带给我们欢乐。今天我们要介绍的是挖数据…...
系列四十五、Spring的事务传播行为案例演示(五)#MANDATORY
一、演示Spring的传播行为(MANDATORY) 1.1、StockServiceImplMANDATORY /*** Author : 一叶浮萍归大海* Date: 2023/10/30 15:43* Description: 演示MANDAORY的传播行为* 外部不存在事务:抛出异常 No existing transaction found for…...
idea插件(二)-- String Manipulation(字符串处理工具)
目录 1. 安装 String Manipulation 2. 默认快捷键 3. 操作说明 3.1 变量名的形式处理 3.2 文本形式的转化...
HQChart实战教程67-worker批量计算股票指标
HQChart实战教程67-worker批量计算股票指标 什么是Worker批量指标计算示例地址步骤1. 创建一个后台工作线程类2. 发送指标计算任务3. 接收计算结果数据对接 完整源码demo_workerthread_sina.htmlhqchart_worker_sina.js HQChart插件源码地址 什么是Worker Worker 接口是 Web W…...
博客系统自动化测试项目实践
文章目录 一.测试需求分析1.功能分析2.非功能分析 二.制定测试方案(计划 策略)三.编写测试用例四.执行自动化测试用例五.编写测试报告六.项目总结 一.测试需求分析 1.功能分析 通过功能测试需求分析 2.非功能分析 非功能分析主要从:界面,性能,安全性,…...
软考高级之系统架构师系列之操作系统基础
概念 接口 操作系统为用户提供两类接口:操作一级的接口和程序控制一级的接口。操作一级的接口包括操作控制命令、菜单命令等;程序控制一级的接口包括系统调用。 UMA和NUMA UMA,统一内存访问,Uniform Memory Access,…...
制作一个可以arm架构下运行的docker镜像(for Python)
看完本篇文章,你将得到一个可以arm架构下运行的python 基础镜像。 题外话 这里直接说docker镜像有点儿草率,因为目前很多容器都是Podman了。 podman的介绍 arm和aarch傻傻分不清楚 现在这两个是一样的意思了。 arm64和aarch64之间的区别 开始制作镜…...
Goland连接服务器/虚拟机远程编译开发
创建SSH连接 SSH用于与远程服务器建立连接 Settings -> Tools -> SSH Configurations 添加新的ssh连接,Host为ip地址,Username为用户名,认证方式这里选择密码验证 全部填完后可以点击Test Connection测试连接是否成功 创建Deployment…...
大数据Doris(十四):Doris表中的数据基本概念
文章目录 Doris表中的数据基本概念 一、Row & Column...
【Linux】Linux环境配置以及部署项目后端
🥳🥳Welcome Huihuis Code World ! !🥳🥳 接下来看看由辉辉所写的关于Linux的相关操作吧 目录 🥳🥳Welcome Huihuis Code World ! !🥳🥳 一.Linux环境配置 1.JDK ①上传安装包到…...
RabbitMQ消费者的可靠性
目录 一、消费者确认 二、失败重试机制 2.1、失败处理策略 三、业务幂等性 3.1、唯一消息ID 3.2、业务判断 3.3、兜底方案 一、消费者确认 RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后&#x…...
云计算助力史上首届“云上亚运”圆满成功!
201金,魔幻的BGM,以及崛起的中国科技,让杭州亚运会成功出圈。 很多网友表示太震撼了!开幕式很漂亮,杭州为了奥运造新城真豪横,看完一整个文化自信住! 赛场内外除了无数个令人感动的瞬间&#…...
博彦科技:以金融为起点,凭借创新技术平台真打实干
【科技明说 | 重磅专题】 成立于1995年的博彦科技,已有28年左右的发展历程。 我没有想到,博彦科技也对AIGC领域情有独钟。博彦科技自研的数字人产品SaaS平台,可以接入包括百度文心一言、阿里通义千问等AI大模型产品。可见&#…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
