当前位置: 首页 > news >正文

Flink使用

Window下启动支持

下载或复制老版本的放在bin目录下即可;

flink.bat

@echo off
setlocalSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\pluginsSET JVM_ARGS=-Xmx512mSET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*endlocal

start-cluster.bat

@echo off
setlocal EnableDelayedExpansionSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\logSET JVM_ARGS=-Xms1024m -Xmx1024mSET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties":: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do ( 
SET /A y = %%x+1 
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!"  2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!"  2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0"  2> nul
RENAME "%logname_tm%" "%logname_tm%.0"  2> nul
RENAME "%outname_jm%" "%outname_jm%.0"  2> nul
RENAME "%outname_tm%" "%outname_tm%.0"  2> nul
DEL "%logname_jm%.6"  2> nul
DEL "%logname_tm%.6"  2> nul
DEL "%outname_jm%.6"  2> nul
DEL "%outname_tm%.6"  2> nulfor %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (echo java.exe was not found in PATH variablegoto :eof
)echo Starting a local cluster with one JobManager process and one TaskManager process.echo You can terminate the processes via CTRL-C in the spawned shell windows.echo Web interface by default on http://localhost:8081/.start java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1endlocal

Flink实战(Java/MongoDB/Mysql)

    <properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><flink-version>1.14.6</flink-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink-version}</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><optional>true</optional></dependency><!-- mongodb --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.50</version></dependency><!-- Spring Boot Starter MyBatis --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.6</version></dependency><!-- MySQL Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies>

Flink mongodb 

Flink CDC在MongoDB的副本集(replica set)上使用了$changeStream操作,而该操作仅支持副本集。如果你的MongoDB是单个实例(standalone),则不支持$changeStream操作。

MongoDB开启副本集(Replica Set),否则代码运行会报错。

Flink报错记录

[20210910 10:16:40.107] [DEBUG] [main] [StreamGraph.java:391][addOperator] Vertex: 2
java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1947)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)

添加

            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency>

[20210910 10:20:02.172] [INFO] [main] [RestServerEndpoint.java:139][start] Starting rest endpoint.
[20210910 10:20:02.203] [DEBUG] [main] [DispatcherRestEndpoint.java:124][initializeWebSubmissionHandlers] Failed to load web based job submission extension.
org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:197)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeWebSubmissionHandlers(DispatcherRestEndpoint.java:112)at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.initializeHandlers(WebMonitorEndpoint.java:268)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:89)at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:144)at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:463)at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:422)at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:366)at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
[20210910 10:20:02.535] [DEBUG] [main] [InternalLoggerFactory.java:45][newDefaultFactory] Using SLF4J as the default logging framework
[20210910 10:20:02.536] [DEBUG] [main] [InternalThreadLocalMap.java:56][<clinit>] -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

 添加

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink-version}</version>
</dependency>

相关文章:

Flink使用

Window下启动支持 下载或复制老版本的放在bin目录下即可&#xff1b; flink.bat echo off setlocalSET bin%~dp0 SET FLINK_HOME%bin%.. SET FLINK_LIB_DIR%FLINK_HOME%\lib SET FLINK_PLUGINS_DIR%FLINK_HOME%\pluginsSET JVM_ARGS-Xmx512mSET FLINK_JM_CLASSPATH%FLINK_LI…...

简易屏幕共享工具-基于WebSocket

前面写了两个简单的屏幕共享工具&#xff0c;不过那只是为了验证通过截屏的方式是否可行&#xff0c;因为通常手动截屏的频率很低&#xff0c;而对于视频来说它的帧率要求就很高了&#xff0c;至少要一秒30帧率左右。所以&#xff0c;经过实际的截屏工具验证&#xff0c;我了解…...

Redis——主从复制模式

文章目录 1. 引入2. 主从复制模式2.1 概念2.2 配置2.3 原理2.3.1 建立连接阶段2.3.2 命令传播阶段2.3.3 心跳检测机制2.3.4 部分重同步机制(1) 主节点通过 复制积压缓冲区 记录写命令(2) 主节点通过 复制偏移量 判断从节点是否满足执行部分重同步的条件(3) 执行部分重同步操作 …...

简历_熟悉缓存高并发场景处理方法,如缓存穿透、缓存击穿、缓存雪崩

系列博客目录 文章目录 系列博客目录1.缓存穿透总结 2.缓存雪崩3.缓存击穿代码总结 1.缓存穿透 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库。 常见的解决方案有两种&#xff1a; 缓存空对…...

阿里云电商平台用户行为分析与人群画像系统设计与实现

通过在阿里云&#xff08;https://baike.baidu.com/item/%E9%98%BF%E9%87%8C%E4%BA%91/297128&#xff09;上构建包含数据源层、数据存储层、数据处理层、数据分析层和数据应用层的系统架构&#xff0c;并设计合理的数据模型、ETL流程、数据质量与性能监控机制以及安全与合规性…...

Go语言的 的输入/输出流(I/O Streams)核心知识

Go语言的输入/输出流&#xff08;I/O Streams&#xff09;核心知识 前言 Go语言是一种现代编程语言&#xff0c;因其高效性、简洁性及强大的并发支持而受到开发者的喜爱。在开发应用程序时&#xff0c;输入/输出&#xff08;I/O&#xff09;操作是一个不可或缺的部分。无论是…...

57.在 Vue 3 中使用 OpenLayers 点击选择 Feature 设置特定颜色

