FlinkX学习
FlinkX学习
FlinkX安装
由于flinkx已经改名chunjun 官网已不存在
(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档
1、上传并解压
unzip flinkx-1.10.zip -d /usr/local/soft/
2、配置环境变量
FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH
3、给bin/flinkx这个文件加上执行权限
chmod +x flinkx
4、修改配置文件,设置运行端口
vim flinkconf/flink-conf.yaml## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
启动
命令行参数选项
- model
- 描述:执行模式,也就是flink集群的工作模式
- local: 本地模式
- standalone: 独立部署模式的flink集群
- yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
- yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
- 必选:否
- 默认值:local
- 描述:执行模式,也就是flink集群的工作模式
- job
- 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
- 必选:是
- 默认值:无
- pluginRoot
- 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
- 必选:是
- 默认值:无
- flinkconf
- 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
- 必选:否
- 默认值:无
- yarnconf
- 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
- 必选:否
- 默认值:无
- flinkLibJar
- 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
- 必选:否
- 默认值:无
- confProp
- 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
- 必选:否
- 默认值:无
- queue
- 描述:yarn队列,如default
- 必选:否
- 默认值:无
- pluginLoadMode
- 描述:yarnPer模式插件加载方式:
- classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
- shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
- 必选:否
- 默认值:classpath
- 描述:yarnPer模式插件加载方式:
FlinkX概述
FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。
不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.

FlinkX的简单使用
MySQL2HDFS
场景
将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中
参考文档
mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)
hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)
创建mysql2hdfs.json文件
{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hdfswriter","parameter": {"path": "hdfs://master:9000/bigdata30/flinkx/out1","defaultFS": "hdfs://master:9000","column": [{"name": "col1","index": 0,"type": "string"},{"name": "col2","index": 1,"type": "string"},{"name": "col3","index": 2,"type": "string"},{"name": "col4","index": 3,"type": "string"}],"fieldDelimiter": ",","fileType": "text","writeMode": "append"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}
运行模式
- 单机模式:对应Flink集群的单机模式
- standalone模式:对应Flink集群的分布式模式
- yarn模式:对应Flink集群的yarn模式
- yarnPer模式: 对应Flink集群的Per-job模式
运行:
flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
监听日志:
flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件
tail -f nohup.out
通过web界面查看任务运行情况
http://master:8888
hdfs上出现文件:
查看该文件:
hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0
出现Sid大于05的学生:

MySQLToHive
hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)
配置文件:
{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hivewriter","parameter": {"jdbcUrl": "jdbc:hive2://master:10000/bigdata30","username": "","password": "","fileType": "text","fieldDelimiter": ",","writeMode": "overwrite","compress": "","charsetName": "UTF-8","maxFileSize": 1073741824,"tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}","defaultFS": "hdfs://master:9000"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}
在hive中建表:
CREATE TABLE `bigdata30`.`Student`(`SId` STRING,`Sname` STRING,`Sage` STRING,`Ssex` STRING)
PARTITIONED BY ( `pt` string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
启动hiveserver2
启动任务
flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
运行发现报错 无法解决。
翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因
尝试使用chunjun 解决
MySQLToHBase
场景
将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中
配置文件
{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hbasewriter","parameter": {"hbaseConfig": {"hbase.zookeeper.property.clientPort": "2181","hbase.rootdir": "hdfs://master:9000/hbase","hbase.cluster.distributed": "true","hbase.zookeeper.quorum": "master,node1,node2","zookeeper.znode.parent": "/hbase"},"table": "flinkx_Student","rowkeyColumn": "$(cf1:SId)","column": [{"name": "cf1:SId","type": "string"},{"name": "cf1:Sname","type": "string"},{"name": "cf1:Sage","type": "string"},{"name": "cf1:Ssex","type": "string"}]}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}
在hbase中创建flinkx_Student表
create 'flinkx_Student','cf1'
启动
flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
hbase中的flinkx_Student表出现数据
MySQLToMySQL
场景
将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中
配置文件 mysql2mysql.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}],"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useSSL=false"],"table": ["Student"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false","table": ["Student2"]}],"writeMode": "insert","column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}]}}}],"setting": {"speed": {"channel": 1,"bytes": 0}}}}
在mysql datax1数据库中建表:
create table if not exists datax1.Student2(SID varchar(10),Sname varchar(100),Sage varchar(100),Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;
运行:
flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
进入网页查看:
master:8888

查看Student2表 数据已导入:

相关文章:
FlinkX学习
FlinkX学习 FlinkX安装 由于flinkx已经改名chunjun 官网已不存在 (https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档 1、上传并解压 unzip flinkx-1.10.zip -d /usr/local/soft/2、配置环境变量 FLINKX_HOME/usr/local/soft/flinkx-1.10 export PATH$F…...
新书速览|解密AI绘画与修图: Stable Diffusion+Photoshop
《解密AI绘画与修图: Stable DiffusionPhotoshop》 本书内容 《解密AI绘画与修图:Stable DiffusionPhotoshop》全面介绍了Photoshop和Stable Diffusion的交互方式,以及各自的AI功能和具体使用方法。除了讲解功能,还通过实际案例加…...
1111111111111
计算机视觉技术在医疗领域的应用正迅速成为推动医疗进步的关键力量。通过高级图像处理和分析,这项技术在医学影像分析(包括CT、MRI和X光图像)、实时手术辅助、患者监测和护理、以及疾病早期诊断等方面展现出巨大的潜力。然而,随着…...
云原生概念
云原生是一种新型的技术体系和方法论,旨在充分利用云计算环境的优势,使应用程序更具有弹性、可伸缩性、可靠性和效率。以下是云原生的详细解释: 定义: 云原生是一种基于分布部署和统一运管的分布式云,以容器、微服务、…...
NoSQL之Redis高可用与优化
一、Redis高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证…...
MySQL 常见存储引擎详解(一)
本篇主要介绍MySQL中常见的存储引擎。 目录 一、InnoDB引擎 简介 特性 最佳实践 创建InnoDB 存储文件 二、MyISAM存储引擎 简介 特性 创建MyISAM表 存储文件 存储格式 静态格式 动态格式 压缩格式 三、MEMORY存储引擎 简介 特点 创建MEMORY表 存储文件 内…...
Leetcode 股票买卖
买卖股票最佳时机 I II 不限制交易次数 prices [7,1,5,3,6,4] 启发思路:最后一天发生了什么? 从第0天到第5天结束时的利润 从第0天到第4天结束时的利润 第5天的利润 (第5天的利润:0/-4/4) 关键词:天…...
小白学习手册:轻松理解MQ消息队列
目录 # 开篇 RabbitMQ介绍 通讯概念 1. 初始MQ及类型 2. MQ的架构 2.1 RabbitMQ的结构和概念 2.2 RabbitMQ消息流示意图 3. MQ下载使用 3.1 Docker下载MQ参考 3.2 进入RabbitMQ # 开篇 MessagesQueue 是一个抽象概念,用于描述消息队列系统的一般特性和功能…...
electron线上更新
一、安装electron-updater npm install --save electron-updater二、在main.js中引入使用 import { autoUpdater } from electron; if (!isDev) {const serverUrl https://your-update-server.com; // 自定义更新服务器地址或GitHub Releases地址autoUpdater.setFeedURL(${…...
谈谈检测浏览器类型
前几天被问到如何检测浏览器类型,我突然发现我对此并不了解,之前的项目中也没有使用到过,只隐约记得通过一个自带的方法即可获取。所以今天特意来仔细补习一下。 核心:navigator.userAgent 1.正则表达式 2.引用外部库 3.判断浏…...
Django 和 Django REST framework 创建对外 API
1. 环境准备 确保你已经安装了 Python 和 Django。如果尚未安装 Django REST framework,通过 pip 安装它: pip install djangorestframework 2. 创建 Django 项目 如果你还没有 Django 项目,可以通过以下命令创建: django-ad…...
数据结构之“刷链表题”
🌹个人主页🌹:喜欢草莓熊的bear 🌹专栏🌹:数据结构 目录 前言 一、相交链表 题目链接 大致思路 代码实现 二、环形链表1 题目链接 大致思路 代码实现 三、环形链表2 题目链接 大致思路 代码实…...
复分析——第9章——椭圆函数导论(E.M. Stein R. Shakarchi)
第 9 章 椭圆函数导论 (An Introduction to Elliptic Functions) The form that Jacobi had given to the theory of elliptic functions was far from perfection; its flaws are obvious. At the base we find three fundamental functions sn, cn and dn. These functio…...
使用kubeadm安装k8s并部署应用
安装k8s 1. 准备机器 准备三台机器 192.168.136.104 master节点 192.168.136.105 worker节点 192.168.136.106 worker节点2. 安装前配置 1.基础环境 ######################################################################### #关闭防火墙: 如果是云服务器&…...
springMVC学习
概述 Spring MVC(Model-View-Controller,模型-视图-控制器)是Spring框架的一部分,用于构建基于Java的Web应用程序。它遵循MVC设计模式,分离了应用程序的不同方面(输入逻辑、业务逻辑和UI逻辑)&…...
深入探讨光刻技术:半导体制造的关键工艺
前言 光刻(Photolithography)是现代半导体制造过程中不可或缺的一环,它的精度和能力直接决定了芯片的性能和密度。本文将详细介绍光刻技术的基本原理、过程、关键技术及其在半导体制造中的重要性。 光刻技术的基本原理 光刻是一种利用光化…...
CesiumJS【Basic】- #042 绘制纹理线(Primitive方式)
文章目录 绘制纹理线(Primitive方式)1 目标2 代码2.1 main.ts3 资源文件绘制纹理线(Primitive方式) 1 目标 使用Primitive方式绘制纹理线 2 代码 2.1 main.ts var start = Cesium.Cartesian3.fromDegrees(-75.59777, 40.03883);var...
代码随想录第38天|动态规划
1049. 最后一块石头的重量 II 参考 备注: 当物体容量也等同于价值时, 01背包问题的含义则是利用好最大的背包容量sum/2, 使得结果尽可能的接近或者小于 sum/2 等价: 尽可能的平分成相同的两堆, 其差则为结果, 比如 (abc)-d, (ac)-(bd) , 最终的结果是一堆减去另外一堆的和, 问…...
java生成excel,uniapp微信小程序接收excel并打开
java引包,引的是apache.poi <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency> 写一个测试类,把excel输出到指定路径 public s…...
sam_out 目标检测的应用
缺点参考地址训练验证模型解析 缺点 词表太大量化才可 参考地址 https://aistudio.baidu.com/projectdetail/8103098 训练验证 import os from glob import glob import cv2 import paddle import faiss from out_yolo_model import GPT as GPT13 import pandas as pd imp…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...
从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...
高考志愿填报管理系统---开发介绍
高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发,采用现代化的Web技术,为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## 📋 系统概述 ### 🎯 系统定…...
