当前位置: 首页 > news >正文

【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决

问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下:

在这里插入图片描述

Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.

问题分析:主要是多个任务都读取的同一个binlog造成serverid冲突;

网上有设置server-id的方法,这个是可以解决的,但是需要分方案来谈;
例如:我们是需要一个任务通用的(7张表重复提交7次,更改flink传参)
这种就不适合我们,因为每次的任务都是同一个jar,读取一段时间后还是会报错;

解决方案:其实我们可以把在同一个mysql库里面的表放到同一个source里面;

解决代码:我们是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以参考这个源代码;

官方是推荐采用StarRocksSink.sink(options)这种方式,我们查看源码,其实是采用的v2这一个fun,点进去发现,sr官方对数据进行了处理,只需要匹配对应的类型即可;
在这里插入图片描述
所以我们只需要对mysqlSource进行一个算子转换即可,关键代码如下:

MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();MySqlSource<String> mySqlSource = builder.hostname(srcHost).port(3306).databaseList(srcDb)// 格式 db.table,db.table2.......tableList(srcTable).username(srcUsername).password(srcPassword).jdbcProperties(jbdcProperties).debeziumProperties(properties)// 这里反序列化我进行了StarRocks增删类型处理,具体可以看我这片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546.deserializer(jsonStringDebeziumDeserializationSchema).serverId("5400-6400").build();DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
//                .setParallelism(parallelism);// todo 这里是关键地方,我们反序列化只能返回flink认可的类型,一般都是string,这里转换成上面sr可以处理的对象 StarRocksSinkRowDataWithMetaSingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {@Overridepublic void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();sinkRowDataWithMeta.addDataRow(value);assert hashMap != null;sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());sinkRowDataWithMeta.setDatabase(sinkDb);collector.collect(sinkRowDataWithMeta);}}).name("Data Filtering");StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder().withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai").withProperty("load-url", sinkHost + ":8030").withProperty("database-name", sinkDb).withProperty("username", sinkUsername).withProperty("password", sinkPassword)// 这里的设置会被多srcTable覆盖.withProperty("table-name", "").withProperty("sink.properties.format", "json").withProperty("sink.properties.strip_outer_array", "true").build();// 这里查看sr sink源码实际使用的是这个fun,我们不要使用SinkFunctionFactory生成:会泛型不支持streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions)).name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
//        streamOperator.addSink(StarRocksSink.sink(sinkOptions))
//                .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);

解读:原理就是我们把原来7张在一个数据库的表放到一个flink source中读取,在指定传输到那个starrocks表时,官方已经实现了代码支持,我们只需要增加一个flink算子转换成sink支持的对象即可,(关联一个source对应多个sink解决思路)

问题二:我们是采用datastream api开发的flink任务,在web-ui界面提交任务,造成taskManager的jvm Metaspace一直增长直到节点挂掉。

报错就不贴了,就是taskmanager会自动挂掉,查看tm的日志是oom异常:jvm metaspace溢出;

问题分析:我们采用web-ui多次提交flink任务时,flink是动态类加载,所以不会释放上一个jar的元空间,才会造成jvm垃圾不回收;

可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408

问题解决:把jar放到flink/lib目录下就可以了,这样flink会优先加载父加载器中的类;

这里需要注意和以前的jar会造成版本冲突,具体解决你可以根据报错信息慢慢调试,我这里贴一个我的环境信息:
当前环境中的lib包
在这里插入图片描述
我增加的lib包
在这里插入图片描述
下面是我的jar包依赖,打完包放到lib中即可

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr><flink.connector.mysql>2.3.0</flink.connector.mysql><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- 基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>${flink.connector.sr}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.connector.mysql}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.3.4</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><version>2.7.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>5.3.26</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>2.0</version></dependency></dependencies>

问题二:最后就是在web-ui提交任务,你也可以用命令行,这里我用的源码包,就是我开发的实际编写代码,就几十K

在这里插入图片描述
最后如果解决了你的问题,请点个赞吧在这里插入图片描述

相关文章:

【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决

问题一&#xff1a;7张表是同一个mysql中的&#xff0c;我们进行增量同步时分别用不同的flink任务读取&#xff0c;造成mysql server-id冲突问题&#xff0c;如下&#xff1a; Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this…...

C#常用多线程(线程同步,事件触发,信号量,互斥锁,共享内存,消息队列)

