当前位置: 首页 > news >正文

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景

FLINK 任务从一个数据源读取数据, 写入多个sink端.

二. 官方实例

写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH ('connector' = 'datagen'
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--DML
BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END;      --写入多个Sink时,必填。

三. 实操

3.1. 启动Standlone集群

进入到flink引擎包目录, 启动Standlone模式.

./bin/start-cluster.sh

3.2. 启动flink sql-client.

./bin/sql-client.sh embedded

3.3. 执行sql

Flink SQL> CREATE TEMPORARY TABLE datagen_source (
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'datagen'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkA(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkB(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL> INSERT INTO blackhole_sinkA
>   SELECT UPPER(name), sum(score)
>   FROM datagen_source
>   GROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> INSERT INTO blackhole_sinkB
>   SELECT LOWER(name), max(score)
>   FROM datagen_source
>   GROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b6

3.4. 查看flink ui

查看flink ui页面,验证结论.

http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview

在这里插入图片描述

相关文章:

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景 FLINK 任务从一个数据源读取数据, 写入多个sink端. 二. 官方实例 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。--源表 CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT ) WITH (connector datagen …...

python飞机大战游戏.py

python飞机大战游戏.py import pygame import random# 游戏窗口大小 WINDOW_WIDTH 600 WINDOW_HEIGHT 800# 颜色定义 BLACK (0, 0, 0) WHITE (255, 255, 255)# 初始化Pygame pygame.init()# 创建游戏窗口 window pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))…...

【C++】14___String容器

目录 一、string基本概念 二、string赋值操作 三、字符串拼接 四、 string查找和替换 五、 string字符串比较 六、string插入和删除 七、string子串 一、string基本概念 本质:string是C风格的字符串,而string本质上是一个类 string和char*区别&am…...

数据特性库 前言

文章目录 一、num-traits库简介二、核心功能三、更新功能四、使用方式五、应用示例六、结论 一、num-traits库简介 num-traits是Rust编程语言中的一个开源库,专注于为数值类型提供一系列的数学运算特性和接口。它支持泛型数学计算,允许开发者在不指定具…...

jdk和cglib动态代理区别

目标类不同 jdk目标类需要实现接口。 cglib不需要。 代理类生成方式不同 jdk内部字节码生成代理类。 cglib使用ASM字节码生成库生成代理类。 代理类和目标类关系不同 jdk代理类实现目标类接口,jdk无法代理目标类中static或private的方法。 cglib 代理类继承目标类…...

部署Mysql、镜像和容器、常见命令

目录 部署Mysql 镜像和容器 常见命令 部署Mysql 可以有多个容器 docker run -d \--name mysql \-p 3306:3306 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123 \mysql docker run -d \--name mysql2 \-p 3307:3307 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123 \mys…...

【数学】P2671 [NOIP2015 普及组] 求和

题目背景 NOIP2015 普及组 T3、深入浅出进阶1-5 题目描述 一条狭长的纸带被均匀划分出了 n n n 个格子,格子编号从 1 1 1 到 n n n。每个格子上都染了一种颜色 c o l o r i color_i colori​ 用 [ 1 , m ] [1,m] [1,m] 当中的一个整数表示)&…...

【AI图像生成网站Golang】项目测试与优化

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与优化 六、项目测试与优化 在开发过程中,性能优化是保证项目可扩展性和用户体验的关键步骤。本文将详细介绍我如何使用一…...

vue常用自定义指令

参考链接1https://blog.csdn.net/m0_67584973/article/details/139300966?spm1001.2014.3001.5501 参考链接2https://juejin.cn/post/7067051410671534116...

以太网帧、IP数据报图解

注:本文为 “以太网帧、IP数据报”图解相关文章合辑。 未整理去重。 以太网帧、IP数据报的图解格式(包含相关例题讲解) Rebecca.Yan已于 2023-05-27 14:13:19 修改 一、基础知识 UDP 段、IP 数据包,以太网帧图示 通信过程中&…...

01.大模型起源与发展

知识点 注意力机制(Attention)的主要用途是什么? 选择重要的信息并忽略不相关的信息 Transformer 模型是基于什么理论构建的? C. 注意力机制(Attention) GPT 和 BERT 的主要区别是什么? C. GPT…...

leetcode刷题日记03——javascript

题目3: 回文数https://leetcode.cn/problems/palindrome-number/ 给你一个整数 x ,如果 x 是一个回文整数,返回 true ;否则,返回 false 。 回文数是指正序(从左向右)和倒序(从右向…...

vue横向滚动日期选择器组件

vue横向滚动日期选择器组件 组件使用到了element-plus组件库和dayjs库,使用前先保证项目中已经下载导入 主要功能:选择日期,点击日期可以让此日期滚动到视图中间,左滑右滑同理,支持跳转至任意日期,支持自…...

【大模型】大模型项目选择 RAGvs微调?

RAG 输入问题,在知识库匹配知识,构建提示词:基于{知识}回答{问题} 微调 用知识问答对重新训练大模型权重,输入问题到调整后的大模型 如何选择 如果业务要求较高,RAG和微调可以一起使用 1-动态数据 选择RAG 原因&a…...

2024年12月CCF-GESP编程能力等级认证Python编程一级真题解析

本文收录于专栏《Python等级认证CCF-GESP真题解析》,专栏总目录:点这里,订阅后可阅读专栏内所有文章。 一、单选题(每题 2 分,共 30 分) 第 1 题 2024年10月8日,诺贝尔物理学奖“意外地”颁给了两位计算机科学家约翰霍普菲尔德(John J. Hopfield)和杰弗里辛顿(Geof…...

【机器学习】元学习(Meta-learning)

云边有个稻草人-CSDN博客 目录 引言 一、元学习的基本概念 1.1 什么是元学习? 1.2 元学习的与少样本学习的关系 二、元学习的核心问题与挑战 2.1 核心问题 2.2 挑战 三、元学习的常见方法 3.1 基于优化的元学习 3.1.1 MAML(Model-Agnostic Meta…...

详解Redis的String类型及相关命令

目录 SET GET MGET MSET SETNX SET和SETNX和SETXX对比 INCR INCRBY DECR DECRBY INCRBYFLOAT APPEND GETRANGE SETRANGE STRLEN 内部编码 SET 将 string 类型的 value 设置到 key 中。如果 key 之前存在,则覆盖,⽆论原来的数据类型是什么…...

android RadioButton + ViewPager+fragment

RadioGroup viewpage fragment 组合显示导航栏 1、首先主界面的布局控件就是RadioGroup viewpage <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:tools…...

给机器装上“脑子”—— 一文带你玩转机器学习

目录 一、引言&#xff1a;AI浪潮中的明星——机器学习 二、机器学习的定义与概念 1. 机器学习与传统编程的区别 2. 机器学习的主要任务类型 3. 机器学习的重要组成部分 三、机器学习的工作原理&#xff1a;从数据到模型的魔法之旅 1. 数据收集与预处理——数据是机器的…...

论文笔记:是什么让多模态学习变得困难?

整理了What Makes Training Multi-modal Classification Networks Hard? 论文的阅读笔记 背景方法OGR基于最小化OGR的多监督信号混合在实践中的应用 实验 背景 直观上&#xff0c;多模态网络接收更多的信息&#xff0c;因此它应该匹配或优于其单峰网络。然而&#xff0c;最好的…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

python如何将word的doc另存为docx

将 DOCX 文件另存为 DOCX 格式&#xff08;Python 实现&#xff09; 在 Python 中&#xff0c;你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是&#xff0c;.doc 是旧的 Word 格式&#xff0c;而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

Java 二维码

Java 二维码 **技术&#xff1a;**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

如何更改默认 Crontab 编辑器 ?

在 Linux 领域中&#xff0c;crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用&#xff0c;用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益&#xff0c;允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

C++ 设计模式 《小明的奶茶加料风波》

&#x1f468;‍&#x1f393; 模式名称&#xff1a;装饰器模式&#xff08;Decorator Pattern&#xff09; &#x1f466; 小明最近上线了校园奶茶配送功能&#xff0c;业务火爆&#xff0c;大家都在加料&#xff1a; 有的同学要加波霸 &#x1f7e4;&#xff0c;有的要加椰果…...