当前位置: 首页 > news >正文

0基础学习PyFlink——使用DataStream进行字数统计

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):for s in line.split():yield ssplitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/

相关文章:

0基础学习PyFlink——使用DataStream进行字数统计

大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API,则使用了类似的结构。 source 为了方便&…...

OpenCV官方教程中文版 —— 图像去噪

OpenCV官方教程中文版 —— 图像去噪 前言一、原理二、OpenCV 中的图像去噪1.cv2.fastNlMeansDenoisingColored()2.cv2.fastNlMeansDenoisingMulti() 前言 目标 • 学习使用非局部平均值去噪算法去除图像中的噪音 • 学习函数 cv2.fastNlMeansDenoising(),cv2.fa…...

AcWing 102. 最佳牛围栏(前缀和+二分+DP)

AcWing 102. 最佳牛围栏 1、问题 2、分析 (1)暴力做法 看到这道题以后,我们可以先想一个最暴力的做法,就是我们去枚举所有长度至少为 F F F的区间,然后求出这个区间的和,再求出这个区间的平均值。最后在…...

React-表单受控绑定和获取Dom元素

一、表单受控组件 1.声明一个react状态 说明&#xff1a;useState const [value,setValue]useState("") 2.核心绑定流程 2.1绑定react状态 <div><input value{value}type"text"></input> 2.2绑定onChange事件 说明&#xff1a;e.…...

python hashlib模块及实例

hashlib 模块密码加密密码撞库密码加盐 一&#xff0c;hashlib模块 hashlib模块是用来为字符串进行加密的模块&#xff0c;通过该作用就可以为用户的密码进行加密。 通过模块中的hash算法可以为任意长度的字符串加密成长度相同的一串hash值。该hash算法得到的hash值有一下几个…...

垃圾回收GC

为什么要有垃圾回收? JVM之所以要有垃圾回收,是因为它能够自动管理内存,避免内存泄漏和内存溢出的问题,垃圾回收机制会自动检测和清理不再使用的对象,释放内存空间,使得开发者不需要手动管理内存,降低了开发难度和错误风险,同时,垃圾回收还可以优化内存分配,提高程序性能和响…...

kubernetes-service微服务

目录 一、service微服务 二、Ipvs模式 三、ClusterIP 1.ClusterIP 2.headless 四、NodePort 1.NodePort 2.默认端口 五、LoadBalancer 1.LoadBalancer 2.metallb 六、ExternalName 一、service微服务 Kubernetes Service微服务是一种基于Kubernetes的微服务架构&…...

让你笑到不行的笑话短视频接口,快来试试!

11在当今这个快节奏的社会中&#xff0c;笑话成为了许多人调节情绪的有效方法。如今&#xff0c;短视频平台已经成为了最受欢迎的娱乐方式之一&#xff0c;因此&#xff0c;将笑话和短视频结合起来&#xff0c;成为了一种很有趣的方式来带给我们欢乐。今天我们要介绍的是挖数据…...

系列四十五、Spring的事务传播行为案例演示(五)#MANDATORY

一、演示Spring的传播行为&#xff08;MANDATORY&#xff09; 1.1、StockServiceImplMANDATORY /*** Author : 一叶浮萍归大海* Date: 2023/10/30 15:43* Description: 演示MANDAORY的传播行为* 外部不存在事务&#xff1a;抛出异常 No existing transaction found for…...

idea插件(二)-- String Manipulation(字符串处理工具)

目录 1. 安装 String Manipulation 2. 默认快捷键 3. 操作说明 3.1 变量名的形式处理 3.2 文本形式的转化...

HQChart实战教程67-worker批量计算股票指标

HQChart实战教程67-worker批量计算股票指标 什么是Worker批量指标计算示例地址步骤1. 创建一个后台工作线程类2. 发送指标计算任务3. 接收计算结果数据对接 完整源码demo_workerthread_sina.htmlhqchart_worker_sina.js HQChart插件源码地址 什么是Worker Worker 接口是 Web W…...

博客系统自动化测试项目实践

文章目录 一.测试需求分析1.功能分析2.非功能分析 二.制定测试方案&#xff08;计划 策略&#xff09;三.编写测试用例四.执行自动化测试用例五.编写测试报告六.项目总结 一.测试需求分析 1.功能分析 通过功能测试需求分析 2.非功能分析 非功能分析主要从:界面,性能,安全性,…...

