当前位置: 首页 > news >正文

FlinkSql一个简单的测试程序

FlinkSql一个简单的测试程序

以下是一个简单的 Flink SQL 示例,展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。


  1. 定义数据实体 CC :
    - CC 类表示数据流中的元素,包含两个字段: character (字符)和 count (计数)。
    - 提供了无参构造函数和带参构造函数,用于创建 CC 对象。
    // 1. 定义数据实体public static class CC {public String character;public long count;public CC() {}public CC(String character, long count) {this.character = character;this.count = count;}} 

  1. 创建执行环境并模拟数据流:
    - 创建了 Flink 执行环境 StreamExecutionEnvironment 和 StreamTableEnvironment 。
    - 创建了一个包含字符串元素的数据流 inputStream ,其中包括 “hello”, “world” 和 “!!!”。
        // 2. 创建执行环境并模拟数据流StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);DataStream<String> inputStream = env.fromElements("hello","world","!!!").uid("source").name("source");

  1. 对数据流进行 flatMap 操作:
    - 使用 flatMap 对每个输入字符串进行拆分,并将每个字符映射为一个 CC 对象。
        // 3. 对数据流进行flatMap()操作SingleOutputStreamOperator<CC> streamOperator = inputStream.flatMap(new FlatMapFunction<String, CC>() {@Overridepublic void flatMap(String value, Collector<CC> out) throws Exception {for (char c : value.toCharArray()) {out.collect(new CC(c + "",1L));}}});

  1. 将数据流转为 Table :
    - 使用 tableEnv.fromDataStream 将 streamOperator 转换为一个 Table 对象。
        // 4. 将数据流转为TableTable table = tableEnv.fromDataStream(streamOperator);

  1. 使用 Table API 操作数据流:
    - 对 table 进行选择和过滤操作,保留字符不为空的记录。
    - 对过滤后的数据进行分组,并计算每个字符的计数总和,将结果存储在 result 中。
        // 5. 使用tableApi操作数据流,并输出结果Table filter = table.select($("character"), $("count")).filter($("character").isNotEqual(""));Table result = filter.groupBy($("character")).select($("character"), $("count").sum().as("character_count"));tableEnv.toRetractStream(result, Row.class).print();

  1. 使用 Flink SQL 操作数据流:
    - 将 table 注册为临时视图 “CC”。
    - 执行 SQL 查询,对 “CC” 进行分组,计算每个字符的计数总和,并将结果存储在 result2 中。
        // 6. 使用FlinkSql操作数据流,并输出结果tableEnv.createTemporaryView("CC", table);Table result2 = tableEnv.sqlQuery("SELECT `character`, SUM(`count`) FROM CC group by `character`");tableEnv.toRetractStream(result2, Row.class).print();

  1. 执行任务:
    - 使用 env.execute(“Flink Sql Test”) 启动 Flink 作业,处理数据流并输出结果。
        // 7.执行任务env.execute("Flink Sql Test");

  1. 执行结果:
(true,+I[h, 1])
(true,+I[e, 1])
(true,+I[l, 1])
(false,-U[l, 1])
(true,+U[l, 2])
(true,+I[o, 1])
(true,+I[w, 1])
(false,-U[o, 1])
(true,+U[o, 2])
(true,+I[r, 1])
(false,-U[l, 2])
(true,+U[l, 3])
(true,+I[d, 1])
(true,+I[!, 1])
(false,-U[!, 1])
(true,+U[!, 2])
(false,-U[!, 2])
(true,+U[!, 3])Process finished with exit code 0

通过这段代码,您可以了解如何使用 Flink Table API 和 Flink SQL 对数据流进行简单的处理和分析,包括数据拆分、选择、过滤、分组和计算。最后,通过 toRetractStream 方法将结果打印输出。

相关文章:

FlinkSql一个简单的测试程序

FlinkSql一个简单的测试程序 以下是一个简单的 Flink SQL 示例&#xff0c;展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。 定义数据实体 CC &#xff1a; - CC 类表示数据流中的元素&#xff0c;包含两个字段&#xff1a; character &#xff08;字符&a…...

二、ActiveMQ安装

ActiveMQ安装 一、相关环境二、安装Java8三、下载安装包四、启动五、其他命令六、开放端口七、后台管理 一、相关环境 环境&#xff1a;Centos7.9安装ActiveMQ版本&#xff1a;5.15.9JDK8 二、安装Java8 安装教程&#xff1a;https://qingsi.blog.csdn.net/article/details/…...

通俗易懂的L0范数和L1范数及其Python实现

