当前位置: 首页 > news >正文

【Apache Flink】Flink DataStream API的基本使用

Flink DataStream API的基本使用

文章目录

  • 前言
  • 1. 基本使用方法
  • 2. 核心示例代码
  • 3. 完成工程代码
    • pom.xml
    • WordCountExample
    • 测试验证
  • 4. Stream 执行环境
  • 5. 参考文档

前言

Flink DataStream API主要用于处理无界和有界数据流 。
无界数据流是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。

有界数据流是一个具有明确开始和结束点的数据集,例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用,其中所有数据都已经可用,并可以一次性处理。

Flink的DataStream API提供了一套丰富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各种复杂的数据处理和分析需求。此外,DataStream API还提供了容错保证,能确保在发生故障时,应用程序能从最近的检查点(checkpoint)恢复,从而实现精确一次(exactly-once)的处理语义。

1. 基本使用方法

  1. 创建执行环境:

    每一个Flink程序都需要创建一个StreamExecutionEnvironment(执行环境),它可以被用来设置参数和创建从外部系统读取数据的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 创建数据流:

    你可以从各种数据源中创建数据流,如本地集合,文件,socket等。下面的代码是从本地集合创建数据流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 转换操作:

    Flink提供了丰富的转换操作,如mapfilterreduce等。以下代码首先将每个字符串映射为其长度,然后过滤出长度大于5的元素:

    DataStream<Integer> transformedStream = dataStream.map(s -> s.length()).filter(l -> l > 5);
    
  4. 数据输出:

    Flink支持将数据流输出到各种存储系统,如文件,socket,数据库等。下面的代码将数据流输出到标准输出:

    transformedStream.print();
    
  5. 执行程序:

    将上述所有步骤放在main函数中,并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的,只有在调用execute方法时才会真正开始执行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代码

使用Flink DataStream API构建一个实时Word Count程序,它会从一个socket端口读取文本数据,统计每个单词的出现次数,并将结果输出到标准输出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter()) // 将文本行切分为单词.keyBy(0) // 按单词分组.sum(1); // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

3. 完成工程代码

下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码,包括Pom.xml文件和所有Java类。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-wordcount-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><flink.version>1.13.2</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter())  // 将文本行切分为单词.keyBy(0)  // 按单词分组.sum(1);  // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

现在,你可以使用Maven编译并运行这个程序。在启动程序之前,你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后,你可以输入文本行,Flink程序会统计每个单词出现的次数,并实时打印结果。

测试验证

用py在本地启动一个socket服务器,监听9000端口,

python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。

import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)while True:data = input("Enter text: ")client_socket.sendall(data.encode())

运行Flink程序和Python socket服务器,然后在Python程序中输入文本, 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。

4. Stream 执行环境

开发学习过程中,不需要关注。每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
在这里插入图片描述

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

5. 参考文档

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/

相关文章:

【Apache Flink】Flink DataStream API的基本使用

Flink DataStream API的基本使用 文章目录 前言1. 基本使用方法2. 核心示例代码3. 完成工程代码pom.xmlWordCountExample测试验证 4. Stream 执行环境5. 参考文档 前言 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流是一个持续生成数据的数据源&#xff0…...

民安:专业在线教育平台客户满意度调查的引领者

在当今的在线教育市场中&#xff0c;客户满意度已成为衡量平台竞争力的关键因素。为了准确了解客户的需求和反馈&#xff0c;某在线教育企业委托民安智库&#xff08;专业市场调查公司&#xff09;对其进行全面的客户满意度调查。 此次调查旨在了解客户对在线教育平台的服务质…...

浅谈新能源汽车充电桩的选型与安装

叶根胜 安科瑞电气股份有限公司 上海嘉定201801 摘要&#xff1a;电动汽车的大力发展和推广是国家为应对日益突出的燃油供需矛盾和环境污染&#xff0c;加强生态环境保护和治理而开发新能源和清洁能源的措施之一&#xff0c;加快了电动汽车的发展。如今&#xff0c;电动汽车已…...

FFmpeg系列索引

第一章 初识FFmpeg https://blog.csdn.net/huantianxidi/article/details/134130159 第二章 ffplay是什么 https://blog.csdn.net/huantianxidi/article/details/134151043...

AWS组件使用

kafka或kinesis 做数据收集 S3redshift 做数仓 EMR做计算 RDS做数据市场 AWS Glue / AWS Data Pipeline 做数据集成 这些组件配合起来&#xff0c;几乎可以做各种方式的数据分析 kinesis还是比较推荐&#xff0c;延迟时间可以配置的算是实时的&#xff0c;而且功能会多一点&am…...

DALLE 3技术分析 - 训练方式/模型结构

DALLE 3技术分析 - 训练方式/模型结构 1. 引言: 从 DALLE 3 开发者技术轨迹中&#xff0c;以及模型的演示视频&#xff0c;我们可以推导 DALLE 3 模型的某些架构信息。 2. DALLE 2 的评价: DALLE 2 的性能不佳&#xff0c;主要归因于 CLIP 模型的限制。 CLIP 在为后续的 diffus…...

Go的自定义错误

在上一篇教程中&#xff0c;我们了解了 Go 中的错误表示以及如何处理标准库中的错误。我们还学习了如何从错误中提取更多信息。 本教程介绍如何创建我们自己的自定义错误&#xff0c;我们可以在函数和包中使用这些错误。我们还将使用标准库所采用的相同技术来提供有关自定义错…...

SpringBoot集成Dubbo

