【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决
问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下:

Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.
问题分析:主要是多个任务都读取的同一个binlog造成serverid冲突;
网上有设置server-id的方法,这个是可以解决的,但是需要分方案来谈;
例如:我们是需要一个任务通用的(7张表重复提交7次,更改flink传参)
这种就不适合我们,因为每次的任务都是同一个jar,读取一段时间后还是会报错;
解决方案:其实我们可以把在同一个mysql库里面的表放到同一个source里面;
解决代码:我们是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以参考这个源代码;
官方是推荐采用StarRocksSink.sink(options)这种方式,我们查看源码,其实是采用的v2这一个fun,点进去发现,sr官方对数据进行了处理,只需要匹配对应的类型即可;
所以我们只需要对mysqlSource进行一个算子转换即可,关键代码如下:
MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();MySqlSource<String> mySqlSource = builder.hostname(srcHost).port(3306).databaseList(srcDb)// 格式 db.table,db.table2.......tableList(srcTable).username(srcUsername).password(srcPassword).jdbcProperties(jbdcProperties).debeziumProperties(properties)// 这里反序列化我进行了StarRocks增删类型处理,具体可以看我这片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546.deserializer(jsonStringDebeziumDeserializationSchema).serverId("5400-6400").build();DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
// .setParallelism(parallelism);// todo 这里是关键地方,我们反序列化只能返回flink认可的类型,一般都是string,这里转换成上面sr可以处理的对象 StarRocksSinkRowDataWithMetaSingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {@Overridepublic void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();sinkRowDataWithMeta.addDataRow(value);assert hashMap != null;sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());sinkRowDataWithMeta.setDatabase(sinkDb);collector.collect(sinkRowDataWithMeta);}}).name("Data Filtering");StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder().withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai").withProperty("load-url", sinkHost + ":8030").withProperty("database-name", sinkDb).withProperty("username", sinkUsername).withProperty("password", sinkPassword)// 这里的设置会被多srcTable覆盖.withProperty("table-name", "").withProperty("sink.properties.format", "json").withProperty("sink.properties.strip_outer_array", "true").build();// 这里查看sr sink源码实际使用的是这个fun,我们不要使用SinkFunctionFactory生成:会泛型不支持streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions)).name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
// streamOperator.addSink(StarRocksSink.sink(sinkOptions))
// .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);
解读:原理就是我们把原来7张在一个数据库的表放到一个flink source中读取,在指定传输到那个starrocks表时,官方已经实现了代码支持,我们只需要增加一个flink算子转换成sink支持的对象即可,(关联一个source对应多个sink解决思路)
问题二:我们是采用datastream api开发的flink任务,在web-ui界面提交任务,造成taskManager的jvm Metaspace一直增长直到节点挂掉。
报错就不贴了,就是taskmanager会自动挂掉,查看tm的日志是oom异常:jvm metaspace溢出;
问题分析:我们采用web-ui多次提交flink任务时,flink是动态类加载,所以不会释放上一个jar的元空间,才会造成jvm垃圾不回收;
可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408
问题解决:把jar放到flink/lib目录下就可以了,这样flink会优先加载父加载器中的类;
这里需要注意和以前的jar会造成版本冲突,具体解决你可以根据报错信息慢慢调试,我这里贴一个我的环境信息:
当前环境中的lib包

我增加的lib包

下面是我的jar包依赖,打完包放到lib中即可
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr><flink.connector.mysql>2.3.0</flink.connector.mysql><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- 基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>${flink.connector.sr}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.connector.mysql}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.3.4</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><version>2.7.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>5.3.26</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>2.0</version></dependency></dependencies>
问题二:最后就是在web-ui提交任务,你也可以用命令行,这里我用的源码包,就是我开发的实际编写代码,就几十K