定义 L0 范数&#xff08;L0-Norm&#xff09; L0 范数并不是真正意义上的一个范数&#xff0c;因为它不满足范数的三角不等式性质&#xff0c;但它在数学优化和信号处理等领域有着实际的应用。L0 范数指的是向量中非零元素的个数。它通常用来度量向量的稀疏性。数学上表示为…...

如何在30天内使用python制作一个卡牌游戏

如何在30天内使用python制作一个卡牌游戏 第1-5天&#xff1a;规划和设计第6-10天&#xff1a;搭建游戏框架第11-20天&#xff1a;核心游戏机制开发第21-25天&#xff1a;游戏界面和用户体验第26-30天&#xff1a;测试和发布附加建议游戏类型游戏规则设计界面设计技术选型第6-…...

VsCode指定插件安装目录

VsCode指定插件安装目录 VsCode安装的默认目录是在用户目录(%HomePath%)下的.vscode文件夹下的extensions目录下&#xff0c;随着安装插件越来越多会占用大量C盘空间。 指定VsCode的插件目录 Vscode安装目录&#xff1a; D:\Microsoft VS Code\Code.exeVscode插件安装目录&a…...

解决npm淘宝镜像到期问题

1 背景 由于node安装插件是从国外服务器下载&#xff0c;如果没有“特殊手法”&#xff0c;就可能会遇到下载速度慢、或其它异常问题。 所以如果npm的服务器在中国就好了&#xff0c;于是我们乐于分享的淘宝团队干了这事。你可以用此只读的淘宝服务代替官方版本&#xff0c;且…...

【JAVA】java泛型 详解

java泛型 详解 一、参数化类型&#xff08;Parameterized Type&#xff09;&#xff1a;二. 泛型类&#xff08;Generic Class&#xff09;&#xff1a;三. 泛型方法&#xff08;Generic Method&#xff09;&#xff1a;四. 通配符类型&#xff08;Wildcard Type&#xff09;&a…...

基于RBAC的权限管理的理论实现和权限管理的实现

权限管理的理论 首先需要两个页面支持&#xff0c;分别是角色管理和员工管理&#xff0c;其中角色管理对应的是角色和权限的配合&#xff0c;员工管理则是将登录的员工账号和员工所处的角色进行对应&#xff0c;即通过新增角色这个概念&#xff0c;让权限和员工并不直接关联&a…...

Atcoder ABC340 C - Divide and Divide

Divide and Divide&#xff08;分而治之&#xff09; 时间限制&#xff1a;2s 内存限制&#xff1a;1024MB 【原题地址】 所有图片源自Atcoder&#xff0c;题目译文源自脚本Atcoder Better! 点击此处跳转至原题 【问题描述】 【输入格式】 【输出格式】 【样例1】 【样例…...

趣学贝叶斯统计:概率密度分布(probability density function)

目录 1. 分布:PDF与PMFPDFPMF 2. 将概率密度函数应用于我们的问题用积分量化连续分布积分度量变化率&#xff1a;导数 3. R语言实践4. 小结 1. 分布:PDF与PMF PDF PDF定义在连续值上。在连续型随机变量的情况下&#xff0c;具体取某个数值的概率是0&#xff0c;因此PDF并不直…...

伦敦金行情分析需要学习吗?

对于伦敦金交易来说&#xff0c;目前大致分成两派&#xff0c;一派是实干派&#xff0c;认为做伦敦金交易重要的是实战&#xff0c;不需要学习太多东西&#xff0c;否则容易被理论知识所局限。另一派则是强调学习&#xff0c;没有理论知识&#xff0c;投资者很难做好伦敦金交易…...

Java实现停车场收费系统 JAVA+Vue+SpringBoot+MySQL

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 停车位模块2.2 车辆模块2.3 停车收费模块2.4 IC卡模块2.5 IC卡挂失模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 停车场表3.2.2 车辆表3.2.3 停车收费表3.2.4 IC 卡表3.2.5 IC 卡挂失表 四、系统实现五、核心代码…...

服务器遭受 DDoS 攻击的常见迹象有哪些?

服务器遭受 DDoS 攻击的现象很常见&#xff0c;并且有时不容易预防&#xff0c;有部分原因是它们的形式多种多样&#xff0c;而且黑客手段越来越隐蔽。如果您怀疑自己可能遭受 DDoS 攻击&#xff0c;可以寻找多种迹象。以下是 DDoS 攻击的5个常见迹象&#xff1a; 1.网络流量无…...

【机器学习笔记】 15 机器学习项目流程

