当前位置: 首页 > news >正文

【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决

问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下:

在这里插入图片描述

Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.

问题分析:主要是多个任务都读取的同一个binlog造成serverid冲突;

网上有设置server-id的方法,这个是可以解决的,但是需要分方案来谈;
例如:我们是需要一个任务通用的(7张表重复提交7次,更改flink传参)
这种就不适合我们,因为每次的任务都是同一个jar,读取一段时间后还是会报错;

解决方案:其实我们可以把在同一个mysql库里面的表放到同一个source里面;

解决代码:我们是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以参考这个源代码;

官方是推荐采用StarRocksSink.sink(options)这种方式,我们查看源码,其实是采用的v2这一个fun,点进去发现,sr官方对数据进行了处理,只需要匹配对应的类型即可;
在这里插入图片描述
所以我们只需要对mysqlSource进行一个算子转换即可,关键代码如下:

MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();MySqlSource<String> mySqlSource = builder.hostname(srcHost).port(3306).databaseList(srcDb)// 格式 db.table,db.table2.......tableList(srcTable).username(srcUsername).password(srcPassword).jdbcProperties(jbdcProperties).debeziumProperties(properties)// 这里反序列化我进行了StarRocks增删类型处理,具体可以看我这片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546.deserializer(jsonStringDebeziumDeserializationSchema).serverId("5400-6400").build();DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
//                .setParallelism(parallelism);// todo 这里是关键地方,我们反序列化只能返回flink认可的类型,一般都是string,这里转换成上面sr可以处理的对象 StarRocksSinkRowDataWithMetaSingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {@Overridepublic void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();sinkRowDataWithMeta.addDataRow(value);assert hashMap != null;sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());sinkRowDataWithMeta.setDatabase(sinkDb);collector.collect(sinkRowDataWithMeta);}}).name("Data Filtering");StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder().withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai").withProperty("load-url", sinkHost + ":8030").withProperty("database-name", sinkDb).withProperty("username", sinkUsername).withProperty("password", sinkPassword)// 这里的设置会被多srcTable覆盖.withProperty("table-name", "").withProperty("sink.properties.format", "json").withProperty("sink.properties.strip_outer_array", "true").build();// 这里查看sr sink源码实际使用的是这个fun,我们不要使用SinkFunctionFactory生成:会泛型不支持streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions)).name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
//        streamOperator.addSink(StarRocksSink.sink(sinkOptions))
//                .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);

解读:原理就是我们把原来7张在一个数据库的表放到一个flink source中读取,在指定传输到那个starrocks表时,官方已经实现了代码支持,我们只需要增加一个flink算子转换成sink支持的对象即可,(关联一个source对应多个sink解决思路)

问题二:我们是采用datastream api开发的flink任务,在web-ui界面提交任务,造成taskManager的jvm Metaspace一直增长直到节点挂掉。

报错就不贴了,就是taskmanager会自动挂掉,查看tm的日志是oom异常:jvm metaspace溢出;

问题分析:我们采用web-ui多次提交flink任务时,flink是动态类加载,所以不会释放上一个jar的元空间,才会造成jvm垃圾不回收;

可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408

问题解决:把jar放到flink/lib目录下就可以了,这样flink会优先加载父加载器中的类;

这里需要注意和以前的jar会造成版本冲突,具体解决你可以根据报错信息慢慢调试,我这里贴一个我的环境信息:
当前环境中的lib包
在这里插入图片描述
我增加的lib包
在这里插入图片描述
下面是我的jar包依赖,打完包放到lib中即可

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr><flink.connector.mysql>2.3.0</flink.connector.mysql><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- 基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>${flink.connector.sr}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.connector.mysql}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.3.4</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><version>2.7.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>5.3.26</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>2.0</version></dependency></dependencies>

问题二:最后就是在web-ui提交任务,你也可以用命令行,这里我用的源码包,就是我开发的实际编写代码,就几十K

