当前位置: 首页 > news >正文

FlinkX学习

FlinkX学习

FlinkX安装

由于flinkx已经改名chunjun 官网已不存在

(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档

1、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/

2、配置环境变量

FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH

3、给bin/flinkx这个文件加上执行权限

 chmod +x flinkx

4、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

启动

命令行参数选项

  • model
    • 描述:执行模式,也就是flink集群的工作模式
      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
      • yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
    • 必选:否
    • 默认值:local
  • job
    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • pluginRoot
    • 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
    • 必选:是
    • 默认值:无
  • flinkconf
    • 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
    • 必选:否
    • 默认值:无
  • yarnconf
    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无
  • flinkLibJar
    • 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
    • 必选:否
    • 默认值:无
  • confProp
    • 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
    • 必选:否
    • 默认值:无
  • queue
    • 描述:yarn队列,如default
    • 必选:否
    • 默认值:无
  • pluginLoadMode
    • 描述:yarnPer模式插件加载方式:
      • classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
      • shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
    • 必选:否
    • 默认值:classpath

FlinkX概述

FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.

image-20220619223533456.png

FlinkX的简单使用

MySQL2HDFS

场景

将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中

参考文档

mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)

hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)

创建mysql2hdfs.json文件

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hdfswriter","parameter": {"path": "hdfs://master:9000/bigdata30/flinkx/out1","defaultFS": "hdfs://master:9000","column": [{"name": "col1","index": 0,"type": "string"},{"name": "col2","index": 1,"type": "string"},{"name": "col3","index": 2,"type": "string"},{"name": "col4","index": 3,"type": "string"}],"fieldDelimiter": ",","fileType": "text","writeMode": "append"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式
  • yarnPer模式: 对应Flink集群的Per-job模式

运行:

flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

监听日志:

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out

通过web界面查看任务运行情况

http://master:8888

hdfs上出现文件:

image.png

查看该文件:

hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0

出现Sid大于05的学生:

image.png

MySQLToHive

hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)

配置文件:

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hivewriter","parameter": {"jdbcUrl": "jdbc:hive2://master:10000/bigdata30","username": "","password": "","fileType": "text","fieldDelimiter": ",","writeMode": "overwrite","compress": "","charsetName": "UTF-8","maxFileSize": 1073741824,"tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}","defaultFS": "hdfs://master:9000"}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

在hive中建表:

CREATE TABLE `bigdata30`.`Student`(`SId` STRING,`Sname` STRING,`Sage` STRING,`Ssex` STRING)
PARTITIONED BY ( `pt` string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

启动hiveserver2

启动任务

flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

运行发现报错 无法解决。

翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因

尝试使用chunjun 解决

MySQLToHBase

场景

将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中

配置文件

{"job": {"content": [{"reader": {"parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"],"table": ["Student"]}],"column": ["*"],"where": "Sid > 05 ","requestAccumulatorInterval": 2},"name": "mysqlreader"},"writer": {"name": "hbasewriter","parameter": {"hbaseConfig": {"hbase.zookeeper.property.clientPort": "2181","hbase.rootdir": "hdfs://master:9000/hbase","hbase.cluster.distributed": "true","hbase.zookeeper.quorum": "master,node1,node2","zookeeper.znode.parent": "/hbase"},"table": "flinkx_Student","rowkeyColumn": "$(cf1:SId)","column": [{"name": "cf1:SId","type": "string"},{"name": "cf1:Sname","type": "string"},{"name": "cf1:Sage","type": "string"},{"name": "cf1:Ssex","type": "string"}]}}}],"setting": {"restore": {"isRestore": false,"isStream": false},"errorLimit": {},"speed": {"channel": 1}}}
}

在hbase中创建flinkx_Student表

create 'flinkx_Student','cf1'

启动

flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

hbase中的flinkx_Student表出现数据

image.png

MySQLToMySQL

场景

将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中

