当前位置: 首页 > news >正文

Flink(一)【WordCount 快速入门】

前言

        学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

        最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

        这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。
        就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

        我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

API 环境搭建

添加依赖

pom.xml

<properties><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version>
</dependency>
</dependencies>

log4j.properties 

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 入门案例

0、数据准备

在 根目录下创建 words.txt

hello flink
hello java
hello spark
hello hadoop

1、批处理

批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个执行批式数据处理环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件中读取数据 String类型  批式数据处理环境得到的 DataSource 继承自 DataSetDataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 将每行数据转换成一个二元组类型// 输入类型: String 输出类型: Tuple2FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =// String lines: 输入数据行  Collector<Tuple2<String,Long>> out: 输出类型lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));  //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型// 4. 根据 word 分组UnsortedGrouping<Tuple2<String, Long>> wordGroup = wordAndOne.groupBy(0);   // 0 是索引位置// 5. 分组内进行聚合AggregateOperator<Tuple2<String, Long>> res = wordGroup.sum(1); // 1 也是索引位置// 6. 打印结果res.print();}
}

运行结果:

(hadoop,1)
(flink,1)
(hello,4)
(java,1)
(spark,1)Process finished with exit code 0

因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2、流处理

2.1、有界数据流处理

这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 2. 流式数据处理环境得到的 DataSource 继承自 DataStreamDataStreamSource<String> lineDS = env.readTextFile("input/words.txt");// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 根据 word 分组KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);// 5. 根据键对索引为 1 处的值进行合并SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);// 6. 输出结果res.print();// 7. 执行env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来}
}

运行结果:

3> (java,1)
13> (flink,1)
1> (spark,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
5> (hello,4)
15> (hadoop,1)

        我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

        Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

        我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

2.2、无界数据流处理

这里我们使用 netcat 来模拟产生数据流

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class UnBoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 2. 流式数据处理环境得到的 DataSource 继承自 DataStreamParameterTool parameterTool = ParameterTool.fromArgs(args);String host = parameterTool.get("host");Integer port = parameterTool.getInt("port");DataStreamSource<String> lineDS = env.socketTextStream(host,port);// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 根据 word 分组KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);// 5. 根据键对索引为 1 处的值进行合并SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);// 6. 输出结果res.print();// 7. 执行env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来}
}

运行结果: 

        可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

总结

        Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

        还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。

相关文章:

Flink(一)【WordCount 快速入门】

前言 学完了 Hadoop、Spark&#xff0c;本想着先把 Kafka、Flume 这些工具先学完的&#xff0c;但想了想还是把核心的技术先学完最后再去把那些工具学学。 最近心有点累哈哈哈&#xff0c;偷偷立个 flag&#xff0c;反正也没人看&#xff0c;明年的今天来这里还愿哈&#xff0c…...

【Redis】hash数据类型-常用命令

文章目录 前置知识常用命令HSETHGETHEXISTSHDELHKEYSHVALSHGETALLHMGET关于HMSETHLENHSETNXHINCRBYHINCRBYFLOAT 命令小结 前置知识 redis自身就是键值对结构了&#xff0c;哈希类型是指值本⾝⼜是⼀个键值对结构&#xff0c;形如key"key"&#xff0c;value{{field1…...

【大数据】Apache NiFi 数据同步流程实践

Apache NiFi 数据同步流程实践 1.环境2.Apache NIFI 部署2.1 获取安装包2.2 部署 Apache NIFI 3.NIFI 在手&#xff0c;跟我走&#xff01;3.1 准备表结构和数据3.2 新建一个 Process Group3.3 新建一个 GenerateTableFetch 组件3.4 配置 GenerateTableFetch 组件3.5 配置 DBCP…...

git怎么使用 拉取代码

废话不多说 直接开干 Git 是一款十分实用的版本控制工具&#xff0c;非常方便地管理代码的变更。但是&#xff0c;在使用 Git 过程中&#xff0c;不可避免地会遇到一些问题。其中&#xff0c;删除分支是一个常见的问题。 查看引用历史记录&#xff1a; git reflog找到你删除的…...

Apple :苹果将在明年年底推出自己的 AI,预计将随 iOS 18 一起推出

本心、输入输出、结果 文章目录 Apple &#xff1a;苹果将在明年年底推出自己的 AI&#xff0c;预计将随 iOS 18 一起推出前言三星声称库克相关图片弘扬爱国精神 Apple &#xff1a;苹果将在明年年底推出自己的 AI&#xff0c;预计将随 iOS 18 一起推出 编辑&#xff1a;简简单…...

数据结构-双向链表

1.带头双向循环链表&#xff1a; 前面我们已经知道了链表的结构有8种&#xff0c;我们主要学习下面两种&#xff1a; 前面我们已经学习了无头单向非循环链表&#xff0c;今天我们来学习带头双向循环链表&#xff1a; 带头双向循环链表&#xff1a;结构最复杂&#xff0c;一般用…...

CV计算机视觉每日开源代码Paper with code速览-2023.11.6

精华置顶 墙裂推荐&#xff01;小白如何1个月系统学习CV核心知识&#xff1a;链接 点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【点云3D目标检测】&#xff08;NeurIPS2023&#xff09;…...

