当前位置: 首页 > news >正文

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景

FLINK 任务从一个数据源读取数据, 写入多个sink端.

二. 官方实例

写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH ('connector' = 'datagen'
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--DML
BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END;      --写入多个Sink时,必填。

三. 实操

3.1. 启动Standlone集群

进入到flink引擎包目录, 启动Standlone模式.

./bin/start-cluster.sh

3.2. 启动flink sql-client.

./bin/sql-client.sh embedded

3.3. 执行sql

Flink SQL> CREATE TEMPORARY TABLE datagen_source (
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'datagen'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkA(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkB(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL> INSERT INTO blackhole_sinkA
>   SELECT UPPER(name), sum(score)
>   FROM datagen_source
>   GROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> INSERT INTO blackhole_sinkB
>   SELECT LOWER(name), max(score)
>   FROM datagen_source
>   GROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b6

3.4. 查看flink ui

查看flink ui页面,验证结论.

http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview

在这里插入图片描述

相关文章:

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景 FLINK 任务从一个数据源读取数据, 写入多个sink端. 二. 官方实例 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。--源表 CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT ) WITH (connector datagen …...

python飞机大战游戏.py

python飞机大战游戏.py import pygame import random# 游戏窗口大小 WINDOW_WIDTH 600 WINDOW_HEIGHT 800# 颜色定义 BLACK (0, 0, 0) WHITE (255, 255, 255)# 初始化Pygame pygame.init()# 创建游戏窗口 window pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))…...

【C++】14___String容器

目录 一、string基本概念 二、string赋值操作 三、字符串拼接 四、 string查找和替换 五、 string字符串比较 六、string插入和删除 七、string子串 一、string基本概念 本质:string是C风格的字符串,而string本质上是一个类 string和char*区别&am…...

数据特性库 前言

文章目录 一、num-traits库简介二、核心功能三、更新功能四、使用方式五、应用示例六、结论 一、num-traits库简介 num-traits是Rust编程语言中的一个开源库,专注于为数值类型提供一系列的数学运算特性和接口。它支持泛型数学计算,允许开发者在不指定具…...

jdk和cglib动态代理区别

目标类不同 jdk目标类需要实现接口。 cglib不需要。 代理类生成方式不同 jdk内部字节码生成代理类。 cglib使用ASM字节码生成库生成代理类。 代理类和目标类关系不同 jdk代理类实现目标类接口,jdk无法代理目标类中static或private的方法。 cglib 代理类继承目标类…...

部署Mysql、镜像和容器、常见命令

目录 部署Mysql 镜像和容器 常见命令 部署Mysql 可以有多个容器 docker run -d \--name mysql \-p 3306:3306 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123 \mysql docker run -d \--name mysql2 \-p 3307:3307 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123 \mys…...

【数学】P2671 [NOIP2015 普及组] 求和

题目背景 NOIP2015 普及组 T3、深入浅出进阶1-5 题目描述 一条狭长的纸带被均匀划分出了 n n n 个格子,格子编号从 1 1 1 到 n n n。每个格子上都染了一种颜色 c o l o r i color_i colori​ 用 [ 1 , m ] [1,m] [1,m] 当中的一个整数表示)&…...

【AI图像生成网站Golang】项目测试与优化

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与优化 六、项目测试与优化 在开发过程中,性能优化是保证项目可扩展性和用户体验的关键步骤。本文将详细介绍我如何使用一…...

vue常用自定义指令

参考链接1https://blog.csdn.net/m0_67584973/article/details/139300966?spm1001.2014.3001.5501 参考链接2https://juejin.cn/post/7067051410671534116...

以太网帧、IP数据报图解

注:本文为 “以太网帧、IP数据报”图解相关文章合辑。 未整理去重。 以太网帧、IP数据报的图解格式(包含相关例题讲解) Rebecca.Yan已于 2023-05-27 14:13:19 修改 一、基础知识 UDP 段、IP 数据包,以太网帧图示 通信过程中&…...

01.大模型起源与发展

知识点 注意力机制(Attention)的主要用途是什么? 选择重要的信息并忽略不相关的信息 Transformer 模型是基于什么理论构建的? C. 注意力机制(Attention) GPT 和 BERT 的主要区别是什么? C. GPT…...

leetcode刷题日记03——javascript

题目3: 回文数https://leetcode.cn/problems/palindrome-number/ 给你一个整数 x ,如果 x 是一个回文整数,返回 true ;否则,返回 false 。 回文数是指正序(从左向右)和倒序(从右向…...

vue横向滚动日期选择器组件

vue横向滚动日期选择器组件 组件使用到了element-plus组件库和dayjs库,使用前先保证项目中已经下载导入 主要功能:选择日期,点击日期可以让此日期滚动到视图中间,左滑右滑同理,支持跳转至任意日期,支持自…...

【大模型】大模型项目选择 RAGvs微调?

RAG 输入问题,在知识库匹配知识,构建提示词:基于{知识}回答{问题} 微调 用知识问答对重新训练大模型权重,输入问题到调整后的大模型 如何选择 如果业务要求较高,RAG和微调可以一起使用 1-动态数据 选择RAG 原因&a…...

2024年12月CCF-GESP编程能力等级认证Python编程一级真题解析

本文收录于专栏《Python等级认证CCF-GESP真题解析》,专栏总目录:点这里,订阅后可阅读专栏内所有文章。 一、单选题(每题 2 分,共 30 分) 第 1 题 2024年10月8日,诺贝尔物理学奖“意外地”颁给了两位计算机科学家约翰霍普菲尔德(John J. Hopfield)和杰弗里辛顿(Geof…...

【机器学习】元学习(Meta-learning)

云边有个稻草人-CSDN博客 目录 引言 一、元学习的基本概念 1.1 什么是元学习? 1.2 元学习的与少样本学习的关系 二、元学习的核心问题与挑战 2.1 核心问题 2.2 挑战 三、元学习的常见方法 3.1 基于优化的元学习 3.1.1 MAML(Model-Agnostic Meta…...

详解Redis的String类型及相关命令

目录 SET GET MGET MSET SETNX SET和SETNX和SETXX对比 INCR INCRBY DECR DECRBY INCRBYFLOAT APPEND GETRANGE SETRANGE STRLEN 内部编码 SET 将 string 类型的 value 设置到 key 中。如果 key 之前存在,则覆盖,⽆论原来的数据类型是什么…...

android RadioButton + ViewPager+fragment

RadioGroup viewpage fragment 组合显示导航栏 1、首先主界面的布局控件就是RadioGroup viewpage <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:tools…...

给机器装上“脑子”—— 一文带你玩转机器学习

目录 一、引言&#xff1a;AI浪潮中的明星——机器学习 二、机器学习的定义与概念 1. 机器学习与传统编程的区别 2. 机器学习的主要任务类型 3. 机器学习的重要组成部分 三、机器学习的工作原理&#xff1a;从数据到模型的魔法之旅 1. 数据收集与预处理——数据是机器的…...

论文笔记:是什么让多模态学习变得困难?

整理了What Makes Training Multi-modal Classification Networks Hard? 论文的阅读笔记 背景方法OGR基于最小化OGR的多监督信号混合在实践中的应用 实验 背景 直观上&#xff0c;多模态网络接收更多的信息&#xff0c;因此它应该匹配或优于其单峰网络。然而&#xff0c;最好的…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...

Xcode 16 集成 cocoapods 报错

基于 Xcode 16 新建工程项目&#xff0c;集成 cocoapods 执行 pod init 报错 ### Error RuntimeError - PBXGroup attempted to initialize an object with unknown ISA PBXFileSystemSynchronizedRootGroup from attributes: {"isa">"PBXFileSystemSynchro…...