当前位置: 首页 > news >正文

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。

批处理

    env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()
# 批处理
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

流处理

    env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()
# 流处理
+I[A, 1]
+I[B, 1]
+I[C, 1]
+I[D, 1]
-U[A, 1]
+U[A, 2]
+I[E, 1]
-U[C, 1]
+U[C, 2]
-U[D, 1]
+U[D, 2]
-U[A, 2]
+U[A, 3]

我们看到批处理是一次性的达成了最终计算——只插入了5条数据,且每条数据都是最终结果。
而流处理则是进行了13次操作,其中插入操作5次,删除4次,更新4次。

只有插入操作

Mysql表无主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT
);

Sink表无主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

Sink表有主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (`word`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

则对于只有插入操作的Batch模式,不管Sink表有没有主键,每次程序执行时都会插入新数据。比如我们执行两次批处理模式代码,则可以看到5的2倍=10条数据。

select * from WordsCountTable;
+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这个很好理解。

Mysql表有主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT,PRIMARY KEY (word)
);

Sink表无主键

因为word成为主键,不可以重复。第一次执行插入操作时成功了

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

但是第二次执行时,会因为主键冲突报错:

Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry ‘E’ for key ‘WordsCountTable.PRIMARY’

Sink表有主键

因为Mysql和Sink表里主键一致,不管执行多少次程序,都不会产生多余的数据。

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

有删除和更新操作

在流模式中我们看到,流处理处理有插入操作外,还有其他操作。我们再对比下它们的表现。

Sink表无主键

Mysql表无主键

Mysql有无主键

因为流模式删除和更新操作需要通过主键来寻找对象,所以会报如下错误

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

Sink表有主键

由于Sink表设置了主键,于是流模式产生的更新和删除操作可以通过其找到对应项,就不会报错。

Mysql表无主键

由于Mysql表没有主键,导致每次执行都会插入一批数据。比如下面是我们执行两次的结果

+------+-------+
| word | count |
+------+-------+
| E    |     1 |
| A    |     3 |
| D    |     2 |
| C    |     2 |
| B    |     1 |
| A    |     3 |
| D    |     2 |
| B    |     1 |
| C    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这从另外一个方面说明:**流模式产生的一系列操作,在Execute环节,最终会对这些操作进行合并,将合并的操作同步给外部系统。**比如之前的流操作实际产生了13个行为,而最终落到数据库里只有5条数据,且第二次操作也是插入了5条新的、最终的数据,这就说明中间的操作在同步给数据库之前已经做了合并处理。

Mysql表有主键

因为Mysql表有主键,Sink过来的操作执行的是“有则更新,无则写入”的模式。
比如我们第一次执行程序时,得到

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

然后我们将数据源中的E改成了A,则这次将出现4个A,但是不会出现E。执行后的结果是

+------+-------+
| word | count |
+------+-------+
| A    |     4 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

这个实验就证明了,当Sink和Mysql表的主键一致时,执行的是insert on duplicate key update操作。

相关文章:

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。 批处理 env_settings EnvironmentSettings \.new_…...

Java学习笔记(五)——数组、排序和查找

一、数组 (一)数组的使用 1、使用方式1——动态初始化 2、使用方式2——动态初始化 3、使用方式3——静态初始化 (二)数组使用注意事项和细节 (三)数组赋值机制 (四)数组拷贝…...

python输出与数据类型

目标 1、使用print输出内容 2、熟悉字符串类型 3、熟悉数字类型 4、熟悉数字与字符串操作 输出 print可控制输出内容也可配合、-、*、/进行运算,和整数型配合可进行运算和字符型配合有不同效果,如为拼接,*为多次输出注:整数型如&…...

React-Redux总结含购物车案例

React-Redux总结含购物车案例 reduc简介 redux是react全家桶的一员,它为react给i共可预测化的状态管理机制。redux是将整个应用状态存储到一个地方,成为store,里面存放着一颗树状态(state,tree),组件可以派发dispatch行为action给store,而不是直接通知其…...

攻克组合优化问题!美国DARPA选中全栈量子经典计算公司Rigetti

(图片来源:网络) 近日,美国量子计算公司Rigetti宣布,它被美国国防高级研究计划局 (DARPA) 选中,加入想象未来量子实际应用 (IMPAQT) 计划,推进先进量子算法的研发,去解决组合优化问…...

Kafka - 深入了解Kafka基础架构:Kafka的基本概念

文章目录 Kafka的基本概念 Kafka的基本概念 我们首先了解一些Kafka的基本概念。 1)Producer :消息生产者,就是向kafka broker发消息的客户端2)Consumer :消息消费者,向kafka broker获取消息的客户端3&…...

[Docker]二.Docker 镜像,仓库,容器介绍以及详解

