当前位置: 首页 > news >正文

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。

批处理

    env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()
# 批处理
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

流处理

    env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()
# 流处理
+I[A, 1]
+I[B, 1]
+I[C, 1]
+I[D, 1]
-U[A, 1]
+U[A, 2]
+I[E, 1]
-U[C, 1]
+U[C, 2]
-U[D, 1]
+U[D, 2]
-U[A, 2]
+U[A, 3]

我们看到批处理是一次性的达成了最终计算——只插入了5条数据,且每条数据都是最终结果。
而流处理则是进行了13次操作,其中插入操作5次,删除4次,更新4次。

只有插入操作

Mysql表无主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT
);

Sink表无主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

Sink表有主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (`word`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

则对于只有插入操作的Batch模式,不管Sink表有没有主键,每次程序执行时都会插入新数据。比如我们执行两次批处理模式代码,则可以看到5的2倍=10条数据。

select * from WordsCountTable;
+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这个很好理解。

Mysql表有主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT,PRIMARY KEY (word)
);

Sink表无主键

因为word成为主键,不可以重复。第一次执行插入操作时成功了

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

但是第二次执行时,会因为主键冲突报错:

Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry ‘E’ for key ‘WordsCountTable.PRIMARY’

Sink表有主键

因为Mysql和Sink表里主键一致,不管执行多少次程序,都不会产生多余的数据。

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

有删除和更新操作

在流模式中我们看到,流处理处理有插入操作外,还有其他操作。我们再对比下它们的表现。

Sink表无主键

Mysql表无主键

Mysql有无主键

因为流模式删除和更新操作需要通过主键来寻找对象,所以会报如下错误

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

Sink表有主键

由于Sink表设置了主键,于是流模式产生的更新和删除操作可以通过其找到对应项,就不会报错。

Mysql表无主键

由于Mysql表没有主键,导致每次执行都会插入一批数据。比如下面是我们执行两次的结果

+------+-------+
| word | count |
+------+-------+
| E    |     1 |
| A    |     3 |
| D    |     2 |
| C    |     2 |
| B    |     1 |
| A    |     3 |
| D    |     2 |
| B    |     1 |
| C    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这从另外一个方面说明:**流模式产生的一系列操作,在Execute环节,最终会对这些操作进行合并,将合并的操作同步给外部系统。**比如之前的流操作实际产生了13个行为,而最终落到数据库里只有5条数据,且第二次操作也是插入了5条新的、最终的数据,这就说明中间的操作在同步给数据库之前已经做了合并处理。

Mysql表有主键

因为Mysql表有主键,Sink过来的操作执行的是“有则更新,无则写入”的模式。
比如我们第一次执行程序时,得到

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

然后我们将数据源中的E改成了A,则这次将出现4个A,但是不会出现E。执行后的结果是

+------+-------+
| word | count |
+------+-------+
| A    |     4 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

这个实验就证明了,当Sink和Mysql表的主键一致时,执行的是insert on duplicate key update操作。

相关文章:

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。 批处理 env_settings EnvironmentSettings \.new_…...

Java学习笔记(五)——数组、排序和查找

一、数组 (一)数组的使用 1、使用方式1——动态初始化 2、使用方式2——动态初始化 3、使用方式3——静态初始化 (二)数组使用注意事项和细节 (三)数组赋值机制 (四)数组拷贝…...

python输出与数据类型

目标 1、使用print输出内容 2、熟悉字符串类型 3、熟悉数字类型 4、熟悉数字与字符串操作 输出 print可控制输出内容也可配合、-、*、/进行运算,和整数型配合可进行运算和字符型配合有不同效果,如为拼接,*为多次输出注:整数型如&…...

React-Redux总结含购物车案例

React-Redux总结含购物车案例 reduc简介 redux是react全家桶的一员,它为react给i共可预测化的状态管理机制。redux是将整个应用状态存储到一个地方,成为store,里面存放着一颗树状态(state,tree),组件可以派发dispatch行为action给store,而不是直接通知其…...

攻克组合优化问题!美国DARPA选中全栈量子经典计算公司Rigetti

(图片来源:网络) 近日,美国量子计算公司Rigetti宣布,它被美国国防高级研究计划局 (DARPA) 选中,加入想象未来量子实际应用 (IMPAQT) 计划,推进先进量子算法的研发,去解决组合优化问…...