最后如果解决了你的问题,请点个赞吧
相关文章:
【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决
问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下: Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this…...
C#常用多线程(线程同步,事件触发,信号量,互斥锁,共享内存,消息队列)
using System; using System.Threading; using System.Windows.Forms; using UtilForm.Util;namespace UtilForm {// 线程同步,事件触发,信号量,互斥锁,共享内存,消息队列public partial class frmUIThread : Form{ Sy…...
OpenWrt系统开发笔记
openWrt英文官网: https://openwrt.org/ 中文官网: http://www.openwrt.org.cn/ 一、开发环境及编译 在github上有两个源码使用的比较多 一个是lede,地址为:https://github.com/coolsnowwolf/lede 另一个为OpenWrt的官方源码&#…...
实战 - Restful APi 格式规范
文章目录 1. 特征2. 优点3. 动作1. GET 获取资源2. POST 创建资源3. PUT 整体替换4. PATCH 部分替换5. DELETE 删除资源 4. 示例 RESTful是一种API的设计风格,他和GraphQL ,JSON-RPC,WebService类似,用于定义在CS、BS架构下暴露服…...
《Linux从练气到飞升》No.21 Linux简单实现一个shell
🕺作者: 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇码字不易,你的👍点赞🙌收藏❤️关注对我真的…...
【iVX】iVX的低代码未来发展趋势:加速应用开发的创新之路
简介: 随着数字化转型的飞速发展,企业和组织对快速开发和交付高质量应用的需求越来越迫切。低代码开发平台作为一种创新的解决方案,极大地简化了应用程序的开发过程。在这一领域,iVX低代码平台作为领先的创业公司,正在…...
zookee 安装
1、下载安装包 weget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz 方案1:wget是一个下载指令,后面可以跟下载连接去从服务器上下载东西。 方案2:也可以先下载到windows上,再通…...
OpenWrt编译自己的应用程序
编译OpenWrt的应用程序可以参考OpenWrt内部其他应用程序的例程,来编写成自己的应用程序 一、OpenWrt源代码获取与编译 1.1、搭建环境 下载OpenWrt的官方源码: git clone https://github.com/openwrt/openwrt.git1.2、安装编译依赖项 sudo apt update…...
MySQL 50 题。
MySQL 50 题。 文章目录 MySQL 50 题。数据库。sql。 数据库。 CREATE SCHEMA new_schema DEFAULT CHARACTER SET utf8mb4 ;Operation failed: There was an error while applying the SQL script to the database. Executing: CREATE SCHEMA new_schema DEFAULT CHARACTER SE…...
强化学习算法总结 (1)
强化学习算法总结 (1) 1.综述 强化学习是通过与环境进行交互,来实现目标的一种计算方法。 s − a 1 − r − s ′ s - a_1 - r- s s−a1−r−s′ 1.1强化学习优化目标 p o l i c y a r g m a x p o l i c y E ( a , s ) [ r e w a r d ( s , a ) ] policy ar…...
Qt应用开发(基础篇)——向导对话框 QWizard
一、前言 QWizard类继承于QDialog,为有向导界面需求的应用环境提供了一个框架。 对话框窗口 QDialog QWizard向导对话框是一个拥有队列界面的特殊对话框,向导的目的是引导用户一步一步的完成预设的流程。向导常用于软件安装界面向导、硬件线路安装向导、…...
Python类的方法
Python类的方法主要分为实例方法、类方法和静态方法三种。 1 实例方法 以self作为第一个参数的方法,就是类的实例方法。该方法由类的实例调用,Python会把调用该方法的实例对象传递给self。 如下代码定义了一个名为A的类。 class A:def __init__(self…...
变电站自动化监控系统
力安科技变电站自动化监控系统是以箱式变电站为管理对象,加装箱变网关,在完成箱变智能化改造的基础上,依托电易云,构建一体化智慧箱变及运维系统。智能箱式变电站被广泛应用于住宅小区、城市公用变压器、工厂、商场、机场、电站等…...
MySql学习笔记11——DBA命令介绍
DBA命令 数据导入 要进入Mysql 创建数据库 create database database_name;使用数据库 use database_name;初始化数据库 source .sql文件地址,不能加双引号;数据导出 要在windows的dos环境下进行 导出数据库 mysqldump database_name > 存放…...
Webpack 复习小结
nodejs学习参考 node常用命令: node xxx.js 执行js文件 npm init -y 初始化package.json npm i 软件包名 下载软件包到本地 npm i 软件包名 -g 下载软件包到全局 npm uni 软件包名 删除软件包 系统优化CDN使用 CDN for free 需求:开发模式使用本地第三…...
Laravel chunk和chunkById的坑
在编写定时任务脚本的时候,经常会用到chunk和chunkById的API。 一、前言 数据库引擎为innodb。 表结构简述,只列出了本文用到的字段。 字段类型注释idint(11)IDtypeint(11)类型mark_timeint(10)标注时间(时间戳) 索引&#x…...
从零开始学习 Java:简单易懂的入门指南之泛型及set集合(二十二)
泛型及set集合扩展 1.泛型1.1泛型概述 2.Set集合2.1Set集合概述和特点【应用】2.2Set集合的使用【应用】 3.TreeSet集合3.1TreeSet集合概述和特点【应用】3.2TreeSet集合基本使用【应用】3.3自然排序Comparable的使用【应用】3.4比较器排序Comparator的使用【应用】3.5两种比较…...
JVM----GC(垃圾回收)详解
一、Automatic Garbage Collection(垃圾回收)简介 Automatic Garbage Collection (自动垃圾回收)是JVM的一个特性,JVM会启动相关的线程,该线程会轮训检查heap memeory,并确定哪些是未被引用的(…...
数据库的三个范式
数据库的三个范式是关系数据库设计中的一组规范,用于确保数据的有效性和一致性。这三个范式分别是: 第一范式(1NF):要求数据库表中的每一列都是不可分割的原子值。换句话说,每个表中的每个字段不能包含多个…...
谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行
环境: Win10 专业版 谷歌浏览器 版本 116.0.5845.141(正式版本) (64 位) L盾加密终端 问题描述: 谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行,要全部结束谷歌浏览器进程&…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)
引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...
Python 高效图像帧提取与视频编码:实战指南
Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...
ubuntu系统文件误删(/lib/x86_64-linux-gnu/libc.so.6)修复方案 [成功解决]
报错信息:libc.so.6: cannot open shared object file: No such file or directory: #ls, ln, sudo...命令都不能用 error while loading shared libraries: libc.so.6: cannot open shared object file: No such file or directory重启后报错信息&…...
算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...

