当前位置: 首页 > news >正文

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH = "info.txt";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field("userId", DataTypes.VARCHAR(50)).field("ts", DataTypes.INT()).field("val", DataTypes.DOUBLE())).createTemporaryTable("input");Table dataTable = tableEnv.from("input");Table aggregateTable = dataTable.groupBy("userId").select("userId, userId.count as total, val.avg as avgVal");String sql="create table jdbcOutputTable (" +" userId varchar(50) not null,total bigint,avgVal double " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +" 'connector.table' = 'demo', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = 123456' )";tableEnv.sqlUpdate(sql);aggregateTable.insertInto("jdbcOutputTable");tableEnv.execute("my job");}
}

文件info.txt

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

相关文章:

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionE…...

【Java 多线程 哈希表】 HashTable, HashMap, ConcurrentHashMap 之间的区别

HashTable、HashMap和ConcurrentHashMap都是Java中用于存储键值对的集合框架的一部分,但它们之间存在一些重要的联系和区别。 联系 键值对存储:它们都用于存储键值对,并允许你根据键来检索值。基于哈希:它们内部都使用了哈希表来…...

有趣之matlab-烟花

待整合1 2 3 动态 有趣编程之11 静态 逼真 3 .m文件路径下放back1.jpg back4.jpg…背景照片 点击screen 就会有小白点升起,爆炸 function yanhuamoban()clear all;%定义全局变量global ah ;%坐标轴句柄global styleNum ;%爆炸图案样式global multiColor; %多颜色变换…...

C语言指针与数组(不适合初学者版):一篇文章带你深入了解指针与数组!

🎈个人主页:JAMES别扣了 💕在校大学生一枚。对IT有着极其浓厚的兴趣 ✨系列专栏目前为C语言初阶、后续会更新c语言的学习方法以及c题目分享. 😍希望我的文章对大家有着不一样的帮助,欢迎大家关注我,我也会回…...

springboot Mongo大数据查询优化方案

前言 因为项目需要把传感器的数据保存起来,当时设计的时是mongo来存储,后期需要从mongo DB里查询传感器的数据记录。由于传感器每秒都会像mongo数据库存500条左右的数据,1天就有4320万条数据,要想按照时间条件去查询,…...

Ollama管理本地开源大模型,用Open WebUI访问Ollama接口

现在开源大模型一个接一个的,而且各个都说自己的性能非常厉害,但是对于我们这些使用者,用起来就比较尴尬了。因为一个模型一个调用的方式,先得下载模型,下完模型,写加载代码,麻烦得很。 对于程…...

Linux--基本知识入门

一.几个基本知识 终端: CtrlAltT 或者桌面/文件夹右键,打开终端切换为管理员: sudo su 退出:exit查看内核版本号: uname -a内核版本号含义: 5 代表主版本号;13代表次版本号;0代表修订版本号;30代表修订版本的第几次微调;数字越大表示内核越新. 二.目录…...

基于springboot+vue实现的大学计算机课程管理平台的设计与实现(全套资料)

一、系统架构 前端:vue | antv 后端:springboot | mybatis-plus 环境:jdk17 | mysql | maven | node | redis 二、代码及数据库 三、功能介绍 01. 登录页 02. 首页 03. 系统基础模块-用户管理 04. 系统基础模块-部门…...

LeetCode2115. 从给定原材料中找到所有可以做出的菜

拓扑排序 题面 题目链接:2115. 从给定原材料中找到所有可以做出的菜 - 力扣(LeetCode) 你有 n 道不同菜的信息。给你一个字符串数组 recipes 和一个二维字符串数组 ingredients 。第 i 道菜的名字为 recipes[i] ,如果你有它 所有…...

项目性能优化—性能优化的指标、目标

项目性能优化—性能优化的指标、目标 性能优化的终极目标是什么 性能优化的目标实际上是为了更好的用户体验: 一般我们认为用户体验是下面的公式: 用户体验 产品设计(非技术) 系统性能 ≈ 系统性能 快 那什么样的体验叫快呢…...

蓝桥杯刷题(三)

一、P8752 [蓝桥杯 2021 省 B2] 特殊年份(洛谷) 题目描述 今年是 2021 年,2021 这个数字非常特殊, 它的千位和十位相等, 个位比百位大 1,我们称满足这样条件的年份为特殊年份。 输入 5 个年份,请计算这里面有多少个…...

