当前位置: 首页 > news >正文

【Flink快速入门-1.Flink 简介与环境配置】

Flink 简介与环境配置

实验介绍

在学习一门新的技术之前,我们首先要了解它的历史渊源,也就是说它为什么会出现,它能够解决什么业务痛点。所以本节我们的学习目的是了解 Flink 的背景,并运行第一个 Flink 程序,对它有一个初步的印象。

知识点
  • 流处理概述
  • Flink 简介
  • Flink 批处理 WordCount
  • Flink 流处理 WordCount

流处理简介

流处理并不是一个新概念,但是要做好并不是一件容易的事情。提到流处理,我们最先想到的可能是金融交易、信号检测以及地图导航等领域的应用。但是近年来随着信息技术的发展,除了前面提到的三个领域,其它方向对数据时效性的要求也越来越高。随着 Hadoop 生态的崛起,Storm、Spark Streaming、Samza、MillWheel 等一众流处理技术开始走入大众视野,但是我们最熟悉的应该还是 Storm 和 Spark Streaming。

我们知道,“高吞吐”、“低延迟”和”exactly-once“是衡量一个流处理框架的重要指标。 Storm 虽然提供了低延迟的流处理,但是在高吞吐方面的表现并不算佳,可以说基本满足不了日益暴涨的数据量,而且也没办法保证精准一次消费。Spark Streaming 中通过微批次的批处理来模拟流处理,只要将批处理的批次分的足够小,那么从宏观上来看就是流处理,这也是 Spark Streaming 的核心思想。通过微观批处理的方式,Spark Streaming 也实现了高吞吐和 exactly-once 语义,时效性也有了大幅提升,在很长一段时间里占据流处理榜首。但是受限于其实现方式,依然存在几秒的延迟,对于那些实时性要求较高的领域来说依然不够完美。
在这样的背景下,Flink 应运而生,接下来我们正式进入 Flink 的学习。

Flink 简介

Apache Flink 是为分布式、高性能、随时可用以及准确 的流处理应用程序打造的开源流处理框架,用于对无界和有界数据流进行有状态计算。Flink 最早起源于在 2010 ~ 2014 年,由 3 所地处柏林的大学和欧洲的一些其它大学共同进行研究的名为 Stratosphere 的项目。2014 年 4 月 Stratosphere 将其捐赠给 Apache 软件基 金会, 初始成员是 Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。在 2015 年,阿里也加入到了 Flink 的开发工作中,并贡献了至少 150 万行代码。

Flink 一词在德语中有着“灵巧”、“快速”的意思,它的 logo 原型也是柏林常见的一种松鼠,以身材娇小、灵活著称,为该项目取这样的名字和选定这样的 logo 也正好符合 Flink 的特点和愿景。

在这里插入图片描述

注意,虽然我们说 Flink 是一个流处理框架,但是它同样可以进行批处理。因为在 Flink 的世界观里,批处理是流处理的一种特殊形式,这和 Spark 不同,在 Spark 中,流处理是通过大批量的微批处理实现的。

运行第一个 Flink 程序

接下来我们运行第一个 Flink 程序,感受一下它的魅力,从而对它有一个初步的印象。

搭建开发环境

本课程使用的是本地环境。后续实验不再提示。

首先我们需要在环境中搭建 Flink 运行环境,总共可以分为下面这几步:

  • 安装 jdk 并配置环境变量 【jdk 1.8】
  • 安装 scala 并配置环境变量 【Scala 2.11.12】
  • 安装 maven 并修改中心仓库为阿里云地址
  • 安装 IDEA 开发工具 【IDEA 2022.2.1或更新版】

我们的实验环境已经为大家安装了 jdk、scala、maven 和 IDEA,只需要在 IDEA 里配置使用即可。

双击桌面的 IDEA 程序,启动之后,点击 File -> New -> Project 创建一个新的 Maven 工程 FlinkLearning

【或者New Project->Maven Archetype】
在这里插入图片描述

创建好之后点击左上角 File > Settings 中,将 Maven 的配置文件修改为 D:\programs\apahe-maven-3.6.3\conf\settings.xml,配置之后的 Maven 中心仓库为阿里云,加载依赖会快很多:

在这里插入图片描述

