当前位置: 首页 > news >正文

flink学习(13)—— 重试机制和维表join

重试机制

当任务出现异常的时候,会直接停止任务——解决方式,重试机制

1、设置checkpoint后,会给任务一个重启策略——无限重启

2、可以手动设置任务的重启策略

代码设置

//开启checkpoint后,默认是无限重启,可以设置该值 表示不重启
env.setRestartStrategy(RestartStrategies.noRestart());//作业失败flink中最多重启3次,每次重启的最小间隔是10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内最多重启3次,每次重启的最小间隔是5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);//无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,  // 无限重启次数Time.of(10, TimeUnit.SECONDS)  // 每次重启的延迟时间
));

维表join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果

那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

维表一般的特点是变化比较慢。——名词表,维度表。

解决方式

 解决维表join的方式方式一:可以用一个静态代码块,或者在open方法中对一个集合初始化,用于存放想要相关联的数据。缺点:数据不能动态改变了方式二:在open中初始化连接,在map中每拿到流中的一条数据,就去mysql中查找一次缺点:数据可以动态改变,但是去mysql查找的次数太多了方式三:创建一个缓存区,用于存放数据,若过期则再去mysql中查询数据。没有缺点,可以动态获取数据了,也减少了mysql的查询次数(缓冲)唯一的是,若是多线程,可能会去mysql查询多次

方式一

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.List;
import java.util.Map;
import java.util.Properties;/*** 直接从mysql中拿出* 弊端 只能拿到一次 不能实现动态*/
public class _03_维表join_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;List<Map<String, Object>> list = null;@Overridepublic void open(Configuration parameters) throws Exception {// 在open中执行sqlpool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);String sql = "select * from city ";list = queryRunner.query(sql, new MapListHandler());}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {String[] split = line.split(",");Object cityName = "未知";for (Map<String, Object> map : list) {String cityId = (String)map.get("city_id");if (cityId.equals(split[1])){cityName = map.get("city_name");}}return line+","+cityName;}}).print();env.execute();}
}

方式二

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Map;
import java.util.Properties;/*** 每次从kafka中拿到一条数据就从mysql中查一遍* 弊端 对mysql的压力加大*/
public class _03_维表join_02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;@Overridepublic void open(Configuration parameters) throws Exception {pool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {// 在处理逻辑中执行sqlString[] split = line.split(",");String sql = "select city_name from city where city_id = ?";Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), split[1]);String cityName="未知";if (rs !=null){cityName = (String) rs.get("city_name");}return line+","+cityName;}}).print();env.execute();}
}

方式三

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** 最终 非常好的方式* 现在内存中查 查不到在去mysql中找* 唯一的问题是,假如是多线程情况下,可能会触发多次去mysql中查找的方法*/
public class _03_维表join_03_cache {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);// 记得设置并行度env.setParallelism(1);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;// 定义一个Cache// 第一个是传入的参数类型 第二个是存放的值的类型// 也就是,传入一个参数,根据这个值获取结果,拿的时候通过传入的值 拿存放的值LoadingCache<String, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {pool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(50, TimeUnit.SECONDS)//指定移除通知.removalListener(new RemovalListener<String, String>() {@Overridepublic void onRemoval(RemovalNotification<String, String> removalNotification) {System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoader<String, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(String cityId) throws Exception {String sql = "select city_name from city where city_id = ? ";Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), cityId);String cityName = null;if (rs!=null){cityName = (String) rs.get("city_name");}System.out.println("进入数据库查询成功,查询的值为"+cityId+"--"+cityName);return cityName;}});}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {String[] arr = line.split(",");// 使用这种方式取值String cityName = cache.get(arr[1]);return line+","+cityName;}}).print();env.execute();}
}

相关文章:

flink学习(13)—— 重试机制和维表join

