当前位置: 首页 > news >正文

Flink 离线计算

文章目录

      • 一、样例一:读 csv 文件生成 csv 文件
      • 二、样例二:读 starrocks 写 starrocks
      • 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
      • 四、遇到的坑

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><!--表程序规划器和运行时--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency>

一、样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );// 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");// 2、加载数据源到 DataSetDataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");// 3、将DataSet装换为TableTable students = bTableEnv.fromDataSet(csv);bTableEnv.registerTable("student", students);// 4、注册student表Table result = bTableEnv.sqlQuery("select name,age from student");result.printSchema();DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);System.out.println("count-->" + dset.count());dset.print();// 5、sink输出CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames = {"name", "age"};TypeInformation[] fieldTypes = {Types.STRING, Types.INT};bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);result.insertInto("CsvOutPutTable");env.execute("SQL-Batch");}@Datapublic static class Student {private String name;private int age;}
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

二、样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s = env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());// 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );env.execute("SQL-Batch");}
}

  数据准备:

CREATE TABLE student (name STRING,age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>flink.KafkaDemo1</mainClass></transformer>--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

三、样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);DataSet<Row> dataSource = env.createInput(jdbcInputFormat);dataSource.print();Table students = bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable("student", students);Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");result.printSchema();DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());env.execute("SQL-Batch");}
}

四、遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。

相关文章:

Flink 离线计算

文章目录 一、样例一&#xff1a;读 csv 文件生成 csv 文件二、样例二&#xff1a;读 starrocks 写 starrocks三、样例三&#xff1a;DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 <dependency><groupId>org.apache.flink</groupId><artifactId&…...

Git | 理解团队合作中Git分支的合并操作

合并操作 团队合作中Git分支的合并操作分支合并过程1.创建feature/A分支的过程2. 创建分支feature/A-COPY3.合并分支查看代码是否改变 团队合作中Git分支的合并操作 需求 假设团队项目中的主分支是main,团队成员A基于主分支main创建了feature/A&#xff0c;而我又在团队成员A创…...

C++多态的实现原理

【欢迎关注编码小哥&#xff0c;学习更多实用的编程方法和技巧】 1、类的继承 子类对象在创建时会首先调用父类的构造函数 父类构造函数执行结束后&#xff0c;执行子类的构造函数 当父类的构造函数有参数时&#xff0c;需要在子类的初始化列表中显式调用 Child(int i) : …...

[极客大挑战 2019]PHP--详细解析

信息搜集 想查看页面源代码&#xff0c;但是右键没有这个选项。 我们可以ctrlu或者在url前面加view-source:查看&#xff1a; 没什么有用信息。根据页面的hint&#xff0c;我们考虑扫一下目录看看能不能扫出一些文件. 扫到了备份文件www.zip&#xff0c;解压一下查看网站源代码…...

map用于leetcode

//第一种map方法 function groupAnagrams(strs) {let map new Map()for (let str of strs) {let key str ? : str.split().sort().join()if (!map.has(key)) {map.set(key, [])}map.get(key).push(str)} //此时map为Map(3) {aet > [ eat, tea, ate ],ant > [ tan,…...

CommonJS 和 ES Modules 的 区别

CommonJS 和 ES Modules 的 区别 1. CommonJS 和 ES Modules 区别?1.1 语法差异CommonJS&#xff1a;ES Modules&#xff1a; 1.2. 加载机制CommonJS&#xff1a;ES Modules&#xff1a; 1.3. 运行时行为CommonJS&#xff1a;ES Modules&#xff1a; 1.4. 兼容性和使用场景Com…...

科技为翼 助残向新 高德地图无障碍导航规划突破1.5亿次

今年12月03日是第33个国际残疾人日。在当下科技发展日新月异的时代&#xff0c;如何让残障人士共享科技红利、平等地参与社会生活&#xff0c;成为当前社会关注的热点。 中国有超过8500万残障人士&#xff0c;其中超过2400万为肢残人群&#xff0c;视力障碍残疾人数超过1700万…...

Flink四大基石之Time (时间语义) 的使用详解

目录 一、引言 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime 重要性凸显 三、Watermark 机制详解 核心原理 Watermark能解决什么问题,如何解决的? Watermark图解原理 举例 总结 多并行度的水印触发 Watermark代码演示 需求 代码演示&#xff…...

Spring WebFlux与Spring MVC

Spring WebFlux 是对 Spring Boot 项目中传统 Spring MVC 部分的一种替代选择&#xff0c;主要是为了解决现代 Web 应用在高并发和低延迟场景下的性能瓶颈。 1.WebFlux 是对 Spring MVC 的替代 架构替代&#xff1a; Spring MVC 使用的是基于 Servlet 规范的阻塞式模型&#xf…...

【深度学习基础】一篇入门模型评估指标(分类篇)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;深度学习_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. 模…...

D80【 python 接口自动化学习】- python基础之HTTP

day80 requests请求加入headers 学习日期&#xff1a;20241126 学习目标&#xff1a;http定义及实战 -- requests请求加入headers 学习笔记&#xff1a; requests请求加入headers import requestsurlhttps://movie.douban.com/j/search_subjects params{"type":…...

