flink设置保存点和恢复保存点
增加了hdfs
package com.qyt;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** DataStreamSource API使用*/
public class StreamWordCount {public static void main(String[] args) throws Exception {//TODO 1、获取流的类final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.setProperty("HADOOP_USER_NAME", "root");env.enableCheckpointing(3000);// 配置存储检查点到文件系统env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop01:9000/flink"));env.getCheckpointConfig().setCheckpointTimeout(2000l);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//TODO 2、获取无界流DataStreamSource<String> stringDataStreamSource = env.socketTextStream("192.168.1.10", 9000, "\n");//TODO 3 ETL//TODO 3.1 转换成二元数组,简单ETL的过程SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);out.collect(tuple2);}}}).uid("etl");//TODO 3.1 分组KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//TODO 3.2 聚合计算SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);//TODO 4、打印sum.print();//TODO 5、无界流需要这个不断执行的方法env.execute();}
}
要增加hadoop客户端的使用
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://maven.apache.org/POM/4.0.0"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>
提交flink集群
#生成对应的任务
./flink run -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar
# 恢复上一次保存点,bc5fae2e282247486003ed259f2f37a7为jobID
./flink run -s hdfs://hadoop01:9000/flink/bc5fae2e282247486003ed259f2f37a7/chk-33 -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar
查看对应的jobId

相关文章:
flink设置保存点和恢复保存点
增加了hdfs package com.qyt;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;import org.apache.flink.streaming.api.datastream.Dat…...
使用python获取百度一下,热搜TOP数据详情
一、查找对应链接 # 警告:以下代码仅供学习和交流使用,严禁用于任何违法活动。 # 本代码旨在帮助理解和学习编程概念,不得用于侵犯他人权益或违反法律法规的行为。 1、打开百度页面 百度一下,你就知道 2、点击F12 或 右键鼠标…...
Go conc库学习与使用
文章目录 主要功能和特点conc 的安装典型使用场景示例代码并行执行多个 Goroutines错误处理限制并发 Goroutines 数量使用 context.Context 进行任务控制 常见问题1. **任务中发生 panic**原因:解决方法: 2. **conc.Group 重复调用 Wait()**原因…...
大模型prompt先关
对于未出现的任务,prompt编写技巧: 1、假设你是资深的摘要生成专家,根据提供的内容,总结对应的摘要信息。请生成一个指令,指令中带有一个使用例子。直接提供给大型模型以执行此任务。 2、基于大模型提供的内容再进行二…...
尚品汇-自动化部署-Jenkins的安装与环境配置(五十六)
目录: 自动化持续集成 (1)环境准备 (2)初始化 Jenkins 插件和管理员用户 (3)工作流程 (4)配置 Jenkins 构建工具 自动化持续集成 互联网软件的开发和发布…...
【尚跑】2024铜川红色照金半程马拉松赛,大爬坡152安全完赛
1、赛事背景 2024年9月22日8点,2024铜川红色照金半程马拉松赛于照金1933广场鸣枪起跑! 起跑仪式上,6000位选手们合唱《歌唱祖国》,熟悉的旋律响彻陕甘边革命根据地照金纪念馆前,激昂的歌声凝聚心中不变的热爱。随着国…...
WPS中让两列数据合并的方法
有这样一个需求,就是把A列数据和B列数据进行合并(空单元格略过)具体实现效果如图下: 该如何操作呢? 首先在新的一列第一个单元格中输入公式"A1&B1" 然后回车,就出现了两列单元格数据合并的效…...
使用yum为centos系统安装软件以及使用(包含阿里云yum源配置)
centos系统配置阿里云yum源 因为centos7官方停止维护,自带yum源用不了了,所以可以更换成阿里云yum源 方法: 使用root权限执行以下语句 curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo CentOS…...
《深度学习》【项目】OpenCV 发票识别 透视变换、轮廓检测解析及案例解析
目录 一、透视变换 1、什么是透视变换 2、操作步骤 1)选择透视变换的源图像和目标图像 2)确定透视变换所需的关键点 3)计算透视变换的变换矩阵 4)对源图像进行透视变换 5)对变换后的图像进行插值处理 二、轮廓检测…...
Linux 线程互斥
前言 对于初学线程的伙伴来讲,多线程并发访问导致的数据不一致问题,总是让人陷入怀疑,很多人只是给你说加锁!但没有人告诉你为什么?本篇博客将详解! 目录 前言 一、线程互斥 • 为什么票会出现负数的情…...
【Redis 源码】6AOF持久化
1 AOF功能说明 aof.c 文件是 Redis 中负责 AOF(Append-Only File)持久化的核心文件。AOF 持久化通过记录服务器接收到的每个写命令来实现数据的持久化。这样,在 Redis 重启时,可以通过重放这些命令来恢复数据。 2 AOF相关配置 a…...
6.MySQL基本查询
目录 表的增删查改Insert(插入)插入替换插入替换2 Retrieve(查找)SELECT 列全列查找指定列查询查询字段为表达式为查询结果指定别名结果去重 WHERE 条件order by子句筛选分页结果 Update(更新)delete&#…...
Linux字符设备驱动开发
Linux 字符设备驱动开发是内核模块开发中的一个重要部分,主要用于处理字节流数据设备(如串口、键盘、鼠标等)。字符设备驱动的核心任务是定义如何与用户空间程序交互,通常通过一组文件操作函数进行。这些函数会映射到 open、read、…...
HTML5+JavaScript绘制闪烁的网格错觉
HTML5JavaScript绘制闪烁的网格错觉 闪烁的网格错觉(scintillating grid illusion)是一种视觉错觉,通过简单的黑白方格网格和少量的精心设计,能够使人眼前出现动态变化的效果。 闪烁的栅格错觉,是一种经典的视觉错觉…...
每日OJ题_牛客_拼三角_枚举/DFS_C++_Java
目录 牛客_拼三角_枚举/DFS 题目解析 C代码1 C代码2 Java代码 牛客_拼三角_枚举/DFS 拼三角_枚举/DFS 题目解析 简单枚举,不过有很多种枚举方法,这里直接用简单粗暴的枚举方式。 C代码1 #include <iostream> #include <algorithm> …...
[uni-app]小兔鲜-01项目起步
项目介绍 效果演示 技术架构 创建项目 HBuilderX创建 下载HBuilderX编辑器 HBuilderX/创建项目: 选择模板/选择Vue版本/创建 安装插件: 工具/插件安装/uni-app(Vue3)编译器 vue代码不能直接运行在小程序环境, 编译插件帮助我们进行代码转换 绑定微信开发者工具: 指定微信开…...
安全的价值:构建现代企业的基础
物理安全对于组织来说并不是事后才考虑的问题:它是关键的基础设施。零售商、医疗保健提供商、市政当局、学校和所有其他类型的组织都依赖安全系统来保障其人员和场所的安全。 随着安全技术能力的不断发展,许多组织正在以更广泛的视角看待他们的投资&am…...
门面(外观)模式
简介 门面模式(Facade Pattern)又叫作外观模式,提供了一个统一的接口,用来访问子系统中的一群接口。其主要特征是定义了一个高层接口,让子系统更容易使用,属于结构型设计模式。 通用模板 创建子系统角色类…...
kotlin flow 使用
1 创建flow 方式1 通过携程扩展函数FlowKt中的flow扩展函数可以直接构建flow,只需要传递FlowCollector收集器实现类就可以了 private fun create1(){val intFlow createFlow()println("创建int flow: $intFlow")runBlocking {println("开始收集&…...
vue3 实现文本内容超过N行折叠并显示“...展开”组件
1. 实现效果 组件内文字样式取决与外侧定义 组件大小发生变化时,文本仍可以省略到指定行数 文本不超过时, 无展开,收起按钮 传入文本发生改变后, 组件展示新的文本 2. 代码 文件名TextEllipsis.vue <template><div ref"compRef" class"wq-text-ellip…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...
3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
什么是VR全景技术
VR全景技术,全称为虚拟现实全景技术,是通过计算机图像模拟生成三维空间中的虚拟世界,使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验,结合图文、3D、音视频等多媒体元素…...
Python竞赛环境搭建全攻略
Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型(算法、数据分析、机器学习等)不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...
ubuntu22.04有线网络无法连接,图标也没了
今天突然无法有线网络无法连接任何设备,并且图标都没了 错误案例 往上一顿搜索,试了很多博客都不行,比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动,重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...