项目中点击File-》Project Structure -> Libraries-》加号-》添加Scala SDK-》选择所需要的scala版本-》ok进行下载
在这里插入图片描述

在工程 src/main 目录中创建 scala 文件夹,然后右键,选择 Mark Directory as,并将其标记为 Sources Root。

在这里插入图片描述

在 scala 目录里创建 com.vlab.wc 包,并分别创建 BatchWordCountStreamWordCount 两个 Scala Object,分别代表 Flink 批处理和 Flink 流处理。
在这里插入图片描述

至此,我们的准备工作已经完成,接下来正式进入编码阶段。

Flink 批处理 WordCount

修改 pom.xml 文件中的代码为如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>pblh123.lh</groupId><artifactId>FlinkLearning</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><!--    配置国内 Maven 依赖库的镜像源镜--><repositories><repository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></repository></repositories>
<!--配置插件的镜像源--><pluginRepositories><pluginRepository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></pluginRepository></pluginRepositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.2</version></dependency></dependencies><build><plugins><!-- 该插件用于将 Scala 代码编译成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.6.0</version><configuration><archive><manifest><mainClass></mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

修改完成后注意点击加载图标重新 Load。
在这里插入图片描述

BatchWordCount.scala 中的代码如下:

package com.vlab.wcimport org.apache.flink.api.scala._/*** @projectName FlinkLearning* @package com.vlab.wc* @className com.vlab.wc.BatchWordCount* @description Flink Batch Word Count Example* @author pblh123* @date 2025/2/7 14:41* @version 1.17.2*/
object BatchWordCount {def main(args: Array[String]): Unit = {// 判断输入参数数量是否正确if (args.length != 1) {System.err.println("Usage: BatchWordCount <input path>")System.exit(5)}// 获取输入路径val inputPath = args(0)// 创建执行环境val env = ExecutionEnvironment.getExecutionEnvironment// 读取输入数据val inputDS: DataSet[String] = env.readTextFile(inputPath)// 计算词频val wordCountDS: DataSet[(String, Int)] = inputDS.flatMap(_.split("\\s+")) // 扁平化,并且处理多个空格作为分隔符.map((_, 1)) // 转换为 (word, 1).groupBy(0) // 按第一个字段(word)进行分组.sum(1) // 对第二个字段(计数)求和// 打印输出wordCountDS.print()}
}

datas/words.txt 路径下创建 words.txt 文件并在其中加入如下内容(注意每个单词之间使用空格分隔):

在这里插入图片描述

hello world
hello flink
hello spark
hello java

右键选中 BatchWordCount.scala,点击 Run 运行,将会看到如下输出:

如果出现一些其他的报错和警告可以忽略。

(java,1)
(world,1)
(flink,1)
(hello,4)
(spark,1)
Flink 流处理 WordCount

StreamWordCount.scala 中加入如下代码:

package com.vlab.wcimport org.apache.flink.streaming.api.scala._/*** @projectName FlinkLearning  * @package com.vlab.wc  * @className com.vlab.wc.StreamWordCount  * @description ${description}  * @author pblh123* @date 2025/2/7 14:41* @version 1.0**/object StreamWordCount {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//  监控Socket数据val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)// 导入隐式转换import org.apache.flink.api.scala._// 计算逻辑val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)// 设置并行度dataStream.print().setParallelism(1)// 执行env.execute("Socket stream word count")}
}

打开终端,并输入 nc -l -p 9999,然后输入以下内容:

hello world
hello flink
hello spark
hello java

运行 StreamWordCount.scala 将会看到如下输出:

(hello,1)
(flink,1)
(hello,2)
(spark,1)
(java,1)
(hello,3)
...

至此,我们的第一个 Flink 实验就已经完成了。

实验总结

本节实验中,我们介绍了 Flink 出现的背景,并和 Storm、Spark Streaming 做了简单对比,然后在实验环境下安装了 idea 开发工具并运行了第一个 Flink 程序。至此,相信大家已经对 Flink 已经有了一个初步的认识。

相关文章:

【Flink快速入门-1.Flink 简介与环境配置】

Flink 简介与环境配置 实验介绍 在学习一门新的技术之前&#xff0c;我们首先要了解它的历史渊源&#xff0c;也就是说它为什么会出现&#xff0c;它能够解决什么业务痛点。所以本节我们的学习目的是了解 Flink 的背景&#xff0c;并运行第一个 Flink 程序&#xff0c;对它有…...

硬盘修复后,文件隐身之谜

在数字时代&#xff0c;硬盘作为数据存储的重要载体&#xff0c;承载着无数珍贵的信息与回忆。然而&#xff0c;当硬盘遭遇故障并经过修复后&#xff0c;有时我们会遇到这样一个棘手问题&#xff1a;硬盘修复后&#xff0c;文件却神秘地“隐身”&#xff0c;无法正常显示。这一…...

如何处理网络连接错误导致的fetch失败?

处理由于网络连接错误导致的 fetch 失败通常涉及捕获网络错误并提供适当的用户反馈。以下是如何在 Vue 3 中实现这一点的步骤和示例。 一、更新 useFetch 函数 在 useFetch 函数中,需要捕获网络错误,并设置相应的错误信息。网络错误通常会抛出一个 TypeError,可以根据这个…...

Qt之设置QToolBar上的按钮样式

通常给QAction设置icon后,菜单栏的菜单项和工具栏(QToolBar)上对应的按钮会同时显示该icon。工具栏还可以使用setToolButtonStyle函数设置按钮样式,其参数为枚举值: enum ToolButtonStyle {ToolButtonIconOnly,ToolButtonTextOnly,ToolButtonTextBesideIcon,ToolButtonTe…...

责任链模式(Chain Responsibility)

一、定义&#xff1a;属于行为型设计模式&#xff0c;包含传递的数据、创建处理的抽象和实现、创建链条、将数据传递给顶端节点&#xff1b; 二、UML图 三、实现 1、需要传递处理的数据类 import java.util.Date;/*** 需要处理的数据信息*/ public class RequestData {priva…...

docker安装 mongodb

1、拉取镜像 docker run -dit --name mongo \ -p 17017:27017 \ -e MONGO_INITDB_ROOT_USERNAMEadmin \ -e MONGO_INITDB_ROOT_PASSWORD2018 \ --restartalways \ mongo2、进入容器 docker exec -it mongo bash 3、进入mongo ./bin/mongosh -u admin -p 2018 --authenticat…...

RabbitMQ 从入门到精通:从工作模式到集群部署实战(五)

#作者&#xff1a;闫乾苓 系列前几篇&#xff1a; 《RabbitMQ 从入门到精通&#xff1a;从工作模式到集群部署实战&#xff08;一&#xff09;》&#xff1a;link 《RabbitMQ 从入门到精通&#xff1a;从工作模式到集群部署实战&#xff08;二&#xff09;》&#xff1a; lin…...

salesforce SF CLI 数据运维经验分享

SF CLI data默认使用bulk api v2, 数据操作效率有了极大的提高。 Bulk api v2的优点&#xff1a; 执行结果可以很直观的从Bulk Data Load Jobs中看到。相较于bulk api v1&#xff0c;只能看到job执行in progress&#xff0c;或者closed的状态&#xff0c;有了很大的改善。执行…...

5.2Internet及其作用

5.2.1Internet概述 Internet称为互联网&#xff0c;又称英特网&#xff0c;始于1969年的美国ARPANET&#xff08;阿帕网&#xff09;&#xff0c;是全球性的网络。 互连网指的是两个或多个不同类型的网络通过路由器等网络设备连接起来&#xff0c;形成一个更大的网络结构。互连…...

【蓝桥杯—单片机】第十一届省赛真题代码题解题笔记 | 省赛 | 真题 | 代码题 | 刷题 | 笔记

第十一届省赛真题代码部分 前言赛题代码思路笔记竞赛板配置内部振荡器频率设定键盘工作模式跳线扩展方式跳线 建立模板明确设计要求和初始状态显示功能部分数据界面第一部分第二部分第三部分调试时发现的问题 参数设置界面第一部分第二部分和第四部分第三部分和第五部分 按键功…...

数据分析:企业数字化转型的金钥匙

引言&#xff1a;数字化浪潮下的数据金矿 在数字化浪潮席卷全球的背景下&#xff0c;有研究表明&#xff0c;只有不到30%的企业能够充分利用手中掌握的数据&#xff0c;这是否让人深思&#xff1f;数据已然成为企业最为宝贵的资产之一。然而&#xff0c;企业是否真正准备好从数…...

网络工程师 (23)OSI模型层次结构

前言 OSI&#xff08;Open System Interconnect&#xff09;模型&#xff0c;即开放式系统互联模型&#xff0c;是一个完整的、完善的宏观模型&#xff0c;它将计算机网络体系结构划分为7层。 OSI七层模型 1. 物理层&#xff08;Physical Layer&#xff09; 功能&#xff1a;负…...

DeepSeek添加知识库

1、下载dify 项目地址:https://github.com/langgenius/dify 2、通过docker安装 端口报错 修改端口 .env文件下所有80端口替换成了其它端口 执行正常了 查看 docker容器 <...

2、k8s的cni网络插件和基本操作命令

kube-prxoy属于节点组件&#xff0c;网络代理&#xff0c;实现服务的自动发现和负载均衡。 k8s的内部网络模式 1、pod内的容器于容器之间的通信。 2、一个节点上的pod之间的通信&#xff0c;docker0网桥直接通信。 3、不同节点上的pod之间的通信&#xff1a; 通过物理网卡的…...

Next.js简介:现代 Web 开发的强大框架(ChatGPT-4o回答)

prompt: 你是一位专业的技术博客撰稿人&#xff0c;你将写一篇关于介绍next.js这个开发框架的技术博文&#xff0c;语言是中文&#xff0c;风格专业严谨&#xff0c;用词自然、引人入胜且饶有趣味 在现代 Web 开发的世界中&#xff0c;选择合适的框架可以显著提升开发效率和应用…...

【DeepSeek:国产大模型的崛起与ChatGPT的全面对比】

DeepSeek&#xff1a;国产大模型的崛起与ChatGPT的全面对比 目录 引言DeepSeek的技术架构 2.1 混合专家&#xff08;MoE&#xff09;架构2.2 动态路由机制2.3 训练数据与成本 ChatGPT的技术架构 3.1 Transformer架构3.2 训练数据与成本 性能对比 4.1 推理能力4.2 语言处理4.3…...

input 超出maxlength限制后,输入框变红

一、前言 最近收到产品的一个需求&#xff1a;输入框限制了maxlength“11”&#xff0c;需要在输入第12位时&#xff0c;输入框变红&#xff1b;当然&#xff0c;第12位是不能真正输入到输入框中的。 二、实现难点 其实&#xff0c;单纯的要监听 字母和数字以及字符 还是比较容…...

Docker 构建镜像并搭建私人镜像仓库教程

构建镜像教程 步骤 1&#xff1a;安装 Docker #在安装 Docker 之前&#xff0c;建议先更新系统软件包。 sudo yum update -y # 移除旧的Docker版本和Podman、runc软件包及其相关依赖。 yum remove -y docker docker-client docker-client-latest docker-ce-cli docker-commo…...

doris:MySQL Dump

Doris 在 0.15 之后的版本已经支持通过 mysqldump 工具导出数据或者表结构 使用示例​ 导出​ 导出 test 数据库中的 table1 表&#xff1a;mysqldump -h127.0.0.1 -P9030 -uroot --no-tablespaces --databases test --tables table1 导出 test 数据库中的 table1 表结构&am…...

OpenBMC:通过qemu-system-arm运行编译好的image

OpenBMC&#xff1a;编译_openbmc meson.build file-CSDN博客 讲述了如何编译生成openbmc的image 完成编译后可以通过qemu-system-arm进行模拟加载&#xff0c;以便在没有BMC硬件的情况下进行调试 1.下载qemu-system-arm 在openbmc的上级目录上执行 wget https://jenkins.op…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版​分享

平时用 iPhone 的时候&#xff0c;难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵&#xff0c;或者买了二手 iPhone 却被原来的 iCloud 账号锁住&#xff0c;这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

MODBUS TCP转CANopen 技术赋能高效协同作业

在现代工业自动化领域&#xff0c;MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步&#xff0c;这两种通讯协议也正在被逐步融合&#xff0c;形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...