Flink之SQL client使用案例
Flink的执行模式有以下三种:
前提是我们已经开启了yarnsession的进程,在下图中可以看到启动的id也就是后续任务需要通过此id进行认证,以及任务分配的master主机。
这里启动时候会报错一个ERROR:org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
查阅资料得知:
该错误是因为,kerberos认证失败,cdh6,并没有启动kerberos。所以该错误可以忽略。但是如果已经开启动了kerberos,这个问题就要解决了。
我们这里没有开启Kerberos,所以这个报错我么可以不管。

Session Mode:会话模式
会话模式需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所有提交的作业会竞争集群中的资源。适合任务规模小,执行时间短的大量作业。
Flink的作业执行环境会一直保留在集群上,直到会话被显式终止。这样,可以提交多个作业,它们可以共享相同的集群资源和状态,从而实现更高的效率和资源利用。
bin/flink run -yid application_1723708102500_0009 examples/batch/WordCount.jar
重要的是要添加 -yid 这个参数,不添加这个参数会执行不成功,会报错找不到执任务的cluster。
脚本执行参数:
-n(--container):TaskManager的数量。(1.10 已经废弃)
-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-q:显示可用的YARN资源(内存,内核);
-tm:每个TaskManager容器的内存(默认值:MB)
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行。
提交flink任务:
bin/flink run examples/batch/WordCount.jar
Per-Job Mode:单作业模式,我们也是更多的使用这种模式,这个模式会将我们的资源更合理的规划使用。
每个Flink应用程序作为一个独立的作业被提交和执行。
每次提交的Flink应用程序都会创建一个独立的作业执行环境,该作业执行环境仅用于执行该特定的作业。
作业完成后,作业执行环境会被释放,集群关闭,资源释放
bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
常用参数:
--p 程序默认并行度
下面的参数仅可用于 -m yarn-cluster 模式
--yjm JobManager可用内存,单位兆
--ynm YARN程序的名称
--yq 查询YARN可用的资源
--yqu 指定YARN队列是哪一个
--ys 每个TM会有多少个Slot
--ytm 每个TM所在的Container可申请多少内存,单位兆
--yD 动态指定Flink参数
-yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)
Application Mode:应用模式
应用模式算是前2种模式的升级,前2种模式中,Flink程序代码是在客户端执行,然后客户端提交给JobManager,客户端需要占用大量网络带宽。
应用模式需要为每一个提交的应用单独启动一个JobManager(应用程序在JobManager执行),也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager关闭。
application 模式使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
带有 JM 和 TM 内存设置的命令提交,这种方式提交之后会带对应服务器的HDFS的WebUI页面多出一个wordcount_01的文件,该文件记录了程序运行的结果
./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
./examples/batch/WordCount.jar --output hdfs://ddp54:8020/wordcount_01
在上面例子 的基础上自己设置 TaskManager slots 个数为3,以及指定并发数为3:
./bin/flink run-application -t yarn-application -p 3 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_52
指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范:
./bin/flink run-application -t yarn-application \
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_53
以上三种模式就先简述这些,其实还有很多参数没有用到,我们更多的只需要用到第二种pre-job的模式即可。
Yarn-session模式开启成功后,我们进入SQL-Client界面,在这个界面我们可以写SQL来实现系统之间的交互,我接下来以MySQL与Kafka的交互为例:
首先是要在MySQL数据库创建一些库和表当作source数据源:
CREATE TABLE src_mysql_order(order_id BIGINT,store_id BIGINT,sales_amt double,PRIMARY KEY (`order_id`) );CREATE TABLE src_mysql_order_detail(order_id BIGINT,store_id BIGINT,goods_id BIGINT,sales_amt double,PRIMARY KEY (order_id,store_id,goods_id) );CREATE TABLE dim_store(store_id BIGINT,store_name varchar(100),PRIMARY KEY (`store_id`) );CREATE TABLE dim_goods(goods_id BIGINT,goods_name varchar(100),PRIMARY KEY (`goods_id`) );CREATE TABLE dwa_mysql_order_analysis (store_id BIGINT,store_name varchar(100),sales_goods_distinct_nums bigint,sales_amt double,order_nums bigint,PRIMARY KEY (store_id,store_name) );
Source:在MySQL中创建完成之后我们要在SQL client界面进行映射在这里以src_mysql_order表为例,执行成功如以下界面:
CREATE TABLE src_mysql_order(
order_id BIGINT,
store_id BIGINT,
sales_amt double,
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxx',
'port' = '3306',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'xxx',
'scan.incremental.snapshot.enabled' = 'false'
);

