Spring Boot中配置Flink的资源管理
在 Spring Boot 中配置 Flink 的资源管理,需要遵循以下步骤:
- 添加 Flink 依赖项
在你的 pom.xml 文件中,添加 Flink 和 Flink-connector-kafka 的依赖项。这里以 Flink 1.14 版本为例:
<!-- Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>1.14.0</version></dependency>
</dependencies>
复制代码
- 创建 Flink 配置类
创建一个名为 FlinkConfiguration 的配置类,用于定义 Flink 的相关配置。
import org.apache.flink.configuration.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkConfiguration {@Beanpublic Configuration getFlinkConfiguration() {Configuration configuration = new Configuration();// 设置 Flink 的相关配置,例如:configuration.setString("rest.port", "8081");configuration.setString("taskmanager.numberOfTaskSlots", "4");return configuration;}
}
复制代码
- 创建 Flink 作业管理器
创建一个名为 FlinkJobManager 的类,用于管理 Flink 作业的生命周期。
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class FlinkJobManager {@Autowiredprivate Configuration flinkConfiguration;public JobExecutionResult execute(FlinkJob job) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConfiguration);// 配置 StreamExecutionEnvironment,例如设置 Checkpoint 等job.execute(env);return env.execute(job.getJobName());}
}
复制代码
- 创建 Flink 作业接口
创建一个名为 FlinkJob 的接口,用于定义 Flink 作业的基本方法。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public interface FlinkJob {String getJobName();void execute(StreamExecutionEnvironment env);
}
复制代码
- 实现 Flink 作业
创建一个实现了 FlinkJob 接口的类,用于定义具体的 Flink 作业逻辑。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class MyFlinkJob implements FlinkJob {@Overridepublic String getJobName() {return "My Flink Job";}@Overridepublic void execute(StreamExecutionEnvironment env) {Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "my-flink-job");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), kafkaProperties);DataStream<String> stream = env.addSource(kafkaConsumer);// 实现 Flink 作业逻辑// ...}
}
复制代码
- 在 Spring Boot 应用中运行 Flink 作业
在你的 Spring Boot 应用中,使用 FlinkJobManager 运行 Flink 作业。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class MyApplication implements CommandLineRunner {@Autowiredprivate FlinkJobManager flinkJobManager;@Autowiredprivate MyFlinkJob myFlinkJob;public static void main(String[] args) {SpringApplication.run(MyApplication.class, args);}@Overridepublic void run(String... args) throws Exception {flinkJobManager.execute(myFlinkJob);}
}
复制代码
通过以上步骤,你可以在 Spring Boot 中配置和运行 Flink 作业。注意,这里只是一个简单的示例,你可能需要根据实际需求调整代码。
相关文章:
Spring Boot中配置Flink的资源管理
在 Spring Boot 中配置 Flink 的资源管理,需要遵循以下步骤: 添加 Flink 依赖项 在你的 pom.xml 文件中,添加 Flink 和 Flink-connector-kafka 的依赖项。这里以 Flink 1.14 版本为例: <!-- Flink dependencies --><de…...
51单片机从入门到精通:理论与实践指南入门篇(二)
续51单片机从入门到精通:理论与实践指南(一)https://blog.csdn.net/speaking_me/article/details/144067372 第一篇总体给大家在(全局)总体上讲解了一下51单片机,那么接下来几天结束详细讲解,从…...
Notepad++ 替换所有数字给数字加单引号
前言 今天遇到这样一个场景: 要去更新某张表里 code1,2,3,4,5,6 的数据,把它的 name 设置为 ‘张三’ 但是 code在数据库里面的字段类型是 vachar(64),它自身携带索引 原本可以这样写 SQL: update tableA set namezhangsan where code in …...
【CANOE】【Capl】【RS232】控制串口设备
系列文章目录 内置函数,来控制传统的串口设备,比如继电器等 文章目录 系列文章目录前言一、控制串口二、自定义相关的参数RS232Configure**函数语法****函数功能****参数说明****返回值****示例代码** 三、回调函数的使用RS232OnSend**函数语法****函数…...
查找相关题目
1.顺序查找法适合于存储结构为(B )的线性表。 A.散列存储 B.顺序存储或链式存储 C.压缩存储 D.索引存储 顺序查找法的特点 2.适用于折半查找的表的存储方式及元素排列要求为(D ) 。 A.链接方式存储,元素无序 B.链接方式存储࿰…...
《独立开发:Spring 框架的综合应用》
一、Spring 框架概述 Spring 是一个分层的 Java SE/EE full-stack 轻量级开源框架,以 IoC 和 AOP 为内核,具有方便解耦、方便集成优秀框架、降低 Java EE API 使用难度等优点。 Spring 框架因其强大的功能以及卓越的性能而受到众多开发人员的喜爱。它是…...
数据工程流程
** 数据工程流程图** #mermaid-svg-ArT55xCISSfZImy3 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ArT55xCISSfZImy3 .error-icon{fill:#552222;}#mermaid-svg-ArT55xCISSfZImy3 .error-text{fill:#552222;stroke…...
Linux宝塔部署wordpress网站更换服务器IP后无法访问管理后台和打开网站页面显示错乱
一、背景: wordpress网站搬家,更换服务器IP后,如果没有域名时,使用服务器IP地址无法访问管理后台和打开网站页面显示错乱。 二、解决方法如下: 1.wordpress搬家后,在新服务器上,新建站点时&am…...
区块链知识体系
1. 区块链基础知识 Q: 什么是区块链? A: 区块链是一种去中心化的分布式账本技术,通过加密算法保证数据的不可篡改性和透明性。它由一系列按时间顺序链接的区块组成,每个区块包含一批交易记录。 Q: 区块链的主要特点是什么? 去…...
力扣第 66 题 “加一”
题目描述 给定一个由 非负整数组成的非空数组,表示一个整数。在该整数的基础上加一。 最高位数字在数组的首位,数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数不会以零开头。 示例 1: 输入: digits [1,2,3] 输出:…...
C语言数据结构与算法--简单实现队列的入队和出队
(一)队列的基本概念 和栈相反,队列(Queue)是一种先进先出(First In First Out)的线性表。只 允许在表的一端进行插入,而在另一端删除元素,如日常生活中的排队现象。队列中 允许插入的一端叫队尾…...
代码美学:MATLAB制作渐变色
输入颜色个数n,颜色类型: n 2; % 输入颜色个数 colors {[1, 0, 0], [0, 0, 1]}; createGradientHeatmap(n, colors); 调用函数: function createGradientHeatmap(n, colors)% 输入检查if length(colors) ~ nerror(输入的颜色数量与n不一…...
排序算法之冒泡排序篇
冒泡排序的思想: 是一个把元素从小到大排的一个算法思想 相邻的两个元素两两比较,大的那一个元素向后移,小的那个元素向前移 核心逻辑: 比较所有相邻的两个项,如果第一个比第二个大,就交换它们 从头开始…...
WPF ItemsControl控件
ItemsControl 是 WPF 中一个非常灵活的控件,用于显示一组数据项。它是一个基类,许多其他控件(如 ListBox, ListView, ComboBox 等)都是从 ItemsControl 继承而来。ItemsControl 的主要特点是它可以自定义数据项的显示方式…...
CentOS 上安装各种应用的命令行总结
在 CentOS 上安装各种应用的命令行方法可以通过不同的软件包管理工具完成,最常用的是 yum(CentOS 7及以前版本)和 dnf(CentOS 8及以上版本)。以下是一些常见应用的安装命令总结。 目录 1. 基本的包管理命令 2. 安装…...
Java中的JSONObject详解
文章目录 Java中的JSONObject详解一、引言二、JSONObject的创建与基本操作1、创建JSONObject2、添加键值对3、获取值 三、JSONObject的高级特性1、遍历JSONObject2、从字符串创建JSONObject3、JSONObject与JSONArray的结合使用4、更新和删除键值对 四、错误处理1. 键值存在性检…...
音视频流媒体直播/点播系统EasyDSS互联网视频云平台介绍
随着互联网技术的飞速发展,音视频流媒体直播已成为现代社会信息传递与娱乐消费的重要组成部分。在这样的背景下,EasyDSS互联网视频云平台应运而生,它以高效、稳定、便捷的特性,为音视频流媒体直播领域带来了全新的解决方案。 1、产…...
shell编程3,参数传递+算术运算
声明! 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&#…...
自动泊车“哐哐撞大墙”,小米SU7智驾功能bug缠身?
文/王俣祺 导语:小米SU7,自带热度与科技光环的“流量神车”,近日却以一种极为“狼狈”的方式闯入大众视野。多达70余辆小米SU7陷入“泊车魔咒”,瞬间在网络上炸开了锅。从“科技控”到“惹祸精”的背后,究竟藏着怎样的…...
RAG 与 HyDE
传统 RAG 与 HyDE,直观解释! 传统 RAG 系统的一个关键问题是问题在语义上与答案不相似。 考虑以下示例,您想要找到类似于“什么是 ML?”的句子。 “什么是 AI?” 可能看起来比“机器学习很有趣”更相似。 这种语义差…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)
漏洞概览 漏洞名称:Apache Flink REST API 任意文件读取漏洞CVE编号:CVE-2020-17519CVSS评分:7.5影响版本:Apache Flink 1.11.0、1.11.1、1.11.2修复版本:≥ 1.11.3 或 ≥ 1.12.0漏洞类型:路径遍历&#x…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