重试机制 当任务出现异常的时候&#xff0c;会直接停止任务——解决方式&#xff0c;重试机制 1、设置checkpoint后&#xff0c;会给任务一个重启策略——无限重启 2、可以手动设置任务的重启策略 代码设置 //开启checkpoint后&#xff0c;默认是无限重启&#xff0c;可以…...

第三方Cookie的消亡与Google服务器端标记的崛起

随着互联网用户对隐私保护的关注日益增强&#xff0c;各大浏览器正在逐步淘汰第三方Cookie。这一变革深刻影响了广告商和数字营销人员的用户跟踪和数据分析方式。然而&#xff0c;Google推出的服务器端标记技术为这一挑战提供了新的解决方案。 什么是第三方Cookie&#xff1f; …...

微信小程序——文档下载功能分享(含代码)

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

Burp Suite 全面解析:开启你的 Web 安全测试之旅

声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&a…...

Oracle DataGuard 主备正常切换 (Switchover)

前言 众所周知&#xff0c;DataGuard 的切换分为两种情况&#xff1a; 系统正常情况下的切换&#xff1a;这种方式称为 switchover&#xff0c;是无损切换&#xff0c;不会丢失数据。灾难情况下的切换&#xff1a;这种情况下一般主库已经启动不起来了&#xff0c;称为 failov…...

为什么编程语言会设计不可变的对象?字符串不可变?NSString *s = @“hello“变量s是不可变的吗?Rust内部可变性的意义?

为什么编程语言会设计不可变的对象? Java和C#中String是不可变的&#xff0c;StringBuilder是可变的。Obj-C中NSArray是不可变数组&#xff0c;NSMutableArray是可变数组。编程语言设计不可变的对象其实是为了优化(更高性能和节省存储空间)、安全(包括线程安全)。 字符串不可变…...

安装 RabbitMQ 服务

安装 RabbitMQ 服务 一. RabbitMQ 需要依赖 Erlang/OTP 环境 (1) 先去 RabbitMQ 官网&#xff0c;查看 RabbitMQ 需要的 Erlang 支持&#xff1a;https://www.rabbitmq.com/ 进入官网&#xff0c;在 Docs -> Install and Upgrade -> Erlang Version Requirements (2) …...

爬虫—Scrapy 整合 ChromeDriver 实现动态网页拉取

在进行爬虫开发时&#xff0c;使用 Scrapy 配合 ChromeDriver 来模拟真实浏览器加载 JavaScript 渲染内容是一种常见且高效的方法。Scrapy 本身是一个非常强大的爬虫框架&#xff0c;然而它默认使用的是 requests 库来抓取静态网页内容。对于需要通过 JavaScript 渲染的动态网页…...

Linux 进程管理详解

Linux 进程管理详解 引言 在现代操作系统中&#xff0c;进程是执行程序的基本单位。Linux作为一个强大的多任务操作系统&#xff0c;提供了丰富且灵活的机制来管理和控制进程。本文将详细介绍Linux进程管理的基本概念、核心机制以及常用的管理工具&#xff0c;帮助读者深入了…...

MySQL更新JSON字段key:value形式

MySQL更新JSON字段key:value形式 1. 介绍 ‌MySQL的JSON数据类型‌是MySQL 5.7及以上版本中引入的一种数据类型&#xff0c;用于存储JSON格式的数据。使用JSON数据类型可以自动校验文档是否满足JSON格式的要求&#xff0c;优化存储格式&#xff0c;并允许快速访问文档中的特定…...

vue.js学习(day 18)

实例&#xff1a;面经基础版...

WINDOWS 单链表SLIST_ENTRY使用

1.初始化链表头 //初始化链表头qq1490900437 void InitialGloubleVar() {while (1){G_Handle.SaveProcessThreadHandle (PSLIST_HEADER)_aligned_malloc(sizeof(SLIST_HEADER), MEMORY_ALLOCATION_ALIGNMENT);if (G_Handle.SaveProcessThreadHandle ! NULL){break;}}Initiali…...

