【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决
问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下:

Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.
问题分析:主要是多个任务都读取的同一个binlog造成serverid冲突;
网上有设置server-id的方法,这个是可以解决的,但是需要分方案来谈;
例如:我们是需要一个任务通用的(7张表重复提交7次,更改flink传参)
这种就不适合我们,因为每次的任务都是同一个jar,读取一段时间后还是会报错;
解决方案:其实我们可以把在同一个mysql库里面的表放到同一个source里面;
解决代码:我们是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以参考这个源代码;
官方是推荐采用StarRocksSink.sink(options)这种方式,我们查看源码,其实是采用的v2这一个fun,点进去发现,sr官方对数据进行了处理,只需要匹配对应的类型即可;
所以我们只需要对mysqlSource进行一个算子转换即可,关键代码如下:
MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();MySqlSource<String> mySqlSource = builder.hostname(srcHost).port(3306).databaseList(srcDb)// 格式 db.table,db.table2.......tableList(srcTable).username(srcUsername).password(srcPassword).jdbcProperties(jbdcProperties).debeziumProperties(properties)// 这里反序列化我进行了StarRocks增删类型处理,具体可以看我这片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546.deserializer(jsonStringDebeziumDeserializationSchema).serverId("5400-6400").build();DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
// .setParallelism(parallelism);// todo 这里是关键地方,我们反序列化只能返回flink认可的类型,一般都是string,这里转换成上面sr可以处理的对象 StarRocksSinkRowDataWithMetaSingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {@Overridepublic void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();sinkRowDataWithMeta.addDataRow(value);assert hashMap != null;sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());sinkRowDataWithMeta.setDatabase(sinkDb);collector.collect(sinkRowDataWithMeta);}}).name("Data Filtering");StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder().withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai").withProperty("load-url", sinkHost + ":8030").withProperty("database-name", sinkDb).withProperty("username", sinkUsername).withProperty("password", sinkPassword)// 这里的设置会被多srcTable覆盖.withProperty("table-name", "").withProperty("sink.properties.format", "json").withProperty("sink.properties.strip_outer_array", "true").build();// 这里查看sr sink源码实际使用的是这个fun,我们不要使用SinkFunctionFactory生成:会泛型不支持streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions)).name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
// streamOperator.addSink(StarRocksSink.sink(sinkOptions))
// .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);
解读:原理就是我们把原来7张在一个数据库的表放到一个flink source中读取,在指定传输到那个starrocks表时,官方已经实现了代码支持,我们只需要增加一个flink算子转换成sink支持的对象即可,(关联一个source对应多个sink解决思路)
问题二:我们是采用datastream api开发的flink任务,在web-ui界面提交任务,造成taskManager的jvm Metaspace一直增长直到节点挂掉。
报错就不贴了,就是taskmanager会自动挂掉,查看tm的日志是oom异常:jvm metaspace溢出;
问题分析:我们采用web-ui多次提交flink任务时,flink是动态类加载,所以不会释放上一个jar的元空间,才会造成jvm垃圾不回收;
可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408
问题解决:把jar放到flink/lib目录下就可以了,这样flink会优先加载父加载器中的类;
这里需要注意和以前的jar会造成版本冲突,具体解决你可以根据报错信息慢慢调试,我这里贴一个我的环境信息:
当前环境中的lib包

我增加的lib包

下面是我的jar包依赖,打完包放到lib中即可
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr><flink.connector.mysql>2.3.0</flink.connector.mysql><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- 基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>${flink.connector.sr}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.connector.mysql}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.3.4</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><version>2.7.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>5.3.26</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>2.0</version></dependency></dependencies>
问题二:最后就是在web-ui提交任务,你也可以用命令行,这里我用的源码包,就是我开发的实际编写代码,就几十K