一.Docker 镜像,容器,仓库的简单介绍 通俗来讲:镜像相当于VM虚拟机中的ios文件,容器相当于虚拟机系统,仓库相当于系统中的进程或者执行文件,容器是通过镜像创建的 1.镜像 Docker 镜像就是一个 Linux 的文件系统( Root FileSystem ),这个文…...

软考高级系统架构设计师系列之:案例分析典型试题一

软考高级系统架构设计师系列之:案例分析典型试题一 一、案例分析考试大纲二、结构化软件系统建模1.案例试题2.案例试题分析3.案例试题参考答案三、联合需求分析会议1.案例试题2.案例试题分析3.案例试题参考答案四、电子政务1.案例试题2.案例试题分析3.案例试题参考答案五、软件…...

2023年5个美国代理IP推荐,最佳代理花落谁家?

美国代理IP指的是代理服务器位于美国的IP地址,对于跨境业务来说,这些代理IP地址可以用于隐藏用户的真实IP地址,将其网络流量路由通过美国的服务器,以实现一些特定的目的。由于近年来,面向美国市场的跨境商家越来越多&a…...

github.com/holiman/uint256 源码阅读

github.com/holiman/uint256 源码阅读 // uint256: Fixed size 256-bit math library // Copyright 2018-2020 uint256 Authors // SPDX-License-Identifier: BSD-3-Clause// Package math provides integer math utilities.package uint256import ("encoding/binary&…...

排序-表排序

当我们需要对一个很大的结构体进行排序时,因为正常的排序需要大量的交换,这就会造成时间复杂度的浪费 因此,我们引入指针,通过指针临时变量的方式来避免时间复杂度的浪费 间接排序-排序思路:通过开辟一个指针数组&…...

勒索病毒最新变种.locked1勒索病毒来袭,如何恢复受感染的数据?

引言: 在当今数字化时代,网络威胁不断进化,.locked1勒索病毒就是其中一种常见的恶意软件。这种病毒会加密您的文件,然后勒索赎金以解锁它们。本文将详细介绍.locked1勒索病毒,包括如何恢复被加密的数据文件和如何预防…...

信号补零对信号频谱的影响

文章目录 前言一、 什么是补零二、案例三、补零前仿真及分析1、补零前 MATLAB 源码2、仿真及结果分析①、 x n x_n xn​ 时域图②、 x n x_n xn​ 频谱图 四、补零后仿真及分析1、补6000个零且1000采样点①、 MATLAB 源码②、仿真及结果分析 2、波形分辨率3、补6000个零且7000采…...

【Gan教程 】 什么是变分自动编码器VAE?

名词解释:Variational Autoencoder(VAE) 一、说明 为什么深度学习研究人员和概率机器学习人员在讨论变分自动编码器时会感到困惑?什么是变分自动编码器?为什么围绕这个术语存在不合理的混淆?本文从两个角度…...

T113-S3-buildroot文件系统tar解压缩gz文件

目录 前言 一、现象描述 二、解决方案 三、tar解压缩.gz文件 总结 前言 本文主要介绍全志T113-S3平台官方SDK,buildroot文件系统tar不支持.gz文件解压缩的问题以及如何配置buildroot文件系统解决该问题的方法介绍。 一、现象描述 在buildroot文件系统中&#xff…...

软件测试面试题:压测时,QPS一直上不去,如何排查?

在进行系统压测时,QPS(Queries Per Second)即每秒查询数,无法达到预期值是一个常见的问题,本文就来介绍下QPS一直上不去时应该如何排查。 一. 检查硬件资源 CPU使用率 使用top或nmon命令来查看CPU使用率。如果CPU使…...

探索JavaScript ES6+新特性

JavaScript是一门十分流行的编程语言,它不断发展演变以适应现代Web开发需求。ES6(也称为ECMAScript 2015)是JavaScript的第六个版本,引入了许多令人兴奋的新特性和语法糖。本文将介绍一些ES6中最有趣和实用的特性。 箭头函数 箭…...

Elasticsearch常见错误

一 read_only_allow_delete" : "true" 当我们在向某个索引添加一条数据的时候,可能(极少情况)会碰到下面的报错: {"error": { "root_cause": [ { "type": "cluster_block_exception", "r…...

mysql源码编译安装

下载地址:http://dev.mysql.com/downloads/mysql/5.1.html#downloads 免费版,只能下载mysql社区版。MySQL Community Server 选择合适的版本迚行下载: 安装前,如果不存在mysql 用户,则建立之 [rootlocalhost ~]# useradd mys…...

On Moving Object Segmentation from Monocular Video with Transformers 论文阅读

论文信息 标题:On Moving Object Segmentation from Monocular Video with Transformers 作者: 来源:ICCV 时间:2023 代码地址:暂无 Abstract 通过单个移动摄像机进行移动对象检测和分割是一项具有挑战性的任务&am…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

微服务通信安全:深入解析mTLS的原理与实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言&#xff1a;微服务时代的通信安全挑战 随着云原生和微服务架构的普及&#xff0c;服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...