当前位置: 首页 > article >正文

Flink Table API 编程入门实践


Flink Table API 编程入门实践

前言

Apache Flink 是目前大数据实时计算领域的明星产品,Flink Table API 则为开发者提供了声明式、类似 SQL 的数据处理能力,兼具 SQL 的易用性与编程 API 的灵活性。本文将带你快速了解 Flink Table API 的基本用法,并通过代码示例帮助你快速上手。


一、环境准备

在 Flink 中,所有 Table API 操作都需要基于 TableEnvironment。对于流处理场景,我们一般这样创建环境:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

二、数据源定义

Table API 支持多种数据源。最常见的两种方式为:

1. 从 DataStream 创建 Table

DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("Alice", 12),new MyPojo("Bob", 10)
);
Table table = tableEnv.fromDataStream(dataStream);

2. 从外部系统注册 Table

比如从 Kafka 注册一张表:

tableEnv.executeSql("CREATE TABLE user_orders (" +" user_id STRING, " +" order_amount DOUBLE " +") WITH (" +" 'connector' = 'kafka', " +" 'topic' = 'orders', " +" 'properties.bootstrap.servers' = 'localhost:9092', " +" 'format' = 'json'" +")"
);

三、Table API 常见操作

Table API 提供了丰富的数据处理能力,如筛选、聚合、分组、连接等。例如:

import static org.apache.flink.table.api.Expressions.$;// 筛选和选择字段
Table result = table.filter($("age").isGreater(10)).select($("name"), $("age"));// 分组聚合
Table agg = table.groupBy($("name")).select($("name"), $("age").avg().as("avg_age"));

四、结果输出

将 Table 转换为 DataStream,方便后续处理或输出:

DataStream<Row> resultStream = tableEnv.toDataStream(result);
resultStream.print();

五、与 SQL API 结合

Table API 与 SQL API 可以无缝结合。例如:

Table sqlResult = tableEnv.sqlQuery("SELECT name, AVG(age) as avg_age FROM my_table GROUP BY name"
);

六、完整示例

下面是一个完整的 Flink Table API 示例,演示数据流到 Table 的转换、聚合与结果输出:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class TableApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 创建数据流DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("Alice", 12),new MyPojo("Bob", 10),new MyPojo("Alice", 15));// 转换为 TableTable table = tableEnv.fromDataStream(dataStream);// Table API 查询Table result = table.groupBy($("name")).select($("name"), $("age").avg().as("avg_age"));// 输出结果DataStream<Row> resultStream = tableEnv.toDataStream(result);resultStream.print();env.execute();}public static class MyPojo {public String name;public Integer age;public MyPojo() {}public MyPojo(String name, Integer age) {this.name = name;this.age = age;}}
}

七、常见问题与建议

  • 字段名区分大小写,需与数据结构一致。
  • Table API 与 SQL API 可混用,灵活应对不同场景。
  • 生产环境推荐结合 Catalog 管理元数据。
  • Flink 1.14 以后批流统一,建议优先采用流模式开发。

结语

Flink Table API 极大地提升了大数据实时处理的开发效率,结合 SQL 的易用性和 API 的灵活性,非常适合复杂业务场景的数据处理。希望本文能帮你快速入门 Flink Table API,后续还可以深入了解窗口聚合、UDF、自定义 Connector 等高级特性。

如果你在学习和实践中遇到问题,欢迎留言交流!

相关文章:

Flink Table API 编程入门实践

Flink Table API 编程入门实践 前言 Apache Flink 是目前大数据实时计算领域的明星产品&#xff0c;Flink Table API 则为开发者提供了声明式、类似 SQL 的数据处理能力&#xff0c;兼具 SQL 的易用性与编程 API 的灵活性。本文将带你快速了解 Flink Table API 的基本用法&am…...

MongoDB 安全机制详解:全方位保障数据安全

在当今数据驱动的时代&#xff0c;数据库安全至关重要。MongoDB 作为一款流行的 NoSQL 数据库&#xff0c;广泛应用于 Web 应用、大数据分析和物联网等领域。然而&#xff0c;随着 MongoDB 的普及&#xff0c;其安全性也面临诸多挑战&#xff0c;如未授权访问、数据泄露和注入攻…...

Teensy LC 一款由 PJRC 公司开发的高性能 32 位微控制器开发板

Teensy LC 是一款由 PJRC 公司开发的高性能 32 位微控制器开发板&#xff0c;具有以下特点&#xff1a; 硬件配置 核心处理器 &#xff1a;采用 MKL26Z64VFT4 ARM Cortex-M0 处理器&#xff0c;运行频率为 48MHz&#xff0c;相较于传统的 8 位 AVR 处理器&#xff0c;速度更快…...

MicroPython 开发ESP32应用教程 之 线程介绍及实例分析

MicroPython ESP32 线程&#xff08;Thread&#xff09;基础 MicroPython 在 ESP32 上支持线程&#xff08;Thread&#xff09;功能&#xff0c;通过 _thread 模块实现。线程允许程序并发执行多个任务&#xff0c;适合处理需要同时运行的场景&#xff0c;例如传感器数据采集和…...