⽂件操作详解

⽬录 一 文件操作的引入 1 为什么使⽤⽂件&#xff1f; 2 什么是⽂件&#xff1f; 3 文件分类&#xff08;1 从⽂件功能的⻆度来分类&#xff1a;程序⽂件/数据⽂件 2根据数据的组织形式&#xff1a;为⽂本⽂件/⼆进制⽂件&#xff09; 二 ⽂件的打开和关闭 1 …...

双高(高比例新能源发电和高比例电力电子设备)系统宽频振荡研究现状

1 为什么会形成双高电力系统 &#xff08;1&#xff09;新能源发电比例增加 双碳计划&#xff0c;新能源革命&#xff0c;可再生能源逐步代替传统化石能源&#xff0c;未来新能源发电将成为最终能源需求的主要来源。 &#xff08;2&#xff09;电力电子设备数量增加 为了实…...

TorchMoji使用教程/环境配置(2024)

TorchMoji使用教程/环境配置&#xff08;2024&#xff09; TorchMoji简介 这是一个基于pytorch库&#xff0c;用于将文本分类成不同的多种emoji表情的库&#xff0c;适用于文本的情感分析 配置流程 从Anaconda官网根据提示安装conda git拉取TorchMoji git clone https://gi…...

使用 Python 中的 TripoSR 根据图像创建 3D 对象

使用 Python 中的 TripoSR 根据图像创建 3D 对象 1. 效果图2. 步骤图像到 3D 对象设置环境导入必要的库设置设备创建计时器实用程序上传并准备图像处理输入图像生成 3D 模型并渲染下载.stl 文件展示结果3. 源码4. 遇到的问题及解决参考这篇博客将引导如何使用Python 及 TripoSR…...

Spring 框架中AOP(面向切面编程)和 IoC(控制反转)

在 Spring 框架中&#xff0c;AOP&#xff08;面向切面编程&#xff09;和 IoC&#xff08;控制反转&#xff09;是两个核心概念&#xff0c;它们分别负责不同的功能。下面我将通过通俗易懂的解释来帮助你理解这两个概念。 IoC&#xff08;控制反转&#xff09; IoC 是 Inver…...

电机瞬态分析基础(7):坐标变换(3)αβ0变换,dq0变换

1. 三相静止坐标系与两相静止坐标系的坐标变换―αβ0坐标变换 若上述x、y坐标系在空间静止不动&#xff0c;且x轴与A轴重合&#xff0c;即&#xff0c;如图1所示&#xff0c;则为两相静止坐标系&#xff0c;常称为坐标系&#xff0c;考虑到零轴分量&#xff0c;也称为αβ0坐标…...

Open3D (C++) 生成任意3D椭圆点云

目录 一、算法原理1、几何参数2、数学公式二、代码实现三、结果展示一、算法原理 1、几何参数 在三维空间中,椭圆由以下参数定义: 椭圆中心点 c = ( x 0 , y 0 , z...

5.利用Pandas以及Numpy进行数据清洗

1、缺失值处理 import pandas as pd import numpy as np#创建一张7行5列带有缺失值的表&#xff0c;表中的数据0-100随机生成&#xff0c;索引是python1. df pd.DataFrame(datanp.random.randint(0,100,size(7,5)), index [i for i in pythonl])df.iloc[2,3] Nonedf.iloc[4…...

@Bean注解详细介绍以及应用

目录 一、概念二、应用&#xff08;一&#xff09;代码示例1、首先创建一个简单的 Java 类User2、然后创建一个配置类AppConfig3、在其他组件中使用Bean创建的 bean4、通过 Spring 的ApplicationContext来获取UserService并调用其方法 &#xff08;二&#xff09;bean的方法名详…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...

【 java 虚拟机知识 第一篇 】

目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...

【Veristand】Veristand环境安装教程-Linux RT / Windows

首先声明&#xff0c;此教程是针对Simulink编译模型并导入Veristand中编写的&#xff0c;同时需要注意的是老用户编译可能用的是Veristand Model Framework&#xff0c;那个是历史版本&#xff0c;且NI不会再维护&#xff0c;新版本编译支持为VeriStand Model Generation Suppo…...

ubuntu系统文件误删(/lib/x86_64-linux-gnu/libc.so.6)修复方案 [成功解决]

报错信息&#xff1a;libc.so.6: cannot open shared object file: No such file or directory&#xff1a; #ls, ln, sudo...命令都不能用 error while loading shared libraries: libc.so.6: cannot open shared object file: No such file or directory重启后报错信息&…...

comfyui 工作流中 图生视频 如何增加视频的长度到5秒

comfyUI 工作流怎么可以生成更长的视频。除了硬件显存要求之外还有别的方法吗&#xff1f; 在ComfyUI中实现图生视频并延长到5秒&#xff0c;需要结合多个扩展和技巧。以下是完整解决方案&#xff1a; 核心工作流配置&#xff08;24fps下5秒120帧&#xff09; #mermaid-svg-yP…...