当前位置: 首页 > news >正文

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH = "info.txt";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field("userId", DataTypes.VARCHAR(50)).field("ts", DataTypes.INT()).field("val", DataTypes.DOUBLE())).createTemporaryTable("input");Table dataTable = tableEnv.from("input");Table aggregateTable = dataTable.groupBy("userId").select("userId, userId.count as total, val.avg as avgVal");String sql="create table jdbcOutputTable (" +" userId varchar(50) not null,total bigint,avgVal double " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +" 'connector.table' = 'demo', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = 123456' )";tableEnv.sqlUpdate(sql);aggregateTable.insertInto("jdbcOutputTable");tableEnv.execute("my job");}
}

文件info.txt

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

相关文章:

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionE…...

【Java 多线程 哈希表】 HashTable, HashMap, ConcurrentHashMap 之间的区别

HashTable、HashMap和ConcurrentHashMap都是Java中用于存储键值对的集合框架的一部分,但它们之间存在一些重要的联系和区别。 联系 键值对存储:它们都用于存储键值对,并允许你根据键来检索值。基于哈希:它们内部都使用了哈希表来…...

有趣之matlab-烟花

待整合1 2 3 动态 有趣编程之11 静态 逼真 3 .m文件路径下放back1.jpg back4.jpg…背景照片 点击screen 就会有小白点升起,爆炸 function yanhuamoban()clear all;%定义全局变量global ah ;%坐标轴句柄global styleNum ;%爆炸图案样式global multiColor; %多颜色变换…...

C语言指针与数组(不适合初学者版):一篇文章带你深入了解指针与数组!

🎈个人主页:JAMES别扣了 💕在校大学生一枚。对IT有着极其浓厚的兴趣 ✨系列专栏目前为C语言初阶、后续会更新c语言的学习方法以及c题目分享. 😍希望我的文章对大家有着不一样的帮助,欢迎大家关注我,我也会回…...

springboot Mongo大数据查询优化方案

前言 因为项目需要把传感器的数据保存起来,当时设计的时是mongo来存储,后期需要从mongo DB里查询传感器的数据记录。由于传感器每秒都会像mongo数据库存500条左右的数据,1天就有4320万条数据,要想按照时间条件去查询,…...

Ollama管理本地开源大模型,用Open WebUI访问Ollama接口

现在开源大模型一个接一个的,而且各个都说自己的性能非常厉害,但是对于我们这些使用者,用起来就比较尴尬了。因为一个模型一个调用的方式,先得下载模型,下完模型,写加载代码,麻烦得很。 对于程…...

Linux--基本知识入门

一.几个基本知识 终端: CtrlAltT 或者桌面/文件夹右键,打开终端切换为管理员: sudo su 退出:exit查看内核版本号: uname -a内核版本号含义: 5 代表主版本号;13代表次版本号;0代表修订版本号;30代表修订版本的第几次微调;数字越大表示内核越新. 二.目录…...

基于springboot+vue实现的大学计算机课程管理平台的设计与实现(全套资料)

一、系统架构 前端:vue | antv 后端:springboot | mybatis-plus 环境:jdk17 | mysql | maven | node | redis 二、代码及数据库 三、功能介绍 01. 登录页 02. 首页 03. 系统基础模块-用户管理 04. 系统基础模块-部门…...

LeetCode2115. 从给定原材料中找到所有可以做出的菜

拓扑排序 题面 题目链接:2115. 从给定原材料中找到所有可以做出的菜 - 力扣(LeetCode) 你有 n 道不同菜的信息。给你一个字符串数组 recipes 和一个二维字符串数组 ingredients 。第 i 道菜的名字为 recipes[i] ,如果你有它 所有…...

项目性能优化—性能优化的指标、目标

项目性能优化—性能优化的指标、目标 性能优化的终极目标是什么 性能优化的目标实际上是为了更好的用户体验: 一般我们认为用户体验是下面的公式: 用户体验 产品设计(非技术) 系统性能 ≈ 系统性能 快 那什么样的体验叫快呢…...

蓝桥杯刷题(三)

