当前位置: 首页 > news >正文

flink table view datastream互转

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()/*+----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*///    table1
//      .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()//    println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
//      .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
//      .print()env.execute("jobName1")}}

相关文章:

flink table view datastream互转

case class outer(f1:String,f2:Inner) case class outerV1(f1:String,f2:Inner,f3:Int) case class Inner(f3:String,f4:Int) 测试代码 package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.tabl…...

redis重启后数据丢失问题解决(亲测好用)

redis修改密码重启后发现redis中的数据丢失了 解决办法&#xff1a; 首先在redis的安装目录下查找重启之前的dump.rdb文件&#xff0c;发现只有当天的一个dump.rdb文件&#xff0c;确认不是重启备份的文件 然后我就全盘找一下dump.rdb的备份文件&#xff0c;找到前一天的备份…...

敬请期待……

敬请期待…… 《Python百宝箱》 序号文章目录直达链接表白系列1无法拒绝的表白界面https://want595.blog.csdn.net/article/details/1347448942满屏飘字表白代码https://want595.blog.csdn.net/article/details/1350373883无限弹窗表白代码...

3.10 Android eBPF HelloWorld调试(四)

一,读取eBPF map的android应用程序示例 1.1 C++源码及源码解读 /system/memory/bpfmapparsed/hello_world_map_parser.cpp //基于aosp android12#define LOG_TAG "BPF_MAP_PARSER"#include <log/log.h> #include <stdlib.h> #include <unistd.h&g…...

PyTorch常用工具(1)数据处理

文章目录 前言1 数据处理1.1 Dataset1.2 DataLoader 前言 在训练神经网络的过程中需要用到很多的工具&#xff0c;最重要的是数据处理、可视化和GPU加速。本章主要介绍PyTorch在这些方面常用的工具模块&#xff0c;合理使用这些工具可以极大地提高编程效率。 由于内容较多&am…...

docker-简单说说cgroup

前面我们简单说了下namespace&#xff0c; 现在我们来接着简单说说cgroup。通过docker-简单说说namespace文章我们知道&#xff1a; namespace 是为了隔离进程组之间的资源&#xff0c;那cgroup就是为了对进程组的监控和限制资源。Cgroup 可以限制进程组使用的资源数量和分配&a…...

印象笔记04: 如何将印象笔记超级会员价值最大化利用?

印象笔记04&#xff1a; 如何将印象笔记超级会员价值最大化利用&#xff1f; 为什么有这个问题 我不知道有没有人一开始接触印象笔记觉得非常好。奈何只能两个设备同步&#xff0c;局限太多。而会员活动比较优惠——就开了会员。而且我开了十年……。只能开发一下看看怎么最大…...

我的JDK动态代理流程

我的JDK动态代理流程 我梳理的动态代理流程大约是&#xff1a; 如果每一个框架都有自己的BPP&#xff0c;且自己的BPP中都有自己的wrapIfNecessory&#xff0c;那样可能就是一个BPP一个代理类。但通常应该都是各自的框架以提供 Advisior&#xff08;切面&#xff09;的方式&am…...

uniapp Vue3 面包屑导航 带动态样式

上干货 <template><view class"bei"><view class"container"><view class"indicator"></view><!-- 遍历路由列表 --><view v-for"(item, index) in routes" :key"index" :class&quo…...

openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作

文章目录 openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作174.1 事务隔离说明174.2 写入和读写操作174.3 并发写入事务的潜在死锁情况 openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作 174.1 事务隔离说…...

数据分析可被划分为4个重要的类别

1、描述型&#xff1a;发生了什么&#xff1f; 全面、准确、实时的数据有效的可视化 2、诊断型&#xff1a;为什么会发生&#xff1f; 能够深入了解问题的根本原因隔离所有混淆信息的能力 3、预测型&#xff1a;可能发生什么&#xff1f; 通过历史数据来预测特定的结果通过…...

爆火小游戏敲木鱼流量主小程序源码系统+完整的代码包以及安装搭建教程

