Flink测试利器之DataGen初探 | 京东云技术团队
什么是 Flinksql
Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如过滤、聚合、连接和转换等。它还提供了窗口操作、时间处理和复杂事件处理等功能,以满足流式数据处理的需求。
Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。他是Flink最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

使用 Flink SQL处理数据的基本步骤:
-
定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。
-
执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。
-
定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。
-
提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。
总而言之,我们可以通过Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。
什么是 connector
Flink Connector 是指用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:
例如:
-
JDBC :用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。
-
JDQ :用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。
-
Elasticsearch :用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。
-
File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。
-
…
还有如HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或将处理结果导出到外部系统。

DataGen Connector
DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。
使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。
demo
以下是一个使用 DataGen 函数的简单示例:
-- 创建输入表
CREATE TABLE input_table (order_number BIGINT,price DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time TIMESTAMP(3)
) WITH ('connector' = 'datagen',
);
在上面的示例中,我们使用 DataGen 连接器创建了一个名为 `input_table` 的输入表。该表包含了 `order_number`、`price` 和 `buyer` ,`order_time`四个字段。默认是random随机生成对应类型的数据,生产速率是10000条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。
生成的数据样例:
{"order_number":-6353089831284155505,"price":253422671148527900374700392448,"buyer":{"first_name":"6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name":"d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time":"2023-09-21 06:22:29.618"}
{"order_number":1102733628546646982,"price":628524591222898424803263250432,"buyer":{"first_name":"4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name":"7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time":"2023-09-21 06:23:01.69"}
支持的类型
| 字段类型 | 数据生成方式 |
|---|---|
| BOOLEAN | random |
| CHAR | random / sequence |
| VARCHAR | random / sequence |
| STRING | random / sequence |
| DECIMAL | random / sequence |
| TINYINT | random / sequence |
| SMALLINT | random / sequence |
| INT | random / sequence |
| BIGINT | random / sequence |
| FLOAT | random / sequence |
| DOUBLE | random / sequence |
| DATE | random |
| TIME | random |
| TIMESTAMP | random |
| TIMESTAMP_LTZ | random |
| INTERVAL YEAR TO MONTH | random |
| INTERVAL DAY TO MONTH | random |
| ROW | random |
| ARRAY | random |
| MAP | random |
| MULTISET | random |
连接器属性
| 属性 | 是否必填 | 默认值 | 类型 | 描述 |
|---|---|---|---|---|
| connector | required | (none) | String | ‘datagen’. |
| rows-per-second | optional | 10000 | Long | 数据生产速率 |
| number-of-rows | optional | (none) | Long | 指定生产的数据条数,默认是不限制。 |
| fields.#.kind | optional | random | String | 指定字段的生产数据的方式 random还是sequence |
| fields.#.min | optional | (Minimum value of type) | (Type of field) | random生成器 指定字段 # 最小值, 支持数字类型 |
| fields.#.max | optional | (Maximum value of type) | (Type of field) | random生成器的指定字段 # 最大值, 支持数字类型 |
| fields.#.length | optional | 100 | Integer | char/varchar/string/array/map/multiset 类型的长度. |
| fields.#.start | optional | (none) | (Type of field) | sequence生成器的开始值 |
| fields.#.end | optional | (none) | (Type of field) | sequence生成器的结束值 |
DataGen使用
了解了dategen的基本使用方法,那么下面来结合其他类型的连接器实践一下吧。
场景1 生成一亿条数据到hive表
CREATE TABLE dataGenSourceTable(order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3))
WITH( 'connector'='datagen', 'number-of-rows'='100000000','rows-per-second' = '100000') ;CREATECATALOG myhive
WITH ('type'='hive','default-database'='default'
);
USECATALOG myhive;
USE dev;
SETtable.sql-dialect=hive;
CREATETABLEifnotexists shipu3_test_0932 (order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);
SETtable.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number,price,buyer,order_time, cast( CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;
当每秒生产10万条数据的时候,17分钟左右就可以完成,当然我们可以通过增加Flink任务的计算节点、并行度、提高生产速率’rows-per-second’的值等来更快速的完成大数据量的生产。
场景2 持续每秒生产10万条数到消息队列
CREATE TABLE dataGenSourceTable (order_number BIGINT,price INT,buyer ROW< first_name STRING, last_name STRING >,order_time TIMESTAMP(3),col_array ARRAY < STRING >,col_map map < STRING, STRING >)
WITH( 'connector'='datagen', --连接器类型'rows-per-second'='100000', --生产速率'fields.order_number.kind'='random', --字段order_number的生产方式'fields.order_number.min'='1', --字段order_number最小值'fields.order_number.max'='1000', --字段order_number最大值'fields.price.kind'='sequence', --字段price的生产方式'fields.price.start'='1', --字段price开始值'fields.price.end'='1000', --字段price最大值'fields.col_array.element.length'='5', --每个元素的长度'fields.col_map.key.length'='5', --map key的长度'fields.col_map.value.length'='5' --map value的长度) ;
CREATE TABLE jdqsink1(order_number BIGINT,price DECIMAL(32, 2),buyer ROW< first_name STRING, last_name STRING >,order_time TIMESTAMP(3),col_ARRAY ARRAY < STRING >,col_map map < STRING, STRING >)
WITH('connector'='jdq','topic'='jrdw-fk-area_info__1','jdq.client.id'='xxxxx','jdq.password'='xxxxxxx','jdq.domain'='db.test.group.com','format'='json') ;
INSERTINTO jdqsink1
SELECT*FROM dataGenSourceTable;
思考
通过以上案例可以看到,通过Datagen结合其他连接器可以模拟各种场景的数据
- 性能测试:我们可以利用Flink的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应;
- 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性;
- 数据完整性测试:我们通过Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink任务在处理数据时是否能够正确地保持数据的完整性。
总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。
作者:京东零售 石朴
来源:京东云开发者社区 转载请注明来源
相关文章:
Flink测试利器之DataGen初探 | 京东云技术团队
什么是 Flinksql Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码…...
linux更换常用软件的默认缓存路径(.conda, .huggingface等)
在使用linux的过程中,我们往往会使用软件安装很多packages,其中的大多数软件(例如conda)会把当前安装的packages缓存起来,以加速之后的相同package的安装。 而很多软件的默认缓存路径是user自己的home路径。下面罗列几…...
Kafka消费者使用案例
本文代码链接:https://download.csdn.net/download/shangjg03/88422633 1.消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组…...
SpringMVC全注解开发
在学习过程中,框架给我们最大的作用,就是想让开发人员尽可能地只将精力放在具体业务功能的实现之上,而对于各种映射关系的配置,统统由框架来进行完成,由此,注解就很好的将映射功能进行实现,并且…...
解决 android Cannot access ‘<init>‘: it is private in
最近要在2个非直接依赖module使用单例,有一种注入依赖的方式可以,但是报了如下错误: Cannot access <init>: it is private in 经过查阅资料,原来是依赖的单例类的构造函数不能使用private,这里做个记录&#…...
不容易解的题10.15
395.至少有K个重复字符的最长字串 395. 至少有 K 个重复字符的最长子串 - 力扣(LeetCode)https://leetcode.cn/problems/longest-substring-with-at-least-k-repeating-characters/description/?envTypelist&envIdZCa7r67M自认为是不好做的题。尤其…...
Megatron-LM GPT 源码分析(二) Sequence Parallel分析
引用 本文基于开源代码 https://github.com/NVIDIA/Megatron-LM ,延续上一篇Megatron-LM GPT 源码分析(一) Tensor Parallel分析 通过对GPT的模型运行示例,从三个维度 - 模型结构、代码运行、代码逻辑说明 对其源码做深入的分析。…...
DNA序列(DNA Consensus String, ACM/ICPC Seoul 2006, UVa1368) rust解法
输入m个长度均为n的DNA序列,求一个DNA序列,到所有序列的总Hamming距离尽量小。两个等长字符串的Hamming距离等于字符不同的位置个数,例如,ACGT和GCGA的Hamming距离为2(左数第1, 4个字符不同)。 输入整数m和…...
如何使用Jmeter进行http接口测试?
前言: 本文主要针对http接口进行测试,使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的,它在实现对各种接口的调用方面已经做的比较成熟,因此,本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接…...
bash一行输入,多行回显demo脚本
效果图: 脚本: #!/bin/bash # 定义一个变量,用来存储输入的内容 input"" # 定义一个变量,用来存储输入的字符 char""# 为了让read能读到空格键 IFS_store$IFS IFS# 提示内容,在while循环中也有&a…...
IDEA spring-boot项目启动,无法加载或找到启动类问题解决
问题描述:找不到或无法加载主类 xxx.xxx.xxx.Classname 解决方案: 1.检查启动设置: 启动类所在包运行环境(一般选择默认即可)设置完成即可进行运行测试 2.如果第一步没有解决问题,试着第二步:…...
【LeetCode刷题(数据结构与算法)】:完全二叉树的节点个数
完全二叉树 的定义如下:在完全二叉树中,除了最底层节点可能没填满外,其余每层节点数都达到最大值,并且最下面一层的节点都集中在该层最左边的若干位置。若最底层为第 h 层,则该层包含 1~ 2h 个节点 输入:r…...
【代码随想录】算法训练营 第一天 第一章 数组 Part 1
目录 数组基础知识补充 704. 二分查找 题目 左闭右闭方法 思路 代码 左闭右开方法 思路 代码 27. 移除元素 题目 暴力解法 思路 代码 双指针法 思路 代码 数组基础知识补充 1. 在leecode中,数组一般是以vector容器的形式出现的,虽然ve…...
286_C++_定时器的其中一个操作,定时重载接口—startTimer循环执行回调(未完全)
1、启动一个定时器,允许在一定时间间隔内执行回调函数startTimer 1、接口函数参数详解 /*** @brief startTimer 定时重载接口* @param interval 定时器触发间隔,单位毫秒 (ms)* @param notify 定时时间到后需要触发的回调* @param type 回调驱动方…...
自动驾驶学习笔记(四)——变道绕行仿真
#Apollo开发者# 学习课程的传送门如下,当您也准备学习自动驾驶时,可以和我一同前往: 《自动驾驶新人之旅》免费课程—> 传送门 《2023星火培训【感知专项营】》免费课程—>传送门 文章目录 前言 仿真内容 启动Dreamview 开启Sim…...
C++位图,布隆过滤器
本期我们来学习位图,布隆过滤器等相关知识,以及模拟实现,需求前置知识 C-哈希Hash-CSDN博客 C-封装unordered_KLZUQ的博客-CSDN博客 目录 位图 布隆过滤器 海量数据面试题 全部代码 位图 我们先来看一道面试题 给 40 亿个不重复的无符号…...
Python多种方法实现九九乘法表
你好,我是悦创。 九九乘法表是一种常见的算术学习工具,通常用于帮助学生记住乘法的基本运算。以下是使用Python实现九九乘法表的几种方法: 1. 使用两个嵌套循环 for i in range(1, 10):for j in range(1, i 1):print(f"{j}x{i}{i * …...
【力扣1876】长度为三且各字符不同的子字符串
👑专栏内容:力扣刷题⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录 一、题目描述二、题目分析 一、题目描述 题目链接:长度为三且各字符不同的子字符串 如果一个字符串不含有任何…...
HSN:微调预训练ViT用于目标检测和语义分割,华南理工和阿里巴巴联合提出
今天跟大家分享华南理工大学和阿里巴巴联合提出的将ViT模型用于下游任务的高效微调方法HSN,该方法在迁移学习、目标检测、实例分割、语义分割等多个下游任务中表现优秀,性能接近甚至在某些任务上超越全参数微调。 论文标题:Hierarchical Side…...
机器学习的原理是什么?
训过小狗没? 没训过的话总见过吧? 你要能理解怎么训狗,就能非常轻易的理解机器学习的原理. 比如你想教小狗学习动作“坐下”一开始小狗根本不知道你在说什么。但是如果你每次都说坐下”然后帮助它坐下,并给它一块小零食作为奖励,经过多次…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机
这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...
PHP 8.5 即将发布:管道操作符、强力调试
前不久,PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5!作为 PHP 语言的又一次重要迭代,PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是,借助强大的本地开发环境 ServBay&am…...
HTML版英语学习系统
HTML版英语学习系统 这是一个完全免费、无需安装、功能完整的英语学习工具,使用HTML CSS JavaScript实现。 功能 文本朗读练习 - 输入英文文章,系统朗读帮助练习听力和发音,适合跟读练习,模仿学习;实时词典查询 - 双…...
