当前位置: 首页 > news >正文

Flink Table API/SQL 多分支sink

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}

相关文章:

Flink Table API/SQL 多分支sink

背景 在某个场景中&#xff0c;需要从Kafka中获取数据&#xff0c;经过转换处理后&#xff0c;需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错&#xff1a; public static void main(String[] args) {final StreamExecuti…...

Vue3 中 导航守卫 的使用

在Vue 3中&#xff0c;导航守卫&#xff08;Navigation Guards&#xff09;用于在路由切换前后执行一些操作&#xff0c;例如验证用户权限、取消路由导航等。Vue 3中的导航守卫与Vue 2中的导航守卫略有不同。下面是Vue 3中导航守卫的使用方式&#xff1a; 全局前置守卫&#xf…...

云原生概论

云原生是一种新兴的技术趋势&#xff0c;它旨在将应用程序设计和部署方式从传统的基础设施转向云端。云原生应用程序是一种针对云环境进行优化的应用程序&#xff0c;能够充分利用云端提供的弹性和可扩展性。本文将探讨云原生的概念、优势、应用场景以及未来发展方向。 一、云…...

hive-sql

hive-常用SQL汇总 查看数据库 -- 查看所有的数据库 show databases; 使用默认的库 -- 下面的语句可以查看默认的库 use default ;查看某个库下的表 -- 查看所有的表 show tables ; -- 查看包含 stu的表 &#xff0c;这种是通配的方法来查看 show tables like *stu*; 查…...

Rspack 创建 vue2/3 项目接入 antdv(rspack.config.js 配置 less 主题)

一、简介 Rspack CLI 官方文档。 rspack.config.js 官方文档。 二、创建 vue 项目 创建项目&#xff08;文档中还提供了 Rspack 内置 monorepo 框架 Nx 的创建方式&#xff0c;根据需求进行选择&#xff09; # npm 方式 $ npm create rspacklatest# yarn 方式 $ yarn create…...

基于centos7完成docker服务的一些基础操作

目录 要求完成 具体操作 1.安装docker服务&#xff0c;配置镜像加速器 2.下载系统镜像&#xff08;Ubuntu、 centos&#xff09; 3.基于下载的镜像创建两个容器 &#xff08;容器名一个为自己名字全拼&#xff0c;一个为首名字字母&#xff09; 4.容器的启动、 停止及重启…...

Microsoft Visual Studio + Qt插件编程出现错误error MSB4184问题

文章目录 报错解决 报错 C:\Users\Administrator\AppData\Local\QtMsBuild\qt_globals.targets(786,7): error MSB4184: 无法计算表达式“[System.IO.File]::ReadAllText(C:\Users\Administrator\AppData\Local\QtMsBuild\qt.natvis.xml)”。 未能找到文件“C:\Users\Administ…...

QT Quick之quick与C++混合编程

Qt quick能够生成非常绚丽界面&#xff0c;但有其局限性的&#xff0c;对于一些业务逻辑和复杂算法&#xff0c;比如低阶的网络编程如 QTcpSocket &#xff0c;多线程&#xff0c;又如 XML 文档处理类库 QXmlStreamReader / QXmlStreamWriter 等等&#xff0c;在 QML 中要么不可…...

Ros noetic Move_base 相关状态位置的获取 实战使用教程

前言: 有一段时间没有更新,这篇文章是为了后续MPC路径跟踪算法开设的帖子用于更新我自己的思路,由于MPC算法,要镶嵌到整个导航任务中去,就绕不开这个move_base包中相关的参数设置和其中相关状态位置的获取和解读等等。 因为最近遇到小车在其他的环境中有些时候,不需要自己…...

【SpringBoot】SpringBoot项目与Vue对接接口的步骤

下面是SpringBoot项目与Vue对接接口的步骤&#xff1a; 创建SpringBoot项目&#xff0c;在项目中添加依赖&#xff0c;如Spring MVC、MyBatis等框架。 在SpringBoot项目中编写接口方法&#xff0c;使用注解标识请求方式&#xff0c;如GetMapping、PostMapping等&#xff0c;并…...