【Linux 篇】Docker 容器星河与镜像灯塔:Linux 系统下解锁应用部署奇幻征程

文章目录 【Linux 篇】Docker 容器星河与镜像灯塔&#xff1a;Linux 系统下解锁应用部署奇幻征程前言一 、docker上部署mysql1. 拉取mysql镜像2. 创建容器3. 远程登录mysql 二 、docker上部署nginx1. 拉取nginx镜像2. 在dockerTar目录下 上传nginx.tar rz命令3. 创建nginx容器4…...

不同云计算网络安全等级

导读云计算的本质是服务&#xff0c;如果不能将计算资源规模化/大范围的进行共享&#xff0c;如果不能真正以服务的形式提供&#xff0c;就根本算不上云计算。 等级保护定级流程 定级是开展网络安全等级保护工作的 “基本出发点”&#xff0c;虚拟化技术使得传统的网络边界变…...

手机实时提取SIM卡打电话的信令声音-蓝牙电话如何适配eSIM卡的手机

手机实时提取SIM卡打电话的信令声音 --蓝牙电话如何适配eSIM卡的手机 一、前言 蓝牙电话的海外战略中&#xff0c;由于海外智能手机市场中政策的差异性&#xff0c;对内置eSIM卡的手机进行支持是非常合理的需求。Android系列手机中&#xff0c;无论是更换通信运营商&#xf…...

视频流媒体服务解决方案之Liveweb视频汇聚平台

一&#xff0c;Liveweb视频汇聚平台简介: LiveWeb是深圳市好游科技有限公司开发的一套综合视频汇聚管理平台&#xff0c;可提供多协议&#xff08;RTSP/RTMP/GB28181/海康Ehome/大华&#xff0c;海康SDK等&#xff09;的视频设备接入&#xff0c;支持GB/T28181上下级联&#xf…...

【在Linux世界中追寻伟大的One Piece】多线程(三)

目录 1 -> Linux线程同步 1.1 -> 条件变量 1.2 -> 同步概念与竞态条件 1.3 -> 条件变量函数 1.4 -> 为什么pthread_cond_wait需要互斥量 1.5 -> 条件变量使用规范 2 -> 生产者消费者模型 2.1 -> 为什么要使用生产者消费者模型 2.2 -> 生产…...

mvc命令

命令 mvc MVC(Model-View-Controller)是一种软件架构模式,用于组织和管理应用程序的代码mvc重要的三部分 (1)‌模型&#xff08;Model&#xff09;‌&#xff1a;负责存储系统的中心数据&#xff0c;提供访问数据的函数&#xff0c;封装了应用程序的功能内核。 (2)视图&…...

17 go语言(golang) - 错误处理

错误处理 错误处理是编程中用于识别、响应和恢复程序运行时出现的错误和异常情况的过程。其目的是确保程序的鲁棒性&#xff08;一个系统、模型或函数在面对错误输入、工作压力、意外情况或故意攻击时仍能保持稳定性和可靠性的能力&#xff09;&#xff0c;即使在出现错误的情…...

PG 库停库超时异常案例

文章目录 现象官方文档停库底层流程:恢复脚本优化思路总结 现象 停库超时 <2024-11-29 12:50:43.022 UTC 87472 192.167.60.1(54862) PostgreSQL JDBC Driver postgres stk>FATAL: terminating connection due to administrator command <2024-11-29 12:50:43.022 …...

EdgeRemover终极指南:3种简单方法彻底卸载Windows 10/11的Microsoft Edge浏览器

EdgeRemover终极指南&#xff1a;3种简单方法彻底卸载Windows 10/11的Microsoft Edge浏览器 【免费下载链接】EdgeRemover A PowerShell script that correctly uninstalls or reinstalls Microsoft Edge on Windows 10 & 11. 项目地址: https://gitcode.com/gh_mirrors/…...

机器学习---监督学习入门实验全攻略(小白友好版)

