当前位置: 首页 > news >正文

Flink Table API/SQL 多分支sink

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}

相关文章:

Flink Table API/SQL 多分支sink

背景 在某个场景中&#xff0c;需要从Kafka中获取数据&#xff0c;经过转换处理后&#xff0c;需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错&#xff1a; public static void main(String[] args) {final StreamExecuti…...

Vue3 中 导航守卫 的使用

在Vue 3中&#xff0c;导航守卫&#xff08;Navigation Guards&#xff09;用于在路由切换前后执行一些操作&#xff0c;例如验证用户权限、取消路由导航等。Vue 3中的导航守卫与Vue 2中的导航守卫略有不同。下面是Vue 3中导航守卫的使用方式&#xff1a; 全局前置守卫&#xf…...

云原生概论

云原生是一种新兴的技术趋势&#xff0c;它旨在将应用程序设计和部署方式从传统的基础设施转向云端。云原生应用程序是一种针对云环境进行优化的应用程序&#xff0c;能够充分利用云端提供的弹性和可扩展性。本文将探讨云原生的概念、优势、应用场景以及未来发展方向。 一、云…...

hive-sql

hive-常用SQL汇总 查看数据库 -- 查看所有的数据库 show databases; 使用默认的库 -- 下面的语句可以查看默认的库 use default ;查看某个库下的表 -- 查看所有的表 show tables ; -- 查看包含 stu的表 &#xff0c;这种是通配的方法来查看 show tables like *stu*; 查…...

Rspack 创建 vue2/3 项目接入 antdv(rspack.config.js 配置 less 主题)

一、简介 Rspack CLI 官方文档。 rspack.config.js 官方文档。 二、创建 vue 项目 创建项目&#xff08;文档中还提供了 Rspack 内置 monorepo 框架 Nx 的创建方式&#xff0c;根据需求进行选择&#xff09; # npm 方式 $ npm create rspacklatest# yarn 方式 $ yarn create…...

基于centos7完成docker服务的一些基础操作

目录 要求完成 具体操作 1.安装docker服务&#xff0c;配置镜像加速器 2.下载系统镜像&#xff08;Ubuntu、 centos&#xff09; 3.基于下载的镜像创建两个容器 &#xff08;容器名一个为自己名字全拼&#xff0c;一个为首名字字母&#xff09; 4.容器的启动、 停止及重启…...

Microsoft Visual Studio + Qt插件编程出现错误error MSB4184问题

文章目录 报错解决 报错 C:\Users\Administrator\AppData\Local\QtMsBuild\qt_globals.targets(786,7): error MSB4184: 无法计算表达式“[System.IO.File]::ReadAllText(C:\Users\Administrator\AppData\Local\QtMsBuild\qt.natvis.xml)”。 未能找到文件“C:\Users\Administ…...

QT Quick之quick与C++混合编程

Qt quick能够生成非常绚丽界面&#xff0c;但有其局限性的&#xff0c;对于一些业务逻辑和复杂算法&#xff0c;比如低阶的网络编程如 QTcpSocket &#xff0c;多线程&#xff0c;又如 XML 文档处理类库 QXmlStreamReader / QXmlStreamWriter 等等&#xff0c;在 QML 中要么不可…...

Ros noetic Move_base 相关状态位置的获取 实战使用教程

前言: 有一段时间没有更新,这篇文章是为了后续MPC路径跟踪算法开设的帖子用于更新我自己的思路,由于MPC算法,要镶嵌到整个导航任务中去,就绕不开这个move_base包中相关的参数设置和其中相关状态位置的获取和解读等等。 因为最近遇到小车在其他的环境中有些时候,不需要自己…...

【SpringBoot】SpringBoot项目与Vue对接接口的步骤

下面是SpringBoot项目与Vue对接接口的步骤&#xff1a; 创建SpringBoot项目&#xff0c;在项目中添加依赖&#xff0c;如Spring MVC、MyBatis等框架。 在SpringBoot项目中编写接口方法&#xff0c;使用注解标识请求方式&#xff0c;如GetMapping、PostMapping等&#xff0c;并…...

