当前位置: 首页 > news >正文

Flink 离线计算

文章目录

      • 一、样例一:读 csv 文件生成 csv 文件
      • 二、样例二:读 starrocks 写 starrocks
      • 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
      • 四、遇到的坑

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><!--表程序规划器和运行时--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency>

一、样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );// 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");// 2、加载数据源到 DataSetDataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");// 3、将DataSet装换为TableTable students = bTableEnv.fromDataSet(csv);bTableEnv.registerTable("student", students);// 4、注册student表Table result = bTableEnv.sqlQuery("select name,age from student");result.printSchema();DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);System.out.println("count-->" + dset.count());dset.print();// 5、sink输出CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames = {"name", "age"};TypeInformation[] fieldTypes = {Types.STRING, Types.INT};bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);result.insertInto("CsvOutPutTable");env.execute("SQL-Batch");}@Datapublic static class Student {private String name;private int age;}
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

二、样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s = env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());// 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );env.execute("SQL-Batch");}
}

  数据准备:

CREATE TABLE student (name STRING,age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>flink.KafkaDemo1</mainClass></transformer>--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

三、样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);DataSet<Row> dataSource = env.createInput(jdbcInputFormat);dataSource.print();Table students = bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable("student", students);Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");result.printSchema();DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());env.execute("SQL-Batch");}
}

四、遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。

相关文章:

Flink 离线计算

文章目录 一、样例一&#xff1a;读 csv 文件生成 csv 文件二、样例二&#xff1a;读 starrocks 写 starrocks三、样例三&#xff1a;DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 <dependency><groupId>org.apache.flink</groupId><artifactId&…...

Git | 理解团队合作中Git分支的合并操作

合并操作 团队合作中Git分支的合并操作分支合并过程1.创建feature/A分支的过程2. 创建分支feature/A-COPY3.合并分支查看代码是否改变 团队合作中Git分支的合并操作 需求 假设团队项目中的主分支是main,团队成员A基于主分支main创建了feature/A&#xff0c;而我又在团队成员A创…...

C++多态的实现原理

【欢迎关注编码小哥&#xff0c;学习更多实用的编程方法和技巧】 1、类的继承 子类对象在创建时会首先调用父类的构造函数 父类构造函数执行结束后&#xff0c;执行子类的构造函数 当父类的构造函数有参数时&#xff0c;需要在子类的初始化列表中显式调用 Child(int i) : …...

[极客大挑战 2019]PHP--详细解析

信息搜集 想查看页面源代码&#xff0c;但是右键没有这个选项。 我们可以ctrlu或者在url前面加view-source:查看&#xff1a; 没什么有用信息。根据页面的hint&#xff0c;我们考虑扫一下目录看看能不能扫出一些文件. 扫到了备份文件www.zip&#xff0c;解压一下查看网站源代码…...

map用于leetcode

//第一种map方法 function groupAnagrams(strs) {let map new Map()for (let str of strs) {let key str ? : str.split().sort().join()if (!map.has(key)) {map.set(key, [])}map.get(key).push(str)} //此时map为Map(3) {aet > [ eat, tea, ate ],ant > [ tan,…...

CommonJS 和 ES Modules 的 区别

CommonJS 和 ES Modules 的 区别 1. CommonJS 和 ES Modules 区别?1.1 语法差异CommonJS&#xff1a;ES Modules&#xff1a; 1.2. 加载机制CommonJS&#xff1a;ES Modules&#xff1a; 1.3. 运行时行为CommonJS&#xff1a;ES Modules&#xff1a; 1.4. 兼容性和使用场景Com…...

科技为翼 助残向新 高德地图无障碍导航规划突破1.5亿次

今年12月03日是第33个国际残疾人日。在当下科技发展日新月异的时代&#xff0c;如何让残障人士共享科技红利、平等地参与社会生活&#xff0c;成为当前社会关注的热点。 中国有超过8500万残障人士&#xff0c;其中超过2400万为肢残人群&#xff0c;视力障碍残疾人数超过1700万…...

Flink四大基石之Time (时间语义) 的使用详解

目录 一、引言 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime 重要性凸显 三、Watermark 机制详解 核心原理 Watermark能解决什么问题,如何解决的? Watermark图解原理 举例 总结 多并行度的水印触发 Watermark代码演示 需求 代码演示&#xff…...

Spring WebFlux与Spring MVC

