当前位置: 首页 > news >正文

Flink使用

Window下启动支持

下载或复制老版本的放在bin目录下即可;

flink.bat

@echo off
setlocalSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\pluginsSET JVM_ARGS=-Xmx512mSET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*endlocal

start-cluster.bat

@echo off
setlocal EnableDelayedExpansionSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\logSET JVM_ARGS=-Xms1024m -Xmx1024mSET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties":: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do ( 
SET /A y = %%x+1 
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!"  2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!"  2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0"  2> nul
RENAME "%logname_tm%" "%logname_tm%.0"  2> nul
RENAME "%outname_jm%" "%outname_jm%.0"  2> nul
RENAME "%outname_tm%" "%outname_tm%.0"  2> nul
DEL "%logname_jm%.6"  2> nul
DEL "%logname_tm%.6"  2> nul
DEL "%outname_jm%.6"  2> nul
DEL "%outname_tm%.6"  2> nulfor %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (echo java.exe was not found in PATH variablegoto :eof
)echo Starting a local cluster with one JobManager process and one TaskManager process.echo You can terminate the processes via CTRL-C in the spawned shell windows.echo Web interface by default on http://localhost:8081/.start java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1endlocal

Flink实战(Java/MongoDB/Mysql)

    <properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><flink-version>1.14.6</flink-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink-version}</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><optional>true</optional></dependency><!-- mongodb --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.50</version></dependency><!-- Spring Boot Starter MyBatis --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.6</version></dependency><!-- MySQL Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies>

Flink mongodb 

Flink CDC在MongoDB的副本集(replica set)上使用了$changeStream操作,而该操作仅支持副本集。如果你的MongoDB是单个实例(standalone),则不支持$changeStream操作。

MongoDB开启副本集(Replica Set),否则代码运行会报错。

Flink报错记录

[20210910 10:16:40.107] [DEBUG] [main] [StreamGraph.java:391][addOperator] Vertex: 2
java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1947)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)

添加

            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency>

[20210910 10:20:02.172] [INFO] [main] [RestServerEndpoint.java:139][start] Starting rest endpoint.
[20210910 10:20:02.203] [DEBUG] [main] [DispatcherRestEndpoint.java:124][initializeWebSubmissionHandlers] Failed to load web based job submission extension.
org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:197)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeWebSubmissionHandlers(DispatcherRestEndpoint.java:112)at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.initializeHandlers(WebMonitorEndpoint.java:268)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:89)at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:144)at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:463)at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:422)at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:366)at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
[20210910 10:20:02.535] [DEBUG] [main] [InternalLoggerFactory.java:45][newDefaultFactory] Using SLF4J as the default logging framework
[20210910 10:20:02.536] [DEBUG] [main] [InternalThreadLocalMap.java:56][<clinit>] -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

 添加

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink-version}</version>
</dependency>

相关文章:

Flink使用

Window下启动支持 下载或复制老版本的放在bin目录下即可&#xff1b; flink.bat echo off setlocalSET bin%~dp0 SET FLINK_HOME%bin%.. SET FLINK_LIB_DIR%FLINK_HOME%\lib SET FLINK_PLUGINS_DIR%FLINK_HOME%\pluginsSET JVM_ARGS-Xmx512mSET FLINK_JM_CLASSPATH%FLINK_LI…...

简易屏幕共享工具-基于WebSocket

前面写了两个简单的屏幕共享工具&#xff0c;不过那只是为了验证通过截屏的方式是否可行&#xff0c;因为通常手动截屏的频率很低&#xff0c;而对于视频来说它的帧率要求就很高了&#xff0c;至少要一秒30帧率左右。所以&#xff0c;经过实际的截屏工具验证&#xff0c;我了解…...

Redis——主从复制模式

文章目录 1. 引入2. 主从复制模式2.1 概念2.2 配置2.3 原理2.3.1 建立连接阶段2.3.2 命令传播阶段2.3.3 心跳检测机制2.3.4 部分重同步机制(1) 主节点通过 复制积压缓冲区 记录写命令(2) 主节点通过 复制偏移量 判断从节点是否满足执行部分重同步的条件(3) 执行部分重同步操作 …...

简历_熟悉缓存高并发场景处理方法,如缓存穿透、缓存击穿、缓存雪崩

系列博客目录 文章目录 系列博客目录1.缓存穿透总结 2.缓存雪崩3.缓存击穿代码总结 1.缓存穿透 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库。 常见的解决方案有两种&#xff1a; 缓存空对…...

阿里云电商平台用户行为分析与人群画像系统设计与实现

通过在阿里云&#xff08;https://baike.baidu.com/item/%E9%98%BF%E9%87%8C%E4%BA%91/297128&#xff09;上构建包含数据源层、数据存储层、数据处理层、数据分析层和数据应用层的系统架构&#xff0c;并设计合理的数据模型、ETL流程、数据质量与性能监控机制以及安全与合规性…...

Go语言的 的输入/输出流(I/O Streams)核心知识

Go语言的输入/输出流&#xff08;I/O Streams&#xff09;核心知识 前言 Go语言是一种现代编程语言&#xff0c;因其高效性、简洁性及强大的并发支持而受到开发者的喜爱。在开发应用程序时&#xff0c;输入/输出&#xff08;I/O&#xff09;操作是一个不可或缺的部分。无论是…...

57.在 Vue 3 中使用 OpenLayers 点击选择 Feature 设置特定颜色

