【入门Flink】- 02Flink经典案例-WordCount
WordCount
需求:统计一段文字中,每个单词出现的频次
添加依赖
<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>
1.批处理
基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。
1.1.数据准备
resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt
words.txt
hello flink
hello world
hello java
1.2.代码编写
public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)String filePath = Objects.requireNonNull(BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataSource<String> lineDS = env.readTextFile(filePath);// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}
打印结果如下:(结果正确)

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。
事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
2.流处理
DataStreamAPI可以直接处理批处理和流处理的所有场景
2.1读取文件
还是上述words.txt文件
代码实现:
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件String filePath = Objects.requireNonNull(StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataStreamSource<String> lineStream = env.readTextFile(filePath);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}
与批处理程序BatchWordCount有几点不同:
- 创建执行环境的不同,流处理程序使用的是
StreamExecutionEnvironment。 - 转换处理之后,得到的数据对象类型不同。
- 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
- 最后执行execute方法,开始执行任务。
2.2读取Socket文件流
实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。
- 简单改动,只需将StreamWordCount 代码中读取文件数据的
readTextFile方法,替换成读取socket文本流的方法socketTextStream。
public class StreamSocketWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}
- 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777
注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。
- 从Linux发送数据
1、输入“hello flink”,输出如下内容

2、再输入“hello world”,输出如下内容

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据
相关文章:
【入门Flink】- 02Flink经典案例-WordCount
WordCount 需求:统计一段文字中,每个单词出现的频次 添加依赖 <properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><…...
go语言将cmd stdout和stderr作为字符串返回而不是打印到控制台
go语言将cmd stdout和stderr作为字符串返回而不是打印到控制台 1、直接打印到控制台 从 golang 应用程序中执行 bash 命令,现在 stdout 和 stderr 直接进入控制台: cmd.Stdout os.Stdout cmd.Stderr os.Stderrpackage mainimport ("fmt"…...
OpenGL ES入门教程(二)之绘制一个平面桌子
OpenGL ES入门教程(二)之绘制一个平面桌子 前言0. OpenGL绘制图形的整体框架概述1. 定义顶点2. 定义着色器3. 加载着色器4. 编译着色器5. 将着色器链接为OpenGL程序对象6. 将着色器需要的数据与拷贝到本地的数组相关联7. 在屏幕上绘制图形8. 让桌子有边框…...
el-select 搜索无选项时 请求接口添加输入的值
el-select 搜索无选项时 请求接口添加输入的值 <template><div class"flex"><el-select class"w250" v-model"state.brand.id" placeholder"请选择" clearable filterable :filter-method"handleQu…...
基于单片机的商场防盗防火系统设计
收藏和点赞,您的关注是我创作的动力 文章目录 概要 一、系统分析二、系统总设计2.1基于单片机的商场防火防盗系统的总体功能2.2系统的组成 三 软件设计4.1软件设计思路4.2软件的实现4.2.1主控模块实物 四、 结论五、 文章目录 概要 本课题设计一种商场防火防盗报警…...
【Java|golang】2103. 环和杆---位运算
总计有 n 个环,环的颜色可以是红、绿、蓝中的一种。这些环分别穿在 10 根编号为 0 到 9 的杆上。 给你一个长度为 2n 的字符串 rings ,表示这 n 个环在杆上的分布。rings 中每两个字符形成一个 颜色位置对 ,用于描述每个环: 第 …...
[SSD综述 1.4] SSD固态硬盘的架构和功能导论
依公知及经验整理,原创保护,禁止转载。 专栏 《SSD入门到精通系列》 <<<< 返回总目录 <<<< 前言 机械硬盘的存储系统由于内部结构, 其IO访问性能无法进一步提高,CPU与存储器之间的性能差距逐渐扩大。以Nand Flash为存储介质的固态硬盘技术的发展,…...
【C++那些事儿】类与对象(1)
君兮_的个人主页 即使走的再远,也勿忘启程时的初心 C/C 游戏开发 Hello,米娜桑们,这里是君兮_,我之前看过一套书叫做《明朝那些事儿》,把本来枯燥的历史讲的生动有趣。而C作为一门接近底层的语言,无疑是抽象且难度颇…...
集简云x slack(自建)无需API开发轻松连接OA、电商、营销、CRM、用户运营、推广、客服等近千款系统
slack是一个工作效率管理平台,让每个人都能够使用无代码自动化和 AI 功能,还可以无缝连接搜索和知识共享,并确保团队保持联系和参与。在世界各地,Slack 不仅受到公司的信任,同时也是人们偏好使用的平台。 官网&#x…...
JS模块化,ESM模块规范的 导入、导出、引用、调用详解
JS模块化,ESM模块规范的 导入、导出、引用、调用详解 写在前面实例代码1、模块导出 - export导出之 - 独立导出导出之 - 集中多个导出导出之 - 默认导出导出之 - 集中默认导出导出之 - 混合导出 2、模块导入 - import导入之 - 全部导入导入之 - 默认导入导入之 - 指…...
markdown常用的快捷键
一级标题 #加 空格 是一级标题 二级标题 ##加空格是二级标题 三级标题 字体 * 粗体:两个**号 斜体:一个 斜体加粗:三个 删除:两个~~ 我是字体 我是字体 我是字体 我是字体 引用 箭头符号>加空格 回车 分割线 三个 - …...
VSCode中的任务什么情况下需要配置多个问题匹配器problemMatcher?多个问题匹配器之间的关系是什么?
☞ ░ 前往老猿Python博客 ░ https://blog.csdn.net/LaoYuanPython 一、简介 在 VS Code 中,tasks.json 文件中的 problemMatcher 字段用于定义如何解析任务输出中的问题(错误、警告等)。 problemMatcher是一个描述问题匹配器的接口&…...
C语言鞍点数组改进版
题目内容: 给定一个n*n矩阵A。矩阵A的鞍点是一个位置(i,j),在该位置上的元素是第i行上的最大数,第j列上的最小数。一个矩阵A也可能没有鞍点。 你的任务是找出A的鞍点。 改进目标: 网络上很多…...
K8s:部署 CNI 网络组件+k8s 多master集群部署+负载均衡及Dashboard k8s仪表盘图像化展示
目录 1 部署 CNI 网络组件 1.1 部署 flannel 1.2 部署 Calico 1.3 部署 CoreDNS 2 负载均衡部署 3 部署 Dashboard 1 部署 CNI 网络组件 1.1 部署 flannel K8S 中 Pod 网络通信: ●Pod 内容器与容器之间的通信 在同一个 Pod 内的容器(Pod 内的容…...
【数据结构】树家族
目录 树的相关术语树家族二叉树霍夫曼树二叉查找树 BST平衡二叉树 AVL红黑树伸展树替罪羊树 B树B树B* 树 当谈到数据结构中的树时,我们通常指的是一种分层的数据结构,它由节点(nodes)组成,这些节点之间以边(…...
Vert.x学习笔记-Vert.x的基本处理单元Verticle
Verticle介绍 Verticle是Vert.x的基本处理单元,Vert.x应用程序中存在着处理各种事件的处理单元,比如负责HTTP API响应请求的处理单元、负责数据库存取的处理单元、负责向第三方发送请求的处理单元。Verticle就是对这些功能单元的封装,Vertic…...
干货分享:基于 LSTM 的广告库存预估算法
近年来,随着互联网的发展,在线广告营销成为一种非常重要的商业模式。出于广告流量商业化售卖和日常业务投放精细化运营的目的,需要对广告流量进行更精准的预估,从而更精细的进行广告库存管理。 因此,携程广告纵横平台…...
dataframe删除某一列
drop import pandas as pd data {‘A’: [1, 2, 3], ‘B’: [4, 5, 6], ‘C’: [7, 8, 9]} df pd.DataFrame(data) #使用drop方法删除列 df df.drop(‘B’, axis1) # 通过指定列名和axis1来删除列 del import pandas as pd data {‘A’: [1, 2, 3], ‘B’: [4, 5, 6]…...
提升ChatGPT答案质量和准确性的方法Prompt engineering
文章目录 怎么获得优质的答案设计一个优质prompt的步骤:Prompt公式:示例怎么获得优质的答案 影响模型回答精确度的因素 我们应该知道一个好的提示词,要具备一下要点: 清晰简洁,不要有歧义; 有明确的任务/问题,任务如果太复杂,需要拆分成子任务分步完成; 确保prompt中…...
SpringBoot + Vue2项目打包部署到服务器后,使用Nginx配置SSL证书,配置访问HTTP协议转HTTPS协议
配置nginx.conf文件,这个文件一般在/etc/nginx/...中,由于每个人的体质不一样,也有可能在别的路径里,自己找找... # 配置工作进程的最大连接数 events {worker_connections 1024; }# 配置HTTP服务 http {# 导入mime.types配置文件…...
Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...
React第五十七节 Router中RouterProvider使用详解及注意事项
前言 在 React Router v6.4 中,RouterProvider 是一个核心组件,用于提供基于数据路由(data routers)的新型路由方案。 它替代了传统的 <BrowserRouter>,支持更强大的数据加载和操作功能(如 loader 和…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
大语言模型如何处理长文本?常用文本分割技术详解
为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态
前言 在人工智能技术飞速发展的今天,深度学习与大模型技术已成为推动行业变革的核心驱动力,而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心,系统性地呈现了两部深度技术著作的精华:…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...