Sink:对MySQL做完source映射之后,我们要将MySQL的数据导入到Kafka,因此我们也要做一些Kafka表的映射,执行成功界面如下:
CREATE TABLE ods_kafka_order (
order_id BIGINT,
store_id BIGINT,
sales_amt double,
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'Kafka主题',
'properties.bootstrap.servers' = 'Kafka集群的IP+端口号',
'key.format' = 'json',
'value.format' = 'json'
);

两张表都映射完成之后,我们先在MySQL添加一些测试用例:
insert into src_mysql_order values
(20221210001,10000,50),
(20221210002,10000,20),
(20221210003,10001,10);

接下来就将MySQL与Kafka实现交互,即将MySQL数据插入到Kafka作业中:
insert into ods_kafka_order_2 select * from src_mysql_order;

在这个过程中,有可能会报错:

这个报错是找不到表的元数据信息,我这里是将表名写错了,这个是比较庆幸的,但是还有一种原因就是:没有MySQLCDC或者Kafka的依赖,导致连接的元数据信息无法保存到catalog中,因此我们就需要添加MySQLCDC和Kafka的连接依赖:
进入到Flink安装路径的lib目录下:使用 rz 指令将依赖jar包上传,上传完毕之后使用 scp 指令远程复制给集群的其它机器,我们的是ddp54、ddp55:
scp -r lib/flink-sql-connector-kafka-1.16.2.jar root@ddp54:$PWD/lib
scp -r lib/flink-sql-connector-kafka-1.16.2.jar root@ddp55:$PWD/lib

Jar包上传完之后,我们在基础平台将Flink集群重启
集群重启之后,我们重新开启一个yarnsession进程来执行后续提交的任务。
进入yarn的web页面来查看进程启动的状况。

接下来我们重走一遍MySQL的source和Kafka的sink流程,走完之后进入SQL client界面执行交互指令,即MySQL数据插入到Kafka,执行完成之后没有报错,但是查看flink的web页面发现并没有作业在执行或执行完成,于是查看日志得知:问题是MySQL的系统时间跟所在地区时间不匹配导致的,我们可以在命令行进行时区的设置,也可以在配置文件中进行时区的设置,我选择了在my.cnf配置文件中进行时区的更改:在[mysqld]下添加默认时区设置即可,与此同时,MySQL也要开启binlog日志,可以保障数据一致性,主要用于复制和数据恢复。配置完成之后重启MySQL服务。
开启binlog日志
# 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有>关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1
查看日志得知是MySQL的时区问题导致任务提交不成功

在 my.cnf 对时区和binlog日志进行修改

上边的MySQL配置完成之后,需要重启MySQL服务
docker restart mysql
接下来在SQL client界面再次执行指令:
insert into ods_kafka_order select * from src_mysql_order;
打开Flink的web界面,发现Flink的作业任务正在执行:

我们在SQL client界面查询MySQL的数据表信息:
SET sql-client.execution.result-mode=tableau;
select * from src_mysql_order;
可以查看插入到MySQL的数据信息和数据的更新信息[Flink中 +I 代表插入数据 ; +U 代表更新数据 ; -U代表撤回数据]

与此同时,我们去Kafka查看数据是否到来,通过Kafka Tool查看到数据已经成功到Kafka。

