当前位置: 首页 > news >正文

FlinkX学习

FlinkX学习

FlinkX安装

由于flinkx已经改名chunjun 官网已不存在

(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档

1、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/

2、配置环境变量

FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH

3、给bin/flinkx这个文件加上执行权限

 chmod +x flinkx

4、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

启动

命令行参数选项

  • model
    • 描述:执行模式,也就是flink集群的工作模式
      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
      • yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
    • 必选:否
    • 默认值:local
  • job
    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • pluginRoot
    • 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
    • 必选:是
    • 默认值:无
  • flinkconf
    • 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
    • 必选:否
    • 默认值:无
  • yarnconf
    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无
  • flinkLibJar
    • 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
    • 必选:否
    • 默认值:无
  • confProp
    • 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
    • 必选:否
    • 默认值:无
  • queue
    • 描述:yarn队列,如default
    • 必选:否
    • 默认值:无
  • pluginLoadMode
    • 描述:yarnPer模式插件加载方式:
      • classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
      • shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
    • 必选:否
    • 默认值:classpath

FlinkX概述

FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.

image-20220619223533456.png

FlinkX的简单使用

MySQL2HDFS

场景

将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中

参考文档

mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)

hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)

创建mysql2hdfs.json文件

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hdfswriter","parameter": {"path": "hdfs://master:9000/bigdata30/flinkx/out1","defaultFS": "hdfs://master:9000","column": [{"name": "col1","index": 0,"type": "string"},{"name": "col2","index": 1,"type": "string"},{"name": "col3","index": 2,"type": "string"},{"name": "col4","index": 3,"type": "string"}],"fieldDelimiter": ",","fileType": "text","writeMode": "append"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式
  • yarnPer模式: 对应Flink集群的Per-job模式

运行:

flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

监听日志:

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out

通过web界面查看任务运行情况

http://master:8888

hdfs上出现文件:

image.png

查看该文件:

hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0

出现Sid大于05的学生:

image.png

MySQLToHive

hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)

配置文件:

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hivewriter","parameter": {"jdbcUrl": "jdbc:hive2://master:10000/bigdata30","username": "","password": "","fileType": "text","fieldDelimiter": ",","writeMode": "overwrite","compress": "","charsetName": "UTF-8","maxFileSize": 1073741824,"tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}","defaultFS": "hdfs://master:9000"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

在hive中建表:

CREATE TABLE `bigdata30`.`Student`(`SId` STRING,`Sname` STRING,`Sage` STRING,`Ssex` STRING)
PARTITIONED BY ( `pt` string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

启动hiveserver2

启动任务

flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

运行发现报错 无法解决。

翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因

尝试使用chunjun 解决

MySQLToHBase

场景

将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中

配置文件

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hbasewriter","parameter": {"hbaseConfig": {"hbase.zookeeper.property.clientPort": "2181","hbase.rootdir": "hdfs://master:9000/hbase","hbase.cluster.distributed": "true","hbase.zookeeper.quorum": "master,node1,node2","zookeeper.znode.parent": "/hbase"},"table": "flinkx_Student","rowkeyColumn": "$(cf1:SId)","column": [{"name": "cf1:SId","type": "string"},{"name": "cf1:Sname","type": "string"},{"name": "cf1:Sage","type": "string"},{"name": "cf1:Ssex","type": "string"}]}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

在hbase中创建flinkx_Student表

create 'flinkx_Student','cf1'

启动

flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

hbase中的flinkx_Student表出现数据

image.png

MySQLToMySQL

场景

将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中

配置文件 mysql2mysql.json

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}],"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useSSL=false"],"table": ["Student"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false","table": ["Student2"]}],"writeMode": "insert","column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}]}}}],"setting": {"speed": {"channel": 1,"bytes": 0}}}}

在mysql datax1数据库中建表:

create table if not exists datax1.Student2(SID varchar(10),Sname varchar(100),Sage varchar(100),Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

运行:

flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

进入网页查看:

master:8888

image.png

查看Student2表 数据已导入:

image.png

相关文章:

FlinkX学习

FlinkX学习 FlinkX安装 由于flinkx已经改名chunjun 官网已不存在 (https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档 1、上传并解压 unzip flinkx-1.10.zip -d /usr/local/soft/2、配置环境变量 FLINKX_HOME/usr/local/soft/flinkx-1.10 export PATH$F…...

新书速览|解密AI绘画与修图: Stable Diffusion+Photoshop

《解密AI绘画与修图: Stable DiffusionPhotoshop》 本书内容 《解密AI绘画与修图:Stable DiffusionPhotoshop》全面介绍了Photoshop和Stable Diffusion的交互方式,以及各自的AI功能和具体使用方法。除了讲解功能,还通过实际案例加…...

1111111111111

计算机视觉技术在医疗领域的应用正迅速成为推动医疗进步的关键力量。通过高级图像处理和分析,这项技术在医学影像分析(包括CT、MRI和X光图像)、实时手术辅助、患者监测和护理、以及疾病早期诊断等方面展现出巨大的潜力。然而,随着…...

云原生概念

云原生是一种新型的技术体系和方法论,旨在充分利用云计算环境的优势,使应用程序更具有弹性、可伸缩性、可靠性和效率。以下是云原生的详细解释: 定义: 云原生是一种基于分布部署和统一运管的分布式云,以容器、微服务、…...

NoSQL之Redis高可用与优化

一、Redis高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证…...

MySQL 常见存储引擎详解(一)

本篇主要介绍MySQL中常见的存储引擎。 目录 一、InnoDB引擎 简介 特性 最佳实践 创建InnoDB 存储文件 二、MyISAM存储引擎 简介 特性 创建MyISAM表 存储文件 存储格式 静态格式 动态格式 压缩格式 三、MEMORY存储引擎 简介 特点 创建MEMORY表 存储文件 内…...

Leetcode 股票买卖

买卖股票最佳时机 I II 不限制交易次数 prices [7,1,5,3,6,4] 启发思路:最后一天发生了什么? 从第0天到第5天结束时的利润 从第0天到第4天结束时的利润 第5天的利润 (第5天的利润:0/-4/4) 关键词:天…...

小白学习手册:轻松理解MQ消息队列

目录 # 开篇 RabbitMQ介绍 通讯概念 1. 初始MQ及类型 2. MQ的架构 2.1 RabbitMQ的结构和概念 2.2 RabbitMQ消息流示意图 3. MQ下载使用 3.1 Docker下载MQ参考 3.2 进入RabbitMQ # 开篇 MessagesQueue 是一个抽象概念,用于描述消息队列系统的一般特性和功能…...

electron线上更新

一、安装electron-updater npm install --save electron-updater二、在main.js中引入使用 import { autoUpdater } from electron; if (!isDev) {const serverUrl https://your-update-server.com; // 自定义更新服务器地址或GitHub Releases地址autoUpdater.setFeedURL(${…...

谈谈检测浏览器类型

前几天被问到如何检测浏览器类型,我突然发现我对此并不了解,之前的项目中也没有使用到过,只隐约记得通过一个自带的方法即可获取。所以今天特意来仔细补习一下。 核心:navigator.userAgent 1.正则表达式 2.引用外部库 3.判断浏…...

Django 和 Django REST framework 创建对外 API

1. 环境准备 确保你已经安装了 Python 和 Django。如果尚未安装 Django REST framework,通过 pip 安装它: pip install djangorestframework 2. 创建 Django 项目 如果你还没有 Django 项目,可以通过以下命令创建: django-ad…...

数据结构之“刷链表题”

🌹个人主页🌹:喜欢草莓熊的bear 🌹专栏🌹:数据结构 目录 前言 一、相交链表 题目链接 大致思路 代码实现 二、环形链表1 题目链接 大致思路 代码实现 三、环形链表2 题目链接 大致思路 代码实…...

复分析——第9章——椭圆函数导论(E.M. Stein R. Shakarchi)

第 9 章 椭圆函数导论 (An Introduction to Elliptic Functions) The form that Jacobi had given to the theory of elliptic functions was far from perfection; its flaws are obvious. At the base we find three fundamental functions sn, cn and dn. These functio…...

使用kubeadm安装k8s并部署应用

安装k8s 1. 准备机器 准备三台机器 192.168.136.104 master节点 192.168.136.105 worker节点 192.168.136.106 worker节点2. 安装前配置 1.基础环境 ######################################################################### #关闭防火墙: 如果是云服务器&…...

springMVC学习

概述 Spring MVC(Model-View-Controller,模型-视图-控制器)是Spring框架的一部分,用于构建基于Java的Web应用程序。它遵循MVC设计模式,分离了应用程序的不同方面(输入逻辑、业务逻辑和UI逻辑)&…...

深入探讨光刻技术:半导体制造的关键工艺

前言 光刻(Photolithography)是现代半导体制造过程中不可或缺的一环,它的精度和能力直接决定了芯片的性能和密度。本文将详细介绍光刻技术的基本原理、过程、关键技术及其在半导体制造中的重要性。 光刻技术的基本原理 光刻是一种利用光化…...

CesiumJS【Basic】- #042 绘制纹理线(Primitive方式)

文章目录 绘制纹理线(Primitive方式)1 目标2 代码2.1 main.ts3 资源文件绘制纹理线(Primitive方式) 1 目标 使用Primitive方式绘制纹理线 2 代码 2.1 main.ts var start = Cesium.Cartesian3.fromDegrees(-75.59777, 40.03883);var...

代码随想录第38天|动态规划

1049. 最后一块石头的重量 II 参考 备注: 当物体容量也等同于价值时, 01背包问题的含义则是利用好最大的背包容量sum/2, 使得结果尽可能的接近或者小于 sum/2 等价: 尽可能的平分成相同的两堆, 其差则为结果, 比如 (abc)-d, (ac)-(bd) , 最终的结果是一堆减去另外一堆的和, 问…...

java生成excel,uniapp微信小程序接收excel并打开

java引包&#xff0c;引的是apache.poi <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency> 写一个测试类&#xff0c;把excel输出到指定路径 public s…...

sam_out 目标检测的应用

缺点参考地址训练验证模型解析 缺点 词表太大量化才可 参考地址 https://aistudio.baidu.com/projectdetail/8103098 训练验证 import os from glob import glob import cv2 import paddle import faiss from out_yolo_model import GPT as GPT13 import pandas as pd imp…...

云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?

大家好&#xff0c;欢迎来到《云原生核心技术》系列的第七篇&#xff01; 在上一篇&#xff0c;我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在&#xff0c;我们就像一个拥有了一块崭新数字土地的农场主&#xff0c;是时…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...