Glog安装与使用

安装 脚本 #!/bin/bash git clone https://github.com/google/glog.git cd glog git checkout v0.4.0 mkdir build && cd build cmake .. make -j4 echo "your password" | sudo -S make install使用 main.cc #include <glog/logging.h>int main(i…...

windows开发环境搭建

下载msys2&#xff0c;官网下载即可&#xff1a; MSYS2 安装其他的编译工具&#xff08;貌似不需要把中间的命令全部执行&#xff09;&#xff1a; MSYS2使用教程——win10系统64位安装msys2最新版&#xff08;msys2-x86_xxxx.exe&#xff09;_msys64_Dreamhai的博客-CSDN博…...

8月17日上课内容 第三章 LVS+Keepalived群集

本章结构 Keepalived概述 keepalived 概述 1.服务功能 故障自动切换 健康检查 节点服务器高可用 HA keepalived工作原理 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题 在一个LVS服务集群中通常有主服务器 (MAST…...

Threejs学习05——球缓冲几何体背景贴图和环境贴图

实现随机多个三角形随机位置随机颜色展示效果 这是一个非常简单基础的threejs的学习应用&#xff01;本节主要学习的是球面缓冲几何体的贴图部分&#xff0c;这里有环境贴图以及背景贴图&#xff0c;这样可以有一种身临其境的效果&#xff01;这里环境贴图用的是一个.hdr的文件…...

LVS+Keepalived群集实验

目录 Keepalived 是什么 Keepalived 功能 Keepalived 模块 工作原理 脑裂现象及解决方案 脑裂 形成脑裂的原因 解决脑裂的几种方法&#xff1a; 为了减少或避免HA集群中出现脑裂现象&#xff0c;我们可以采取以下措施&#xff1a; Keepalived服务主要功能&#xff0…...

软考高级之系统架构师之系统开发基础

架构 场景 场景&#xff08;scenarios&#xff09;在进行体系结构评估时&#xff0c;一般首先要精确地得出具体的质量目标&#xff0c;并以之作为判定该体系结构优劣的标准。为得出这些目标而采用的机制做场景。场景是从风险承担者的角度对与系统的交互的简短描述。在体系结构…...

Web 3.0 安全风险,您需要了解这些内容

随着技术的不断发展&#xff0c;Web 3.0 正在逐渐成为现实&#xff0c;为我们带来了许多新的机遇和挑战。然而&#xff0c;与任何新技术一样&#xff0c;Web 3.0 也伴随着一系列安全风险&#xff0c;这些风险需要被认真对待。在这篇文章中&#xff0c;我们将探讨一些与Web 3.0 …...

万宾科技22款产品入选《城市生命线安全工程监测技术产品名录》

2023年8月17日-18日&#xff0c;由北京市地下管线协会主办的2023首届城市生命线安全与发展大会在北京召开&#xff0c;本次大会汇聚中央及地方政府主管领导、院士专家、行业领袖、龙头代表、产业精英等。 大会聚焦安全监管智慧平台和燃气爆炸、城市内涝、地下管线交互风险、第三…...

MFC 隐藏窗口

亲测能用 改变主窗体的创建方式 将 C***App::InitInstance() 函数中的代码 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; INT_PTR nResponse dlg.DoModal(); 替换为 CMFCApplication1Dlg dlg; m_pMainWnd &dlg; //INT_PTR nResponse dlg.DoModal(); INT_PTR nRe…...

Java数据库连接池原理及spring boot使用数据库连接池(HikariCP、Druid)

和线程池类似&#xff0c;数据库连接池的作用是建立一些和数据库的连接供需要连接数据库的业务使用&#xff0c;避免了每次和数据库建立、销毁连接的性能消耗&#xff0c;通过设置连接池参数可以防止建立连接过多导致服务宕机等&#xff0c;以下介绍Java中主要使用的几种数据库…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...