Kafka - 深入了解Kafka基础架构:Kafka的基本概念

文章目录 Kafka的基本概念 Kafka的基本概念 我们首先了解一些Kafka的基本概念。 1)Producer :消息生产者,就是向kafka broker发消息的客户端2)Consumer :消息消费者,向kafka broker获取消息的客户端3&…...

[Docker]二.Docker 镜像,仓库,容器介绍以及详解

一.Docker 镜像,容器,仓库的简单介绍 通俗来讲:镜像相当于VM虚拟机中的ios文件,容器相当于虚拟机系统,仓库相当于系统中的进程或者执行文件,容器是通过镜像创建的 1.镜像 Docker 镜像就是一个 Linux 的文件系统( Root FileSystem ),这个文…...

软考高级系统架构设计师系列之:案例分析典型试题一

软考高级系统架构设计师系列之:案例分析典型试题一 一、案例分析考试大纲二、结构化软件系统建模1.案例试题2.案例试题分析3.案例试题参考答案三、联合需求分析会议1.案例试题2.案例试题分析3.案例试题参考答案四、电子政务1.案例试题2.案例试题分析3.案例试题参考答案五、软件…...

2023年5个美国代理IP推荐,最佳代理花落谁家?

美国代理IP指的是代理服务器位于美国的IP地址,对于跨境业务来说,这些代理IP地址可以用于隐藏用户的真实IP地址,将其网络流量路由通过美国的服务器,以实现一些特定的目的。由于近年来,面向美国市场的跨境商家越来越多&a…...

github.com/holiman/uint256 源码阅读

github.com/holiman/uint256 源码阅读 // uint256: Fixed size 256-bit math library // Copyright 2018-2020 uint256 Authors // SPDX-License-Identifier: BSD-3-Clause// Package math provides integer math utilities.package uint256import ("encoding/binary&…...

排序-表排序

当我们需要对一个很大的结构体进行排序时,因为正常的排序需要大量的交换,这就会造成时间复杂度的浪费 因此,我们引入指针,通过指针临时变量的方式来避免时间复杂度的浪费 间接排序-排序思路:通过开辟一个指针数组&…...

勒索病毒最新变种.locked1勒索病毒来袭,如何恢复受感染的数据?

引言: 在当今数字化时代,网络威胁不断进化,.locked1勒索病毒就是其中一种常见的恶意软件。这种病毒会加密您的文件,然后勒索赎金以解锁它们。本文将详细介绍.locked1勒索病毒,包括如何恢复被加密的数据文件和如何预防…...

信号补零对信号频谱的影响

文章目录 前言一、 什么是补零二、案例三、补零前仿真及分析1、补零前 MATLAB 源码2、仿真及结果分析①、 x n x_n xn​ 时域图②、 x n x_n xn​ 频谱图 四、补零后仿真及分析1、补6000个零且1000采样点①、 MATLAB 源码②、仿真及结果分析 2、波形分辨率3、补6000个零且7000采…...

【Gan教程 】 什么是变分自动编码器VAE?

名词解释:Variational Autoencoder(VAE) 一、说明 为什么深度学习研究人员和概率机器学习人员在讨论变分自动编码器时会感到困惑?什么是变分自动编码器?为什么围绕这个术语存在不合理的混淆?本文从两个角度…...

T113-S3-buildroot文件系统tar解压缩gz文件

目录 前言 一、现象描述 二、解决方案 三、tar解压缩.gz文件 总结 前言 本文主要介绍全志T113-S3平台官方SDK,buildroot文件系统tar不支持.gz文件解压缩的问题以及如何配置buildroot文件系统解决该问题的方法介绍。 一、现象描述 在buildroot文件系统中&#xff…...

软件测试面试题:压测时,QPS一直上不去,如何排查?

在进行系统压测时,QPS(Queries Per Second)即每秒查询数,无法达到预期值是一个常见的问题,本文就来介绍下QPS一直上不去时应该如何排查。 一. 检查硬件资源 CPU使用率 使用top或nmon命令来查看CPU使用率。如果CPU使…...

探索JavaScript ES6+新特性

JavaScript是一门十分流行的编程语言,它不断发展演变以适应现代Web开发需求。ES6(也称为ECMAScript 2015)是JavaScript的第六个版本,引入了许多令人兴奋的新特性和语法糖。本文将介绍一些ES6中最有趣和实用的特性。 箭头函数 箭…...

