当前位置: 首页 > news >正文

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景

FLINK 任务从一个数据源读取数据, 写入多个sink端.

二. 官方实例

写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH ('connector' = 'datagen'
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--DML
BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END;      --写入多个Sink时,必填。

三. 实操

3.1. 启动Standlone集群

进入到flink引擎包目录, 启动Standlone模式.

./bin/start-cluster.sh

3.2. 启动flink sql-client.

./bin/sql-client.sh embedded

3.3. 执行sql

Flink SQL> CREATE TEMPORARY TABLE datagen_source (
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'datagen'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkA(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkB(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL> INSERT INTO blackhole_sinkA
>   SELECT UPPER(name), sum(score)
>   FROM datagen_source
>   GROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> INSERT INTO blackhole_sinkB
>   SELECT LOWER(name), max(score)
>   FROM datagen_source
>   GROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b6

3.4. 查看flink ui

查看flink ui页面,验证结论.

http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview

在这里插入图片描述

相关文章:

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景 FLINK 任务从一个数据源读取数据, 写入多个sink端. 二. 官方实例 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。--源表 CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT ) WITH (connector datagen …...

python飞机大战游戏.py

python飞机大战游戏.py import pygame import random# 游戏窗口大小 WINDOW_WIDTH 600 WINDOW_HEIGHT 800# 颜色定义 BLACK (0, 0, 0) WHITE (255, 255, 255)# 初始化Pygame pygame.init()# 创建游戏窗口 window pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))…...

【C++】14___String容器

目录 一、string基本概念 二、string赋值操作 三、字符串拼接 四、 string查找和替换 五、 string字符串比较 六、string插入和删除 七、string子串 一、string基本概念 本质:string是C风格的字符串,而string本质上是一个类 string和char*区别&am…...

数据特性库 前言

文章目录 一、num-traits库简介二、核心功能三、更新功能四、使用方式五、应用示例六、结论 一、num-traits库简介 num-traits是Rust编程语言中的一个开源库,专注于为数值类型提供一系列的数学运算特性和接口。它支持泛型数学计算,允许开发者在不指定具…...

jdk和cglib动态代理区别

目标类不同 jdk目标类需要实现接口。 cglib不需要。 代理类生成方式不同 jdk内部字节码生成代理类。 cglib使用ASM字节码生成库生成代理类。 代理类和目标类关系不同 jdk代理类实现目标类接口,jdk无法代理目标类中static或private的方法。 cglib 代理类继承目标类…...

部署Mysql、镜像和容器、常见命令

目录 部署Mysql 镜像和容器 常见命令 部署Mysql 可以有多个容器 docker run -d \--name mysql \-p 3306:3306 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123 \mysql docker run -d \--name mysql2 \-p 3307:3307 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123 \mys…...

【数学】P2671 [NOIP2015 普及组] 求和

题目背景 NOIP2015 普及组 T3、深入浅出进阶1-5 题目描述 一条狭长的纸带被均匀划分出了 n n n 个格子,格子编号从 1 1 1 到 n n n。每个格子上都染了一种颜色 c o l o r i color_i colori​ 用 [ 1 , m ] [1,m] [1,m] 当中的一个整数表示)&…...

【AI图像生成网站Golang】项目测试与优化

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与优化 六、项目测试与优化 在开发过程中,性能优化是保证项目可扩展性和用户体验的关键步骤。本文将详细介绍我如何使用一…...

vue常用自定义指令

参考链接1https://blog.csdn.net/m0_67584973/article/details/139300966?spm1001.2014.3001.5501 参考链接2https://juejin.cn/post/7067051410671534116...

以太网帧、IP数据报图解

注:本文为 “以太网帧、IP数据报”图解相关文章合辑。 未整理去重。 以太网帧、IP数据报的图解格式(包含相关例题讲解) Rebecca.Yan已于 2023-05-27 14:13:19 修改 一、基础知识 UDP 段、IP 数据包,以太网帧图示 通信过程中&…...

01.大模型起源与发展

知识点 注意力机制(Attention)的主要用途是什么? 选择重要的信息并忽略不相关的信息 Transformer 模型是基于什么理论构建的? C. 注意力机制(Attention) GPT 和 BERT 的主要区别是什么? C. GPT…...

leetcode刷题日记03——javascript

题目3: 回文数https://leetcode.cn/problems/palindrome-number/ 给你一个整数 x ,如果 x 是一个回文整数,返回 true ;否则,返回 false 。 回文数是指正序(从左向右)和倒序(从右向…...

vue横向滚动日期选择器组件

vue横向滚动日期选择器组件 组件使用到了element-plus组件库和dayjs库,使用前先保证项目中已经下载导入 主要功能:选择日期,点击日期可以让此日期滚动到视图中间,左滑右滑同理,支持跳转至任意日期,支持自…...

【大模型】大模型项目选择 RAGvs微调?

RAG 输入问题,在知识库匹配知识,构建提示词:基于{知识}回答{问题} 微调 用知识问答对重新训练大模型权重,输入问题到调整后的大模型 如何选择 如果业务要求较高,RAG和微调可以一起使用 1-动态数据 选择RAG 原因&a…...

2024年12月CCF-GESP编程能力等级认证Python编程一级真题解析

本文收录于专栏《Python等级认证CCF-GESP真题解析》,专栏总目录:点这里,订阅后可阅读专栏内所有文章。 一、单选题(每题 2 分,共 30 分) 第 1 题 2024年10月8日,诺贝尔物理学奖“意外地”颁给了两位计算机科学家约翰霍普菲尔德(John J. Hopfield)和杰弗里辛顿(Geof…...

【机器学习】元学习(Meta-learning)

云边有个稻草人-CSDN博客 目录 引言 一、元学习的基本概念 1.1 什么是元学习? 1.2 元学习的与少样本学习的关系 二、元学习的核心问题与挑战 2.1 核心问题 2.2 挑战 三、元学习的常见方法 3.1 基于优化的元学习 3.1.1 MAML(Model-Agnostic Meta…...

详解Redis的String类型及相关命令

目录 SET GET MGET MSET SETNX SET和SETNX和SETXX对比 INCR INCRBY DECR DECRBY INCRBYFLOAT APPEND GETRANGE SETRANGE STRLEN 内部编码 SET 将 string 类型的 value 设置到 key 中。如果 key 之前存在,则覆盖,⽆论原来的数据类型是什么…...

android RadioButton + ViewPager+fragment

RadioGroup viewpage fragment 组合显示导航栏 1、首先主界面的布局控件就是RadioGroup viewpage <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:tools…...

给机器装上“脑子”—— 一文带你玩转机器学习

目录 一、引言&#xff1a;AI浪潮中的明星——机器学习 二、机器学习的定义与概念 1. 机器学习与传统编程的区别 2. 机器学习的主要任务类型 3. 机器学习的重要组成部分 三、机器学习的工作原理&#xff1a;从数据到模型的魔法之旅 1. 数据收集与预处理——数据是机器的…...

论文笔记:是什么让多模态学习变得困难?

整理了What Makes Training Multi-modal Classification Networks Hard? 论文的阅读笔记 背景方法OGR基于最小化OGR的多监督信号混合在实践中的应用 实验 背景 直观上&#xff0c;多模态网络接收更多的信息&#xff0c;因此它应该匹配或优于其单峰网络。然而&#xff0c;最好的…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...

掌握 HTTP 请求:理解 cURL GET 语法

cURL 是一个强大的命令行工具&#xff0c;用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中&#xff0c;cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...

32位寻址与64位寻址

32位寻址与64位寻址 32位寻址是什么&#xff1f; 32位寻址是指计算机的CPU、内存或总线系统使用32位二进制数来标识和访问内存中的存储单元&#xff08;地址&#xff09;&#xff0c;其核心含义与能力如下&#xff1a; 1. 核心定义 地址位宽&#xff1a;CPU或内存控制器用32位…...

用鸿蒙HarmonyOS5实现国际象棋小游戏的过程

下面是一个基于鸿蒙OS (HarmonyOS) 的国际象棋小游戏的完整实现代码&#xff0c;使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├── …...

【Pandas】pandas DataFrame dropna

Pandas2.2 DataFrame Missing data handling 方法描述DataFrame.fillna([value, method, axis, …])用于填充 DataFrame 中的缺失值&#xff08;NaN&#xff09;DataFrame.backfill(*[, axis, inplace, …])用于**使用后向填充&#xff08;即“下一个有效观测值”&#xff09…...

可视化预警系统:如何实现生产风险的实时监控?

在生产环境中&#xff0c;风险无处不在&#xff0c;而传统的监控方式往往只能事后补救&#xff0c;难以做到提前预警。但如今&#xff0c;可视化预警系统正在改变这一切&#xff01;它能够实时收集和分析生产数据&#xff0c;通过直观的图表和警报&#xff0c;让管理者第一时间…...

【学习记录】使用 Kali Linux 与 Hashcat 进行 WiFi 安全分析:合法的安全测试指南

文章目录 &#x1f4cc; 前言&#x1f9f0; 一、前期准备✅ 安装 Kali Linux✅ 获取支持监听模式的无线网卡 &#x1f6e0; 二、使用 Kali Linux 进行 WiFi 安全测试步骤 1&#xff1a;插入无线网卡并确认识别步骤 2&#xff1a;开启监听模式步骤 3&#xff1a;扫描附近的 WiFi…...