Glog安装与使用

安装 脚本 #!/bin/bash git clone https://github.com/google/glog.git cd glog git checkout v0.4.0 mkdir build && cd build cmake .. make -j4 echo "your password" | sudo -S make install使用 main.cc #include <glog/logging.h>int main(i…...

windows开发环境搭建

下载msys2&#xff0c;官网下载即可&#xff1a; MSYS2 安装其他的编译工具&#xff08;貌似不需要把中间的命令全部执行&#xff09;&#xff1a; MSYS2使用教程——win10系统64位安装msys2最新版&#xff08;msys2-x86_xxxx.exe&#xff09;_msys64_Dreamhai的博客-CSDN博…...

8月17日上课内容 第三章 LVS+Keepalived群集

本章结构 Keepalived概述 keepalived 概述 1.服务功能 故障自动切换 健康检查 节点服务器高可用 HA keepalived工作原理 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题 在一个LVS服务集群中通常有主服务器 (MAST…...

Threejs学习05——球缓冲几何体背景贴图和环境贴图

实现随机多个三角形随机位置随机颜色展示效果 这是一个非常简单基础的threejs的学习应用&#xff01;本节主要学习的是球面缓冲几何体的贴图部分&#xff0c;这里有环境贴图以及背景贴图&#xff0c;这样可以有一种身临其境的效果&#xff01;这里环境贴图用的是一个.hdr的文件…...

LVS+Keepalived群集实验

目录 Keepalived 是什么 Keepalived 功能 Keepalived 模块 工作原理 脑裂现象及解决方案 脑裂 形成脑裂的原因 解决脑裂的几种方法&#xff1a; 为了减少或避免HA集群中出现脑裂现象&#xff0c;我们可以采取以下措施&#xff1a; Keepalived服务主要功能&#xff0…...

软考高级之系统架构师之系统开发基础

架构 场景 场景&#xff08;scenarios&#xff09;在进行体系结构评估时&#xff0c;一般首先要精确地得出具体的质量目标&#xff0c;并以之作为判定该体系结构优劣的标准。为得出这些目标而采用的机制做场景。场景是从风险承担者的角度对与系统的交互的简短描述。在体系结构…...

Web 3.0 安全风险,您需要了解这些内容

随着技术的不断发展&#xff0c;Web 3.0 正在逐渐成为现实&#xff0c;为我们带来了许多新的机遇和挑战。然而&#xff0c;与任何新技术一样&#xff0c;Web 3.0 也伴随着一系列安全风险&#xff0c;这些风险需要被认真对待。在这篇文章中&#xff0c;我们将探讨一些与Web 3.0 …...

万宾科技22款产品入选《城市生命线安全工程监测技术产品名录》

2023年8月17日-18日&#xff0c;由北京市地下管线协会主办的2023首届城市生命线安全与发展大会在北京召开&#xff0c;本次大会汇聚中央及地方政府主管领导、院士专家、行业领袖、龙头代表、产业精英等。 大会聚焦安全监管智慧平台和燃气爆炸、城市内涝、地下管线交互风险、第三…...

MFC 隐藏窗口

亲测能用 改变主窗体的创建方式 将 C***App::InitInstance() 函数中的代码 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; INT_PTR nResponse dlg.DoModal(); 替换为 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; //INT_PTR nResponse dlg.DoModal(); INT_PTR nRe…...

Java数据库连接池原理及spring boot使用数据库连接池(HikariCP、Druid)

和线程池类似&#xff0c;数据库连接池的作用是建立一些和数据库的连接供需要连接数据库的业务使用&#xff0c;避免了每次和数据库建立、销毁连接的性能消耗&#xff0c;通过设置连接池参数可以防止建立连接过多导致服务宕机等&#xff0c;以下介绍Java中主要使用的几种数据库…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

微服务商城-商品微服务

数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

掌握 HTTP 请求:理解 cURL GET 语法

cURL 是一个强大的命令行工具&#xff0c;用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中&#xff0c;cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...