Elasticsearch常见错误

一 read_only_allow_delete" : "true" 当我们在向某个索引添加一条数据的时候,可能(极少情况)会碰到下面的报错: {"error": { "root_cause": [ { "type": "cluster_block_exception", "r…...

mysql源码编译安装

下载地址:http://dev.mysql.com/downloads/mysql/5.1.html#downloads 免费版,只能下载mysql社区版。MySQL Community Server 选择合适的版本迚行下载: 安装前,如果不存在mysql 用户,则建立之 [rootlocalhost ~]# useradd mys…...

On Moving Object Segmentation from Monocular Video with Transformers 论文阅读

论文信息 标题:On Moving Object Segmentation from Monocular Video with Transformers 作者: 来源:ICCV 时间:2023 代码地址:暂无 Abstract 通过单个移动摄像机进行移动对象检测和分割是一项具有挑战性的任务&am…...

[AutoSar NVM] 存储架构

依AutoSAR及公开知识辛苦整理,禁止转载。 专栏 《深入浅出AutoSAR》, 全文 2900 字. 图片来源: 知乎 汽车的ECU内存中有很多不同类型的变量,这些变量包括了车辆各个系统和功能所需的数据。大部分变量在ECU掉电后就会丢失&#x…...

ES10 新特性

1. Object.fromEntries Object.fromEntries() 方法把可迭代对象的键值对列表转换为一个对象。 语法: Object.fromEntries(iterable)iterable:类似 Array 、 Map 或者其它实现了可迭代协议的可迭代对象。返回值:一个由该迭代对象条目提供对应属性的新对象。相当于 Object.e…...

宝塔安装脚本

Centos安装脚本 yum install -y wget && wget -O install.sh https://download.bt.cn/install/install_6.0.sh && sh install.sh ed8484bec Ubuntu/Deepin安装脚本 wget -O install.sh https://download.bt.cn/install/install-ubuntu_6.0.sh && sud…...

gulp打包vue3+jsx+less插件

最终转换结果如下 在根目录下添加gulpfile.js文件,package.json添加命令npm run gulp var gulp require(gulp) var babel require(gulp-babel) var less require(gulp-less) var del require(del); var spawn require(child_process).spawn;const outDir &…...

华为ICT——第四章深度学习和积卷神经

接第三章的末尾: 目录 接第三章的末尾: 1:自适应阈值分割: 2:形态处理: 4:膨胀: 5:腐蚀 6:开运算 7:闭运算 8:特征描述子 9&#xf…...

MongoDB 学习笔记(基础)

概论 出现背景:MongoDB 是文档型数据库,由于传统的关系型数据库(如 MySQL),在数据操作的“三高”需求以及应对 web 的网站需求面前显得有些吃力,在此环境下 MongoDB 出世了 三高需求: (1) 对数…...

【TGRS 2023】RingMo: A Remote Sensing Foundation ModelWith Masked Image Modeling

RingMo: A Remote Sensing Foundation Model With Masked Image Modeling, TGRS 2023 论文:https://ieeexplore.ieee.org/stamp/stamp.jsp?tp&arnumber9844015 代码:https://github.com/comeony/RingMo MindSpore/RingMo-Framework (gitee.com) …...

性能测试 —— 生成html测试报告、参数化、jvm监控

1.生成HTML的测试报告 1.1配置 (1)找到jmeter 的安装目录,下的bin中的jmeter.properties(jmeter配置文件) (2) ctrl f ,搜索jmeter.save.saveservice.output_format,取消井号 并且 把等号后的xml改为csv,…...

堆(二叉树,带图详解)

一.堆 1.堆的概念 2.堆的存储方式 逻辑结构 物理结构 2.堆的插入问题 3.堆的基本实现(代码)(以小堆为例) 1.堆的初始化 2. 向上调整 3.插入结点 4. 交换函数、堆的打印 5.向下调整 6.删除根节点并调整成小根堆 7.获取堆…...

vue3 code format bug

vue code format bug vue客户端代码格式化缺陷,为了方便阅读和维护,对代码格式化发现这个缺陷 vue.global.min.3.2.26.js var Vuefunction(r){"use strict";function e(e,t){const nObject.create(null);var re.split(",");for(le…...