当前位置: 首页 > news >正文

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH = "info.txt";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field("userId", DataTypes.VARCHAR(50)).field("ts", DataTypes.INT()).field("val", DataTypes.DOUBLE())).createTemporaryTable("input");Table dataTable = tableEnv.from("input");Table aggregateTable = dataTable.groupBy("userId").select("userId, userId.count as total, val.avg as avgVal");String sql="create table jdbcOutputTable (" +" userId varchar(50) not null,total bigint,avgVal double " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +" 'connector.table' = 'demo', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = 123456' )";tableEnv.sqlUpdate(sql);aggregateTable.insertInto("jdbcOutputTable");tableEnv.execute("my job");}
}

文件info.txt

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

相关文章:

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionE…...

【Java 多线程 哈希表】 HashTable, HashMap, ConcurrentHashMap 之间的区别

HashTable、HashMap和ConcurrentHashMap都是Java中用于存储键值对的集合框架的一部分,但它们之间存在一些重要的联系和区别。 联系 键值对存储:它们都用于存储键值对,并允许你根据键来检索值。基于哈希:它们内部都使用了哈希表来…...

有趣之matlab-烟花

待整合1 2 3 动态 有趣编程之11 静态 逼真 3 .m文件路径下放back1.jpg back4.jpg…背景照片 点击screen 就会有小白点升起,爆炸 function yanhuamoban()clear all;%定义全局变量global ah ;%坐标轴句柄global styleNum ;%爆炸图案样式global multiColor; %多颜色变换…...

C语言指针与数组(不适合初学者版):一篇文章带你深入了解指针与数组!

🎈个人主页:JAMES别扣了 💕在校大学生一枚。对IT有着极其浓厚的兴趣 ✨系列专栏目前为C语言初阶、后续会更新c语言的学习方法以及c题目分享. 😍希望我的文章对大家有着不一样的帮助,欢迎大家关注我,我也会回…...

springboot Mongo大数据查询优化方案

前言 因为项目需要把传感器的数据保存起来,当时设计的时是mongo来存储,后期需要从mongo DB里查询传感器的数据记录。由于传感器每秒都会像mongo数据库存500条左右的数据,1天就有4320万条数据,要想按照时间条件去查询,…...

Ollama管理本地开源大模型,用Open WebUI访问Ollama接口

现在开源大模型一个接一个的,而且各个都说自己的性能非常厉害,但是对于我们这些使用者,用起来就比较尴尬了。因为一个模型一个调用的方式,先得下载模型,下完模型,写加载代码,麻烦得很。 对于程…...

Linux--基本知识入门

一.几个基本知识 终端: CtrlAltT 或者桌面/文件夹右键,打开终端切换为管理员: sudo su 退出:exit查看内核版本号: uname -a内核版本号含义: 5 代表主版本号;13代表次版本号;0代表修订版本号;30代表修订版本的第几次微调;数字越大表示内核越新. 二.目录…...

基于springboot+vue实现的大学计算机课程管理平台的设计与实现(全套资料)

一、系统架构 前端:vue | antv 后端:springboot | mybatis-plus 环境:jdk17 | mysql | maven | node | redis 二、代码及数据库 三、功能介绍 01. 登录页 02. 首页 03. 系统基础模块-用户管理 04. 系统基础模块-部门…...

LeetCode2115. 从给定原材料中找到所有可以做出的菜

拓扑排序 题面 题目链接:2115. 从给定原材料中找到所有可以做出的菜 - 力扣(LeetCode) 你有 n 道不同菜的信息。给你一个字符串数组 recipes 和一个二维字符串数组 ingredients 。第 i 道菜的名字为 recipes[i] ,如果你有它 所有…...

项目性能优化—性能优化的指标、目标

项目性能优化—性能优化的指标、目标 性能优化的终极目标是什么 性能优化的目标实际上是为了更好的用户体验: 一般我们认为用户体验是下面的公式: 用户体验 产品设计(非技术) 系统性能 ≈ 系统性能 快 那什么样的体验叫快呢…...

