当前位置: 首页 > news >正文

Flink构造宽表实时入库案例介绍

1. 安装包准备

Flink 1.15.4 安装包

Flink cdc的mysql连接器

Flink sql的sdb连接器

MySQL驱动

SDB驱动

Flink jdbc的mysql连接器

 

2. 入库流程图

3. Flink安装部署

  1. 上传Flink压缩包到服务器,并解压

tar -zxvf  flink-1.14.5-bin-scala_2.11.tgz  -C /opt/

  1. 复制依赖至Flink中

cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib
cp sequoiadb-driver-3.4.8.jre8.jar /opt/flink-1.14.5/lib
cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib
cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib

  1. 修改flink-conf.yaml文件

vi conf/flink-conf.yaml

### 配置Master的机器名(IP地址)
jobmanager.rpc.address: sdb1
### 配置每个taskmanager 生成的临时文件夹
io.tmp.dirs: /opt/flink-1.14.5/tmp

  1. 修改master文件

vi conf/masters

#作为master的ip和端口号
upgrade1:8081

  1. 修改worker文件

vi conf/workers

#集群主机名
upgrade1
upgrade2
upgrade3

  1. 拷贝到集群其他机器

scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/
scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/

  1. 启动flink集群

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh

  1. 启动flink-SQL

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh

4. 实时入库

编写造数程序进行造数

4.1 环境准备

4.1.1 开启mysql的binlog

  1. 创建binlog文件夹

[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog

  1. 开启binlog

vim /opt/sequoiasql/mysql/database/3306/auto.cnf

>>配置以下内容:
log_bin=/opt/sequoiasql/mysql/database/3306/binlog
binlog_format=ROW
expire_logs_days=1
server_id=1

配置完成之后,重启mysql

[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst

4.1.2 创建mysql表

创建库

create database sbtest;
use sbtest;

创建表

CREATE TABLE sbtest1 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest2 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name2 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest3 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

创建flink入库表

CREATE TABLE sbtest4 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

4.1.3 创建flink映射表

需要用到flink-sql-connector-mysql-cdc-2.2.1.jar

CREATE TABLE sbtest1_mysql (
    id INT,
    uuid INT,
    name1 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest1'
);

CREATE TABLE sbtest2_mysql (
    id INT,
    uuid INT,
    name2 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest2'
);

CREATE TABLE sbtest3_mysql (
    id INT,
    uuid INT,
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest3'
);

创建flink -->  mysql入库映射表

需要用到flink-connector-jdbc_2.11-1.14.6.jar

CREATE TABLE sbtest4_mysql (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'sbtest4'
);

创建flink -->  mysql入库映射表

需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar

CREATE TABLE sbtest_sdb (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sequoiadb',
    'bulksize' = '1',
    'hosts' = '192.168.223.135:11810',
    'collectionspace' = 'sbtest',
    'collection' = 'sbtest4'
);

4.2 MySQL实时入库

4.2.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.2.2 mysql实时入库

insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

查看可以成功入库

4.3 SDB实时入库

4.3.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.3.2 sdb实时入库

insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

显示已经成功入库

相关文章:

Flink构造宽表实时入库案例介绍

1. 安装包准备 Flink 1.15.4 安装包 Flink cdc的mysql连接器 Flink sql的sdb连接器 MySQL驱动 SDB驱动 Flink jdbc的mysql连接器 2. 入库流程图 3. Flink安装部署 上传Flink压缩包到服务器,并解压 tar -zxvf flink-1.14.5-bin-scala_2.11.tgz -C /opt/ 复…...

【Kubernetes】K8s 查看 Pod 的状态

K8s 查看 Pod 的状态 [rootk8s-master1 ~]# kubectl get pods NAME READY STATUS RESTARTS AGE nginx-3 1/1 Running 2 (34m ago) 14hNAME:Pod 的名称。READY:代表 Pod 里面有几个容器,前面是启动的,后面…...

Linux系统操作命令

Linux管理 在线查询Linux命令: https://www.runoob.com/linux/linux-install.htmlhttps://www.linuxcool.com/https://man.linuxde.net/ 1.Linux系统目录结构 Linux系统的目录结构是一个树状结构,每一个文件或目录都从根目录开始,并且根目…...

大模型学习与实践笔记(五)

一、环境配置 1. huggingface 镜像下载 sentence-transformers 开源词向量模型 import os# 设置环境变量 os.environ[HF_ENDPOINT] https://hf-mirror.com# 下载模型 os.system(huggingface-cli download --resume-download sentence-transformers/paraphrase-multilingual-…...

100个GEO基因表达芯片或转录组数据处理之GSE126848(003)

写在前边 虽然现在是高通量测序的时代,但是GEO、ArrayExpress等数据库储存并公开大量的基因表达芯片数据,还是会有大量的需求去处理芯片数据,并且建模或验证自己所研究基因的表达情况,芯片数据的处理也可能是大部分刚学生信的道友…...

1. Presto基础

该笔记来源于网络,仅用于搜索学习,不保证所有内容正确。文章目录 一、presto基础操作二、时间函数0、当前日期/当前时间1、转时间戳1)字符串转时间戳 (推荐)2)按照format指定的格式,将字符串str…...

