当前位置: 首页 > news >正文

FlinkX学习

FlinkX学习

FlinkX安装

由于flinkx已经改名chunjun 官网已不存在

(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档

1、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/

2、配置环境变量

FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH

3、给bin/flinkx这个文件加上执行权限

 chmod +x flinkx

4、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

启动

命令行参数选项

  • model
    • 描述:执行模式,也就是flink集群的工作模式
      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
      • yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
    • 必选:否
    • 默认值:local
  • job
    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • pluginRoot
    • 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
    • 必选:是
    • 默认值:无
  • flinkconf
    • 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
    • 必选:否
    • 默认值:无
  • yarnconf
    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无
  • flinkLibJar
    • 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
    • 必选:否
    • 默认值:无
  • confProp
    • 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
    • 必选:否
    • 默认值:无
  • queue
    • 描述:yarn队列,如default
    • 必选:否
    • 默认值:无
  • pluginLoadMode
    • 描述:yarnPer模式插件加载方式:
      • classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
      • shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
    • 必选:否
    • 默认值:classpath

FlinkX概述

FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.

image-20220619223533456.png

FlinkX的简单使用

MySQL2HDFS

场景

将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中

参考文档

mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)

hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)

创建mysql2hdfs.json文件

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hdfswriter","parameter": {"path": "hdfs://master:9000/bigdata30/flinkx/out1","defaultFS": "hdfs://master:9000","column": [{"name": "col1","index": 0,"type": "string"},{"name": "col2","index": 1,"type": "string"},{"name": "col3","index": 2,"type": "string"},{"name": "col4","index": 3,"type": "string"}],"fieldDelimiter": ",","fileType": "text","writeMode": "append"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式
  • yarnPer模式: 对应Flink集群的Per-job模式

运行:

flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

监听日志:

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out

通过web界面查看任务运行情况

http://master:8888

hdfs上出现文件:

image.png

查看该文件:

hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0

出现Sid大于05的学生:

image.png

MySQLToHive

hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)

配置文件:

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hivewriter","parameter": {"jdbcUrl": "jdbc:hive2://master:10000/bigdata30","username": "","password": "","fileType": "text","fieldDelimiter": ",","writeMode": "overwrite","compress": "","charsetName": "UTF-8","maxFileSize": 1073741824,"tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}","defaultFS": "hdfs://master:9000"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

在hive中建表:

CREATE TABLE `bigdata30`.`Student`(`SId` STRING,`Sname` STRING,`Sage` STRING,`Ssex` STRING)
PARTITIONED BY ( `pt` string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

启动hiveserver2

启动任务

flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

运行发现报错 无法解决。

翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因

尝试使用chunjun 解决

MySQLToHBase

场景

将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中

配置文件

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hbasewriter","parameter": {"hbaseConfig": {"hbase.zookeeper.property.clientPort": "2181","hbase.rootdir": "hdfs://master:9000/hbase","hbase.cluster.distributed": "true","hbase.zookeeper.quorum": "master,node1,node2","zookeeper.znode.parent": "/hbase"},"table": "flinkx_Student","rowkeyColumn": "$(cf1:SId)","column": [{"name": "cf1:SId","type": "string"},{"name": "cf1:Sname","type": "string"},{"name": "cf1:Sage","type": "string"},{"name": "cf1:Ssex","type": "string"}]}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

在hbase中创建flinkx_Student表

create 'flinkx_Student','cf1'

启动

flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

hbase中的flinkx_Student表出现数据

image.png

MySQLToMySQL

场景

将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中

配置文件 mysql2mysql.json

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}],"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useSSL=false"],"table": ["Student"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false","table": ["Student2"]}],"writeMode": "insert","column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}]}}}],"setting": {"speed": {"channel": 1,"bytes": 0}}}}

在mysql datax1数据库中建表:

create table if not exists datax1.Student2(SID varchar(10),Sname varchar(100),Sage varchar(100),Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

运行:

flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

进入网页查看:

master:8888

image.png

查看Student2表 数据已导入:

image.png

相关文章:

FlinkX学习

FlinkX学习 FlinkX安装 由于flinkx已经改名chunjun 官网已不存在 (https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档 1、上传并解压 unzip flinkx-1.10.zip -d /usr/local/soft/2、配置环境变量 FLINKX_HOME/usr/local/soft/flinkx-1.10 export PATH$F…...

新书速览|解密AI绘画与修图: Stable Diffusion+Photoshop

《解密AI绘画与修图: Stable DiffusionPhotoshop》 本书内容 《解密AI绘画与修图:Stable DiffusionPhotoshop》全面介绍了Photoshop和Stable Diffusion的交互方式,以及各自的AI功能和具体使用方法。除了讲解功能,还通过实际案例加…...

1111111111111

计算机视觉技术在医疗领域的应用正迅速成为推动医疗进步的关键力量。通过高级图像处理和分析,这项技术在医学影像分析(包括CT、MRI和X光图像)、实时手术辅助、患者监测和护理、以及疾病早期诊断等方面展现出巨大的潜力。然而,随着…...

云原生概念

云原生是一种新型的技术体系和方法论,旨在充分利用云计算环境的优势,使应用程序更具有弹性、可伸缩性、可靠性和效率。以下是云原生的详细解释: 定义: 云原生是一种基于分布部署和统一运管的分布式云,以容器、微服务、…...

NoSQL之Redis高可用与优化

一、Redis高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证…...

MySQL 常见存储引擎详解(一)

本篇主要介绍MySQL中常见的存储引擎。 目录 一、InnoDB引擎 简介 特性 最佳实践 创建InnoDB 存储文件 二、MyISAM存储引擎 简介 特性 创建MyISAM表 存储文件 存储格式 静态格式 动态格式 压缩格式 三、MEMORY存储引擎 简介 特点 创建MEMORY表 存储文件 内…...

Leetcode 股票买卖

买卖股票最佳时机 I II 不限制交易次数 prices [7,1,5,3,6,4] 启发思路:最后一天发生了什么? 从第0天到第5天结束时的利润 从第0天到第4天结束时的利润 第5天的利润 (第5天的利润:0/-4/4) 关键词:天…...

小白学习手册:轻松理解MQ消息队列

目录 # 开篇 RabbitMQ介绍 通讯概念 1. 初始MQ及类型 2. MQ的架构 2.1 RabbitMQ的结构和概念 2.2 RabbitMQ消息流示意图 3. MQ下载使用 3.1 Docker下载MQ参考 3.2 进入RabbitMQ # 开篇 MessagesQueue 是一个抽象概念,用于描述消息队列系统的一般特性和功能…...

electron线上更新

一、安装electron-updater npm install --save electron-updater二、在main.js中引入使用 import { autoUpdater } from electron; if (!isDev) {const serverUrl https://your-update-server.com; // 自定义更新服务器地址或GitHub Releases地址autoUpdater.setFeedURL(${…...

谈谈检测浏览器类型

前几天被问到如何检测浏览器类型,我突然发现我对此并不了解,之前的项目中也没有使用到过,只隐约记得通过一个自带的方法即可获取。所以今天特意来仔细补习一下。 核心:navigator.userAgent 1.正则表达式 2.引用外部库 3.判断浏…...

Django 和 Django REST framework 创建对外 API

1. 环境准备 确保你已经安装了 Python 和 Django。如果尚未安装 Django REST framework,通过 pip 安装它: pip install djangorestframework 2. 创建 Django 项目 如果你还没有 Django 项目,可以通过以下命令创建: django-ad…...

数据结构之“刷链表题”

🌹个人主页🌹:喜欢草莓熊的bear 🌹专栏🌹:数据结构 目录 前言 一、相交链表 题目链接 大致思路 代码实现 二、环形链表1 题目链接 大致思路 代码实现 三、环形链表2 题目链接 大致思路 代码实…...

复分析——第9章——椭圆函数导论(E.M. Stein R. Shakarchi)

第 9 章 椭圆函数导论 (An Introduction to Elliptic Functions) The form that Jacobi had given to the theory of elliptic functions was far from perfection; its flaws are obvious. At the base we find three fundamental functions sn, cn and dn. These functio…...

使用kubeadm安装k8s并部署应用

安装k8s 1. 准备机器 准备三台机器 192.168.136.104 master节点 192.168.136.105 worker节点 192.168.136.106 worker节点2. 安装前配置 1.基础环境 ######################################################################### #关闭防火墙: 如果是云服务器&…...

springMVC学习

概述 Spring MVC(Model-View-Controller,模型-视图-控制器)是Spring框架的一部分,用于构建基于Java的Web应用程序。它遵循MVC设计模式,分离了应用程序的不同方面(输入逻辑、业务逻辑和UI逻辑)&…...

深入探讨光刻技术:半导体制造的关键工艺

前言 光刻(Photolithography)是现代半导体制造过程中不可或缺的一环,它的精度和能力直接决定了芯片的性能和密度。本文将详细介绍光刻技术的基本原理、过程、关键技术及其在半导体制造中的重要性。 光刻技术的基本原理 光刻是一种利用光化…...

CesiumJS【Basic】- #042 绘制纹理线(Primitive方式)

文章目录 绘制纹理线(Primitive方式)1 目标2 代码2.1 main.ts3 资源文件绘制纹理线(Primitive方式) 1 目标 使用Primitive方式绘制纹理线 2 代码 2.1 main.ts var start = Cesium.Cartesian3.fromDegrees(-75.59777, 40.03883);var...

代码随想录第38天|动态规划

1049. 最后一块石头的重量 II 参考 备注: 当物体容量也等同于价值时, 01背包问题的含义则是利用好最大的背包容量sum/2, 使得结果尽可能的接近或者小于 sum/2 等价: 尽可能的平分成相同的两堆, 其差则为结果, 比如 (abc)-d, (ac)-(bd) , 最终的结果是一堆减去另外一堆的和, 问…...

java生成excel,uniapp微信小程序接收excel并打开

java引包&#xff0c;引的是apache.poi <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency> 写一个测试类&#xff0c;把excel输出到指定路径 public s…...

sam_out 目标检测的应用

缺点参考地址训练验证模型解析 缺点 词表太大量化才可 参考地址 https://aistudio.baidu.com/projectdetail/8103098 训练验证 import os from glob import glob import cv2 import paddle import faiss from out_yolo_model import GPT as GPT13 import pandas as pd imp…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...