Spring WebFlux 是对 Spring Boot 项目中传统 Spring MVC 部分的一种替代选择&#xff0c;主要是为了解决现代 Web 应用在高并发和低延迟场景下的性能瓶颈。 1.WebFlux 是对 Spring MVC 的替代 架构替代&#xff1a; Spring MVC 使用的是基于 Servlet 规范的阻塞式模型&#xf…...

【深度学习基础】一篇入门模型评估指标(分类篇)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;深度学习_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. 模…...

D80【 python 接口自动化学习】- python基础之HTTP

day80 requests请求加入headers 学习日期&#xff1a;20241126 学习目标&#xff1a;http定义及实战 -- requests请求加入headers 学习笔记&#xff1a; requests请求加入headers import requestsurlhttps://movie.douban.com/j/search_subjects params{"type":…...

⽂件操作详解

⽬录 一 文件操作的引入 1 为什么使⽤⽂件&#xff1f; 2 什么是⽂件&#xff1f; 3 文件分类&#xff08;1 从⽂件功能的⻆度来分类&#xff1a;程序⽂件/数据⽂件 2根据数据的组织形式&#xff1a;为⽂本⽂件/⼆进制⽂件&#xff09; 二 ⽂件的打开和关闭 1 …...

双高(高比例新能源发电和高比例电力电子设备)系统宽频振荡研究现状

1 为什么会形成双高电力系统 &#xff08;1&#xff09;新能源发电比例增加 双碳计划&#xff0c;新能源革命&#xff0c;可再生能源逐步代替传统化石能源&#xff0c;未来新能源发电将成为最终能源需求的主要来源。 &#xff08;2&#xff09;电力电子设备数量增加 为了实…...

TorchMoji使用教程/环境配置(2024)

TorchMoji使用教程/环境配置&#xff08;2024&#xff09; TorchMoji简介 这是一个基于pytorch库&#xff0c;用于将文本分类成不同的多种emoji表情的库&#xff0c;适用于文本的情感分析 配置流程 从Anaconda官网根据提示安装conda git拉取TorchMoji git clone https://gi…...

使用 Python 中的 TripoSR 根据图像创建 3D 对象

使用 Python 中的 TripoSR 根据图像创建 3D 对象 1. 效果图2. 步骤图像到 3D 对象设置环境导入必要的库设置设备创建计时器实用程序上传并准备图像处理输入图像生成 3D 模型并渲染下载.stl 文件展示结果3. 源码4. 遇到的问题及解决参考这篇博客将引导如何使用Python 及 TripoSR…...

Spring 框架中AOP(面向切面编程)和 IoC(控制反转)

在 Spring 框架中&#xff0c;AOP&#xff08;面向切面编程&#xff09;和 IoC&#xff08;控制反转&#xff09;是两个核心概念&#xff0c;它们分别负责不同的功能。下面我将通过通俗易懂的解释来帮助你理解这两个概念。 IoC&#xff08;控制反转&#xff09; IoC 是 Inver…...

电机瞬态分析基础(7):坐标变换(3)αβ0变换,dq0变换

1. 三相静止坐标系与两相静止坐标系的坐标变换―αβ0坐标变换 若上述x、y坐标系在空间静止不动&#xff0c;且x轴与A轴重合&#xff0c;即&#xff0c;如图1所示&#xff0c;则为两相静止坐标系&#xff0c;常称为坐标系&#xff0c;考虑到零轴分量&#xff0c;也称为αβ0坐标…...

Open3D (C++) 生成任意3D椭圆点云