在 Web 开发中&#xff0c;地图应用是非常常见的需求&#xff0c;而 OpenLayers 是一个非常强大的地图库&#xff0c;它提供了丰富的地图操作功能。今天&#xff0c;我们将一起学习如何在 Vue 3 中结合 OpenLayers 使用点击事件来选择地图上的 Feature&#xff0c;并设置特定的…...

数据结构C语言描述8(图文结合)--哈希、哈希冲突、开放地址法、链地址法等实现

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…...

自动化立体库安全使用管理制度完整版

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。欢迎大家到本文底部评论区留言。 新书《智能物流系统构成与技术实践》人俱乐部 完整版文件和更多学习资料&#xff0c;请球友到知识星球【智能仓储物流技术研习社】自行下载。 以下是《…...

云打印之拼多多打印组件交互协议

拼多多打印组件交互协议相关介绍如下&#xff1a; 1、打印组件下载地址 http://meta.pinduoduo.com/api/one/app/v1/lateststable?appIdcom.xunmeng.pddprint&platformwindows&subTypemain 2、socket连接端口 如果是http的话&#xff0c;端口是5000 socket new …...

TCP 演进之路:软硬件跷跷板与新征程

今天依旧是与 TCP 相关的一个短评。 先看软硬件间的胶着。晶体管诞生以来&#xff0c;硬件一直在突飞猛进发展&#xff0c;后来这个事被摩尔定律正则化&#xff0c;人们开始可以预测未来&#xff0c;但即便如此&#xff0c;软件依然跟不上来&#xff0c;不过几年&#xff0c;老…...

React最小状态管理Jotai

Jotai 状态管理 1. 简介 Jotai 是一个基于原子 atom 概念的 React 状态管理库&#xff0c;它提供了简单且灵活的方式来管理应用状态, 而且非常轻量&#xff0c; 大厂用的非常多。 JotaiRedux适合单个页面&#xff0c;多次用到的属性适合全局公共属性超级轻量&#xff08;与use…...

计算机网络 —— 网络编程(TCP)

计算机网络 —— 网络编程&#xff08;TCP&#xff09; TCP和UDP的区别TCP (Transmission Control Protocol)UDP (User Datagram Protocol) 前期准备listen &#xff08;服务端&#xff09;函数原型返回值使用示例注意事项 accpect &#xff08;服务端&#xff09;函数原型返回…...

字玩FontPlayer开发笔记4 性能优化 首屏加载时间优化

字玩FontPlayer开发笔记4 性能优化 首屏加载时间优化 字玩FontPlayer是笔者开源的一款字体设计工具&#xff0c;使用Vue3 ElementUI开发&#xff0c;源代码&#xff1a; github: https://github.com/HiToysMaker/fontplayer gitee: https://gitee.com/toysmaker/fontplayer …...

RabbitMQ案例

1. 导入依赖 <!--AMQP依赖&#xff0c;包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency> 发送消息 注入RabbitTemplate Autowired RabbitT…...

智能工厂的设计软件 应用场景的一个例子:为AI聊天工具添加一个知识系统 之13 方案再探之4:特定于领域的模板 之 div模型(完整版)

前景提要 整个“方案再探”篇 围绕着如何将项目附件文档中Part 1 部分中给出的零散问题讨论整理、重组为一个结构化的设计文档。为此提出了讨论题目&#xff1a; 特定于领域的模板--一个三套接的hoc结构 它是本项目actors 的剧本原型。其地位&#xff1a; 祖传代码脚本模板…...

WebRtc02:WebRtc架构、目录结构、运行机制

整体架构 WebRtc主要分为三层&#xff1a; CAPI层&#xff1a;外层调用Session管理核心层&#xff1a;包括视频引擎、音频引擎、网络传输 可由使用者重写视频引擎&#xff1a;编解码器、视频缓存、视频增强音频引擎&#xff1a;编解码器、音频缓存、回音消除、降噪传输&#x…...

数据结构复习 (顺序查找,对半查找,斐波那契查找,插值查找,分块查找)

查找&#xff08;检索&#xff09;&#xff1a; 定义&#xff1a;从给定的数据中找到对应的K 1&#xff0c;顺序查找&#xff1a; O(n)的从前向后的遍历 2&#xff0c;对半查找&#xff0c;要求有序 从中间开始查找&#xff0c;每次检查中间的是否正确&#xff0c;不正确就…...

el-input输入框需要支持多输入,最后传输给后台的字段值以逗号分割

需求&#xff1a;一个输入框字段需要支持多次输入&#xff0c;最后传输给后台的字段值以逗号分割 解决方案&#xff1a;结合了el-tag组件的动态编辑标签 那块的代码 //子组件 <template><div class"input-multiple-box" idinputMultipleBox><div>…...

C# 枚举格式字符串

总目录 前言 当前文章为 C# 中的格式设置(格式化字符串) 大全 中的一个小章节。 一、概述 1. 基本信息 可以使用 Enum.ToString 方法&#xff0c;新建表示枚举成员的数字值、十六进制值或字符串值的字符串对象。枚举格式说明符不区分大小写。 二、自定义数字格式说明符详解…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

拉力测试cuda pytorch 把 4070显卡拉满

import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试&#xff0c;通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小&#xff0c;增大可提高计算复杂度duration: 测试持续时间&#xff08;秒&…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...

免费PDF转图片工具

免费PDF转图片工具 一款简单易用的PDF转图片工具&#xff0c;可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件&#xff0c;也不需要在线上传文件&#xff0c;保护您的隐私。 工具截图 主要特点 &#x1f680; 快速转换&#xff1a;本地转换&#xff0c;无需等待上…...

Java数值运算常见陷阱与规避方法

整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...