当前位置: 首页 > news >正文

Flink 离线计算

文章目录

      • 一、样例一:读 csv 文件生成 csv 文件
      • 二、样例二:读 starrocks 写 starrocks
      • 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
      • 四、遇到的坑

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><!--表程序规划器和运行时--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency>

一、样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );// 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");// 2、加载数据源到 DataSetDataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");// 3、将DataSet装换为TableTable students = bTableEnv.fromDataSet(csv);bTableEnv.registerTable("student", students);// 4、注册student表Table result = bTableEnv.sqlQuery("select name,age from student");result.printSchema();DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);System.out.println("count-->" + dset.count());dset.print();// 5、sink输出CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames = {"name", "age"};TypeInformation[] fieldTypes = {Types.STRING, Types.INT};bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);result.insertInto("CsvOutPutTable");env.execute("SQL-Batch");}@Datapublic static class Student {private String name;private int age;}
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

二、样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s = env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());// 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );env.execute("SQL-Batch");}
}

  数据准备:

CREATE TABLE student (name STRING,age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>flink.KafkaDemo1</mainClass></transformer>--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

三、样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);DataSet<Row> dataSource = env.createInput(jdbcInputFormat);dataSource.print();Table students = bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable("student", students);Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");result.printSchema();DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());env.execute("SQL-Batch");}
}

四、遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。

相关文章:

Flink 离线计算

文章目录 一、样例一&#xff1a;读 csv 文件生成 csv 文件二、样例二&#xff1a;读 starrocks 写 starrocks三、样例三&#xff1a;DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 <dependency><groupId>org.apache.flink</groupId><artifactId&…...

Git | 理解团队合作中Git分支的合并操作

合并操作 团队合作中Git分支的合并操作分支合并过程1.创建feature/A分支的过程2. 创建分支feature/A-COPY3.合并分支查看代码是否改变 团队合作中Git分支的合并操作 需求 假设团队项目中的主分支是main,团队成员A基于主分支main创建了feature/A&#xff0c;而我又在团队成员A创…...

C++多态的实现原理

【欢迎关注编码小哥&#xff0c;学习更多实用的编程方法和技巧】 1、类的继承 子类对象在创建时会首先调用父类的构造函数 父类构造函数执行结束后&#xff0c;执行子类的构造函数 当父类的构造函数有参数时&#xff0c;需要在子类的初始化列表中显式调用 Child(int i) : …...

[极客大挑战 2019]PHP--详细解析

信息搜集 想查看页面源代码&#xff0c;但是右键没有这个选项。 我们可以ctrlu或者在url前面加view-source:查看&#xff1a; 没什么有用信息。根据页面的hint&#xff0c;我们考虑扫一下目录看看能不能扫出一些文件. 扫到了备份文件www.zip&#xff0c;解压一下查看网站源代码…...

map用于leetcode

//第一种map方法 function groupAnagrams(strs) {let map new Map()for (let str of strs) {let key str ? : str.split().sort().join()if (!map.has(key)) {map.set(key, [])}map.get(key).push(str)} //此时map为Map(3) {aet > [ eat, tea, ate ],ant > [ tan,…...

CommonJS 和 ES Modules 的 区别

CommonJS 和 ES Modules 的 区别 1. CommonJS 和 ES Modules 区别?1.1 语法差异CommonJS&#xff1a;ES Modules&#xff1a; 1.2. 加载机制CommonJS&#xff1a;ES Modules&#xff1a; 1.3. 运行时行为CommonJS&#xff1a;ES Modules&#xff1a; 1.4. 兼容性和使用场景Com…...

科技为翼 助残向新 高德地图无障碍导航规划突破1.5亿次

今年12月03日是第33个国际残疾人日。在当下科技发展日新月异的时代&#xff0c;如何让残障人士共享科技红利、平等地参与社会生活&#xff0c;成为当前社会关注的热点。 中国有超过8500万残障人士&#xff0c;其中超过2400万为肢残人群&#xff0c;视力障碍残疾人数超过1700万…...

Flink四大基石之Time (时间语义) 的使用详解

目录 一、引言 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime 重要性凸显 三、Watermark 机制详解 核心原理 Watermark能解决什么问题,如何解决的? Watermark图解原理 举例 总结 多并行度的水印触发 Watermark代码演示 需求 代码演示&#xff…...

Spring WebFlux与Spring MVC