using System; using System.Threading; using System.Windows.Forms; using UtilForm.Util;namespace UtilForm {// 线程同步&#xff0c;事件触发&#xff0c;信号量&#xff0c;互斥锁&#xff0c;共享内存&#xff0c;消息队列public partial class frmUIThread : Form{ Sy…...

OpenWrt系统开发笔记

openWrt英文官网&#xff1a; https://openwrt.org/ 中文官网&#xff1a; http://www.openwrt.org.cn/ 一、开发环境及编译 在github上有两个源码使用的比较多   一个是lede,地址为&#xff1a;https://github.com/coolsnowwolf/lede   另一个为OpenWrt的官方源码&#…...

实战 - Restful APi 格式规范

文章目录 1. 特征2. 优点3. 动作1. GET 获取资源2. POST 创建资源3. PUT 整体替换4. PATCH 部分替换5. DELETE 删除资源 4. 示例 RESTful是一种API的设计风格&#xff0c;他和GraphQL &#xff0c;JSON-RPC&#xff0c;WebService类似&#xff0c;用于定义在CS、BS架构下暴露服…...

《Linux从练气到飞升》No.21 Linux简单实现一个shell

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的…...

【iVX】iVX的低代码未来发展趋势:加速应用开发的创新之路

简介&#xff1a; 随着数字化转型的飞速发展&#xff0c;企业和组织对快速开发和交付高质量应用的需求越来越迫切。低代码开发平台作为一种创新的解决方案&#xff0c;极大地简化了应用程序的开发过程。在这一领域&#xff0c;iVX低代码平台作为领先的创业公司&#xff0c;正在…...

zookee 安装

1、下载安装包 weget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz 方案1&#xff1a;wget是一个下载指令&#xff0c;后面可以跟下载连接去从服务器上下载东西。 方案2&#xff1a;也可以先下载到windows上&#xff0c;再通…...

OpenWrt编译自己的应用程序

编译OpenWrt的应用程序可以参考OpenWrt内部其他应用程序的例程&#xff0c;来编写成自己的应用程序 一、OpenWrt源代码获取与编译 1.1、搭建环境 下载OpenWrt的官方源码&#xff1a; git clone https://github.com/openwrt/openwrt.git1.2、安装编译依赖项 sudo apt update…...

MySQL 50 题。

MySQL 50 题。 文章目录 MySQL 50 题。数据库。sql。 数据库。 CREATE SCHEMA new_schema DEFAULT CHARACTER SET utf8mb4 ;Operation failed: There was an error while applying the SQL script to the database. Executing: CREATE SCHEMA new_schema DEFAULT CHARACTER SE…...

强化学习算法总结 (1)

强化学习算法总结 (1) 1.综述 强化学习是通过与环境进行交互&#xff0c;来实现目标的一种计算方法。 s − a 1 − r − s ′ s - a_1 - r- s s−a1​−r−s′ 1.1强化学习优化目标 p o l i c y a r g m a x p o l i c y E ( a , s ) [ r e w a r d ( s , a ) ] policy ar…...

Qt应用开发(基础篇)——向导对话框 QWizard

一、前言 QWizard类继承于QDialog&#xff0c;为有向导界面需求的应用环境提供了一个框架。 对话框窗口 QDialog QWizard向导对话框是一个拥有队列界面的特殊对话框&#xff0c;向导的目的是引导用户一步一步的完成预设的流程。向导常用于软件安装界面向导、硬件线路安装向导、…...

Python类的方法

Python类的方法主要分为实例方法、类方法和静态方法三种。 1 实例方法 以self作为第一个参数的方法&#xff0c;就是类的实例方法。该方法由类的实例调用&#xff0c;Python会把调用该方法的实例对象传递给self。 如下代码定义了一个名为A的类。 class A:def __init__(self…...

变电站自动化监控系统

力安科技变电站自动化监控系统是以箱式变电站为管理对象&#xff0c;加装箱变网关&#xff0c;在完成箱变智能化改造的基础上&#xff0c;依托电易云&#xff0c;构建一体化智慧箱变及运维系统。智能箱式变电站被广泛应用于住宅小区、城市公用变压器、工厂、商场、机场、电站等…...

MySql学习笔记11——DBA命令介绍

DBA命令 数据导入 要进入Mysql 创建数据库 create database database_name;使用数据库 use database_name;初始化数据库 source .sql文件地址&#xff0c;不能加双引号&#xff1b;数据导出 要在windows的dos环境下进行 导出数据库 mysqldump database_name > 存放…...

Webpack 复习小结

nodejs学习参考 node常用命令&#xff1a; node xxx.js 执行js文件 npm init -y 初始化package.json npm i 软件包名 下载软件包到本地 npm i 软件包名 -g 下载软件包到全局 npm uni 软件包名 删除软件包 系统优化CDN使用 CDN for free 需求&#xff1a;开发模式使用本地第三…...

Laravel chunk和chunkById的坑

在编写定时任务脚本的时候&#xff0c;经常会用到chunk和chunkById的API。 一、前言 数据库引擎为innodb。 表结构简述&#xff0c;只列出了本文用到的字段。 字段类型注释idint(11)IDtypeint(11)类型mark_timeint(10)标注时间&#xff08;时间戳&#xff09; 索引&#x…...

从零开始学习 Java:简单易懂的入门指南之泛型及set集合(二十二)

泛型及set集合扩展 1.泛型1.1泛型概述 2.Set集合2.1Set集合概述和特点【应用】2.2Set集合的使用【应用】 3.TreeSet集合3.1TreeSet集合概述和特点【应用】3.2TreeSet集合基本使用【应用】3.3自然排序Comparable的使用【应用】3.4比较器排序Comparator的使用【应用】3.5两种比较…...

JVM----GC(垃圾回收)详解

一、Automatic Garbage Collection&#xff08;垃圾回收&#xff09;简介 Automatic Garbage Collection &#xff08;自动垃圾回收&#xff09;是JVM的一个特性&#xff0c;JVM会启动相关的线程&#xff0c;该线程会轮训检查heap memeory&#xff0c;并确定哪些是未被引用的(…...

数据库的三个范式

数据库的三个范式是关系数据库设计中的一组规范&#xff0c;用于确保数据的有效性和一致性。这三个范式分别是&#xff1a; 第一范式&#xff08;1NF&#xff09;&#xff1a;要求数据库表中的每一列都是不可分割的原子值。换句话说&#xff0c;每个表中的每个字段不能包含多个…...

谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行

环境&#xff1a; Win10 专业版 谷歌浏览器 版本 116.0.5845.141&#xff08;正式版本&#xff09; &#xff08;64 位&#xff09; L盾加密终端 问题描述&#xff1a; 谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行&#xff0c;要全部结束谷歌浏览器进程&…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...