ChatGPT可以帮你做什么?

学习 利用ChatGPT学习有很多,比如:语言学习、编程学习、论文学习拆解、推荐学习资源等,使用方法大同小异,这里以语言学习为例。 在开始前先给GPT充分的信息:(举例) 【角色】充当一名有丰富经验…...

20240111在ubuntu20.04.6下解压缩RAR格式的压缩包

20240111在ubuntu20.04.6下解压缩RAR格式的压缩包 2024/1/11 18:25 百度搜搜:ubuntu rar文件怎么解压 rootrootrootroot-X99-Turbo:~/temp$ ll total 2916 drwx------ 3 rootroot rootroot 4096 1月 11 18:28 ./ drwxr-xr-x 25 rootroot rootroot 4096 1月…...

YOLOv5改进 | 检测头篇 | ASFFHead自适应空间特征融合检测头(全网首发)

一、本文介绍 本文给大家带来的改进机制是利用ASFF改进YOLOv5的检测头形成新的检测头Detect_ASFF,其主要创新是引入了一种自适应的空间特征融合方式,有效地过滤掉冲突信息,从而增强了尺度不变性。经过我的实验验证,修改后的检测头在所有的检测目标上均有大幅度的涨点效果,…...

第十三章 接口测试(笔记)

一、接口测试分类 内部接口:测试被测系统各个子模块之间的接口,或者被测系统提供给内部系统使用的接口 外部接口: 1.被测系统调用外部的接口 2.系统对外提供的接口 接口测试重点:检查接口参数传递的正确性,接口功能的正确性,输出结果的正确性,以及对各种异常情况的容错…...

Github搭建图床 github搭建静态资源库 免费CDN加速 github搭建图床使用 jsdelivr CDN免费加速访问

Github搭建图床 github搭建静态资源库 免费CDN加速 github搭建图床使用 jsdelivr CDN免费加速访问 前言1、创建仓库2、开启 gh-pages页面功能3、访问测试 前言 写博客文章时,图片的上传和存放是一个问题,使用小众第三方图床,怕不稳定和倒闭&…...

Airtest-Selenium实操小课②:刷B站视频

1. 前言 上一课我们讲到用Airtest-Selenium爬取网站上我们需要的信息数据,还没看的同学可以戳这里看看~ 那么今天的推文,我们就来说说看,怎么实现看b站、刷b站的日常操作,包括点击暂停,发弹幕,点赞&#…...

Linux chmod命令详解

Linux chmod(英文全拼:change mode)命令是控制用户对文件的权限的命令 Linux/Unix 的文件调用权限分为三级 : 文件所有者(Owner)、用户组(Group)、其它用户(Other Users&#xff09…...

求幸存数之和 - 华为OD统一考试

OD统一考试(C卷) 分值: 100分 题解: Java / Python / C++ 题目描述 给一个正整数列nums,一个跳数jump,及幸存数量left。运算过程为:从索引为0的位置开始向后跳,中间跳过 J 个数字,命中索引为 J+1 的数字,该数被敲出,并从该点起跳,以此类推,直到幸存left个数为止。…...

【QML COOK】- 008-自定义属性

前面介绍了用C定义QML类型,通常在使用Qt Quick开发项目时,C定义后端数据类型,前端则完全使用QML实现。而QML类型或Qt Quick中的类型时不免需要为对象增加一些属性,本篇就来介绍如何自定义属性。 1. 创建项目,并编辑Ma…...

前端页面优化做的工作

1.分析模块占用空间 new (require(webpack-bundle-analyzer).BundleAnalyzerPlugin)() 2.使用谷歌浏览器中的layers,看下有没有影响性能的模块,或者应该销毁没销毁的 3.由于我们页面中含有很大的序列帧动画,所以会导致页面性能低&#xff0…...

Spark六:Spark 底层执行原理SparkContext、DAG、TaskScheduler

Spark底层执行原理 学习Spark运行流程 学习链接:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ 一、Spark运行流程 流程: SparkContext向管理器注册并向资源管理器申请运行Executor资源管理器分配Executor,然后资源管理器启动Execut…...

关于鸿蒙的笔记整理

提示:有使用过 vue 或 react 的小伙伴更容易理解 知识点强调: ArkTS所有内容都不支持深层数据更新 UI渲染 文章目录 一、关于样式1 . 默认单位 vp2 . 写公共样式 二 、 加载图片三 、 自定义构建函数 Builder四、构建函数-BuilderParam 传递UI五 、 父子…...

【漏洞复现】先锋WEB燃气收费系统文件上传漏洞 1day

漏洞描述 /AjaxService/Upload.aspx 存在任意文件上传漏洞 免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危害国家安全、荣誉和利益,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作…...

MYSQL篇--锁机制高频面试题

Mysql锁机制 1对mysql的锁有了解吗? 首先我们要知道,mysql的锁 其实是为了解决在并发事务时所导致的数据不一致问题的一种处理机制,也就是说 在事务的隔离级别实现中,就需要利用锁来解决幻读问题 然后我们可以聊到锁的分类 按锁…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...

JavaSec-RCE

简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性&#xff0c…...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...

在rocky linux 9.5上在线安装 docker

前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...