最后如果解决了你的问题,请点个赞吧
相关文章:
【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决
问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下: Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this…...
C#常用多线程(线程同步,事件触发,信号量,互斥锁,共享内存,消息队列)
using System; using System.Threading; using System.Windows.Forms; using UtilForm.Util;namespace UtilForm {// 线程同步,事件触发,信号量,互斥锁,共享内存,消息队列public partial class frmUIThread : Form{ Sy…...
OpenWrt系统开发笔记
openWrt英文官网: https://openwrt.org/ 中文官网: http://www.openwrt.org.cn/ 一、开发环境及编译 在github上有两个源码使用的比较多 一个是lede,地址为:https://github.com/coolsnowwolf/lede 另一个为OpenWrt的官方源码&#…...
实战 - Restful APi 格式规范
文章目录 1. 特征2. 优点3. 动作1. GET 获取资源2. POST 创建资源3. PUT 整体替换4. PATCH 部分替换5. DELETE 删除资源 4. 示例 RESTful是一种API的设计风格,他和GraphQL ,JSON-RPC,WebService类似,用于定义在CS、BS架构下暴露服…...
《Linux从练气到飞升》No.21 Linux简单实现一个shell
🕺作者: 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇码字不易,你的👍点赞🙌收藏❤️关注对我真的…...
【iVX】iVX的低代码未来发展趋势:加速应用开发的创新之路
简介: 随着数字化转型的飞速发展,企业和组织对快速开发和交付高质量应用的需求越来越迫切。低代码开发平台作为一种创新的解决方案,极大地简化了应用程序的开发过程。在这一领域,iVX低代码平台作为领先的创业公司,正在…...
zookee 安装
1、下载安装包 weget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz 方案1:wget是一个下载指令,后面可以跟下载连接去从服务器上下载东西。 方案2:也可以先下载到windows上,再通…...
OpenWrt编译自己的应用程序
编译OpenWrt的应用程序可以参考OpenWrt内部其他应用程序的例程,来编写成自己的应用程序 一、OpenWrt源代码获取与编译 1.1、搭建环境 下载OpenWrt的官方源码: git clone https://github.com/openwrt/openwrt.git1.2、安装编译依赖项 sudo apt update…...
MySQL 50 题。
MySQL 50 题。 文章目录 MySQL 50 题。数据库。sql。 数据库。 CREATE SCHEMA new_schema DEFAULT CHARACTER SET utf8mb4 ;Operation failed: There was an error while applying the SQL script to the database. Executing: CREATE SCHEMA new_schema DEFAULT CHARACTER SE…...
强化学习算法总结 (1)
强化学习算法总结 (1) 1.综述 强化学习是通过与环境进行交互,来实现目标的一种计算方法。 s − a 1 − r − s ′ s - a_1 - r- s s−a1−r−s′ 1.1强化学习优化目标 p o l i c y a r g m a x p o l i c y E ( a , s ) [ r e w a r d ( s , a ) ] policy ar…...
Qt应用开发(基础篇)——向导对话框 QWizard
一、前言 QWizard类继承于QDialog,为有向导界面需求的应用环境提供了一个框架。 对话框窗口 QDialog QWizard向导对话框是一个拥有队列界面的特殊对话框,向导的目的是引导用户一步一步的完成预设的流程。向导常用于软件安装界面向导、硬件线路安装向导、…...
Python类的方法
Python类的方法主要分为实例方法、类方法和静态方法三种。 1 实例方法 以self作为第一个参数的方法,就是类的实例方法。该方法由类的实例调用,Python会把调用该方法的实例对象传递给self。 如下代码定义了一个名为A的类。 class A:def __init__(self…...
变电站自动化监控系统
力安科技变电站自动化监控系统是以箱式变电站为管理对象,加装箱变网关,在完成箱变智能化改造的基础上,依托电易云,构建一体化智慧箱变及运维系统。智能箱式变电站被广泛应用于住宅小区、城市公用变压器、工厂、商场、机场、电站等…...
MySql学习笔记11——DBA命令介绍
DBA命令 数据导入 要进入Mysql 创建数据库 create database database_name;使用数据库 use database_name;初始化数据库 source .sql文件地址,不能加双引号;数据导出 要在windows的dos环境下进行 导出数据库 mysqldump database_name > 存放…...
Webpack 复习小结
nodejs学习参考 node常用命令: node xxx.js 执行js文件 npm init -y 初始化package.json npm i 软件包名 下载软件包到本地 npm i 软件包名 -g 下载软件包到全局 npm uni 软件包名 删除软件包 系统优化CDN使用 CDN for free 需求:开发模式使用本地第三…...
Laravel chunk和chunkById的坑
在编写定时任务脚本的时候,经常会用到chunk和chunkById的API。 一、前言 数据库引擎为innodb。 表结构简述,只列出了本文用到的字段。 字段类型注释idint(11)IDtypeint(11)类型mark_timeint(10)标注时间(时间戳) 索引&#x…...
从零开始学习 Java:简单易懂的入门指南之泛型及set集合(二十二)
泛型及set集合扩展 1.泛型1.1泛型概述 2.Set集合2.1Set集合概述和特点【应用】2.2Set集合的使用【应用】 3.TreeSet集合3.1TreeSet集合概述和特点【应用】3.2TreeSet集合基本使用【应用】3.3自然排序Comparable的使用【应用】3.4比较器排序Comparator的使用【应用】3.5两种比较…...
JVM----GC(垃圾回收)详解
一、Automatic Garbage Collection(垃圾回收)简介 Automatic Garbage Collection (自动垃圾回收)是JVM的一个特性,JVM会启动相关的线程,该线程会轮训检查heap memeory,并确定哪些是未被引用的(…...
数据库的三个范式
数据库的三个范式是关系数据库设计中的一组规范,用于确保数据的有效性和一致性。这三个范式分别是: 第一范式(1NF):要求数据库表中的每一列都是不可分割的原子值。换句话说,每个表中的每个字段不能包含多个…...
谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行
环境: Win10 专业版 谷歌浏览器 版本 116.0.5845.141(正式版本) (64 位) L盾加密终端 问题描述: 谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行,要全部结束谷歌浏览器进程&…...
雀魂智能辅助:从零构建你的AI麻将教练系统
雀魂智能辅助:从零构建你的AI麻将教练系统 【免费下载链接】Akagi A helper client for Majsoul 项目地址: https://gitcode.com/gh_mirrors/ak/Akagi 想在雀魂对局中获得实时AI分析与策略指导?雀魂智能辅助系统通过深度学习技术,为玩…...
避坑指南:Synopsys VCS工具安装中的5个常见错误及解决方案
Synopsys VCS工具安装避坑实战:从报错排查到环境调优 在芯片设计领域,Synopsys VCS作为业界标准的仿真工具,其安装过程却常常成为工程师们的"第一道门槛"。不同于简单的解压即用软件,VCS的安装涉及复杂的依赖关系、权限…...
李慕婉-仙逆-造相Z-Turbo在C语言项目中的集成方案
李慕婉-仙逆-造相Z-Turbo在C语言项目中的集成方案 将AI图像生成能力无缝集成到C语言项目中,为传统应用注入智能创作活力 1. 为什么要在C项目中集成图像生成能力 在当今的软件开发领域,C语言仍然是系统级编程、嵌入式设备和性能敏感应用的首选语言。虽然…...
Apache Arrow Rust社区与生态:参与开源项目的最佳路径
Apache Arrow Rust社区与生态:参与开源项目的最佳路径 【免费下载链接】arrow-rs Apache Arrow Rust: 一个Rust语言实现的Apache Arrow数据交换格式,可用于高效地在不同计算引擎之间传输和操作大规模数据。它支持多种数据类型和编码方式,并提…...
家庭实验室:树莓派控制OpenClaw调用远程Qwen3-32B
家庭实验室:树莓派控制OpenClaw调用远程Qwen3-32B 1. 为什么选择树莓派OpenClaw组合 去年冬天,我在整理家庭实验室设备时发现一个闲置的树莓派4B。这台信用卡大小的电脑曾经用来跑Home Assistant控制智能家居,但后来换了NUC主机就被束之高阁…...
从轨迹到网络:广州休闲步行空间格局刻画 | 论文全解析与方法论深度拆解
从轨迹到网络:广州休闲步行空间格局刻画 | 论文全解析与方法论拆解 原文:From trajectories to network: Delineating the spatial pattern of recreational walking in Guangzhou》 一、论文核心概览:摘要与关键词 1.1 核心摘要解析 本文的核心内容可拆解为5个核心模块,…...
探索前沿技术趋势:2024年最值得关注的创新应用场景
1. 生成式AI的爆发式应用 2024年最让人兴奋的技术趋势,莫过于生成式AI从实验室走向千家万户。我最近测试了十几个主流AI创作工具,发现它们已经能完成许多过去认为"只有人类能做到"的任务。比如用Midjourney生成产品设计图,只需要简…...
AI Agent 的动态知识更新:保持 LLM 知识的实时性
AI Agent 的动态知识更新:保持 LLM 知识的实时性 关键词:AI Agent、动态知识更新、大语言模型(LLM)、实时性、知识图谱 摘要:本文聚焦于 AI Agent 的动态知识更新,旨在探讨如何保持大语言模型(LLM)知识的实时性。首先介绍了相关背景,包括目的、预期读者等。接着阐述了…...
KiCanvas:浏览器中的KiCAD设计查看器,5分钟快速入门指南
KiCanvas:浏览器中的KiCAD设计查看器,5分钟快速入门指南 【免费下载链接】kicanvas The KiCAD web viewer 项目地址: https://gitcode.com/gh_mirrors/ki/kicanvas 想要在浏览器中直接查看KiCAD电路设计文件,无需安装任何软件…...
别再死记公式了!用Python+Matplotlib亲手仿真LC并联谐振,直观理解选频原理
用PythonMatplotlib动态仿真LC并联谐振:从代码到物理直觉的沉浸式探索 当教科书上的LC并联谐振公式变成屏幕上跳动的曲线,当抽象的Q值概念转化为滑块调节时的实时波形变化,电子工程的学习便从枯燥的符号演算升维为一场充满探索乐趣的科学实验…...