随着移动互联网的快速发展&#xff0c;小程序已成为一种新的应用形态&#xff0c;深入到人们生活的方方面面。其中&#xff0c;小游戏由于其简单、有趣的特点&#xff0c;吸引了大量用户&#xff0c;也成为了许多开发者的首选。敲木鱼小游戏&#xff0c;以其独特的玩法和轻松的…...

Invoke和BeginInvoke的区别

Invoke和BeginInvoke的区别 本文导读&#xff1a;BeginInvoke() 调用时&#xff0c;当前线程会启用线程池中的某个线程来执行此方法&#xff0c;当前线程不被阻塞&#xff0c;继续运行后面的代码&#xff0c; Invoke() 调用时&#xff0c;会阻塞当前线程&#xff0c;等到 Invo…...

3 分钟为英语学习神器 Anki 部署一个专属同步服务器

Anki 介绍 Anki 是一款基于间隔重复&#xff08;Spaced Repetition&#xff09;原理的学习软件&#xff0c;想象一下&#xff0c;你的大脑就像是一个需要定期维护的精密仪器。间隔重复就好比是一种精准的维护计划&#xff0c;它通过在最佳时刻复习信息&#xff0c;来确保知识在…...

<HarmonyOS第一课>应用程序框架

【习题】应用程序框架 目录 判断题 单选题 多选题 判断题 1. 一个应用只能有一个UIAbility。错误 正确(True)错误(False) 2. 创建的Empty Ability模板工程&#xff0c;初始会生成一个UIAbility文件。正确 正确(True)错误(False) 3. 每调用一次router.pushUrl()方法&…...

SQL 解析 — 如何轻松实现新增语句

KaiwuDB 支持多种不同类型的 SQL 语句&#xff0c;例如 create、insert 等。本文将介绍在 KaiwuDB SQL Parser&#xff08;下文统称解析器&#xff09;中添加新语句的过程及其实现。我们将了解如何使用 goyacc 工具更新解析器&#xff0c;以及执行器和查询计划器&#xff08;pl…...

Android集成OpenSSL实现加解密-集成

导入so 将编译生成的 OpenSSL 动态库文件&#xff08;.so 文件&#xff09;复制到你的 Android 项目的 libs 目录中 导入头文件 将编译生成的include文件夹导入到项目中 build.gradle添加配置 defaultConfig {……testInstrumentationRunner "androidx.test.runner…...

代码随想录算法训练营Day18|513.找树左下角的值、112. 路径总和、113. 路径总和ii、106.从中序与后序遍历序列构造二叉树

目录 513.找树左下角的值 前言 层序遍历 递归法 112. 路径总和 前言 递归法 113. 路径总和ii 前言 递归法 106.从中序与后序遍历序列构造二叉树 前言 思路 递归法 总结 513.找树左下角的值 题目链接 文章链接 前言 本题要求得到二叉树最后一行最左边的值&#xf…...

【蓝桥备赛】技能升级——二分查找

题目链接 技能升级 个人思路 需要给n个技能添加技能点&#xff0c;无论技能点加成如何衰减&#xff0c;每次始终都是选择当前技能加点加成最高的那一项技能&#xff0c;所以最后一次的加点一定也是加在当时技能攻击加成最高的那个。此时&#xff0c;我们去寻找最后一次的加点…...

zyqn-arm软中断设置

所有SGI都是边缘触发的&#xff0c;sgi的灵敏度类型是固定的&#xff0c;不能改变。 软中断初始化流程 1、初始化异常处理 2、初始化中断控制器 3、注册异常处理回调函数到CPU 4、连接软中断信号与注册软中断回调函数 5、使能中断控制器中的软中断中断 6、使能异常处理 …...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

JDK 17 新特性

#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持&#xff0c;不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的&#xff…...

LRU 缓存机制详解与实现(Java版) + 力扣解决

&#x1f4cc; LRU 缓存机制详解与实现&#xff08;Java版&#xff09; 一、&#x1f4d6; 问题背景 在日常开发中&#xff0c;我们经常会使用 缓存&#xff08;Cache&#xff09; 来提升性能。但由于内存有限&#xff0c;缓存不可能无限增长&#xff0c;于是需要策略决定&am…...

为什么要创建 Vue 实例

核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...