Spring WebFlux 是对 Spring Boot 项目中传统 Spring MVC 部分的一种替代选择&#xff0c;主要是为了解决现代 Web 应用在高并发和低延迟场景下的性能瓶颈。 1.WebFlux 是对 Spring MVC 的替代 架构替代&#xff1a; Spring MVC 使用的是基于 Servlet 规范的阻塞式模型&#xf…...

【深度学习基础】一篇入门模型评估指标(分类篇)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;深度学习_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. 模…...

D80【 python 接口自动化学习】- python基础之HTTP

day80 requests请求加入headers 学习日期&#xff1a;20241126 学习目标&#xff1a;http定义及实战 -- requests请求加入headers 学习笔记&#xff1a; requests请求加入headers import requestsurlhttps://movie.douban.com/j/search_subjects params{"type":…...

⽂件操作详解

⽬录 一 文件操作的引入 1 为什么使⽤⽂件&#xff1f; 2 什么是⽂件&#xff1f; 3 文件分类&#xff08;1 从⽂件功能的⻆度来分类&#xff1a;程序⽂件/数据⽂件 2根据数据的组织形式&#xff1a;为⽂本⽂件/⼆进制⽂件&#xff09; 二 ⽂件的打开和关闭 1 …...

双高(高比例新能源发电和高比例电力电子设备)系统宽频振荡研究现状

1 为什么会形成双高电力系统 &#xff08;1&#xff09;新能源发电比例增加 双碳计划&#xff0c;新能源革命&#xff0c;可再生能源逐步代替传统化石能源&#xff0c;未来新能源发电将成为最终能源需求的主要来源。 &#xff08;2&#xff09;电力电子设备数量增加 为了实…...

TorchMoji使用教程/环境配置(2024)

TorchMoji使用教程/环境配置&#xff08;2024&#xff09; TorchMoji简介 这是一个基于pytorch库&#xff0c;用于将文本分类成不同的多种emoji表情的库&#xff0c;适用于文本的情感分析 配置流程 从Anaconda官网根据提示安装conda git拉取TorchMoji git clone https://gi…...

使用 Python 中的 TripoSR 根据图像创建 3D 对象

使用 Python 中的 TripoSR 根据图像创建 3D 对象 1. 效果图2. 步骤图像到 3D 对象设置环境导入必要的库设置设备创建计时器实用程序上传并准备图像处理输入图像生成 3D 模型并渲染下载.stl 文件展示结果3. 源码4. 遇到的问题及解决参考这篇博客将引导如何使用Python 及 TripoSR…...

Spring 框架中AOP(面向切面编程)和 IoC(控制反转)

在 Spring 框架中&#xff0c;AOP&#xff08;面向切面编程&#xff09;和 IoC&#xff08;控制反转&#xff09;是两个核心概念&#xff0c;它们分别负责不同的功能。下面我将通过通俗易懂的解释来帮助你理解这两个概念。 IoC&#xff08;控制反转&#xff09; IoC 是 Inver…...

电机瞬态分析基础(7):坐标变换(3)αβ0变换,dq0变换

1. 三相静止坐标系与两相静止坐标系的坐标变换―αβ0坐标变换 若上述x、y坐标系在空间静止不动&#xff0c;且x轴与A轴重合&#xff0c;即&#xff0c;如图1所示&#xff0c;则为两相静止坐标系&#xff0c;常称为坐标系&#xff0c;考虑到零轴分量&#xff0c;也称为αβ0坐标…...

Open3D (C++) 生成任意3D椭圆点云

