flink中使用外部定时器实现定时刷新
背景:
我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?
外部定时器定时加载实现
1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示
package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定时任务执行器private transient ScheduledExecutorService scheduledExecutorService;// 本地变量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.从db查询数据初始化本地变量
// threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
// }// 2.2 更新本地变量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地缓存 ---定时同步DB配置表的数据*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 获取本地缓存实例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 缓存内存配置项 */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地缓存数据*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地缓存数据*/public String getByKey(String key){return configCache.getIfPresent(key);}}
2.在静态类中通过static语句块创建定时器并定时加载,代码如下
package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}
总结:
1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行
2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行
相关文章:
flink中使用外部定时器实现定时刷新
背景: 我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢? 外部定时器定时加载实现 1.在open函数中进行定时器的…...
Spring Cloud Pipelines 入门实践
文章目录 1. 前言2. Spring Cloud Pipelines 是做什么的2.1. 预定义的流程2.2. 集成测试和契约测试2.3.部署策略 4. Spring Cloud Pipelines的使用示例4.1. 创建一个Spring Boot应用4.2. 将代码托管到GitHub仓库4.3. 添加Spring Cloud Pipelines依赖4.4. 配置Spring Cloud Pipe…...
G1 GC详解及设置
一、概述 G1 GC,全称Garbage-First Garbage Collector,在JDK1.7中引入了G1 GC,从JAVA 9开始,G1 GC是默认的GC算法。通过-XX:UseG1GC参数来启用。G1收集器是工作在堆内不同分区上的收集器,分区既可以是年轻代也可以是老…...
GitHub详细教程
将代码推送到GitHub仓库涉及一系列的步骤。以下是详细的步骤说明: 创建一个新的仓库(如果还没有的话): 访问 GitHub。登录您的帐户。点击页面右上角的图标,然后选择“New repository”。填写仓库名称、描述等信息&…...
【小沐学Python】Python实现Web图表功能(Dash)
文章目录 1、简介2、安装3、功能示例3.1 Hello World3.2 连接到数据3.3 可视化数据3.4 控件和回调3.5 设置应用的样式3.5.1 HTML and CSS3.5.2 Dash Design Kit (DDK)3.5.3 Dash Bootstrap Components3.5.4 Dash Mantine Components 4、更多示例4.1 Basic Dashboard4.2 Using C…...
【RabbitMQ】docker rabbitmq集群 docker搭建rabbitmq集群
docker rabbitmq集群 docker搭建rabbitmq集群 RabbitMQ提供了两种常用的集群模式 1.普通集群模式 2.镜像集群模式 普通集群模式只能同步主节点上的交换机和队列信息,但对于队列中的消息不做同步,主节点宕机也不能进行切换(故障转移ÿ…...
Linux 网络驱动实验
本文章对Linux 网络驱动实验中的设备树进行介绍,Linux网络驱动程序比较复杂,只要学会应用。 1、I.MX6ULL 网络外设设备树 I.MX6ULL 有两个 10/100M 的网络 MAC 外设,因此 I.MX6ULL 网络驱动主要就是这两个网络 MAC 外设的驱动。 fec1…...
访问Apache Tomcat的虚拟主机管理页面
介绍 通过Tomcat Host Manager应用可以创建、删除、管理Tomcat内的虚拟主机(virtual hosts)。该应用是Tomcat安装的一部分,默认在<Tomcat安装目录>/webapps/host-manager: 配置用户名、密码、角色 要访问Host Manager应…...
【算法】排序——归并排序和计数排序
主页点击直达:个人主页 我的小仓库:代码仓库 C语言偷着笑:C语言专栏 数据结构挨打小记:初阶数据结构专栏 Linux被操作记:Linux专栏 LeetCode刷题掉发记:LeetCode刷题 算法头疼记:算法专栏…...
discuz封面设置失败的解决办法(centos系统+windows系统)
discuz封面设置失败的解决办法(centos系统windows系统) centos系统:1、开启/var/www/html 这个目录的读写权限chmod -R 777 /var/www/html然后重启httpd:service httpd restart如果discuz论坛发布帖子,还是显示封面设置失败的话…...
AI绘画-Stable Diffusion笔记
软件:Stable Diffusion 视频教程来自 https://www.bilibili.com/video/BV1As4y127HW/?spm_id_from333.337.search-card.all.click 提示词 提示词类别 内容型提示词 人物主题特征: 服饰穿搭:white dress 发型发色:blonde hair,l…...
中值滤波算法及例程
中值滤波是一种常用的非线性图像滤波算法,它能够有效去除图像中的椒盐噪声(即孤立的亮或暗像素点),同时保持图像边缘和细节的清晰度。中值滤波的主要思想是使用一个滑动窗口,在窗口内对像素值进行排序,并将…...
SpringBoot 如何使用 Ehcache 作为缓存
使用Spring Boot Sleuth进行分布式跟踪 在现代分布式应用程序中,跟踪请求和了解应用程序的性能是至关重要的。Spring Boot Sleuth是一个分布式跟踪解决方案,它可以帮助您在分布式系统中跟踪请求并分析性能问题。本文将介绍如何在Spring Boot应用程序中使…...
Stable Diffusion 图片换脸插件Roop保姆教程 附错误解决办法和API使用
换脸技术已经不是新鲜事物,但如何实现简单、快速、高效的换脸操作呢?Roop插件正是为解决这一问题而生的。 sd-webui-roop 插件适用于已经本地部署了SD的用户。相较于传统的换脸技术,Roop插件几乎不需要训练,只需一张照片,即可在10秒内完成换脸。 但是要注意到是必须注意…...
华为OD机试 - 组成最大数(Java 2023 B卷 100分)
目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷)》…...
十一、2023.10.5.计算机网络(end).11
文章目录 17、说说 TCP 可靠性保证?18、简述 TCP 滑动窗口以及重传机制?19、说说滑动窗口过小怎么办?20、说说如果三次握手时候每次握手信息对方没收到会怎么样,分情况介绍?21、简述 TCP 的 TIME_WAIT,为什么需要有这个状态&…...
基于SpringBoot的网上摄影工作室
目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 作品分类管理 轮播图管理 摄影作品管理 摄影作品收藏 摄影圈 摄影作品发布 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统…...
Spring源码解析——IOC之bean 的初始化
正文 一个 bean 经历了 createBeanInstance() 被创建出来,然后又经过一番属性注入,依赖处理,历经千辛万苦,千锤百炼,终于有点儿 bean 实例的样子,能堪大任了,只需要经历最后一步就破茧成蝶了。…...
互联网摸鱼日报(2023-10-07)
互联网摸鱼日报(2023-10-07) 36氪新闻 小米汽车将研发增程式电动车,产品已有规划;LG新能源和丰田汽车北美公司签署电动汽车电池供应协议|36氪新能源日报1005 详解企业数字化转型建设过程中所需的七种能力 电商平台,如何让丰收「…...
深入理解RBAC
RBAC是一种基于角色实现访问控制的权限管理机制,通过定义角色和权限、用户和角色、角色和角色之间的关系,实现多层次、细粒度、可复用的权限管理系统。原文: Role-based Access Control (RBAC) Model[1] Bernard HermantUnsplash Avery Pennarun写的&quo…...
Kubernetes原生自动化部署工具Keel:实现容器镜像自动更新的最后一公里
1. 项目概述:什么是Keel,以及它解决了什么问题如果你和我一样,在团队里负责过一段时间的应用部署和更新,那你一定对“发布日”的紧张感深有体会。开发那边代码一提交,这边就得开始手动拉取镜像、更新Kubernetes的Deplo…...
基于Kubernetes Lease构建分布式部署锁:解决CI/CD环境下的资源竞争
1. 项目概述:从“clawfight”看一场被遗忘的社区技术博弈看到“2019-02-18/clawfight”这个标题,很多人的第一反应可能是困惑。它不像一个标准的软件项目名,没有清晰的版本号,也没有指明具体的技术栈。但恰恰是这种看似随意的命名…...
如何永久保存你的微信聊天记录?WeChatExporter开源工具完整指南
如何永久保存你的微信聊天记录?WeChatExporter开源工具完整指南 【免费下载链接】WeChatExporter 一个可以快速导出、查看你的微信聊天记录的工具 项目地址: https://gitcode.com/gh_mirrors/wec/WeChatExporter 你是否曾经历过手机丢失、微信重装后珍贵聊天…...
FastAPI+AI应用脚手架:模块化架构与生产级实践指南
1. 项目概述:一个为AI应用量身定制的FastAPI脚手架如果你正在寻找一个能快速启动、结构清晰且功能强大的AI应用后端框架,那么fastapi-genai-boilerplate这个项目绝对值得你花时间研究。它不是一个简单的“Hello World”示例,而是一个面向生产…...
开源大模型推理引擎Takeoff部署指南:从原理到生产实践
1. 项目概述:一个让大模型推理“起飞”的开源引擎 如果你正在为如何将那些动辄几十GB、几百亿参数的大语言模型(LLM)部署到生产环境而头疼,或者厌倦了为每一次API调用支付高昂的费用,那么今天聊的这个项目,…...
ESP32接入ChatGPT API:构建本地化AIoT智能交互终端
1. 项目概述:当ESP32遇见ChatGPT,开启本地化智能交互新玩法最近在捣鼓ESP32开发板,总想着给它加点“智能”的料。传统的物联网项目,比如温湿度监测、远程控制开关,虽然实用,但总觉得少了点“灵魂”。直到我…...
ASPICE汽车软件开发标准:V模型、能力等级与核心过程实战解析
1. 项目概述:为什么我们需要ASPICE这张“汽车软件地图”如果你在汽车行业,尤其是涉及软件、电子电气或系统开发的岗位待过一阵子,大概率会频繁听到一个词:ASPICE。它可能出现在项目启动会上,出现在供应商审核清单里&am…...
AI对话记忆管理实战:memory-organizer库解决长上下文难题
1. 项目概述:一个为AI记忆体“瘦身”与“归档”的利器最近在折腾一些本地大语言模型(LLM)的应用,比如搭建个人知识库助手或者长期对话机器人,一个绕不开的痛点就是“记忆”的管理。模型本身没有持久记忆,每…...
医院内外部人员管理系统
基于计算机视觉技术的医院人员综合管理解决方案,整合人脸识别考勤与行人流量监控两大核心能力,实现内部员工身份验证、自动打卡签到,以及公共区域人流量实时统计与可视化分析,提升医院管理效率与安全保障水平。 [📺 系…...
AI与人类共创:从替代焦虑到协作闭环
GPT-Image 2 与人类创造力的共生:从“替代焦虑”到“协作闭环”(2026 研究视角与可落地实践)当 GPT-Image 2 这样的多模态生成/理解模型进入创作流程后,“竞争还是协作”立刻变成一个绕不开的讨论。直觉上,大家会把它理…...