鸿蒙5开发宝藏案例分享---一多断点开发实践

&#x1f31f;【鸿蒙开发实战进阶】六大核心案例深度拆解&#xff0c;带你玩转多端适配&#xff01; &#x1f4d0; 案例4&#xff1a;动态网格布局&#xff08;电商商品列表&#xff09; 应用场景&#xff1a;手机/平板商品展示差异 痛点分析&#xff1a;手机单列→平板多列&…...

嵌入式学习之系统编程(六)线程

目录 一、线程 &#xff08;一&#xff09;线程概念 &#xff08;二&#xff09;特征 &#xff08;三&#xff09;优缺点 二、线程与进程的区别&#xff08;面问&#xff09; 三、多线程程序设计步骤 四、线程的创建&#xff08;相关函数&#xff09; 1、pthread_create…...

分布式常见概念

分布式常见概念 反向代理正向代理 vs 反向代理&#xff08;对比理解名称&#xff09;正向代理示意&#xff08;“我去帮你拿数据”&#xff09;反向代理示意&#xff08;“你找我&#xff0c;我替你联系内部服务器”&#xff09;为什么叫“反向”&#xff1f; API网关一、为什么…...

数据库的事务(Transaction)

在数据库中&#xff0c;事务&#xff08;Transaction&#xff09; 是保证数据操作一致性和完整性的核心机制。它通过一组原子性的操作单元&#xff0c;确保所有操作要么全部成功&#xff08;提交&#xff09;&#xff0c;要么全部失败&#xff08;回滚&#xff09;。以下是数据…...

大语言模型 提示词的少样本案例的 演示选择与排序新突破

提示词中 演示示例的选择与排序 这篇论文《Rapid Selection and Ordering of In-Context Demonstrations via Prompt Embedding Clustering》聚焦于提升大语言模型(LLMs)在自适应上下文学习(ICL)场景中演示示例的选择与排序效率 一、论文要解决的问题 在上下文学习(ICL)…...

【算法篇】二分查找算法:基础篇

题目链接&#xff1a; 34.在排序数组中查找元素的第一个和最后一个位置 题目描述&#xff1a; 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返…...

Qtc++开发遇到的问题-按钮点击不管用?

我在设计自己的控件的时候&#xff0c;遇到了按钮点击不管用的问题&#xff0c;而且是有的自定义控件不管用&#xff0c;有的管用&#xff0c;有的一开始管用&#xff0c;多点几次就不管用了&#xff0c; 它是这样的&#xff0c;一个lineEdit和位于两侧的按钮&#xff0c;分别…...

重磅发布 | 复旦533页《大规模语言模型:从理论到实践(第2版)》(免费下载)

在人工智能浪潮席卷全球的今天&#xff0c;大语言模型正以前所未有的速度推动着科技进步和产业变革。从 ChatGPT 到各类行业应用&#xff0c;LLM 不仅重塑了人机交互的方式&#xff0c;更成为推动学术研究与产业创新的关键技术。 面对这一飞速演进的技术体系&#xff0c;如何系…...

智能体赋能效率,企业知识库沉淀价值:UMI企业智脑的双轮驱动!

智能体企业知识库&#xff1a;UMI企业智脑的核心功能与价值 在人工智能技术飞速发展的今天&#xff0c;企业智能化转型已经成为不可逆转的趋势。作为企业级AI智能体开发平台的佼佼者&#xff0c;优秘智能推出的UMI企业智脑&#xff0c;以其强大的智能体开发能力和全面的企业知…...

STM32CubeMX,arm-none-eabi-gcc简单试用

在windows下&#xff0c;为stm32系列单片机编程&#xff0c;keil有了免费的试用版&#xff0c;有很多开发板示例&#xff0c;给学习单片机编程带来很大的方便。 STM32CubeMX提供了stm32单片机的功能设置&#xff0c;在输出方式上给出了几种方式&#xff0c;有mdk&#xff08;k…...

Spring AI(一)

Spring AI 官网 Spring AI 是一个用于 AI 工程的应用程序框架。其目标是将 Spring 生态系统设计原则(如可移植性和模块化设计)应用于 AI 领域,并将使用 POJO 作为应用程序的构建块推广到 AI 领域。 Spring AI 的核心是解决了 AI 集成的根本挑战:将您的企业数据和 API 与 A…...

Nacos适配GaussDB超详细部署流程

1部署openGauss 官方文档下载 https://support.huaweicloud.com/download_gaussdb/index.html 社区地址 安装包下载 本文主要是以部署轻量级为主要教程 1.1系统环境准备 操作系统选择 系统AARCH64X86-64openEuler√√CentOS7√Docker√√1.2软硬件安装环境 版本轻量版(单…...

vue-pure-admin动态路由无Layout实现解决方案

背景&#xff1a; 最近在使用vue-pure-admin开发后台项目的时候发现作者并没有动态路由的全屏无Layout实现方案。查询作者路由发现&#xff0c;作者只做了静态路由的无Layout方案&#xff0c;其它动态路由&#xff0c;作者在做整合的时候&#xff0c;都放进了 \ 下面的子路由&…...