在SpringMVC中Dubbo的使用https://tiantian.blog.csdn.net/article/details/134194696?spm1001.2014.3001.5502 阿里巴巴提供了Dubbo集成SpringBoot开源项目。(这个.....) 地址GitHub https://github.com/apache/dubbo-spring-boot-project 查看入门教程 反正是pilipala一大…...

利用shp文件构建mask【MATLAB和ARCGIS】两种方法

1 ARCGIS &#xff08;推荐&#xff01;&#xff01;&#xff01;-速度很快&#xff09; 利用Polygon to Raster 注意&#xff1a;由于我们想要的mask有效值是1&#xff0c;在进行转换的时候&#xff0c;注意设置转换字段【Value field】 【Value field】通过编辑shp文件属性表…...

Luminar Neo Mac/Windows中文版:引领AI图像编辑的革命性时代

Luminar Neo运用先进的AI技术&#xff0c;能够自动化地完成许多繁琐的编辑任务&#xff0c;如色彩校正、噪点消除、人脸识别等。这不仅大大提高了工作效率&#xff0c;同时也降低了对专业知识和技能的要求。无论你是专业摄影师&#xff0c;还是摄影爱好者&#xff0c;甚至是一个…...

远程设备常用工具:向日葵、Todesk

其实按理说远程工具例如向日葵、Todesk如果是计算机专业、计算机从业者是必须知道的一个东西&#xff0c;但是在大学期间身边知道的人是少之又少的。 向日葵、Todesk工具的优势&#xff1a;方便、快捷、速度快等等我就不过多阐述了 PS:现在我就是在学校用远程写这篇 很多时候…...

JAVA七种常见排序算法

前言&#xff1a; 排序算法在计算机科学中扮演着至关重要的角色&#xff0c;它们用于将无序数据变为有序数据&#xff0c;以便更有效地检索和处理信息。不同的排序算法适用于不同的情况&#xff0c;因此了解它们的工作原理和性能特点对于选择正确的算法至关重要。本文提供的Jav…...

高质量绝世玄幻小说,情节引人入胜,一读成痴的绝佳选择

《我有一个修仙世界》 在这个高科技后修仙时代&#xff0c;主角拥有资源丰富的原始修仙世界。他需要不断地探索、发掘、修炼&#xff0c;才能成为真正的修仙者。这是一本充满想象力和创意的小说。 《长生武道&#xff1a;从五禽养生拳开始》 林轩修炼养生类功法&#xff0c;通过…...

Flask三种添加路由的方法

Flask 是一个流行的 Python Web 框架&#xff0c;它提供了多种方法来添加路由。路由是将 URL 映射到特定函数的过程&#xff0c;它是构建 Web 应用程序的基础。本文将介绍 Flask 中几种常用的路由添加方法&#xff0c;并附带代码示例。 方法一&#xff1a;使用装饰器 from flas…...

基于layui的select选择框修改为多选框

layui-xm-select 的功能强大&#xff0c;可多选、可下拉树、下拉日期多选、下拉折叠面板、下拉穿梭框、级联模式。 首先在引用layui css和js 的基础上&#xff0c;再引用js&#xff1a;layui-xm-select layui-xm-select点击下载地址 基本使用 第一步: 下载 第二步: 引入 layu…...

【技术分享】RK356X Android 使用 libgpiod 测试gpio

前言 libgpiod 是用于与 Linux GPIO 字符设备交互的 C 库和工具库&#xff1b;此项目包含六种命令行工具&#xff08;gpiodetect、gpioinfo、gpioset、gpioget、gpiomon&#xff09;&#xff0c;使用这些工具可以在命令行设置和获取GPIO的状态信息&#xff1b;在程序开发中也可…...

代碼隨想錄算法訓練營|第五十九天|647. 回文子串、7516.最长回文子序列、动态规划总结篇。刷题心得(c++)

目录 讀題 647. 回文子串 看完代码随想录之后的想法 516.最长回文子序列 看完代码随想录之后的想法 647. 回文子串 - 實作 思路 動態規劃思路 雙指針思路 Code 動態規劃思路 雙指針思路 516.最长回文子序列 - 實作 思路 Code 动态规划 - 總結 動態規劃基礎 動…...

Qt封装的Halcon显示控件,支持ROI绘制

前言 目前机器视觉ROI交互控件在C#上做的比较多&#xff0c;而Qt上做的比较少&#xff0c;根据作者 VSQtHalcon——显示图片&#xff0c;实现鼠标缩放、移动图片的文章&#xff0c;我在显示和移动控件的基础上&#xff0c;增加了ROI设置功能&#xff0c;并封装成了一个独立的Q…...

基于深度学的图像修复 图像补全 计算机竞赛

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学的图像修复 图像补全 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-se…...

vue3框架全局修改样式(字体颜色以及初始化定义基础elemplent颜色)

问题1、全局修改vue管理系统框架的字体颜色&#xff08;index.scss目录下修改&#xff09; 问题2、vue3中使用elemplent-plus中的el-select组件&#xff0c;默认选中二级或三级的一个数据&#xff0c;没有显示label只显示了id 问题如下 原因是因为 这个属性为true了&#xff0…...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

实现弹窗随键盘上移居中

实现弹窗随键盘上移的核心思路 在Android中&#xff0c;可以通过监听键盘的显示和隐藏事件&#xff0c;动态调整弹窗的位置。关键点在于获取键盘高度&#xff0c;并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路&#xff1a; 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑&#xff1a;async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill

视觉语言模型&#xff08;Vision-Language Models, VLMs&#xff09;&#xff0c;为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展&#xff0c;机器人仍难以胜任复杂的长时程任务&#xff08;如家具装配&#xff09;&#xff0c;主要受限于人…...