蓝桥杯刷题(三)

一、P8752 [蓝桥杯 2021 省 B2] 特殊年份(洛谷) 题目描述 今年是 2021 年,2021 这个数字非常特殊, 它的千位和十位相等, 个位比百位大 1,我们称满足这样条件的年份为特殊年份。 输入 5 个年份,请计算这里面有多少个…...

20240312-算法复习打卡day21||● 530.二叉搜索树的最小绝对差 ● 501.二叉搜索树中的众数 ● 236. 二叉树的最近公共祖先

530.二叉搜索树的最小绝对差 1.中序遍历得到升序数组 class Solution { private:vector<int> vec;void traversal(TreeNode* root) {if (root NULL) return;if (root->left) traversal(root->left);vec.push_back(root->val);if (root->right) traversal(r…...

今天我们来学习一下关于MySQL数据库

目录 前言: 1.MySQL定义&#xff1a; 1.1基础概念&#xff1a; 1.1.1数据库&#xff08;Database&#xff09;&#xff1a; 1.1.2表&#xff08;Table&#xff09;&#xff1a; 1.1.3记录&#xff08;Record&#xff09;与字段&#xff08;Field&#xff09;&#xff1a; …...

长期护理保险可改善老年人心理健康 | CHARLS CLHLS CFPS 公共数据库周报(3.6)...

欢迎报名2024年“真实世界临床研究”课程&#xff01; 本周郑老师开讲&#xff1a;“真实世界临床研究”培训班&#xff0c;3月16-17日两天&#xff0c;欢迎报名&#xff01; CHARLS公共数据库‍ CHARLS数据库简介中国健康与养老追踪调查(China Health and Retirement Longitud…...

49、C++/友元、常成员函数和常对象、运算符重载学习20240314

一、封装类 用其成员函数实现&#xff08;对该类的&#xff09;数学运算符的重载&#xff08;加法&#xff09;&#xff0c;并封装一个全局函数实现&#xff08;对该类的&#xff09;数学运算符的重载&#xff08;减法&#xff09;。 代码&#xff1a; #include <iostream…...

SQL Server错误:15404

执行维护计划失败&#xff0c;提示SQL Server Error 15404 无法获取有关... 异常如下图&#xff1a; 原因&#xff1a;数据库用户名与计算机名称不一致 解决办法&#xff1a;1.重名称数据库用户名 将前缀改成计算机名 2.重启SQL Server代理...

Halcon文件操作

1、Region读写操作 region&#xff08;区域&#xff09;是一种重要的数据类型&#xff0c;用于表示图像中的特定区域。这些区域可以代表图像中的目标、感兴趣的区域、边缘、形状等等 read_image (Image, printer_chip/printer_chip_01) dev_open_window (0, 0, 512, 512, black…...

【测试知识】业务面试问答突击版1

高内聚低耦合 高内聚指的是将相关的功能或数据组织在一起&#xff0c;使得模块内部的各个元素紧密地联系在一起&#xff0c;完成特定的任务。 低耦合指的是模块之间的依赖关系尽可能地降低&#xff0c;模块之间的接口简单清晰&#xff0c;减少模块之间的相互影响。 文章目录 整…...

使用el-row及el-col页面缩放时出现空行解决方案

问题&#xff1a; 当缩放到90%或者110%&#xff0c;选中下拉后&#xff0c;下方就会出现空行 如下图所示&#xff1a; 关于el-row 和 el-col &#xff1a; 参数说明类型可选值默认值span栅格占据的列数number—24offset栅格左侧的间隔格数number—0push栅格向右移动格数number…...

java中几种对象存储(文件存储)中间件的介绍

一、前言 在博主得到系统中使用的对象存储主要有OSS&#xff08;阿里云的对象存储&#xff09; COS&#xff08;腾讯云的对象存储&#xff09;OBS&#xff08;华为云的对象存储&#xff09;还有就是MinIO 这些玩意。其实这种东西大差不差&#xff0c;几乎实现方式都是一样&…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...