vue项目 build时@vue-office/docx报错

我在打包vue项目时&#xff0c; 开始用的npm run build和cnpm run build&#xff0c;总是提示 vue-office/docx 错误&#xff0c;尝试过用cnpm重新安装node_modules几次都没用。类似下面的提示一直有。 Error: [commonjs--resolver] Failed to resolve entry for package "…...

卓力达蚀刻工艺:精密制造的跨行业赋能者

引言 蚀刻技术作为现代精密制造的核心工艺之一&#xff0c;通过化学或物理方法对金属材料进行选择性去除&#xff0c;实现微米级复杂结构的加工。南通卓力达凭借20余年技术积淀与全产业链布局&#xff0c;成为全球高端制造领域的重要支撑力量。本文将从蚀刻技术的多领域应用与…...

【大模型面试每日一题】Day 30:解释一下 FlashAttention 技术,并对比其与传统注意力在显存效率和计算性能上的差异。

【大模型面试每日一题】Day 30&#xff1a;解释一下 FlashAttention 技术&#xff0c;并对比其与传统注意力在显存效率和计算性能上的差异。 &#x1f4cc; 题目重现 &#x1f31f;&#x1f31f; 面试官&#xff1a;解释一下 FlashAttention 技术&#xff0c;并对比其与传统注…...

#RabbitMQ# 消息队列入门

目录 一 MQ技术选型 1 运行rabbitmq 2 基本介绍 3 快速入门 1 交换机负责路由消息给队列 2 数据隔离 二 Java客户端 1 快速入门 2 WorkQueue 3 FanOut交换机 4 Direct交换机 5 Topic交换机 *6 声明队列交换机 1 在配置类当中声明 2 使用注解的方式指定 7 消息转…...

在promise中,多个then如何传值

在 JavaScript 中&#xff0c;Promise 的多个 .then() 是链式调用的&#xff0c;值可以通过返回值的方式&#xff0c;在多个 .then() 之间传递。这是 Promise 链式调用的核心机制。 基本原理&#xff1a;每个 then 接收上一个 then 的返回值 new Promise((resolve, reject) &g…...

TCP 三次握手过程详解

TCP 三次握手过程详解 一、TCP握手基础概念 1.1 什么是TCP握手 TCP三次握手是传输控制协议(Transmission Control Protocol)在建立连接时的标准过程,目的是确保通信双方具备可靠的双向通信能力。 关键结论:三次握手的本质是通过序列号同步和能力协商建立可靠的逻辑连接。 …...

EPT(Efficient Prompt Tuning)方法,旨在解决提示调优(Prompt Tuning)中效率与准确性平衡和跨任务一致性的问题

EPT(Efficient Prompt Tuning)方法,旨在解决提示调优(Prompt Tuning)中效率与准确性平衡和跨任务一致性的问题 一、核心原理:分解提示与多空间投影 1. 提示分解:用低秩矩阵压缩长提示 传统问题: 长提示(如100个token)精度高但训练慢,短提示(如20个token)速度快但…...

云原生安全核心:云安全责任共担模型(Shared Responsibility Model)详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 1. 基础概念 什么是云安全责任共担模型&#xff1f; 云安全责任共担模型&#xff08;Shared Responsibility Model, SRM&#xff09;是云服务提供商&…...

go并发与锁之sync.Mutex入门

sync.Mutex 原理&#xff1a;一个共享的变量&#xff0c;哪个线程握到了&#xff0c;哪个线程可以执行代码 功能&#xff1a;一个性能不错的悲观锁&#xff0c;使用方式和Java的ReentrantLock很像&#xff0c;就是手动Lock&#xff0c;手动UnLock。 使用例子&#xff1a; v…...

[Java恶补day8] 3. 无重复字符的最长子串

给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长 子串 的长度。 示例 1: 输入: s “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc”&#xff0c;所以其长度为 3。 示例 2: 输入: s “bbbbb” 输出: 1 解释: 因为无重复字符的最长子串是 “…...

LabVIEW教学用开发平台

一、培训目标 基础编程&#xff1a;掌握 LabVIEW 数据类型、程序结构、子 VI 设计与调试技巧。 硬件通信&#xff1a;精通 RS-232/485、TCP/IP、Modbus、PLC 等工业通信协议及实现。 高级设计模式&#xff1a;熟练运用状态机、生产者 - 消费者模式构建复杂测控系统。 项目实…...

Package Size Comparison – 6 Leads

Package Size Comparison 6 LeadsTSOP SOT SM SMT SOT23 SC-74 SC-59 SC-88 SOT363 US6 UMT6 SC-70 SOT563 ES EMT SC-75-6...

python打卡day38

Dataset和DataLoader 知识点回顾&#xff1a; Dataset类的__getitem__和__len__方法&#xff08;本质是python的特殊方法&#xff09;Dataloader类minist手写数据集的了解 作业&#xff1a;了解下cifar数据集&#xff0c;尝试获取其中一张图片 在遇到大规模数据集时&#xff0c…...