目录 一、算法原理1、几何参数2、数学公式二、代码实现三、结果展示一、算法原理 1、几何参数 在三维空间中,椭圆由以下参数定义: 椭圆中心点 c = ( x 0 , y 0 , z...

5.利用Pandas以及Numpy进行数据清洗

1、缺失值处理 import pandas as pd import numpy as np#创建一张7行5列带有缺失值的表&#xff0c;表中的数据0-100随机生成&#xff0c;索引是python1. df pd.DataFrame(datanp.random.randint(0,100,size(7,5)), index [i for i in pythonl])df.iloc[2,3] Nonedf.iloc[4…...

@Bean注解详细介绍以及应用

目录 一、概念二、应用&#xff08;一&#xff09;代码示例1、首先创建一个简单的 Java 类User2、然后创建一个配置类AppConfig3、在其他组件中使用Bean创建的 bean4、通过 Spring 的ApplicationContext来获取UserService并调用其方法 &#xff08;二&#xff09;bean的方法名详…...

实战演练:基于快马平台快速构建一个电商场景的智能客服AI Agent

实战演练&#xff1a;基于快马平台快速构建一个电商场景的智能客服AI Agent 最近在做一个电商项目&#xff0c;需要给平台增加智能客服功能。传统开发流程要写大量业务逻辑代码&#xff0c;还要处理前后端对接&#xff0c;想想就头大。后来发现用InsCode(快马)平台可以快速实现…...

ViGEmBus虚拟手柄驱动全栈技术指南:从内核原理到游戏控制革新

ViGEmBus虚拟手柄驱动全栈技术指南&#xff1a;从内核原理到游戏控制革新 【免费下载链接】ViGEmBus Windows kernel-mode driver emulating well-known USB game controllers. 项目地址: https://gitcode.com/gh_mirrors/vi/ViGEmBus 一、认知虚拟手柄技术&#xff1a;…...

逆向分析WhatsApp的GIF功能:用Frida抓取Tenor API的完整请求与响应数据

逆向工程实战&#xff1a;用Frida解密WhatsApp的GIF数据流 当你在WhatsApp中发送一个GIF表情时&#xff0c;是否好奇过这个动态图片是如何从服务器传输到你的手机上的&#xff1f;今天我们将深入WhatsApp客户端内部&#xff0c;通过动态插桩工具Frida来捕获和分析其背后的Tenor…...

NaViL-9B效果实测:支持中英文混排表格图像的行列结构识别与内容提取

NaViL-9B效果实测&#xff1a;支持中英文混排表格图像的行列结构识别与内容提取 1. 模型介绍 NaViL-9B是新一代原生多模态大语言模型&#xff0c;专为处理复杂视觉-语言任务设计。与常规视觉模型不同&#xff0c;它不仅能够理解图片内容&#xff0c;还能精准解析表格、文档等…...

无线通信入门:为什么说DFT是提升OFDM信道估计性能的“降噪神器”?

无线通信中的降噪艺术&#xff1a;DFT如何让OFDM信道估计更精准 想象一下&#xff0c;你正试图在嘈杂的咖啡馆里听清朋友的谈话。背景音乐、周围人的聊天声、杯盘碰撞声都在干扰你获取清晰的信息。无线通信中的信道估计面临类似的挑战——如何在充满噪声的传输环境中&#xff0…...

FaceFusion项目二次开发踩坑记:深入content_analyser.py,手动修复模型依赖哈希问题

FaceFusion项目二次开发踩坑记&#xff1a;深入content_analyser.py&#xff0c;手动修复模型依赖哈希问题 当你在全新环境中部署经过二次开发的FaceFusion项目时&#xff0c;可能会遇到一个令人头疼的问题——模型文件哈希校验失败。这个问题通常表现为控制台输出类似[FACEFUS…...

ETH-01模块避坑指南:为什么HTTP协议不行而TCP直接监听成功?

ETH-01模块协议选择实战&#xff1a;从HTTP困境到TCP高效监听 第一次拿到ETH-01这个串口转以太网模块时&#xff0c;我和大多数开发者一样&#xff0c;本能地选择了HTTP协议进行通信测试。毕竟在Web开发领域&#xff0c;HTTP就像空气一样无处不在。但当我花了整整两天时间调试…...

6种专业计时模式:让OBS直播时间管理变得如此简单

6种专业计时模式&#xff1a;让OBS直播时间管理变得如此简单 【免费下载链接】obs-advanced-timer 项目地址: https://gitcode.com/gh_mirrors/ob/obs-advanced-timer 想让你的直播画面看起来更加专业吗&#xff1f;OBS高级计时器正是你需要的秘密武器&#xff01;这款…...

别再只盯着Midjourney了!2025年,这5款文生图模型更适合你的具体业务场景

2025年五大文生图模型实战指南&#xff1a;如何为你的业务精准匹配AI工具 当Midjourney成为文生图领域的"网红"时&#xff0c;真正懂行的从业者已经在根据具体业务需求选择更合适的工具了。就像专业摄影师不会只用一款镜头拍所有题材&#xff0c;明智的AI应用者需要建…...

用Node.js和request-promise玩转EduCoder API:手把手教你搭建自己的实训答案库

用Node.js构建EduCoder实训数据采集系统的工程实践 在编程教育平台EduCoder上&#xff0c;实训关卡的设计往往需要学习者反复尝试和验证。作为开发者&#xff0c;我们能否通过技术手段实现实训数据的自动化采集与管理&#xff1f;本文将深入探讨如何基于Node.js生态构建一个稳定…...