当前位置: 首页 > news >正文

flink中使用外部定时器实现定时刷新

背景:

我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

外部定时器定时加载实现

1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定时任务执行器private transient ScheduledExecutorService scheduledExecutorService;// 本地变量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.从db查询数据初始化本地变量
//        threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
//            for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
//            }// 2.2 更新本地变量threshold的值
//            threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地缓存 ---定时同步DB配置表的数据*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 获取本地缓存实例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 缓存内存配置项 */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地缓存数据*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地缓存数据*/public  String getByKey(String key){return configCache.getIfPresent(key);}}

2.在静态类中通过static语句块创建定时器并定时加载,代码如下

package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}

总结:

1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行

相关文章:

flink中使用外部定时器实现定时刷新

背景&#xff1a; 我们经常会使用到比如数据库中的配置表信息&#xff0c;而我们不希望每次都去查询db&#xff0c;那么我们就想定时把db配置表的数据定时加载到flink的本地内存中&#xff0c;那么如何实现呢&#xff1f; 外部定时器定时加载实现 1.在open函数中进行定时器的…...

Spring Cloud Pipelines 入门实践

文章目录 1. 前言2. Spring Cloud Pipelines 是做什么的2.1. 预定义的流程2.2. 集成测试和契约测试2.3.部署策略 4. Spring Cloud Pipelines的使用示例4.1. 创建一个Spring Boot应用4.2. 将代码托管到GitHub仓库4.3. 添加Spring Cloud Pipelines依赖4.4. 配置Spring Cloud Pipe…...

G1 GC详解及设置

一、概述 G1 GC&#xff0c;全称Garbage-First Garbage Collector&#xff0c;在JDK1.7中引入了G1 GC&#xff0c;从JAVA 9开始&#xff0c;G1 GC是默认的GC算法。通过-XX:UseG1GC参数来启用。G1收集器是工作在堆内不同分区上的收集器&#xff0c;分区既可以是年轻代也可以是老…...

GitHub详细教程

将代码推送到GitHub仓库涉及一系列的步骤。以下是详细的步骤说明&#xff1a; 创建一个新的仓库&#xff08;如果还没有的话&#xff09;&#xff1a; 访问 GitHub。登录您的帐户。点击页面右上角的图标&#xff0c;然后选择“New repository”。填写仓库名称、描述等信息&…...

【小沐学Python】Python实现Web图表功能(Dash)

文章目录 1、简介2、安装3、功能示例3.1 Hello World3.2 连接到数据3.3 可视化数据3.4 控件和回调3.5 设置应用的样式3.5.1 HTML and CSS3.5.2 Dash Design Kit (DDK)3.5.3 Dash Bootstrap Components3.5.4 Dash Mantine Components 4、更多示例4.1 Basic Dashboard4.2 Using C…...

【RabbitMQ】docker rabbitmq集群 docker搭建rabbitmq集群

docker rabbitmq集群 docker搭建rabbitmq集群 RabbitMQ提供了两种常用的集群模式 1.普通集群模式 2.镜像集群模式 普通集群模式只能同步主节点上的交换机和队列信息&#xff0c;但对于队列中的消息不做同步&#xff0c;主节点宕机也不能进行切换&#xff08;故障转移&#xff…...

Linux 网络驱动实验

本文章对Linux 网络驱动实验中的设备树进行介绍&#xff0c;Linux网络驱动程序比较复杂&#xff0c;只要学会应用。 1、I.MX6ULL 网络外设设备树 I.MX6ULL 有两个 10/100M 的网络 MAC 外设&#xff0c;因此 I.MX6ULL 网络驱动主要就是这两个网络 MAC 外设的驱动。 fec1…...

访问Apache Tomcat的虚拟主机管理页面

介绍 通过Tomcat Host Manager应用可以创建、删除、管理Tomcat内的虚拟主机&#xff08;virtual hosts&#xff09;。该应用是Tomcat安装的一部分&#xff0c;默认在<Tomcat安装目录>/webapps/host-manager&#xff1a; 配置用户名、密码、角色 要访问Host Manager应…...

【算法】排序——归并排序和计数排序

主页点击直达&#xff1a;个人主页 我的小仓库&#xff1a;代码仓库 C语言偷着笑&#xff1a;C语言专栏 数据结构挨打小记&#xff1a;初阶数据结构专栏 Linux被操作记&#xff1a;Linux专栏 LeetCode刷题掉发记&#xff1a;LeetCode刷题 算法头疼记&#xff1a;算法专栏…...

discuz封面设置失败的解决办法(centos系统+windows系统)