新晋码农一枚&#xff0c;小编会定期整理一些写的比较好的代码和知识点&#xff0c;作为自己的学习笔记&#xff0c;试着做一下批注和补充&#xff0c;转载或者参考他人文献会标明出处&#xff0c;非商用&#xff0c;如有侵权会删改&#xff01;欢迎大家斧正和讨论&#xff01;…...

【独家首发】2026年AI知识管理工具淘汰预警:这7个曾上榜“年度创新”的产品已被头部科技公司集体弃用

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;2026年AI知识管理工具演进全景图 2026年&#xff0c;AI驱动的知识管理工具已从单点智能助手跃迁为组织级认知操作系统。其核心演进体现在三大维度&#xff1a;语义理解深度化、工作流原生融合、以及私有知识资…...

Recipe协议:TEE与RDMA赋能的分布式复制技术

1. 现代硬件加速的复制协议&#xff1a;Recipe在不可信云环境中的应用在分布式系统的世界里&#xff0c;复制协议就像一支交响乐团的指挥&#xff0c;确保每个乐手&#xff08;节点&#xff09;都能在正确的时间演奏正确的音符&#xff08;数据&#xff09;。传统的崩溃容错&am…...

昇腾CANN cann-samples:从示例代码到生产力工具的全路径

CANN 55 个仓库里&#xff0c;cann-samples 是最容易被低估的一个。它不定义新算子、不优化性能、不做架构设计——只提供可运行的代码示例。但正是因为「只提供示例」&#xff0c;cann-samples 是新手最快上手、老手最常查阅的仓库。每个示例都是独立可编译的项目&#xff1a;…...

uv虽快但包管理体验差:命令笨拙、更新不安全,改进之路在何方?

【uv项目承接与特点】自2023年以来&#xff0c;作者首次有空承接新的项目。Astral的uv在Python世界掀起热潮&#xff0c;它速度极快&#xff0c;能轻松处理Python版本&#xff0c;还能用一个二进制文件替代半打工具&#xff0c;作者之前也写过多篇关于它的文章。【uv使用体验问…...

医疗票据 OCR 识别 API 多场景落地指南:医保结算 + 商保理赔 + 医疗信息化(附 Python/Java 完整示例)

《医疗 OCR 识别 API 怎么选&#xff1f;&#xff08;报告单 / 发票 / 检测单&#xff09;》医疗票据 OCR 识别 API 多场景落地指南&#xff1a;医保结算 商保理赔 医疗信息化&#xff08;附 Python/Java 完整示例&#xff09; 导语&#xff1a;每天上万张医疗票据&#xff…...

树莓派Zero 2W + 0.96寸OLED屏保姆级接线与配置教程(附I2C开启与Python库安装)

树莓派Zero 2W与0.96寸OLED屏从接线到显示的完整实战指南 第一次拿到树莓派Zero 2W和0.96寸OLED屏时&#xff0c;那种既兴奋又忐忑的心情我至今记得——这么小的板子真能驱动屏幕吗&#xff1f;接线会不会烧毁设备&#xff1f;经过多次实践和踩坑&#xff0c;我整理出这份真正适…...

AI技术的未来发展方向

AI技术的未来发展方向AI技术的未来发展将围绕以下几个关键领域展开&#xff0c;这些方向不仅推动技术进步&#xff0c;也深刻影响社会和经济结构。通用人工智能&#xff08;AGI&#xff09;的探索AGI旨在实现与人类智能相当的通用性&#xff0c;能够跨领域学习和推理。当前研究…...

AI Agent 架构设计与实现原理深度解析

AI Agent 架构设计与实现原理深度解析 摘要 本文深入解析 AI Agent 的核心架构设计、关键组件原理及主流实现模式。从 ReAct 推理循环到记忆系统设计&#xff0c;从工具调用机制到生产级部署考量&#xff0c;全面剖析构建可靠智能体的技术要点。读者将掌握 AI Agent 的底层原…...