目录 一、算法原理1、几何参数2、数学公式二、代码实现三、结果展示一、算法原理 1、几何参数 在三维空间中,椭圆由以下参数定义: 椭圆中心点 c = ( x 0 , y 0 , z...

5.利用Pandas以及Numpy进行数据清洗

1、缺失值处理 import pandas as pd import numpy as np#创建一张7行5列带有缺失值的表&#xff0c;表中的数据0-100随机生成&#xff0c;索引是python1. df pd.DataFrame(datanp.random.randint(0,100,size(7,5)), index [i for i in pythonl])df.iloc[2,3] Nonedf.iloc[4…...

@Bean注解详细介绍以及应用

目录 一、概念二、应用&#xff08;一&#xff09;代码示例1、首先创建一个简单的 Java 类User2、然后创建一个配置类AppConfig3、在其他组件中使用Bean创建的 bean4、通过 Spring 的ApplicationContext来获取UserService并调用其方法 &#xff08;二&#xff09;bean的方法名详…...

技术债务的职场政治:谁该为历史遗留问题买单

在软件测试从业者的日常工作中&#xff0c;技术债务是一个绕不开的话题。它像一颗隐藏在代码深处的定时炸弹&#xff0c;随时可能在项目推进的某个节点爆发&#xff0c;引发一系列连锁反应。而当技术债务问题浮出水面时&#xff0c;一场关于“谁该为历史遗留问题买单”的职场政…...

OpenClaw:让 AI 从 “对话” 走向 “实干” 的开源智能体

在人工智能技术快速发展的今天&#xff0c;大语言模型的对话能力已日趋成熟&#xff0c;但 “能说不能做” 的痛点始终制约着 AI 的实际应用价值。2026 年&#xff0c;一款名为 OpenClaw&#xff08;社区昵称 “小龙虾 AI”&#xff09;的开源项目迅速走红&#xff0c;它以 “真…...

写论文软件哪个好?2026 实测:真文献 + 实证图表 + 全流程,虎贲等考 AI 才是毕业论文通关王

每到毕业季&#xff0c;“写论文软件哪个好” 就成为本硕生最纠结的问题。市面上工具看似繁多&#xff0c;却大多藏着隐患&#xff1a;通用 AI 编造文献、无实证支撑&#xff1b;小众工具功能碎片化、格式混乱&#xff1b;传统软件效率低、无智能辅助…… 选错软件&#xff0c;…...

计算机毕业设计:Python智慧医疗数据可视化与疾病预测系统 Flask框架 随机森林 机器学习 疾病数据 智慧医疗 深度学习(建议收藏)✅

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ > &#x1f345;想要获取完整文章或者源码&#xff0c;或者代做&#xff0c;拉到文章底部即可与…...

别再死记公式了!用Multisim仿真带你玩转反相/同相比例运算电路

用Multisim仿真解锁比例运算电路的实战奥秘 在电子工程的学习中&#xff0c;运算放大器电路一直是让初学者又爱又恨的内容。传统的学习方法往往从公式推导开始&#xff0c;要求学生死记硬背各种电路配置下的增益公式。但今天&#xff0c;我们要打破这种枯燥的学习方式——通过…...

何为可编程控制器?可编程控制器4大内容介绍

可编程控制器在控制中常为使用&#xff0c;因此本文将从4大方面对可编程控制器予以介绍&#xff0c;以增进大家对可编程控制器的了解。这4大方面包括&#xff1a;1.何为可编程控制器?2. 可编程控制器的基本组成&#xff0c;3. 可编程控制器发展史&#xff0c;以及4. 可编程控制…...

如何快速掌控Windows浏览器自由:3步掌握EdgeRemover终极系统优化工具

如何快速掌控Windows浏览器自由&#xff1a;3步掌握EdgeRemover终极系统优化工具 【免费下载链接】EdgeRemover A PowerShell script that correctly uninstalls or reinstalls Microsoft Edge on Windows 10 & 11. 项目地址: https://gitcode.com/gh_mirrors/ed/EdgeRem…...

STM32F4上跑FreeType:手把手教你为嵌入式GUI添加矢量字体(附源码)

STM32F4实战&#xff1a;FreeType矢量字体移植与GUI深度优化指南 1. 嵌入式矢量字体技术选型与原理 在资源受限的嵌入式环境中实现矢量字体渲染&#xff0c;本质上是一场内存效率与视觉质量的博弈。FreeType作为行业标准的字体引擎&#xff0c;其核心优势在于采用二次贝塞尔曲…...

树莓派4B + MPU9250:从零到一搭建你的第一个姿态传感器(附完整代码与避坑指南)

树莓派4B与MPU9250实战&#xff1a;从硬件连接到姿态解算的全流程指南 1. 准备工作与环境搭建 1.1 硬件清单与连接指南 在开始之前&#xff0c;我们需要准备以下硬件组件&#xff1a; 树莓派4B&#xff08;建议4GB内存版本&#xff09;MPU9250九轴传感器模块杜邦线&#xff08;…...

【Portal实战指南】STEP 7 Basic许可证丢失排查与一键修复

1. 问题现象与紧急处理 当你满心欢喜地打开TIA Portal准备开始一天的工作&#xff0c;突然弹出一个令人窒息的提示框&#xff1a;"找不到许可证STEP 7 Basic"。这种情况我遇到过不下十次&#xff0c;每次都能让工程师血压瞬间飙升。别慌&#xff0c;我们先来快速判断…...