软考高级之系统架构师系列之操作系统基础

概念 接口 操作系统为用户提供两类接口&#xff1a;操作一级的接口和程序控制一级的接口。操作一级的接口包括操作控制命令、菜单命令等&#xff1b;程序控制一级的接口包括系统调用。 UMA和NUMA UMA&#xff0c;统一内存访问&#xff0c;Uniform Memory Access&#xff0c…...

制作一个可以arm架构下运行的docker镜像(for Python)

看完本篇文章&#xff0c;你将得到一个可以arm架构下运行的python 基础镜像。 题外话 这里直接说docker镜像有点儿草率&#xff0c;因为目前很多容器都是Podman了。 podman的介绍 arm和aarch傻傻分不清楚 现在这两个是一样的意思了。 arm64和aarch64之间的区别 开始制作镜…...

Goland连接服务器/虚拟机远程编译开发

创建SSH连接 SSH用于与远程服务器建立连接 Settings -> Tools -> SSH Configurations 添加新的ssh连接&#xff0c;Host为ip地址&#xff0c;Username为用户名&#xff0c;认证方式这里选择密码验证 全部填完后可以点击Test Connection测试连接是否成功 创建Deployment…...

大数据Doris(十四):Doris表中的数据基本概念

文章目录 Doris表中的数据基本概念 一、​​​​​​​Row & Column...

【Linux】Linux环境配置以及部署项目后端

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于Linux的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.Linux环境配置 1.JDK ①上传安装包到…...

RabbitMQ消费者的可靠性

目录 一、消费者确认 二、失败重试机制 2.1、失败处理策略 三、业务幂等性 3.1、唯一消息ID 3.2、业务判断 3.3、兜底方案 一、消费者确认 RabbitMQ提供了消费者确认机制&#xff08;Consumer Acknowledgement&#xff09;。即&#xff1a;当消费者处理消息结束后&#x…...

云计算助力史上首届“云上亚运”圆满成功!

201金&#xff0c;魔幻的BGM&#xff0c;以及崛起的中国科技&#xff0c;让杭州亚运会成功出圈。 很多网友表示太震撼了&#xff01;开幕式很漂亮&#xff0c;杭州为了奥运造新城真豪横&#xff0c;看完一整个文化自信住&#xff01; 赛场内外除了无数个令人感动的瞬间&#…...

博彦科技:以金融为起点,凭借创新技术平台真打实干

【科技明说 &#xff5c; 重磅专题】 成立于1995年的博彦科技&#xff0c;已有28年左右的发展历程。 我没有想到&#xff0c;博彦科技也对AIGC领域情有独钟。博彦科技自研的数字人产品SaaS平台&#xff0c;可以接入包括百度文心一言、阿里通义千问等AI大模型产品。可见&#…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

破解路内监管盲区:免布线低位视频桩重塑停车管理新标准

城市路内停车管理常因行道树遮挡、高位设备盲区等问题&#xff0c;导致车牌识别率低、逃费率高&#xff0c;传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法&#xff0c;正成为破局关键。该设备安装于车位侧方0.5-0.7米高度&#xff0c;直接规避树枝遮…...

[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG

TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码&#xff1a;HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...

6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础

第三周 Day 3 &#x1f3af; 今日目标 理解类&#xff08;class&#xff09;和对象&#xff08;object&#xff09;的关系学会定义类的属性、方法和构造函数&#xff08;init&#xff09;掌握对象的创建与使用初识封装、继承和多态的基本概念&#xff08;预告&#xff09; &a…...

aardio 自动识别验证码输入

技术尝试 上周在发学习日志时有网友提议“在网页上识别验证码”&#xff0c;于是尝试整合图像识别与网页自动化技术&#xff0c;完成了这套模拟登录流程。核心思路是&#xff1a;截图验证码→OCR识别→自动填充表单→提交并验证结果。 代码在这里 import soImage; import we…...

命令行关闭Windows防火墙

命令行关闭Windows防火墙 引言一、防火墙:被低估的"智能安检员"二、优先尝试!90%问题无需关闭防火墙方案1:程序白名单(解决软件误拦截)方案2:开放特定端口(解决网游/开发端口不通)三、命令行极速关闭方案方法一:PowerShell(推荐Win10/11)​方法二:CMD命令…...