机器学习的一般步骤 数据清洗 数据清洗是指发现并纠正数据文件中可识别的错误的最后一道程序&#xff0c;包括检查数据一致性&#xff0c;处理无效值和缺失值等。与问卷审核不同&#xff0c;录入后的数据清理一般是由计算机而不是人工完成。 探索性数据分析(EDA 探索性数据…...

【C语言】位操作符与移位操作符练习

目录 前言&#xff1a; 1.一道变态的面试题 2.输入一个整数 n &#xff0c;输出该数32位二进制表示中1的个数。其中负数用补码表示。 方法一&#xff1a; 方法二&#xff1a; 方法三&#xff1a; 3.打印整数二进制的奇数位和偶数位 前言&#xff1a; 前篇我们学习过C语言…...

第十四届“中关村青联杯”全国研究生数学建模竞赛-A题:无人机在抢险救灾中的优化运用

目录 摘 要: 1 问题重述 1.1 问题背景 1.2 待解决的问题 2 模型假设及符号说明...

Android 9.0 Launcher3桌面显示多个相同app图标的解决办法

1.前言 在9.0的系统ROM定制化开发中,在Launcher3的系统原生桌面中,在显示桌面的时候,在禁用和启用app的功能测试的时候,会发现有多个相同app的图标显示在桌面 这对Launcher3的体验效果不是很好,所以为了优化产品,需要解决这个bug,然后让产品更完善 2.桌面显示多个相同…...

WordPress主题YIA在广告位添加图片广告时下方有空白怎么办?

YIA主题设置中默认有4个广告位&#xff0c;而侧边栏的广告位由站长自行添加。boke112百科在这些广告位添加图片广告后发现图片下方有空白&#xff0c;导致下方的两个角没有变圆角&#xff0c;看起来也有点不好看。具体如下图所示&#xff1a; 其实&#xff0c;这个问题就是典型…...

5.15 BCC工具之kvm_hypercall.py解读

一,工具简介 在该示例中,我们可以了解到如何使用eBPF(扩展BPF,Berkeley Packet Filter的扩展)和bcc(BPF Compiler Collection)来分析KVM(Kernel-based Virtual Machine)中的超级调用(hypercall)。 即当exit_reason为VMCALL时,有状态的kvm_entry和kvm_exit记录以及…...

git 解除本地分支与其它分支(远程分支)的关联

开发中&#xff0c;我在同事的分支开一条分支&#xff0c;并将同事的分支作为关联分支&#xff0c;前两天还好&#xff0c;我一个人在干活&#xff0c;然而第3天&#xff0c;同事回来了&#xff0c;他在他那条分支也开发&#xff0c;这时就会出现2种情况&#xff0c; 1. 同时修…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

拉力测试cuda pytorch 把 4070显卡拉满

import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试&#xff0c;通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小&#xff0c;增大可提高计算复杂度duration: 测试持续时间&#xff08;秒&…...

安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖

在Vuzix M400 AR智能眼镜的助力下&#xff0c;卢森堡罗伯特舒曼医院&#xff08;the Robert Schuman Hospitals, HRS&#xff09;凭借在无菌制剂生产流程中引入增强现实技术&#xff08;AR&#xff09;创新项目&#xff0c;荣获了2024年6月7日由卢森堡医院药剂师协会&#xff0…...

LOOI机器人的技术实现解析:从手势识别到边缘检测

LOOI机器人作为一款创新的AI硬件产品&#xff0c;通过将智能手机转变为具有情感交互能力的桌面机器人&#xff0c;展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家&#xff0c;我将全面解析LOOI的技术实现架构&#xff0c;特别是其手势识别、物体识别和环境…...

离线语音识别方案分析

随着人工智能技术的不断发展&#xff0c;语音识别技术也得到了广泛的应用&#xff0c;从智能家居到车载系统&#xff0c;语音识别正在改变我们与设备的交互方式。尤其是离线语音识别&#xff0c;由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力&#xff0c;广…...

【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...

电脑桌面太单调,用Python写一个桌面小宠物应用。

下面是一个使用Python创建的简单桌面小宠物应用。这个小宠物会在桌面上游荡&#xff0c;可以响应鼠标点击&#xff0c;并且有简单的动画效果。 import tkinter as tk import random import time from PIL import Image, ImageTk import os import sysclass DesktopPet:def __i…...

背包问题双雄:01 背包与完全背包详解(Java 实现)

一、背包问题概述 背包问题是动态规划领域的经典问题&#xff0c;其核心在于如何在有限容量的背包中选择物品&#xff0c;使得总价值最大化。根据物品选择规则的不同&#xff0c;主要分为两类&#xff1a; 01 背包&#xff1a;每件物品最多选 1 次&#xff08;选或不选&#…...