配置文件 mysql2mysql.json

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}],"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useSSL=false"],"table": ["Student"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false","table": ["Student2"]}],"writeMode": "insert","column": [{"name": "SId","type": "string"},{"name": "Sname","type": "string"},{"name": "Sage","type": "string"},{"name": "Ssex","type": "string"}]}}}],"setting": {"speed": {"channel": 1,"bytes": 0}}}}

在mysql datax1数据库中建表:

create table if not exists datax1.Student2(SID varchar(10),Sname varchar(100),Sage varchar(100),Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

运行:

flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

进入网页查看:

master:8888

image.png

查看Student2表 数据已导入:

image.png

相关文章:

FlinkX学习

FlinkX学习 FlinkX安装 由于flinkx已经改名chunjun 官网已不存在 (https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档 1、上传并解压 unzip flinkx-1.10.zip -d /usr/local/soft/2、配置环境变量 FLINKX_HOME/usr/local/soft/flinkx-1.10 export PATH$F…...

新书速览|解密AI绘画与修图: Stable Diffusion+Photoshop

《解密AI绘画与修图: Stable DiffusionPhotoshop》 本书内容 《解密AI绘画与修图:Stable DiffusionPhotoshop》全面介绍了Photoshop和Stable Diffusion的交互方式,以及各自的AI功能和具体使用方法。除了讲解功能,还通过实际案例加…...

1111111111111

计算机视觉技术在医疗领域的应用正迅速成为推动医疗进步的关键力量。通过高级图像处理和分析,这项技术在医学影像分析(包括CT、MRI和X光图像)、实时手术辅助、患者监测和护理、以及疾病早期诊断等方面展现出巨大的潜力。然而,随着…...

云原生概念

云原生是一种新型的技术体系和方法论,旨在充分利用云计算环境的优势,使应用程序更具有弹性、可伸缩性、可靠性和效率。以下是云原生的详细解释: 定义: 云原生是一种基于分布部署和统一运管的分布式云,以容器、微服务、…...

NoSQL之Redis高可用与优化

一、Redis高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证…...

MySQL 常见存储引擎详解(一)

本篇主要介绍MySQL中常见的存储引擎。 目录 一、InnoDB引擎 简介 特性 最佳实践 创建InnoDB 存储文件 二、MyISAM存储引擎 简介 特性 创建MyISAM表 存储文件 存储格式 静态格式 动态格式 压缩格式 三、MEMORY存储引擎 简介 特点 创建MEMORY表 存储文件 内…...

Leetcode 股票买卖

买卖股票最佳时机 I II 不限制交易次数 prices [7,1,5,3,6,4] 启发思路:最后一天发生了什么? 从第0天到第5天结束时的利润 从第0天到第4天结束时的利润 第5天的利润 (第5天的利润:0/-4/4) 关键词:天…...

小白学习手册:轻松理解MQ消息队列

目录 # 开篇 RabbitMQ介绍 通讯概念 1. 初始MQ及类型 2. MQ的架构 2.1 RabbitMQ的结构和概念 2.2 RabbitMQ消息流示意图 3. MQ下载使用 3.1 Docker下载MQ参考 3.2 进入RabbitMQ # 开篇 MessagesQueue 是一个抽象概念,用于描述消息队列系统的一般特性和功能…...

electron线上更新

一、安装electron-updater npm install --save electron-updater二、在main.js中引入使用 import { autoUpdater } from electron; if (!isDev) {const serverUrl https://your-update-server.com; // 自定义更新服务器地址或GitHub Releases地址autoUpdater.setFeedURL(${…...

谈谈检测浏览器类型

前几天被问到如何检测浏览器类型,我突然发现我对此并不了解,之前的项目中也没有使用到过,只隐约记得通过一个自带的方法即可获取。所以今天特意来仔细补习一下。 核心:navigator.userAgent 1.正则表达式 2.引用外部库 3.判断浏…...

Django 和 Django REST framework 创建对外 API

1. 环境准备 确保你已经安装了 Python 和 Django。如果尚未安装 Django REST framework,通过 pip 安装它: pip install djangorestframework 2. 创建 Django 项目 如果你还没有 Django 项目,可以通过以下命令创建: django-ad…...

数据结构之“刷链表题”

🌹个人主页🌹:喜欢草莓熊的bear 🌹专栏🌹:数据结构 目录 前言 一、相交链表 题目链接 大致思路 代码实现 二、环形链表1 题目链接 大致思路 代码实现 三、环形链表2 题目链接 大致思路 代码实…...

复分析——第9章——椭圆函数导论(E.M. Stein R. Shakarchi)

第 9 章 椭圆函数导论 (An Introduction to Elliptic Functions) The form that Jacobi had given to the theory of elliptic functions was far from perfection; its flaws are obvious. At the base we find three fundamental functions sn, cn and dn. These functio…...

使用kubeadm安装k8s并部署应用

安装k8s 1. 准备机器 准备三台机器 192.168.136.104 master节点 192.168.136.105 worker节点 192.168.136.106 worker节点2. 安装前配置 1.基础环境 ######################################################################### #关闭防火墙: 如果是云服务器&…...

springMVC学习

概述 Spring MVC(Model-View-Controller,模型-视图-控制器)是Spring框架的一部分,用于构建基于Java的Web应用程序。它遵循MVC设计模式,分离了应用程序的不同方面(输入逻辑、业务逻辑和UI逻辑)&…...

深入探讨光刻技术:半导体制造的关键工艺

前言 光刻(Photolithography)是现代半导体制造过程中不可或缺的一环,它的精度和能力直接决定了芯片的性能和密度。本文将详细介绍光刻技术的基本原理、过程、关键技术及其在半导体制造中的重要性。 光刻技术的基本原理 光刻是一种利用光化…...

CesiumJS【Basic】- #042 绘制纹理线(Primitive方式)

文章目录 绘制纹理线(Primitive方式)1 目标2 代码2.1 main.ts3 资源文件绘制纹理线(Primitive方式) 1 目标 使用Primitive方式绘制纹理线 2 代码 2.1 main.ts var start = Cesium.Cartesian3.fromDegrees(-75.59777, 40.03883);var...

代码随想录第38天|动态规划

1049. 最后一块石头的重量 II 参考 备注: 当物体容量也等同于价值时, 01背包问题的含义则是利用好最大的背包容量sum/2, 使得结果尽可能的接近或者小于 sum/2 等价: 尽可能的平分成相同的两堆, 其差则为结果, 比如 (abc)-d, (ac)-(bd) , 最终的结果是一堆减去另外一堆的和, 问…...

java生成excel,uniapp微信小程序接收excel并打开

java引包&#xff0c;引的是apache.poi <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency> 写一个测试类&#xff0c;把excel输出到指定路径 public s…...

sam_out 目标检测的应用

缺点参考地址训练验证模型解析 缺点 词表太大量化才可 参考地址 https://aistudio.baidu.com/projectdetail/8103098 训练验证 import os from glob import glob import cv2 import paddle import faiss from out_yolo_model import GPT as GPT13 import pandas as pd imp…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

PAN/FPN

import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...

前端高频面试题2:浏览器/计算机网络

本专栏相关链接 前端高频面试题1&#xff1a;HTML/CSS 前端高频面试题2&#xff1a;浏览器/计算机网络 前端高频面试题3&#xff1a;JavaScript 1.什么是强缓存、协商缓存&#xff1f; 强缓存&#xff1a; 当浏览器请求资源时&#xff0c;首先检查本地缓存是否命中。如果命…...

13.10 LangGraph多轮对话系统实战:Ollama私有部署+情感识别优化全解析

LangGraph多轮对话系统实战:Ollama私有部署+情感识别优化全解析 LanguageMentor 对话式训练系统架构与实现 关键词:多轮对话系统设计、场景化提示工程、情感识别优化、LangGraph 状态管理、Ollama 私有化部署 1. 对话训练系统技术架构 采用四层架构实现高扩展性的对话训练…...