在这里插入图片描述
最后如果解决了你的问题,请点个赞吧在这里插入图片描述

相关文章:

【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决

问题一&#xff1a;7张表是同一个mysql中的&#xff0c;我们进行增量同步时分别用不同的flink任务读取&#xff0c;造成mysql server-id冲突问题&#xff0c;如下&#xff1a; Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this…...

C#常用多线程(线程同步,事件触发,信号量,互斥锁,共享内存,消息队列)

using System; using System.Threading; using System.Windows.Forms; using UtilForm.Util;namespace UtilForm {// 线程同步&#xff0c;事件触发&#xff0c;信号量&#xff0c;互斥锁&#xff0c;共享内存&#xff0c;消息队列public partial class frmUIThread : Form{ Sy…...

OpenWrt系统开发笔记

openWrt英文官网&#xff1a; https://openwrt.org/ 中文官网&#xff1a; http://www.openwrt.org.cn/ 一、开发环境及编译 在github上有两个源码使用的比较多   一个是lede,地址为&#xff1a;https://github.com/coolsnowwolf/lede   另一个为OpenWrt的官方源码&#…...

实战 - Restful APi 格式规范

文章目录 1. 特征2. 优点3. 动作1. GET 获取资源2. POST 创建资源3. PUT 整体替换4. PATCH 部分替换5. DELETE 删除资源 4. 示例 RESTful是一种API的设计风格&#xff0c;他和GraphQL &#xff0c;JSON-RPC&#xff0c;WebService类似&#xff0c;用于定义在CS、BS架构下暴露服…...

《Linux从练气到飞升》No.21 Linux简单实现一个shell

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的…...

【iVX】iVX的低代码未来发展趋势:加速应用开发的创新之路

简介&#xff1a; 随着数字化转型的飞速发展&#xff0c;企业和组织对快速开发和交付高质量应用的需求越来越迫切。低代码开发平台作为一种创新的解决方案&#xff0c;极大地简化了应用程序的开发过程。在这一领域&#xff0c;iVX低代码平台作为领先的创业公司&#xff0c;正在…...

zookee 安装

1、下载安装包 weget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz 方案1&#xff1a;wget是一个下载指令&#xff0c;后面可以跟下载连接去从服务器上下载东西。 方案2&#xff1a;也可以先下载到windows上&#xff0c;再通…...

OpenWrt编译自己的应用程序

编译OpenWrt的应用程序可以参考OpenWrt内部其他应用程序的例程&#xff0c;来编写成自己的应用程序 一、OpenWrt源代码获取与编译 1.1、搭建环境 下载OpenWrt的官方源码&#xff1a; git clone https://github.com/openwrt/openwrt.git1.2、安装编译依赖项 sudo apt update…...

MySQL 50 题。

MySQL 50 题。 文章目录 MySQL 50 题。数据库。sql。 数据库。 CREATE SCHEMA new_schema DEFAULT CHARACTER SET utf8mb4 ;Operation failed: There was an error while applying the SQL script to the database. Executing: CREATE SCHEMA new_schema DEFAULT CHARACTER SE…...

强化学习算法总结 (1)

强化学习算法总结 (1) 1.综述 强化学习是通过与环境进行交互&#xff0c;来实现目标的一种计算方法。 s − a 1 − r − s ′ s - a_1 - r- s s−a1​−r−s′ 1.1强化学习优化目标 p o l i c y a r g m a x p o l i c y E ( a , s ) [ r e w a r d ( s , a ) ] policy ar…...

Qt应用开发(基础篇)——向导对话框 QWizard

一、前言 QWizard类继承于QDialog&#xff0c;为有向导界面需求的应用环境提供了一个框架。 对话框窗口 QDialog QWizard向导对话框是一个拥有队列界面的特殊对话框&#xff0c;向导的目的是引导用户一步一步的完成预设的流程。向导常用于软件安装界面向导、硬件线路安装向导、…...

Python类的方法

Python类的方法主要分为实例方法、类方法和静态方法三种。 1 实例方法 以self作为第一个参数的方法&#xff0c;就是类的实例方法。该方法由类的实例调用&#xff0c;Python会把调用该方法的实例对象传递给self。 如下代码定义了一个名为A的类。 class A:def __init__(self…...

变电站自动化监控系统

力安科技变电站自动化监控系统是以箱式变电站为管理对象&#xff0c;加装箱变网关&#xff0c;在完成箱变智能化改造的基础上&#xff0c;依托电易云&#xff0c;构建一体化智慧箱变及运维系统。智能箱式变电站被广泛应用于住宅小区、城市公用变压器、工厂、商场、机场、电站等…...

MySql学习笔记11——DBA命令介绍

DBA命令 数据导入 要进入Mysql 创建数据库 create database database_name;使用数据库 use database_name;初始化数据库 source .sql文件地址&#xff0c;不能加双引号&#xff1b;数据导出 要在windows的dos环境下进行 导出数据库 mysqldump database_name > 存放…...

Webpack 复习小结

nodejs学习参考 node常用命令&#xff1a; node xxx.js 执行js文件 npm init -y 初始化package.json npm i 软件包名 下载软件包到本地 npm i 软件包名 -g 下载软件包到全局 npm uni 软件包名 删除软件包 系统优化CDN使用 CDN for free 需求&#xff1a;开发模式使用本地第三…...

Laravel chunk和chunkById的坑

在编写定时任务脚本的时候&#xff0c;经常会用到chunk和chunkById的API。 一、前言 数据库引擎为innodb。 表结构简述&#xff0c;只列出了本文用到的字段。 字段类型注释idint(11)IDtypeint(11)类型mark_timeint(10)标注时间&#xff08;时间戳&#xff09; 索引&#x…...

从零开始学习 Java:简单易懂的入门指南之泛型及set集合(二十二)

泛型及set集合扩展 1.泛型1.1泛型概述 2.Set集合2.1Set集合概述和特点【应用】2.2Set集合的使用【应用】 3.TreeSet集合3.1TreeSet集合概述和特点【应用】3.2TreeSet集合基本使用【应用】3.3自然排序Comparable的使用【应用】3.4比较器排序Comparator的使用【应用】3.5两种比较…...

JVM----GC(垃圾回收)详解

一、Automatic Garbage Collection&#xff08;垃圾回收&#xff09;简介 Automatic Garbage Collection &#xff08;自动垃圾回收&#xff09;是JVM的一个特性&#xff0c;JVM会启动相关的线程&#xff0c;该线程会轮训检查heap memeory&#xff0c;并确定哪些是未被引用的(…...

数据库的三个范式

数据库的三个范式是关系数据库设计中的一组规范&#xff0c;用于确保数据的有效性和一致性。这三个范式分别是&#xff1a; 第一范式&#xff08;1NF&#xff09;&#xff1a;要求数据库表中的每一列都是不可分割的原子值。换句话说&#xff0c;每个表中的每个字段不能包含多个…...

谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行

环境&#xff1a; Win10 专业版 谷歌浏览器 版本 116.0.5845.141&#xff08;正式版本&#xff09; &#xff08;64 位&#xff09; L盾加密终端 问题描述&#xff1a; 谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行&#xff0c;要全部结束谷歌浏览器进程&…...

刚刚!美团开源LongCat-Next,全模态模型保姆级教程(非常详细),从入门到精通,建议收藏!

昨天下午刷到了美团龙猫团队又开源了一个新模型-LongCat-Next。 这次有所不同&#xff0c;是一个原生全模态模型&#xff0c;可以接受文本、语音、图像的输入&#xff0c;生成文本、语音、图像&#xff0c;激活参数3B。 在训练上&#xff0c;通过分词器-反分词器对&#xff0…...

2026生成式引擎优化(GEO)深度实测报告:基于Hakuna Matata平台的五大主流大模型对抗性测试全景分析

摘要&#xff1a;本文以“Hakuna Matata”测试平台为基准场&#xff0c;针对百度文心一言、Moonshot AI&#xff08;Kimi&#xff09;、腾讯元宝、阿里千问、字节豆包五大国内主流生成式AI平台&#xff0c;开展了一场史无前例的生成式引擎优化&#xff08;GEO&#xff09;对抗性…...

RouterOS L2TP服务器搭建与安全优化指南

1. L2TP协议基础与RouterOS适配性 L2TP协议全称为Layer 2 Tunneling Protocol&#xff0c;是一种工作在OSI模型第二层的隧道协议。我第一次接触这个协议是在2015年为企业部署远程办公系统时&#xff0c;当时发现它相比PPTP有着明显的安全优势。简单来说&#xff0c;L2TP就像是在…...

FedProto:跨异构客户端的原型联邦学习实践指南

1. 从零理解FedProto的核心思想 第一次听说FedProto时&#xff0c;我正被一个医疗影像分析项目搞得焦头烂额。五家医院的数据就像五个方言区——同样的病症在CT影像上呈现的特征分布天差地别。传统联邦学习就像让这些医院用各自的方言写报告&#xff0c;再强行翻译成标准语&…...

告别数据丢失!GD32串口DMA双缓冲+内存对齐配置避坑指南

GD32串口DMA双缓冲与内存对齐实战&#xff1a;工业级数据零丢失方案 在工业自动化、高速数据采集等场景中&#xff0c;串口通信的稳定性和效率直接关系到整个系统的可靠性。当波特率提升到921600甚至更高时&#xff0c;传统的轮询或中断方式往往难以应对持续的数据流&#xff0…...

M2LOrder模型管理实战:Python脚本自动扫描/opt目录并生成模型索引表

M2LOrder模型管理实战&#xff1a;Python脚本自动扫描/opt目录并生成模型索引表 1. 项目背景与需求 在实际的AI模型部署和维护过程中&#xff0c;我们经常会遇到模型文件分散存储、版本混乱、信息不透明的问题。M2LOrder情感识别系统就是一个典型的例子&#xff0c;它包含了9…...

PySR社区贡献指南:如何参与这个革命性符号回归开源项目的开发

PySR社区贡献指南&#xff1a;如何参与这个革命性符号回归开源项目的开发 【免费下载链接】PySR High-Performance Symbolic Regression in Python and Julia 项目地址: https://gitcode.com/gh_mirrors/py/PySR 想要为高性能符号回归工具PySR做出贡献吗&#xff1f;这份…...

用快马ai五分钟生成java学习路线可视化原型,清晰规划你的编程进阶之路

今天想和大家分享一个特别实用的Java学习路线可视化工具的开发过程。作为一个Java初学者&#xff0c;我经常被各种知识点搞得晕头转向&#xff0c;直到发现用InsCode(快马)平台可以快速搭建一个学习路线图&#xff0c;整个开发过程只用了不到半小时&#xff0c;效果却出奇地好。…...

深度解析 ConcurrentHashMap 1.8:put 与 get 核心流程全解

在 Java 并发编程中&#xff0c;ConcurrentHashMap 是线程安全的高频使用集合&#xff0c;相比线程不安全的 HashMap、效率低下的 HashTable&#xff08;全锁&#xff09;&#xff0c;JDK 1.8 版本的 ConcurrentHashMap 做了底层结构重构和锁机制优化&#xff0c;成为高并发场景…...

OWASP靶场实战指南:从环境搭建到第一个SQL注入漏洞挖掘(含DVWA通关思路)

OWASP靶场实战指南&#xff1a;从环境搭建到第一个SQL注入漏洞挖掘 网络安全的世界就像一片未知的海洋&#xff0c;而靶场就是我们练习游泳的安全泳池。对于刚入门的新手来说&#xff0c;最大的困扰往往不是缺乏理论知识&#xff0c;而是不知道如何将所学付诸实践。OWASP靶场正…...