一、P8752 [蓝桥杯 2021 省 B2] 特殊年份(洛谷) 题目描述 今年是 2021 年,2021 这个数字非常特殊, 它的千位和十位相等, 个位比百位大 1,我们称满足这样条件的年份为特殊年份。 输入 5 个年份,请计算这里面有多少个…...

20240312-算法复习打卡day21||● 530.二叉搜索树的最小绝对差 ● 501.二叉搜索树中的众数 ● 236. 二叉树的最近公共祖先

530.二叉搜索树的最小绝对差 1.中序遍历得到升序数组 class Solution { private:vector<int> vec;void traversal(TreeNode* root) {if (root NULL) return;if (root->left) traversal(root->left);vec.push_back(root->val);if (root->right) traversal(r…...

今天我们来学习一下关于MySQL数据库

目录 前言: 1.MySQL定义&#xff1a; 1.1基础概念&#xff1a; 1.1.1数据库&#xff08;Database&#xff09;&#xff1a; 1.1.2表&#xff08;Table&#xff09;&#xff1a; 1.1.3记录&#xff08;Record&#xff09;与字段&#xff08;Field&#xff09;&#xff1a; …...

长期护理保险可改善老年人心理健康 | CHARLS CLHLS CFPS 公共数据库周报(3.6)...

欢迎报名2024年“真实世界临床研究”课程&#xff01; 本周郑老师开讲&#xff1a;“真实世界临床研究”培训班&#xff0c;3月16-17日两天&#xff0c;欢迎报名&#xff01; CHARLS公共数据库‍ CHARLS数据库简介中国健康与养老追踪调查(China Health and Retirement Longitud…...

49、C++/友元、常成员函数和常对象、运算符重载学习20240314

一、封装类 用其成员函数实现&#xff08;对该类的&#xff09;数学运算符的重载&#xff08;加法&#xff09;&#xff0c;并封装一个全局函数实现&#xff08;对该类的&#xff09;数学运算符的重载&#xff08;减法&#xff09;。 代码&#xff1a; #include <iostream…...

SQL Server错误:15404

执行维护计划失败&#xff0c;提示SQL Server Error 15404 无法获取有关... 异常如下图&#xff1a; 原因&#xff1a;数据库用户名与计算机名称不一致 解决办法&#xff1a;1.重名称数据库用户名 将前缀改成计算机名 2.重启SQL Server代理...

Halcon文件操作

1、Region读写操作 region&#xff08;区域&#xff09;是一种重要的数据类型&#xff0c;用于表示图像中的特定区域。这些区域可以代表图像中的目标、感兴趣的区域、边缘、形状等等 read_image (Image, printer_chip/printer_chip_01) dev_open_window (0, 0, 512, 512, black…...

【测试知识】业务面试问答突击版1

高内聚低耦合 高内聚指的是将相关的功能或数据组织在一起&#xff0c;使得模块内部的各个元素紧密地联系在一起&#xff0c;完成特定的任务。 低耦合指的是模块之间的依赖关系尽可能地降低&#xff0c;模块之间的接口简单清晰&#xff0c;减少模块之间的相互影响。 文章目录 整…...

使用el-row及el-col页面缩放时出现空行解决方案

问题&#xff1a; 当缩放到90%或者110%&#xff0c;选中下拉后&#xff0c;下方就会出现空行 如下图所示&#xff1a; 关于el-row 和 el-col &#xff1a; 参数说明类型可选值默认值span栅格占据的列数number—24offset栅格左侧的间隔格数number—0push栅格向右移动格数number…...

java中几种对象存储(文件存储)中间件的介绍

一、前言 在博主得到系统中使用的对象存储主要有OSS&#xff08;阿里云的对象存储&#xff09; COS&#xff08;腾讯云的对象存储&#xff09;OBS&#xff08;华为云的对象存储&#xff09;还有就是MinIO 这些玩意。其实这种东西大差不差&#xff0c;几乎实现方式都是一样&…...

Jupyter C内核:在Notebook中实现C语言交互式编程的完整指南

Jupyter C内核&#xff1a;在Notebook中实现C语言交互式编程的完整指南 【免费下载链接】jupyter-c-kernel Minimal Jupyter C kernel 项目地址: https://gitcode.com/gh_mirrors/ju/jupyter-c-kernel Jupyter C内核是一个开源项目&#xff0c;为Jupyter Notebook提供完…...

别再只看benchmark!Claude的“类人延迟响应”背后藏着7种语境锚定策略

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;类人延迟响应的本质&#xff1a;为什么“慢”才是更高级的智能 人类在面对复杂问题时&#xff0c;并非即时作答&#xff0c;而是经历感知、检索、权衡、修正等多阶段认知循环——这种可观察的“延迟”&…...

军规零外源设备要求,无感定位完全替代UWB硬件堆叠方案

军规零外源设备要求&#xff0c;无感定位完全替代UWB硬件堆叠方案军队营区管控、战备执勤、野外演训、涉密阵地等场景&#xff0c;严格遵循军规装备管理准则&#xff0c;奉行零外源附加设备硬性管控标准&#xff0c;严禁额外加装大量外置终端、基站、线缆类附属设施。传统UWB定…...

2026第四届“盘古石杯“晋级赛 手机取证 手搓复盘(write up)

手机取证1. 分析黄志远phone.E01检材&#xff0c;黄志远手机总共安装了多少款短视频应用&#xff1f;[答案格式&#xff1a;1]apk 分析里面&#xff0c;4 个。当时把 b 站也算上了2. 分析黄志远phone.E01检材&#xff0c;黄志远手机安装的龙虾应用的包名是什么&#xff1f;[答案…...

【2026年华为暑期实习-非AI方向(通软嵌软测试算法数据科学)- 5月22日-第二题- 建筑物的安全视野】(题目+思路+JavaC++Python解析+在线测试)

题目内容 在城市规划中,建筑师需要分析建筑物之间的视野关系。给出一条街道上的一排建筑物,每个建筑物有一定的高度。对于每个建筑物,我们定义一个安全视野距离:从该建筑物向右看,能看到的建筑物的数量。 一个建筑物 AAA 能够看到另一个建筑物 BBB 的条件是: BB...

【稀缺首发】全球仅12家头部科技公司验证的AI Agent机器学习架构(附可复用决策树模板)

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;AI Agent机器学习应用的范式跃迁 传统机器学习系统通常以静态模型为中心&#xff0c;依赖人工特征工程、离线训练与固定推理流程。而AI Agent的兴起正推动一场根本性范式跃迁&#xff1a;从“被动预测”转向“…...

机房动环监控系统是什么?其主要功能及智能运用方向有哪些?

机房动环监控系统的重要性 为信息技术的稳健运行提供了保障&#xff0c;尤其是在现代社会日益依赖数据和信息的背景下。利用实时监测电力、温度、湿度环境参数发生&#xff0c;还大幅度降低了因停机带来的经济损失。以大榕树科技为例&#xff0c;他们采用先进的监控系统&#x…...

工业AI数字孪生技术:工业制造的虚拟革命 数字孪生(Digital Twin)通过实时数据采集、三维建模和AI仿真,为物理设备创建动态虚拟副本,实现工业全生命周期的监控与优化的方案

CSDN标签&#xff1a; 数字孪生 Digital Twin 工业AI 虚拟仿真 Unity3D BIM 引言&#xff1a;当工厂有了自己的"虚拟分身" 想象一下&#xff0c;如果你有一个和你一模一样的"克隆体"——它知道你的心跳、呼吸、每一个动作&#xff0c;甚至能预测你下一秒会…...

深度解析 | SRE 核心机制:如何通过“错误预算”平衡速度与稳定性?

在网站可靠性工程 (SRE) 的世界中&#xff0c;在创新的速度与系统的稳定性之间找到完美的平衡是一项持续的挑战。虽然开发团队致力于快速发布新功能&#xff0c;但运维团队和 SRE 的目标则是保持系统平稳运行且不中断。这种利益冲突常常导致团队之间的摩擦。而这正是错误预算 (…...

FactoryBluePrints项目深度解析:戴森球计划终极工厂蓝图优化指南

FactoryBluePrints项目深度解析&#xff1a;戴森球计划终极工厂蓝图优化指南 【免费下载链接】FactoryBluePrints 游戏戴森球计划的**工厂**蓝图仓库 项目地址: https://gitcode.com/GitHub_Trending/fa/FactoryBluePrints FactoryBluePrints项目是戴森球计划游戏中最为…...