20240312-算法复习打卡day21||● 530.二叉搜索树的最小绝对差 ● 501.二叉搜索树中的众数 ● 236. 二叉树的最近公共祖先

530.二叉搜索树的最小绝对差 1.中序遍历得到升序数组 class Solution { private:vector<int> vec;void traversal(TreeNode* root) {if (root NULL) return;if (root->left) traversal(root->left);vec.push_back(root->val);if (root->right) traversal(r…...

今天我们来学习一下关于MySQL数据库

目录 前言: 1.MySQL定义&#xff1a; 1.1基础概念&#xff1a; 1.1.1数据库&#xff08;Database&#xff09;&#xff1a; 1.1.2表&#xff08;Table&#xff09;&#xff1a; 1.1.3记录&#xff08;Record&#xff09;与字段&#xff08;Field&#xff09;&#xff1a; …...

长期护理保险可改善老年人心理健康 | CHARLS CLHLS CFPS 公共数据库周报(3.6)...

欢迎报名2024年“真实世界临床研究”课程&#xff01; 本周郑老师开讲&#xff1a;“真实世界临床研究”培训班&#xff0c;3月16-17日两天&#xff0c;欢迎报名&#xff01; CHARLS公共数据库‍ CHARLS数据库简介中国健康与养老追踪调查(China Health and Retirement Longitud…...

49、C++/友元、常成员函数和常对象、运算符重载学习20240314

一、封装类 用其成员函数实现&#xff08;对该类的&#xff09;数学运算符的重载&#xff08;加法&#xff09;&#xff0c;并封装一个全局函数实现&#xff08;对该类的&#xff09;数学运算符的重载&#xff08;减法&#xff09;。 代码&#xff1a; #include <iostream…...

SQL Server错误:15404

执行维护计划失败&#xff0c;提示SQL Server Error 15404 无法获取有关... 异常如下图&#xff1a; 原因&#xff1a;数据库用户名与计算机名称不一致 解决办法&#xff1a;1.重名称数据库用户名 将前缀改成计算机名 2.重启SQL Server代理...

Halcon文件操作

1、Region读写操作 region&#xff08;区域&#xff09;是一种重要的数据类型&#xff0c;用于表示图像中的特定区域。这些区域可以代表图像中的目标、感兴趣的区域、边缘、形状等等 read_image (Image, printer_chip/printer_chip_01) dev_open_window (0, 0, 512, 512, black…...

【测试知识】业务面试问答突击版1

高内聚低耦合 高内聚指的是将相关的功能或数据组织在一起&#xff0c;使得模块内部的各个元素紧密地联系在一起&#xff0c;完成特定的任务。 低耦合指的是模块之间的依赖关系尽可能地降低&#xff0c;模块之间的接口简单清晰&#xff0c;减少模块之间的相互影响。 文章目录 整…...

使用el-row及el-col页面缩放时出现空行解决方案

问题&#xff1a; 当缩放到90%或者110%&#xff0c;选中下拉后&#xff0c;下方就会出现空行 如下图所示&#xff1a; 关于el-row 和 el-col &#xff1a; 参数说明类型可选值默认值span栅格占据的列数number—24offset栅格左侧的间隔格数number—0push栅格向右移动格数number…...

java中几种对象存储(文件存储)中间件的介绍

一、前言 在博主得到系统中使用的对象存储主要有OSS&#xff08;阿里云的对象存储&#xff09; COS&#xff08;腾讯云的对象存储&#xff09;OBS&#xff08;华为云的对象存储&#xff09;还有就是MinIO 这些玩意。其实这种东西大差不差&#xff0c;几乎实现方式都是一样&…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例&#xff0c;模拟20个网页的爬取&#xff0c;每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程&#xff1a;允许程序同时执行多个任务&#xff0c;提高IO密集型任务&#xff08;如网络请求&#xff09;的效率…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

接口自动化测试:HttpRunner基础

相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具&#xff0c;支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议&#xff0c;涵盖接口测试、性能测试、数字体验监测等测试类型…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

tomcat入门

1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效&#xff0c;稳定&#xff0c;易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...