至此我们实现了MySQL到Kafka的实时数据的接入以及在这个过程中遇到的一些问题以及解决办法。
相关文章:
Flink之SQL client使用案例
Flink的执行模式有以下三种: 前提是我们已经开启了yarnsession的进程,在下图中可以看到启动的id也就是后续任务需要通过此id进行认证,以及任务分配的master主机。 这里启动时候会报错一个ERROR:org.apache.flink.shaded.curator.org.apache…...
实际开发中的模块化开发 - 应用到直播间
实际开发中的模块化开发 - 模块管理(以直播间为例)-CSDN博客 引言 在前面的两篇博客中,我们已经介绍了直播模块的简单结构,创建了模块管理器和模块抽象基类,并且通过模块化实现了两个小业务功能模块。接下来…...
EmguCV学习笔记 VB.Net 第5章 图像变换
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 EmguCV是一个基于OpenCV的开源免费的跨平台计算机视觉库,它向C#和VB.NET开发者提供了OpenCV库的大部分功能。 教程VB.net版本请访…...
【初阶数据结构】顺序表与链表的比较(附题)
目录 一、顺序表和链表的区别(其他链表存在缺陷,比较意义不大,这里用带头双向循环链表与顺序表进行比较) 1.1插入、扩容与随机访问 二、缓存利用率的比较 2.1前置知识 详解及补充知识(本文仅为比较顺序表及链表&am…...
git-20240822
目录 初始化仓库 Git init Git init project --bare 查看提交的记录 git log --prettyoneline 查看当前git远程库地址 git remote -v 查看详细提交记录 git log 撤出暂存区的文件 git reset HEAD file(.代表全部文件) 提交数据到远程仓库 git config --global push.…...
【时时三省】c语言例题----华为机试题< 数字颠倒>
目录 1,题目 描述 输入描述: 输出描述: 示例1 2,代码...
【前缀和算法】--- 一维和二维前缀和模板
Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏: 算法Journey 本文开始,博主开始讲解有关前缀和的算法,本篇博客我们先来了解一下有关前缀和的两个模板。 🏠 一维前缀和模板 &…...
有些信息注定会丢失
智能在分析问题、做出决策时,总是希望获取尽可能多的信息,以此更加准确地决策。然而,很遗憾的是,有一些信息注定会丢失,不可能获取完全的信息,而且即使能够获取,智能也不能完全利用。 这一点与…...
c#中Task.Run 和使用 Task 构造函数创建任务的区别
Task.Run 和使用 Task 构造函数创建任务是两种不同的方法,它们在某些方面有显著的区别: 启动方式: Task.Run 是一个静态方法,它立即启动一个任务并在后台执行指定的工作。它通常用于快速启动一个简单的后台任务。使用 Task 构造函数创建任务&…...
使用nginx做代理转发
需求1:通过监听服务器的80端口,将请求转发到另一台服务器的8070端口 打开nginx/nginx.conf文件 server {listen 80;server_name localhost;location /analys {proxy_pass http://10.xx.xx.xx:8070/;} }需求2:通过监听服务器的80端口&am…...
Java前端与后端交互:JSON与XML数据交换 - 掌握现代Web开发的核心技能
引言 随着互联网技术的不断进步,Web应用变得越来越复杂,从前端到后端的每一个环节都需要精心设计以保证良好的用户体验。在这个过程中,数据的传递扮演着至关重要的角色。无论是简单的表单提交还是复杂的API调用,都需要一种可靠的…...
网络攻击原理及过程
网络攻击原理表 攻击者 内容 攻击访问 攻击效果 攻击意图 黑客 挑战 间谍 用户命令 破坏信息 好奇 恐怖主义者 脚本或程序 本地访问 信息泄密 获取情报 公司职员 自治主体 远程访问 窃取服务 经济利益 职业犯罪分子 电磁泄露 拒绝服务 恐怖事…...
day30(8/16)——ansible
目录 一、回顾 1、mysql和python 1. mysql5.7 2. 可以使用pymysql非交互的管理mysql 2、mycat中间件 1. 独属于mysql主从的负载均衡策略 2.配置写主读从 3. 步骤 3.1 安装jdk 3.2 mycat 3.3 配置 3.4 启动和调试 二、运维自动化(ansible) 1、任务背…...
fastadmin 安装
环境要求,大家可以参考官方文档的,我这里使用的是phpstudy,很多已经集成了。 注意一点,PHP 版本:PHP 7.4 。 第二步:下载 下载地址:https://www.fastadmin.net/download.html 进入下载地址后…...
Unity动画模块 之 3D模型导入基础设置 Rig页签
本文仅作笔记学习和分享,不用做任何商业用途本文包括但不限于unity官方手册,unity唐老狮等教程知识,如有不足还请斧正 1.Rig页签 Rig 选项卡 - Unity 手册,rig是设置骨骼与替身系统的,工作流程如下 Avatar是什么…...
⭐️Python在Windows命令行(Command Prompt)运行Python脚本或交互式地执行Python代码详解
Python在Windows命令行(Command Prompt)运行Python脚本或交互式地执行Python代码详解 Python在Windows命令行(Command Prompt)运行Python脚本或交互式地执行Python代码详解一、安装Python二、运行Python脚本1. 打开命令行2. 导航到…...
Python | Leetcode Python题解之第355题设计推特
题目: 题解: class Twitter:class Node:def __init__(self):self.followee set()self.tweet list()def __init__(self):self.time 0self.recentMax 10self.tweetTime dict()self.user dict()def postTweet(self, userId: int, tweetId: int) ->…...
D. Beard Graph
https://codeforces.com/problemset/problem/165/D 主要是边转点 后面都是简单的线段树维护 我们维护ok标记,val值,黑(1),白(0) id.okl.ok&r.ok id.vall.valr.val 注意特判如果两个点一样是0,如果dfn[u]1>dfn[v]就不…...
使用预训练的 ONNX 格式的 YOLOv8n 模型进行目标检测,并在图像上绘制检测结果
目录 __init__方法: pre_process方法: run方法: filter_boxes方法: view_img方法: __init__方法: 初始化类的实例时,创建一个onnxruntime的推理会话,加载名为yolo…...
mac安装xmind
文章目录 介绍软件功能下载安装1.下载完成后打开downloads 双击进行安装2.将软件拖到应用程序中3.在启动台中搜索打开4.提示损坏问题解决5.执行完成关闭命令窗口6.打开成功,点击继续,跳过登录7.打开成功后,点击关于 小结 介绍 XMind 是一款流…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