discuz封面设置失败的解决办法(centos系统windows系统&#xff09; centos系统&#xff1a;1、开启/var/www/html 这个目录的读写权限chmod -R 777 /var/www/html然后重启httpd&#xff1a;service httpd restart如果discuz论坛发布帖子&#xff0c;还是显示封面设置失败的话…...

AI绘画-Stable Diffusion笔记

软件&#xff1a;Stable Diffusion 视频教程来自 https://www.bilibili.com/video/BV1As4y127HW/?spm_id_from333.337.search-card.all.click 提示词 提示词类别 内容型提示词 人物主题特征&#xff1a; 服饰穿搭&#xff1a;white dress 发型发色&#xff1a;blonde hair,l…...

中值滤波算法及例程

中值滤波是一种常用的非线性图像滤波算法&#xff0c;它能够有效去除图像中的椒盐噪声&#xff08;即孤立的亮或暗像素点&#xff09;&#xff0c;同时保持图像边缘和细节的清晰度。中值滤波的主要思想是使用一个滑动窗口&#xff0c;在窗口内对像素值进行排序&#xff0c;并将…...

SpringBoot 如何使用 Ehcache 作为缓存

使用Spring Boot Sleuth进行分布式跟踪 在现代分布式应用程序中&#xff0c;跟踪请求和了解应用程序的性能是至关重要的。Spring Boot Sleuth是一个分布式跟踪解决方案&#xff0c;它可以帮助您在分布式系统中跟踪请求并分析性能问题。本文将介绍如何在Spring Boot应用程序中使…...

Stable Diffusion 图片换脸插件Roop保姆教程 附错误解决办法和API使用

换脸技术已经不是新鲜事物,但如何实现简单、快速、高效的换脸操作呢?Roop插件正是为解决这一问题而生的。 sd-webui-roop 插件适用于已经本地部署了SD的用户。相较于传统的换脸技术,Roop插件几乎不需要训练,只需一张照片,即可在10秒内完成换脸。 但是要注意到是必须注意…...

华为OD机试 - 组成最大数(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#xff09;》…...

十一、2023.10.5.计算机网络(end).11

文章目录 17、说说 TCP 可靠性保证&#xff1f;18、简述 TCP 滑动窗口以及重传机制?19、说说滑动窗口过小怎么办?20、说说如果三次握手时候每次握手信息对方没收到会怎么样&#xff0c;分情况介绍&#xff1f;21、简述 TCP 的 TIME_WAIT&#xff0c;为什么需要有这个状态&…...

基于SpringBoot的网上摄影工作室

目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 作品分类管理 轮播图管理 摄影作品管理 摄影作品收藏 摄影圈 摄影作品发布 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统…...

Spring源码解析——IOC之bean 的初始化

正文 一个 bean 经历了 createBeanInstance() 被创建出来&#xff0c;然后又经过一番属性注入&#xff0c;依赖处理&#xff0c;历经千辛万苦&#xff0c;千锤百炼&#xff0c;终于有点儿 bean 实例的样子&#xff0c;能堪大任了&#xff0c;只需要经历最后一步就破茧成蝶了。…...

互联网摸鱼日报(2023-10-07)

互联网摸鱼日报(2023-10-07) 36氪新闻 小米汽车将研发增程式电动车&#xff0c;产品已有规划&#xff1b;LG新能源和丰田汽车北美公司签署电动汽车电池供应协议&#xff5c;36氪新能源日报1005 详解企业数字化转型建设过程中所需的七种能力 电商平台&#xff0c;如何让丰收「…...

深入理解RBAC

RBAC是一种基于角色实现访问控制的权限管理机制&#xff0c;通过定义角色和权限、用户和角色、角色和角色之间的关系&#xff0c;实现多层次、细粒度、可复用的权限管理系统。原文: Role-based Access Control (RBAC) Model[1] Bernard HermantUnsplash Avery Pennarun写的&quo…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...

AD学习(3)

1 PCB封装元素组成及简单的PCB封装创建 封装的组成部分&#xff1a; &#xff08;1&#xff09;PCB焊盘&#xff1a;表层的铜 &#xff0c;top层的铜 &#xff08;2&#xff09;管脚序号&#xff1a;用来关联原理图中的管脚的序号&#xff0c;原理图的序号需要和PCB封装一一…...

前端调试HTTP状态码

1xx&#xff08;信息类状态码&#xff09; 这类状态码表示临时响应&#xff0c;需要客户端继续处理请求。 100 Continue 服务器已收到请求的初始部分&#xff0c;客户端应继续发送剩余部分。 2xx&#xff08;成功类状态码&#xff09; 表示请求已成功被服务器接收、理解并处…...

[特殊字符] 手撸 Redis 互斥锁那些坑

&#x1f4d6; 手撸 Redis 互斥锁那些坑 最近搞业务遇到高并发下同一个 key 的互斥操作&#xff0c;想实现分布式环境下的互斥锁。于是私下顺手手撸了个基于 Redis 的简单互斥锁&#xff0c;也顺便跟 Redisson 的 RLock 机制对比了下&#xff0c;记录一波&#xff0c;别踩我踩过…...