当前位置: 首页 > news >正文

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。

批处理

    env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()
# 批处理
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

流处理

    env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()
# 流处理
+I[A, 1]
+I[B, 1]
+I[C, 1]
+I[D, 1]
-U[A, 1]
+U[A, 2]
+I[E, 1]
-U[C, 1]
+U[C, 2]
-U[D, 1]
+U[D, 2]
-U[A, 2]
+U[A, 3]

我们看到批处理是一次性的达成了最终计算——只插入了5条数据,且每条数据都是最终结果。
而流处理则是进行了13次操作,其中插入操作5次,删除4次,更新4次。

只有插入操作

Mysql表无主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT
);

Sink表无主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

Sink表有主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (`word`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

则对于只有插入操作的Batch模式,不管Sink表有没有主键,每次程序执行时都会插入新数据。比如我们执行两次批处理模式代码,则可以看到5的2倍=10条数据。

select * from WordsCountTable;
+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这个很好理解。

Mysql表有主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT,PRIMARY KEY (word)
);

Sink表无主键

因为word成为主键,不可以重复。第一次执行插入操作时成功了

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

但是第二次执行时,会因为主键冲突报错:

Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry ‘E’ for key ‘WordsCountTable.PRIMARY’

Sink表有主键

因为Mysql和Sink表里主键一致,不管执行多少次程序,都不会产生多余的数据。

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

有删除和更新操作

在流模式中我们看到,流处理处理有插入操作外,还有其他操作。我们再对比下它们的表现。

Sink表无主键

Mysql表无主键

Mysql有无主键

因为流模式删除和更新操作需要通过主键来寻找对象,所以会报如下错误

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

Sink表有主键

由于Sink表设置了主键,于是流模式产生的更新和删除操作可以通过其找到对应项,就不会报错。

Mysql表无主键

由于Mysql表没有主键,导致每次执行都会插入一批数据。比如下面是我们执行两次的结果

+------+-------+
| word | count |
+------+-------+
| E    |     1 |
| A    |     3 |
| D    |     2 |
| C    |     2 |
| B    |     1 |
| A    |     3 |
| D    |     2 |
| B    |     1 |
| C    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这从另外一个方面说明:**流模式产生的一系列操作,在Execute环节,最终会对这些操作进行合并,将合并的操作同步给外部系统。**比如之前的流操作实际产生了13个行为,而最终落到数据库里只有5条数据,且第二次操作也是插入了5条新的、最终的数据,这就说明中间的操作在同步给数据库之前已经做了合并处理。

Mysql表有主键

因为Mysql表有主键,Sink过来的操作执行的是“有则更新,无则写入”的模式。
比如我们第一次执行程序时,得到

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

然后我们将数据源中的E改成了A,则这次将出现4个A,但是不会出现E。执行后的结果是

+------+-------+
| word | count |
+------+-------+
| A    |     4 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

这个实验就证明了,当Sink和Mysql表的主键一致时,执行的是insert on duplicate key update操作。

相关文章:

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。 批处理 env_settings EnvironmentSettings \.new_…...

Java学习笔记(五)——数组、排序和查找

一、数组 (一)数组的使用 1、使用方式1——动态初始化 2、使用方式2——动态初始化 3、使用方式3——静态初始化 (二)数组使用注意事项和细节 (三)数组赋值机制 (四)数组拷贝…...

python输出与数据类型

目标 1、使用print输出内容 2、熟悉字符串类型 3、熟悉数字类型 4、熟悉数字与字符串操作 输出 print可控制输出内容也可配合、-、*、/进行运算,和整数型配合可进行运算和字符型配合有不同效果,如为拼接,*为多次输出注:整数型如&…...

React-Redux总结含购物车案例

React-Redux总结含购物车案例 reduc简介 redux是react全家桶的一员,它为react给i共可预测化的状态管理机制。redux是将整个应用状态存储到一个地方,成为store,里面存放着一颗树状态(state,tree),组件可以派发dispatch行为action给store,而不是直接通知其…...

攻克组合优化问题!美国DARPA选中全栈量子经典计算公司Rigetti

(图片来源:网络) 近日,美国量子计算公司Rigetti宣布,它被美国国防高级研究计划局 (DARPA) 选中,加入想象未来量子实际应用 (IMPAQT) 计划,推进先进量子算法的研发,去解决组合优化问…...

Kafka - 深入了解Kafka基础架构:Kafka的基本概念

文章目录 Kafka的基本概念 Kafka的基本概念 我们首先了解一些Kafka的基本概念。 1)Producer :消息生产者,就是向kafka broker发消息的客户端2)Consumer :消息消费者,向kafka broker获取消息的客户端3&…...

[Docker]二.Docker 镜像,仓库,容器介绍以及详解