GB28181学习(十五)——流传输方式

前言 基于GB/T28181-2022版本&#xff0c;实时流的传输方式包括3种&#xff1a; UDPTCP被动TCP主动 UDP 流程 注意&#xff1a; m字段指定传输方式为RTP/AVP&#xff1b; 抓包 SIP服务器发送INVITE请求&#xff1b; INVITE sip:xxx192.168.0.111:5060 SIP/2.0 Via: SIP…...

【Linux】:初识git || centos下安装git || 创建本地仓库 || 配置本地仓库 || 认识工作区/暂存区(索引)以及版本库

&#x1f4ee;1.初识git Git 原理与使用 课程⽬标 • 技术⽬标:掌握Git企业级应⽤&#xff0c;深刻理解Git操作过程与操作原理&#xff0c;理解⼯作区&#xff0c;暂存区&#xff0c;版本库的含义 • 技术⽬标:掌握Git版本管理&#xff0c;⾃由进⾏版本回退、撤销、修改等Git操…...

Vue 3 中,watch 和 watchEffect 的区别

结论先行&#xff1a; watch 和 watchEffect 都是监听器&#xff0c;都是用来监听响应式数据的变化并执行相应操作。区别是&#xff1a; watch&#xff1a;需要指明要监听的数据&#xff0c;而且在回调函数中可以获取到属性变化的前后值&#xff1b; 适用于需要精确控制监视…...

鲜花展示服务预约小程序的效果如何

鲜花产品的市场需求度非常高&#xff0c;互联网深入各个行业&#xff0c;很多鲜花商家都会通过线上建立平台实现产品销售、获客引流、转化复购、生意增长等&#xff0c;当然除了搭建鲜花商城小程序外&#xff0c;对鲜花供应商及门店还有展示预约方面的需求。 通过【雨科】平台可…...

Linux下多个盘符乱的问题处理

参考文档&#xff1a; linux下man fstab命令查看帮助&#xff0c;有一段说明&#xff0c;可以使用UUID&#xff0c;或者LABEL 来绑定盘。这里使用UUID来绑定 Instead of giving the device explicitly, one may indicate the filesystem that is to be mounted by its UUID …...

uniapp小程序使用web-view组件页面分享后,点击没有home小房子解决办法

uniapp小程序使用web-view组件页面分享后&#xff0c;点击没有home小房子解决办法 小程序 &#xff1a;IOS 测试正常&#xff0c; 安卓 不显示home 微信小程序使用的是全局自定义导航&#xff0c;通过首页 banner 跳转到一个 web-view 页面&#xff0c;展示官网。 web-view 页…...

SLAM_语义SLAM相关论文

目录 1. 综述 2. 相关文章 Probabilistic Data Association for Semantic SLAM VSO:Visual Semantic Odometry 语义信息分割运动物体...

【技巧】并发读取Mysql数据保证读取到的数据不重复

【技巧】并发读取Mysql数据保证读取到的数据不重复 使用场景: 并发场景下, 保证不获取到重复的数据 思路: 先通过 MYSQL锁 去占位打标识,然后再去取数据 相当于几个人抢蛋糕, A先把蛋糕打上记号 蛋糕是A的, 然后再慢慢吃 表结构 表 t_userid name val used_flag 是否使用…...

Lavarel异步队列的使用

系统为window 启动队列&#xff1a; php artisan queue:listen设置队列类 .env文件需设置&#xff1a;QUEUE_CONNECTIONredis <?phpnamespace App\Jobs;use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Disp…...

JVM知识分享(PPT在资源里)

一、前言 1.自动内存管理 有句经典的话是这样说&#xff0c;Java与C之间有一堵由内存动态分配和垃圾收集技术所围成的高墙&#xff0c;墙外面的人想进去&#xff0c;墙里面的人却想出来。对于Java程序员来说&#xff0c;在虚拟机自动内存管理机制的帮助下&#xff0c;不再需要…...

整合Salesforce Org需要避免的3大风险

管理多个Salesforce实例是成长型企业可能遇到的场景&#xff0c;每个Salesforce实例都包含可能需要整合的关键业务数据和流程。 除了整合&#xff0c;组织可能会在不同的发展阶段采用Salesforce(例如CRM、服务、运营)。整合的最终结果是多个Salesforce实例被统一&#xff0c;并…...

viple进阶3:打印不同形状的三角形

&#xff08;1&#xff09;题目&#xff1a;打印实心的三角形&#xff08;正三角&#xff09; 第一步&#xff1a;观察图形。首行是1颗星&#xff0c;其余的每一行都比上一行多1颗星&#xff1b;其次&#xff0c;每一行的星号数和行数值相等&#xff0c;第一行有1颗星&#xff…...

pytest+yaml实现接口自动化框架

前言 httprunner 用 yaml 文件实现接口自动化框架很好用&#xff0c;最近在看 pytest 框架&#xff0c;于是参考 httprunner的用例格式&#xff0c;写了一个差不多的 pytest 版的简易框架 项目结构设计 项目结构完全符合 pytest 的项目结构&#xff0c;pytest 是查找 test_.…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域&#xff0c;准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具&#xff0c;正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。

1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj&#xff0c;再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...