在 Web 开发中&#xff0c;地图应用是非常常见的需求&#xff0c;而 OpenLayers 是一个非常强大的地图库&#xff0c;它提供了丰富的地图操作功能。今天&#xff0c;我们将一起学习如何在 Vue 3 中结合 OpenLayers 使用点击事件来选择地图上的 Feature&#xff0c;并设置特定的…...

数据结构C语言描述8(图文结合)--哈希、哈希冲突、开放地址法、链地址法等实现

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…...

自动化立体库安全使用管理制度完整版

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。欢迎大家到本文底部评论区留言。 新书《智能物流系统构成与技术实践》人俱乐部 完整版文件和更多学习资料&#xff0c;请球友到知识星球【智能仓储物流技术研习社】自行下载。 以下是《…...

云打印之拼多多打印组件交互协议

拼多多打印组件交互协议相关介绍如下&#xff1a; 1、打印组件下载地址 http://meta.pinduoduo.com/api/one/app/v1/lateststable?appIdcom.xunmeng.pddprint&platformwindows&subTypemain 2、socket连接端口 如果是http的话&#xff0c;端口是5000 socket new …...

TCP 演进之路:软硬件跷跷板与新征程

今天依旧是与 TCP 相关的一个短评。 先看软硬件间的胶着。晶体管诞生以来&#xff0c;硬件一直在突飞猛进发展&#xff0c;后来这个事被摩尔定律正则化&#xff0c;人们开始可以预测未来&#xff0c;但即便如此&#xff0c;软件依然跟不上来&#xff0c;不过几年&#xff0c;老…...

React最小状态管理Jotai

Jotai 状态管理 1. 简介 Jotai 是一个基于原子 atom 概念的 React 状态管理库&#xff0c;它提供了简单且灵活的方式来管理应用状态, 而且非常轻量&#xff0c; 大厂用的非常多。 JotaiRedux适合单个页面&#xff0c;多次用到的属性适合全局公共属性超级轻量&#xff08;与use…...

计算机网络 —— 网络编程(TCP)

计算机网络 —— 网络编程&#xff08;TCP&#xff09; TCP和UDP的区别TCP (Transmission Control Protocol)UDP (User Datagram Protocol) 前期准备listen &#xff08;服务端&#xff09;函数原型返回值使用示例注意事项 accpect &#xff08;服务端&#xff09;函数原型返回…...

字玩FontPlayer开发笔记4 性能优化 首屏加载时间优化

字玩FontPlayer开发笔记4 性能优化 首屏加载时间优化 字玩FontPlayer是笔者开源的一款字体设计工具&#xff0c;使用Vue3 ElementUI开发&#xff0c;源代码&#xff1a; github: https://github.com/HiToysMaker/fontplayer gitee: https://gitee.com/toysmaker/fontplayer …...

RabbitMQ案例

1. 导入依赖 <!--AMQP依赖&#xff0c;包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency> 发送消息 注入RabbitTemplate Autowired RabbitT…...

智能工厂的设计软件 应用场景的一个例子:为AI聊天工具添加一个知识系统 之13 方案再探之4:特定于领域的模板 之 div模型(完整版)

前景提要 整个“方案再探”篇 围绕着如何将项目附件文档中Part 1 部分中给出的零散问题讨论整理、重组为一个结构化的设计文档。为此提出了讨论题目&#xff1a; 特定于领域的模板--一个三套接的hoc结构 它是本项目actors 的剧本原型。其地位&#xff1a; 祖传代码脚本模板…...

WebRtc02:WebRtc架构、目录结构、运行机制

整体架构 WebRtc主要分为三层&#xff1a; CAPI层&#xff1a;外层调用Session管理核心层&#xff1a;包括视频引擎、音频引擎、网络传输 可由使用者重写视频引擎&#xff1a;编解码器、视频缓存、视频增强音频引擎&#xff1a;编解码器、音频缓存、回音消除、降噪传输&#x…...

数据结构复习 (顺序查找,对半查找,斐波那契查找,插值查找,分块查找)

查找&#xff08;检索&#xff09;&#xff1a; 定义&#xff1a;从给定的数据中找到对应的K 1&#xff0c;顺序查找&#xff1a; O(n)的从前向后的遍历 2&#xff0c;对半查找&#xff0c;要求有序 从中间开始查找&#xff0c;每次检查中间的是否正确&#xff0c;不正确就…...

el-input输入框需要支持多输入,最后传输给后台的字段值以逗号分割

需求&#xff1a;一个输入框字段需要支持多次输入&#xff0c;最后传输给后台的字段值以逗号分割 解决方案&#xff1a;结合了el-tag组件的动态编辑标签 那块的代码 //子组件 <template><div class"input-multiple-box" idinputMultipleBox><div>…...

C# 枚举格式字符串

总目录 前言 当前文章为 C# 中的格式设置(格式化字符串) 大全 中的一个小章节。 一、概述 1. 基本信息 可以使用 Enum.ToString 方法&#xff0c;新建表示枚举成员的数字值、十六进制值或字符串值的字符串对象。枚举格式说明符不区分大小写。 二、自定义数字格式说明符详解…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法

树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作&#xff0c;无需更改相机配置。但是&#xff0c;一…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

协议转换利器,profinet转ethercat网关的两大派系,各有千秋

随着工业以太网的发展&#xff0c;其高效、便捷、协议开放、易于冗余等诸多优点&#xff0c;被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口&#xff0c;具有实时性、开放性&#xff0c;使用TCP/IP和IT标准&#xff0c;符合基于工业以太网的…...

Python 高效图像帧提取与视频编码:实战指南

Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...