flink设置保存点和恢复保存点
增加了hdfs
package com.qyt;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** DataStreamSource API使用*/
public class StreamWordCount {public static void main(String[] args) throws Exception {//TODO 1、获取流的类final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.setProperty("HADOOP_USER_NAME", "root");env.enableCheckpointing(3000);// 配置存储检查点到文件系统env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop01:9000/flink"));env.getCheckpointConfig().setCheckpointTimeout(2000l);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//TODO 2、获取无界流DataStreamSource<String> stringDataStreamSource = env.socketTextStream("192.168.1.10", 9000, "\n");//TODO 3 ETL//TODO 3.1 转换成二元数组,简单ETL的过程SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);out.collect(tuple2);}}}).uid("etl");//TODO 3.1 分组KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//TODO 3.2 聚合计算SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);//TODO 4、打印sum.print();//TODO 5、无界流需要这个不断执行的方法env.execute();}
}
要增加hadoop客户端的使用
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://maven.apache.org/POM/4.0.0"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>
提交flink集群
#生成对应的任务
./flink run -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar
# 恢复上一次保存点,bc5fae2e282247486003ed259f2f37a7为jobID
./flink run -s hdfs://hadoop01:9000/flink/bc5fae2e282247486003ed259f2f37a7/chk-33 -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar
查看对应的jobId

相关文章:
flink设置保存点和恢复保存点
增加了hdfs package com.qyt;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;import org.apache.flink.streaming.api.datastream.Dat…...
使用python获取百度一下,热搜TOP数据详情
一、查找对应链接 # 警告:以下代码仅供学习和交流使用,严禁用于任何违法活动。 # 本代码旨在帮助理解和学习编程概念,不得用于侵犯他人权益或违反法律法规的行为。 1、打开百度页面 百度一下,你就知道 2、点击F12 或 右键鼠标…...
Go conc库学习与使用
文章目录 主要功能和特点conc 的安装典型使用场景示例代码并行执行多个 Goroutines错误处理限制并发 Goroutines 数量使用 context.Context 进行任务控制 常见问题1. **任务中发生 panic**原因:解决方法: 2. **conc.Group 重复调用 Wait()**原因…...
大模型prompt先关
对于未出现的任务,prompt编写技巧: 1、假设你是资深的摘要生成专家,根据提供的内容,总结对应的摘要信息。请生成一个指令,指令中带有一个使用例子。直接提供给大型模型以执行此任务。 2、基于大模型提供的内容再进行二…...
尚品汇-自动化部署-Jenkins的安装与环境配置(五十六)
目录: 自动化持续集成 (1)环境准备 (2)初始化 Jenkins 插件和管理员用户 (3)工作流程 (4)配置 Jenkins 构建工具 自动化持续集成 互联网软件的开发和发布…...
【尚跑】2024铜川红色照金半程马拉松赛,大爬坡152安全完赛
1、赛事背景 2024年9月22日8点,2024铜川红色照金半程马拉松赛于照金1933广场鸣枪起跑! 起跑仪式上,6000位选手们合唱《歌唱祖国》,熟悉的旋律响彻陕甘边革命根据地照金纪念馆前,激昂的歌声凝聚心中不变的热爱。随着国…...
WPS中让两列数据合并的方法
有这样一个需求,就是把A列数据和B列数据进行合并(空单元格略过)具体实现效果如图下: 该如何操作呢? 首先在新的一列第一个单元格中输入公式"A1&B1" 然后回车,就出现了两列单元格数据合并的效…...
使用yum为centos系统安装软件以及使用(包含阿里云yum源配置)
centos系统配置阿里云yum源 因为centos7官方停止维护,自带yum源用不了了,所以可以更换成阿里云yum源 方法: 使用root权限执行以下语句 curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo CentOS…...
《深度学习》【项目】OpenCV 发票识别 透视变换、轮廓检测解析及案例解析
目录 一、透视变换 1、什么是透视变换 2、操作步骤 1)选择透视变换的源图像和目标图像 2)确定透视变换所需的关键点 3)计算透视变换的变换矩阵 4)对源图像进行透视变换 5)对变换后的图像进行插值处理 二、轮廓检测…...
Linux 线程互斥
前言 对于初学线程的伙伴来讲,多线程并发访问导致的数据不一致问题,总是让人陷入怀疑,很多人只是给你说加锁!但没有人告诉你为什么?本篇博客将详解! 目录 前言 一、线程互斥 • 为什么票会出现负数的情…...
【Redis 源码】6AOF持久化
1 AOF功能说明 aof.c 文件是 Redis 中负责 AOF(Append-Only File)持久化的核心文件。AOF 持久化通过记录服务器接收到的每个写命令来实现数据的持久化。这样,在 Redis 重启时,可以通过重放这些命令来恢复数据。 2 AOF相关配置 a…...
6.MySQL基本查询
目录 表的增删查改Insert(插入)插入替换插入替换2 Retrieve(查找)SELECT 列全列查找指定列查询查询字段为表达式为查询结果指定别名结果去重 WHERE 条件order by子句筛选分页结果 Update(更新)delete&#…...
Linux字符设备驱动开发
Linux 字符设备驱动开发是内核模块开发中的一个重要部分,主要用于处理字节流数据设备(如串口、键盘、鼠标等)。字符设备驱动的核心任务是定义如何与用户空间程序交互,通常通过一组文件操作函数进行。这些函数会映射到 open、read、…...
HTML5+JavaScript绘制闪烁的网格错觉
HTML5JavaScript绘制闪烁的网格错觉 闪烁的网格错觉(scintillating grid illusion)是一种视觉错觉,通过简单的黑白方格网格和少量的精心设计,能够使人眼前出现动态变化的效果。 闪烁的栅格错觉,是一种经典的视觉错觉…...
每日OJ题_牛客_拼三角_枚举/DFS_C++_Java
目录 牛客_拼三角_枚举/DFS 题目解析 C代码1 C代码2 Java代码 牛客_拼三角_枚举/DFS 拼三角_枚举/DFS 题目解析 简单枚举,不过有很多种枚举方法,这里直接用简单粗暴的枚举方式。 C代码1 #include <iostream> #include <algorithm> …...
[uni-app]小兔鲜-01项目起步
项目介绍 效果演示 技术架构 创建项目 HBuilderX创建 下载HBuilderX编辑器 HBuilderX/创建项目: 选择模板/选择Vue版本/创建 安装插件: 工具/插件安装/uni-app(Vue3)编译器 vue代码不能直接运行在小程序环境, 编译插件帮助我们进行代码转换 绑定微信开发者工具: 指定微信开…...
安全的价值:构建现代企业的基础
物理安全对于组织来说并不是事后才考虑的问题:它是关键的基础设施。零售商、医疗保健提供商、市政当局、学校和所有其他类型的组织都依赖安全系统来保障其人员和场所的安全。 随着安全技术能力的不断发展,许多组织正在以更广泛的视角看待他们的投资&am…...
门面(外观)模式
简介 门面模式(Facade Pattern)又叫作外观模式,提供了一个统一的接口,用来访问子系统中的一群接口。其主要特征是定义了一个高层接口,让子系统更容易使用,属于结构型设计模式。 通用模板 创建子系统角色类…...
kotlin flow 使用
1 创建flow 方式1 通过携程扩展函数FlowKt中的flow扩展函数可以直接构建flow,只需要传递FlowCollector收集器实现类就可以了 private fun create1(){val intFlow createFlow()println("创建int flow: $intFlow")runBlocking {println("开始收集&…...
vue3 实现文本内容超过N行折叠并显示“...展开”组件
1. 实现效果 组件内文字样式取决与外侧定义 组件大小发生变化时,文本仍可以省略到指定行数 文本不超过时, 无展开,收起按钮 传入文本发生改变后, 组件展示新的文本 2. 代码 文件名TextEllipsis.vue <template><div ref"compRef" class"wq-text-ellip…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
