当前位置: 首页 > article >正文

Flink-01学习 介绍Flink及上手小项目之词频统计

flink简介

官网
概述
学习Flink具体包括四个关键概念:流数据的持续处理,事件时间,有状态流处理和状态快照。
Apache Flink 是一个开源的流处理框架,旨在处理批处理和实时数据处理,具有高吞吐量和低延迟的特点。
Apache Flink 是一个用于对无界和有界数据流进行有状态计算的框架和分布式处理引擎。Flink 被设计为可在所有常见的集群环境中运行,并以内存速度和任意规模执行计算。
特点

  • 流处理:Flink 将批处理视为流处理的一种特殊情况。这种方法允许实时数据处理,实现即时的洞察和行动。
  • 有状态计算:Flink 提供强大的状态管理,使得在处理流的过程中可以保持状态。这一特性对于需要容错和一致性的应用至关重要。
  • 事件时间处理:Flink 允许用户基于事件时间来处理数据,即使数据无序到达,也能提供准确及时的结果。
  • 容错性:Flink 的状态管理和检查点机制确保系统在出现故障时能够恢复而不丢失状态,维护数据完整性和应用一致性。
  • 高吞吐量和低延迟:Flink 的架构优化了高吞吐量和低延迟,适合高性能应用。
  • 可扩展性:Flink 可以扩展到数千个节点,能够处理大规模数据处理任务。
  • 灵活的部署选项:Flink 可以部署在各种环境中,包括独立集群、云环境和容器编排平台(如 Kubernetes)。
    应用
    Flink 集群始终由一个 JobManager和一个或多个 Flink TaskManager组成。JobManager 负责处理作业提交、作业监管以及资源管理。Flink TaskManager 是工作进程,负责执行构成 Flink 作业的实际 任务。在本实践中,您将从单个 TaskManager 开始,但稍后可以扩展到更多 TaskManager。

实践

创建一个meven项目
引入pom

  <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.20.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.20.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.20.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.20.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.20.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.20.0</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>

Flink 中的 DataStream 程序是常规程序,用于对数据流进行转换(例如,过滤、更新状态、定义窗口、聚合)。数据流最初由各种来源(例如,消息队列、套接字流、文件)创建。结果通过接收器返回,接收器可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,独立运行或嵌入到其他程序中。执行可以在本地 JVM 中,也可以在多台机器组成的集群中执行。

代码如下:

package org.example.snow.demo1;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class StartRunApp {public static void main(String[] args) throws Exception {// 数据来源String inPath = "dataMsg/dataSourceFile.txt";// 数据输出String outputPath = "dataMsg/result.csv";// 初始化StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 读取文件信息DataStreamSource<String> textFile = executionEnvironment.readTextFile(inPath);SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = textFile.flatMap(new Splitter()).keyBy(value -> value.f0).sum(1);textFile.print();// 写入数据到 CSV 文件,使用 writeAsText 或 writeAsCsvflatMap.writeAsCsv(outputPath,FileSystem.WriteMode.OVERWRITE," ","\n")// 这是设置并行度的参数,表示执行该操作的任务的数量。在这里,setParallelism(1) 表示该操作会在 单个任务 中执行,而不是并行执行多个任务。通常情况下,Flink 在处理流或批数据时可以通过并行执行来加速处理,而设置并行度为 1 可以强制数据写入在一个线程中进行。.setParallelism(1);// 调用 execute() 后,Flink 会开始处理数据流中的每个操作,并根据定义的逻辑执行数据转换、聚合等操作。// 作业名称:"file.txt -> result.csv" 是为了描述输入文件和输出文件的关系,可以帮助你理解这个作业的目的。这个名称在 Flink 的执行日志和监控界面中会显示executionEnvironment.execute("file.txt -> result.csv");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}

运行结果 将文章进行了分词处理
请添加图片描述

相关文章:

Flink-01学习 介绍Flink及上手小项目之词频统计

flink简介 官网 概述&#xff1a; 学习Flink具体包括四个关键概念&#xff1a;流数据的持续处理&#xff0c;事件时间&#xff0c;有状态流处理和状态快照。 Apache Flink 是一个开源的流处理框架&#xff0c;旨在处理批处理和实时数据处理&#xff0c;具有高吞吐量和低延迟的…...

自注意力机制、多头自注意力机制、填充掩码 Python实现

原理讲解 【Transformer系列&#xff08;2&#xff09;】注意力机制、自注意力机制、多头注意力机制、通道注意力机制、空间注意力机制超详细讲解 自注意力机制 import torch import torch.nn as nn# 自注意力机制 class SelfAttention(nn.Module):def __init__(self, input…...

目标检测篇---R-CNN梳理

目标检测系列文章 第一章 R-CNN 目录 目标检测系列文章&#x1f4c4; 论文标题&#x1f9e0; 论文逻辑梳理1. 引言部分梳理 (动机与思想) &#x1f4dd; 三句话总结&#x1f50d; 方法逻辑梳理&#x1f680; 关键创新点&#x1f517; 方法流程图补充边界框回归 (BBR)1. BBR 的…...

C#处理网络传输中不完整的数据流

1、背景 在读取byte数组的场景&#xff08;例如&#xff1a;读取文件、网络传输数据&#xff09;中&#xff0c;特别是网络传输的场景中&#xff0c;非常有可能接收了不完整的byte数组&#xff0c;在将byte数组转换时&#xff0c;因字符的缺失/增多&#xff0c;转为乱码。如下…...

HTML 初识

段落标签 <p><!-- 段落标签 -->Lorem ipsum dolor sit amet consectetur adipisicing elit. Fugiat, voluptate iure. Obcaecati explicabo sint ipsum impedit! Dolorum omnis voluptas sint unde sed, ipsa molestiae quo sapiente quos et ad reprehenderit.&l…...

MATLAB 训练CNN模型 yolo v4

学生对小车控制提出了更好的要求&#xff0c;能否加入深度学习模型。 考虑到小车用matlab来做&#xff0c;yolo v5及以上版本都需要在pytorch下训练&#xff0c;还是用早期版本来演示。 1 yolov4 调用 参考 trainYOLOv4ObjectDetector (mathworks.com) name "tiny-yo…...

【前端】跟着maxkb学习logicflow流程图画法

文章目录 背景1. 选定学习对象-maxkb应用逻辑编排2. 确定实现框架3. 关键逻辑&#xff1a;查看app-node.js4. 学习开始节点绘制流程数据形式 5. 给节点增加表单输入框遇到过的问题 背景 看看前端如何绘制流程图&#xff0c;界面好看点。 "logicflow/core": "1.…...

数据结构-C语言版本(八)字符串

数据结构中的字符串&#xff1a;概念、操作与实战 第一部分 字符串的分类及常见形式 字符串是由零个或多个字符组成的有限序列&#xff0c;是编程中最基础也最重要的数据结构之一。 1. C语言中的字符串表示 字符数组形式 char str1[10] {H, e, l, l, o, \0};字符串字面量…...

Arduino示例代码讲解:Project 07 - Keyboard 键盘

Arduino示例代码讲解:Project 07 - Keyboard 键盘 Project 07 - Keyboard 键盘程序功能概述功能:硬件要求:输出:代码结构全局变量`setup()` 函数`loop()` 函数读取电位器值:打印电位器值:播放音调:运行过程注意事项Project 07 - Keyboard 键盘 /*Arduino Starter Kit e…...

oracle expdp/impdp 用法详解

oracle expdp/impdp 用法详解 创建逻辑目录&#xff0c;该命令不会在操作系统创建真正的目录&#xff0c;最好以system等管理员创建。 create directory db_bak as d:\test\dump; 查看管理理员目录&#xff08;同时查看操作系统是否存在&#xff0c;因为Oracle并不关心该目录是…...

【漏洞复现】CVE-2024-38856(ApacheOfbiz RCE)

【漏洞复现】CVE-2024-38856&#xff08;ApacheOfbiz RCE&#xff09; 1. 漏洞描述 Apache OFBiz 是一个开源的企业资源规划&#xff08;ERP&#xff09;系统。它提供了一套企业应用程序&#xff0c;用于集成和自动化企业的许多业务流程。 这个漏洞是由于对 CVE-2023-51467 的…...

超详细VMware虚拟机扩容磁盘容量-无坑版

1.环境&#xff1a; 虚拟机&#xff1a;VMware Workstation 17 Pro-17.5.2 Linux系统&#xff1a;Ubuntu 22.04 LTS 2.硬盘容量 虚拟机当前硬盘容量180G -> 扩展至 300G 3.操作步骤 &#xff08;1&#xff09;在虚拟机关机的状态下&#xff0c;虚拟机硬盘扩容之前必…...

每日一题算法——移除链表元素、反转链表

移除链表元素 力扣题目链接 我的解法&#xff1a; 注意细节&#xff1a;要删掉移除的元素。 class Solution { public:ListNode* removeElements(ListNode* head, int val) {while(head!nullptr){if(head->valval){headhead->next;}}ListNode* nowhead head;while(n…...

全面理解Linux 系统日志:核心文件与查看方法

全文目录 1 Linux 系统日志分类及功能1.1 通用日志1.1.1 ‌/var/log/messages1.1.2 ‌/var/log/syslog 1.2 安全相关日志1.2.1 ‌/var/log/auth.log‌&#xff08;Debian/Ubuntu&#xff09;或 ‌/var/log/secure‌&#xff08;RHEL/CentOS&#xff09;1.2.2 /var/log/audit/au…...

机器学习-08-关联规则更新

总结 本系列是机器学习课程的系列课程&#xff0c;主要介绍机器学习中关联规则和协同过滤。 参考 机器学习&#xff08;三&#xff09;&#xff1a;Apriori算法&#xff08;算法精讲&#xff09; Apriori 算法 理论 重点 【手撕算法】【Apriori】关联规则Apriori原理、代码…...

Flutter与FastAPI的OSS系统实现

作者&#xff1a;孙嘉成 目录 一、对象存储 二、FastAPI与对象存储 2.1 缤纷云S4服务API对接与鉴权实现 2.2 RESTful接口设计与异步路由优化 三、Flutter界面与数据交互开发 3.1 应用的创建 3.2页面的搭建 3.3 文件的上传 关键词&#xff1a;对象存储、FastAPI、Flutte…...

Kubernetes控制平面组件:API Server详解(二)

云原生学习路线导航页&#xff08;持续更新中&#xff09; kubernetes学习系列快捷链接 Kubernetes架构原则和对象设计&#xff08;一&#xff09;Kubernetes架构原则和对象设计&#xff08;二&#xff09;Kubernetes架构原则和对象设计&#xff08;三&#xff09;Kubernetes控…...

MySQL-锁机制3-意向共享锁与意向排它锁、死锁

文章目录 一、意向锁二、死锁应该如何避免死锁问题&#xff1f; 总结 一、意向锁 在表获取共享锁或者排它锁时&#xff0c;需要先检查该表有没有被其它事务获取过X锁&#xff0c;通过意向锁可以避免大量的行锁扫描&#xff0c;提升表获取锁的效率。意向锁是一种表级锁&#xf…...

报告系统状态的连续日期 mysql + pandas(连续值判断)

本题用到知识点&#xff1a;row_number(), union, date_sub(), to_timedelta()…… 目录 思路 pandas Mysql 思路 链接&#xff1a;报告系统状态的连续日期 思路&#xff1a; 判断连续性常用的一个方法&#xff0c;增量相同的两个列的差值是固定的。 让日期与行号 * 天数…...

pytest自动化中关于使用fixture是否影响用例的独立性

第一个问题&#xff1a;难道使用fixture 会影响用例独立吗&#xff1f; ✅ 简单回答&#xff1a; 使用 fixture ≠ 不独立。 只要你的 fixture 是每次测试都能自己运行、自己产生数据的&#xff0c;那么测试用例依然是“逻辑独立”的。 ✅ 怎么判断 fixture 是否影响独立性&a…...

Token与axios拦截器

目录 一、Token 详解 1. Token 的定义与作用 2. Token 的工作流程 3. Token 的优势 4. Token 的安全实践 5. JWT 结构示例 二、Axios 拦截器详解 1. 拦截器的作用 2. 请求拦截器 3. 响应拦截器 4. 拦截器常见场景 5. 移除拦截器 三、完整代码示例 四、总结 五、…...

unity3d实现物体闪烁

unity3d实现物体闪烁&#xff0c;代码如下: using UnityEngine;public class Test : MonoBehaviour {//创建一个常量&#xff0c;用来接收时间的变化值private float shake;//通过控制物体的MeshRenderer组件的开关来实现物体闪烁的效果private MeshRenderer BoxColliderClick…...

C#—Lazy<T> 类型(延迟初始化/懒加载模式)

C# 的 Lazy<T> 类型 Lazy<T> 是 C# 中的一个类&#xff0c;用于实现延迟初始化&#xff08;懒加载&#xff09;模式。它提供了一种线程安全的方式来延迟创建大型或资源密集型对象&#xff0c;直到第一次实际需要时才进行初始化。 主要特点 延迟初始化&#xff1a…...

Spring Boot 项目启动命令解析

Spring Boot 项目启动命令参数 一、启动命令基础格式 java [JVM参数] [Spring Boot参数] -jar your-project.jar必选部分&#xff1a;java -jar your-project.jar 启动可执行 JAR 包。 可选部分&#xff1a; JVM 参数&#xff1a;控制 Java 虚拟机行为&#xff08;如内存、垃…...

为什么 Docker 容器中有额外的目录(如 `/dev`、`/proc`、`/sys`)?及 `docker run` 详细执行过程

、当你使用 docker run 启动一个基于极简镜像&#xff08;如 scratch 或手动构建的镜像&#xff09;的容器时&#xff0c;发现容器内出现了 /dev、/proc、/sys 等目录&#xff0c;即使你的镜像中并未包含这些目录。这是因为 Docker 在启动容器时&#xff0c;会自动挂载一些必要…...

Tailwind 武林奇谈:bg-blue-400 失效,如何重拾蓝衣神功?

前言 江湖有云,Tailwind CSS,乃前端武林中的轻功秘籍。习得此技,排版如行云流水,配色似御风随形,收放自如,随心所欲。 某日,小侠你奋笔敲码,正欲施展“蓝衣神功”(bg-blue-400),让按钮怒气冲冠、蓝光满面,怎料一招使出,画面竟一片白茫茫大地真干净,毫无半点杀气…...

【Docker 运维】Java 应用在 Docker 容器中启动报错:`unable to allocate file descriptor table`

文章目录 一、根本原因二、判断与排查方法三、解决方法1、限制 Docker 容器的文件描述符上限2、在执行脚本中动态设置ulimit的值3、升级至 Java 11 四、总结 容器内执行脚本时报错如下&#xff0c;Java 进程异常退出&#xff1a; library initialization failed - unable to a…...

开始放飞之先搞个VSCode

文章目录 开始放飞之先搞个VSCode重要提醒安装VSCode下载MinGW-w64回到VSCode中去VSCode原生调试键盘问题遗留问题参考文献 开始放飞之先搞个VSCode 突然发现自己的新台式机上面连个像样的编程环境都没有&#xff0c;全是游戏了&#xff01;&#xff01;&#xff01;&#xff…...

基于SA模拟退火算法的车间调度优化matlab仿真,输出甘特图和优化收敛曲线

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 基于SA模拟退火算法的车间调度优化matlab仿真,输出甘特图和优化收敛曲线。输出指标包括最小平均流动时间&#xff0c;最大完工时间&#xff0c;最小间隙时间。 2…...

【仿Mudou库one thread per loop式并发服务器实现】SERVER服务器模块实现

SERVER服务器模块实现 1. Buffer模块2. Socket模块3. Channel模块4. Poller模块5. EventLoop模块5.1 TimerQueue模块5.2 TimeWheel整合到EventLoop5.1 EventLoop与线程结合5.2 EventLoop线程池 6. Connection模块7. Acceptor模块8. TcpServer模块 1. Buffer模块 Buffer模块&…...