一.Docker 镜像,容器,仓库的简单介绍 通俗来讲:镜像相当于VM虚拟机中的ios文件,容器相当于虚拟机系统,仓库相当于系统中的进程或者执行文件,容器是通过镜像创建的 1.镜像 Docker 镜像就是一个 Linux 的文件系统( Root FileSystem ),这个文…...

软考高级系统架构设计师系列之:案例分析典型试题一

软考高级系统架构设计师系列之:案例分析典型试题一 一、案例分析考试大纲二、结构化软件系统建模1.案例试题2.案例试题分析3.案例试题参考答案三、联合需求分析会议1.案例试题2.案例试题分析3.案例试题参考答案四、电子政务1.案例试题2.案例试题分析3.案例试题参考答案五、软件…...

2023年5个美国代理IP推荐,最佳代理花落谁家?

美国代理IP指的是代理服务器位于美国的IP地址,对于跨境业务来说,这些代理IP地址可以用于隐藏用户的真实IP地址,将其网络流量路由通过美国的服务器,以实现一些特定的目的。由于近年来,面向美国市场的跨境商家越来越多&a…...

github.com/holiman/uint256 源码阅读

github.com/holiman/uint256 源码阅读 // uint256: Fixed size 256-bit math library // Copyright 2018-2020 uint256 Authors // SPDX-License-Identifier: BSD-3-Clause// Package math provides integer math utilities.package uint256import ("encoding/binary&…...

排序-表排序

当我们需要对一个很大的结构体进行排序时,因为正常的排序需要大量的交换,这就会造成时间复杂度的浪费 因此,我们引入指针,通过指针临时变量的方式来避免时间复杂度的浪费 间接排序-排序思路:通过开辟一个指针数组&…...

勒索病毒最新变种.locked1勒索病毒来袭,如何恢复受感染的数据?

引言: 在当今数字化时代,网络威胁不断进化,.locked1勒索病毒就是其中一种常见的恶意软件。这种病毒会加密您的文件,然后勒索赎金以解锁它们。本文将详细介绍.locked1勒索病毒,包括如何恢复被加密的数据文件和如何预防…...

信号补零对信号频谱的影响

文章目录 前言一、 什么是补零二、案例三、补零前仿真及分析1、补零前 MATLAB 源码2、仿真及结果分析①、 x n x_n xn​ 时域图②、 x n x_n xn​ 频谱图 四、补零后仿真及分析1、补6000个零且1000采样点①、 MATLAB 源码②、仿真及结果分析 2、波形分辨率3、补6000个零且7000采…...

【Gan教程 】 什么是变分自动编码器VAE?

名词解释:Variational Autoencoder(VAE) 一、说明 为什么深度学习研究人员和概率机器学习人员在讨论变分自动编码器时会感到困惑?什么是变分自动编码器?为什么围绕这个术语存在不合理的混淆?本文从两个角度…...

T113-S3-buildroot文件系统tar解压缩gz文件

目录 前言 一、现象描述 二、解决方案 三、tar解压缩.gz文件 总结 前言 本文主要介绍全志T113-S3平台官方SDK,buildroot文件系统tar不支持.gz文件解压缩的问题以及如何配置buildroot文件系统解决该问题的方法介绍。 一、现象描述 在buildroot文件系统中&#xff…...

软件测试面试题:压测时,QPS一直上不去,如何排查?

在进行系统压测时,QPS(Queries Per Second)即每秒查询数,无法达到预期值是一个常见的问题,本文就来介绍下QPS一直上不去时应该如何排查。 一. 检查硬件资源 CPU使用率 使用top或nmon命令来查看CPU使用率。如果CPU使…...

探索JavaScript ES6+新特性

JavaScript是一门十分流行的编程语言,它不断发展演变以适应现代Web开发需求。ES6(也称为ECMAScript 2015)是JavaScript的第六个版本,引入了许多令人兴奋的新特性和语法糖。本文将介绍一些ES6中最有趣和实用的特性。 箭头函数 箭…...

Elasticsearch常见错误

一 read_only_allow_delete" : "true" 当我们在向某个索引添加一条数据的时候,可能(极少情况)会碰到下面的报错: {"error": { "root_cause": [ { "type": "cluster_block_exception", "r…...

mysql源码编译安装

下载地址:http://dev.mysql.com/downloads/mysql/5.1.html#downloads 免费版,只能下载mysql社区版。MySQL Community Server 选择合适的版本迚行下载: 安装前,如果不存在mysql 用户,则建立之 [rootlocalhost ~]# useradd mys…...

On Moving Object Segmentation from Monocular Video with Transformers 论文阅读

论文信息 标题:On Moving Object Segmentation from Monocular Video with Transformers 作者: 来源:ICCV 时间:2023 代码地址:暂无 Abstract 通过单个移动摄像机进行移动对象检测和分割是一项具有挑战性的任务&am…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现

目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比

在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

深度学习水论文:mamba+图像增强

🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号(第三种)后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

CSS | transition 和 transform的用处和区别

省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...