FlinkX学习
FlinkX学习
FlinkX安装
由于flinkx已经改名chunjun 官网已不存在
(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档
1、上传并解压
unzip flinkx-1.10.zip -d /usr/local/soft/
2、配置环境变量
FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH
3、给bin/flinkx这个文件加上执行权限
chmod +x flinkx
4、修改配置文件,设置运行端口
vim flinkconf/flink-conf.yaml## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
启动
命令行参数选项
- model
- 描述:执行模式,也就是flink集群的工作模式
- local: 本地模式
- standalone: 独立部署模式的flink集群
- yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
- yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
- 必选:否
- 默认值:local
- 描述:执行模式,也就是flink集群的工作模式
- job
- 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
- 必选:是
- 默认值:无
- pluginRoot
- 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
- 必选:是
- 默认值:无
- flinkconf
- 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
- 必选:否
- 默认值:无
- yarnconf
- 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
- 必选:否
- 默认值:无
- flinkLibJar
- 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
- 必选:否
- 默认值:无
- confProp
- 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
- 必选:否
- 默认值:无
- queue
- 描述:yarn队列,如default
- 必选:否
- 默认值:无
- pluginLoadMode
- 描述:yarnPer模式插件加载方式:
- classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
- shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
- 必选:否
- 默认值:classpath
- 描述:yarnPer模式插件加载方式:
FlinkX概述
FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。
不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.
FlinkX的简单使用
MySQL2HDFS
场景
将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中
参考文档
mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)
hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)
创建mysql2hdfs.json文件
{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hdfswriter","parameter": {"path": "hdfs://master:9000/bigdata30/flinkx/out1","defaultFS": "hdfs://master:9000","column": [{"name": "col1","index": 0,"type": "string"},{"name": "col2","index": 1,"type": "string"},{"name": "col3","index": 2,"type": "string"},{"name": "col4","index": 3,"type": "string"}],"fieldDelimiter": ",","fileType": "text","writeMode": "append"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}
运行模式
- 单机模式:对应Flink集群的单机模式
- standalone模式:对应Flink集群的分布式模式
- yarn模式:对应Flink集群的yarn模式
- yarnPer模式: 对应Flink集群的Per-job模式
运行:
flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
监听日志:
flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件
tail -f nohup.out
通过web界面查看任务运行情况
http://master:8888
hdfs上出现文件:

查看该文件:
hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0
出现Sid大于05的学生:
MySQLToHive
hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)
配置文件:
{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hivewriter","parameter": {"jdbcUrl": "jdbc:hive2://master:10000/bigdata30","username": "","password": "","fileType": "text","fieldDelimiter": ",","writeMode": "overwrite","compress": "","charsetName": "UTF-8","maxFileSize": 1073741824,"tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}","defaultFS": "hdfs://master:9000"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}
在hive中建表:
CREATE TABLE `bigdata30`.`Student`(`SId` STRING,`Sname` STRING,`Sage` STRING,`Ssex` STRING)
PARTITIONED BY ( `pt` string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
启动hiveserver2
启动任务
flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
运行发现报错 无法解决。
翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因
尝试使用chunjun 解决
MySQLToHBase
场景
将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中
配置文件
{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hbasewriter","parameter": {"hbaseConfig": {"hbase.zookeeper.property.clientPort": "2181","hbase.rootdir": "hdfs://master:9000/hbase","hbase.cluster.distributed": "true","hbase.zookeeper.quorum": "master,node1,node2","zookeeper.znode.parent": "/hbase"},"table": "flinkx_Student","rowkeyColumn": "$(cf1:SId)","column": [{"name": "cf1:SId","type": "string"},{"name": "cf1:Sname","type": "string"},{"name": "cf1:Sage","type": "string"},{"name": "cf1:Ssex","type": "string"}]}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}
在hbase中创建flinkx_Student表
create 'flinkx_Student','cf1'
启动
flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
hbase中的flinkx_Student表出现数据

MySQLToMySQL
场景
将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中
配置文件 mysql2mysql.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}],"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useSSL=false"],"table": ["Student"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false","table": ["Student2"]}],"writeMode": "insert","column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}]}}}],"setting": {"speed": {"channel": 1,"bytes": 0}}}}
在mysql datax1数据库中建表:
create table if not exists datax1.Student2(SID varchar(10),Sname varchar(100),Sage varchar(100),Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;
运行:
flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
进入网页查看:
master:8888
查看Student2表 数据已导入:

相关文章:

FlinkX学习
FlinkX学习 FlinkX安装 由于flinkx已经改名chunjun 官网已不存在 (https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档 1、上传并解压 unzip flinkx-1.10.zip -d /usr/local/soft/2、配置环境变量 FLINKX_HOME/usr/local/soft/flinkx-1.10 export PATH$F…...

新书速览|解密AI绘画与修图: Stable Diffusion+Photoshop
《解密AI绘画与修图: Stable DiffusionPhotoshop》 本书内容 《解密AI绘画与修图:Stable DiffusionPhotoshop》全面介绍了Photoshop和Stable Diffusion的交互方式,以及各自的AI功能和具体使用方法。除了讲解功能,还通过实际案例加…...
1111111111111
计算机视觉技术在医疗领域的应用正迅速成为推动医疗进步的关键力量。通过高级图像处理和分析,这项技术在医学影像分析(包括CT、MRI和X光图像)、实时手术辅助、患者监测和护理、以及疾病早期诊断等方面展现出巨大的潜力。然而,随着…...
云原生概念
云原生是一种新型的技术体系和方法论,旨在充分利用云计算环境的优势,使应用程序更具有弹性、可伸缩性、可靠性和效率。以下是云原生的详细解释: 定义: 云原生是一种基于分布部署和统一运管的分布式云,以容器、微服务、…...

NoSQL之Redis高可用与优化
一、Redis高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证…...

MySQL 常见存储引擎详解(一)
本篇主要介绍MySQL中常见的存储引擎。 目录 一、InnoDB引擎 简介 特性 最佳实践 创建InnoDB 存储文件 二、MyISAM存储引擎 简介 特性 创建MyISAM表 存储文件 存储格式 静态格式 动态格式 压缩格式 三、MEMORY存储引擎 简介 特点 创建MEMORY表 存储文件 内…...
Leetcode 股票买卖
买卖股票最佳时机 I II 不限制交易次数 prices [7,1,5,3,6,4] 启发思路:最后一天发生了什么? 从第0天到第5天结束时的利润 从第0天到第4天结束时的利润 第5天的利润 (第5天的利润:0/-4/4) 关键词:天…...

小白学习手册:轻松理解MQ消息队列
目录 # 开篇 RabbitMQ介绍 通讯概念 1. 初始MQ及类型 2. MQ的架构 2.1 RabbitMQ的结构和概念 2.2 RabbitMQ消息流示意图 3. MQ下载使用 3.1 Docker下载MQ参考 3.2 进入RabbitMQ # 开篇 MessagesQueue 是一个抽象概念,用于描述消息队列系统的一般特性和功能…...

electron线上更新
一、安装electron-updater npm install --save electron-updater二、在main.js中引入使用 import { autoUpdater } from electron; if (!isDev) {const serverUrl https://your-update-server.com; // 自定义更新服务器地址或GitHub Releases地址autoUpdater.setFeedURL(${…...
谈谈检测浏览器类型
前几天被问到如何检测浏览器类型,我突然发现我对此并不了解,之前的项目中也没有使用到过,只隐约记得通过一个自带的方法即可获取。所以今天特意来仔细补习一下。 核心:navigator.userAgent 1.正则表达式 2.引用外部库 3.判断浏…...
Django 和 Django REST framework 创建对外 API
1. 环境准备 确保你已经安装了 Python 和 Django。如果尚未安装 Django REST framework,通过 pip 安装它: pip install djangorestframework 2. 创建 Django 项目 如果你还没有 Django 项目,可以通过以下命令创建: django-ad…...

数据结构之“刷链表题”
🌹个人主页🌹:喜欢草莓熊的bear 🌹专栏🌹:数据结构 目录 前言 一、相交链表 题目链接 大致思路 代码实现 二、环形链表1 题目链接 大致思路 代码实现 三、环形链表2 题目链接 大致思路 代码实…...
复分析——第9章——椭圆函数导论(E.M. Stein R. Shakarchi)
第 9 章 椭圆函数导论 (An Introduction to Elliptic Functions) The form that Jacobi had given to the theory of elliptic functions was far from perfection; its flaws are obvious. At the base we find three fundamental functions sn, cn and dn. These functio…...
使用kubeadm安装k8s并部署应用
安装k8s 1. 准备机器 准备三台机器 192.168.136.104 master节点 192.168.136.105 worker节点 192.168.136.106 worker节点2. 安装前配置 1.基础环境 ######################################################################### #关闭防火墙: 如果是云服务器&…...

springMVC学习
概述 Spring MVC(Model-View-Controller,模型-视图-控制器)是Spring框架的一部分,用于构建基于Java的Web应用程序。它遵循MVC设计模式,分离了应用程序的不同方面(输入逻辑、业务逻辑和UI逻辑)&…...
深入探讨光刻技术:半导体制造的关键工艺
前言 光刻(Photolithography)是现代半导体制造过程中不可或缺的一环,它的精度和能力直接决定了芯片的性能和密度。本文将详细介绍光刻技术的基本原理、过程、关键技术及其在半导体制造中的重要性。 光刻技术的基本原理 光刻是一种利用光化…...

CesiumJS【Basic】- #042 绘制纹理线(Primitive方式)
文章目录 绘制纹理线(Primitive方式)1 目标2 代码2.1 main.ts3 资源文件绘制纹理线(Primitive方式) 1 目标 使用Primitive方式绘制纹理线 2 代码 2.1 main.ts var start = Cesium.Cartesian3.fromDegrees(-75.59777, 40.03883);var...

代码随想录第38天|动态规划
1049. 最后一块石头的重量 II 参考 备注: 当物体容量也等同于价值时, 01背包问题的含义则是利用好最大的背包容量sum/2, 使得结果尽可能的接近或者小于 sum/2 等价: 尽可能的平分成相同的两堆, 其差则为结果, 比如 (abc)-d, (ac)-(bd) , 最终的结果是一堆减去另外一堆的和, 问…...

java生成excel,uniapp微信小程序接收excel并打开
java引包,引的是apache.poi <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency> 写一个测试类,把excel输出到指定路径 public s…...
sam_out 目标检测的应用
缺点参考地址训练验证模型解析 缺点 词表太大量化才可 参考地址 https://aistudio.baidu.com/projectdetail/8103098 训练验证 import os from glob import glob import cv2 import paddle import faiss from out_yolo_model import GPT as GPT13 import pandas as pd imp…...

鸿蒙PC,有什么缺点?
点击上方关注 “终端研发部” 设为“星标”,和你一起掌握更多数据库知识 价格太高,二是部分管理员权限首先,三对于开发者不太友好举个例子:VSCode的兼容性对程序员至关重要。若能支持VSCode,这台电脑将成为大多数开发者…...
程序代码篇---Python串口
在 Python 里,serial库(一般指pyserial)是串口通信的常用工具。下面为你介绍其常用的读取和发送操作函数及使用示例: 1. 初始化串口 要进行串口通信,首先得对串口对象进行初始化,代码如下: i…...

STM32使用土壤湿度传感器
1.1 介绍: 土壤湿度传感器是一种传感装置,主要用于检测土壤湿度的大小,并广泛应用于汽车自动刮水系统、智能灯光系统和智能天窗系统等。传感器采用优质FR-04双料,大面积5.0 * 4.0厘米,镀镍处理面。 它具有抗氧化&…...
PTC过流保护器件工作原理及选型方法
PTC过流保护器件 (Positive Temperature Coefficient,正温度系数热敏电阻)是一种过流保护元件,其工作原理基于电阻值随温度变化的特性。当电路正常工作时,PTC的阻值很小,电流可以顺畅通过;但当…...

从上下文学习和微调看语言模型的泛化:一项对照研究
大型语言模型表现出令人兴奋的能力,但也可以从微调中表现出令人惊讶的狭窄泛化。例如,他们可能无法概括为简单的关系反转,或者无法根据训练信息进行简单的逻辑推理。这些未能从微调中概括出来的失败可能会阻碍这些模型的实际应用。另一方面&a…...
使用 Docker Compose 从零部署 TeamCity + PostgreSQL(详细新手教程)
JetBrains TeamCity 是一款专业的持续集成(CI)服务器工具,支持各种编程语言和构建流程。本文将一步一步带你用 Docker 和 Docker Compose 快速部署 TeamCity,搭配 PostgreSQL 数据库,并确保 所有操作新手可跟着做。 一…...
Ansible自动化运维全解析:从设计哲学到实战演进
一、Ansible的设计哲学:简单即正义 在DevOps工具链中,Ansible以其"无代理架构(Agentless)"设计独树一帜。这个用Python编写的自动化引擎,通过SSH协议与目标主机通信,彻底摒弃了传统配置管理工具…...

NTT印地赛车:数字孪生技术重构赛事体验范式,驱动观众参与度革命
引言:数字孪生技术赋能体育赛事,开启沉浸式观赛新纪元 在传统体育赛事观赛模式遭遇体验天花板之际,NTT与印地赛车系列赛(NTT INDYCAR SERIES)的深度合作,通过数字孪生(Digital Twin)…...

[Spring]-AOP
AOP场景 AOP: Aspect Oriented Programming (面向切面编程) OOP: Object Oriented Programming (面向对象编程) 场景设计 设计: 编写一个计算器接口和实现类,提供加减乘除四则运算 需求: 在加减乘除运算的时候需要记录操作日志(运算前参数、运算后结果)实现方案:…...
【Fiddler抓取手机数据包】
Fiddler抓取手机数据包的配置方法 确保电脑和手机在同一局域网 电脑和手机需连接同一Wi-Fi网络。可通过电脑命令行输入ipconfig查看电脑的本地IP地址(IPv4地址),手机需能ping通该IP。 配置Fiddler允许远程连接 打开Fiddler,进入…...