flink:通过table api把文件中读取的数据写入MySQL
当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作
package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH = "info.txt";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field("userId", DataTypes.VARCHAR(50)).field("ts", DataTypes.INT()).field("val", DataTypes.DOUBLE())).createTemporaryTable("input");Table dataTable = tableEnv.from("input");Table aggregateTable = dataTable.groupBy("userId").select("userId, userId.count as total, val.avg as avgVal");String sql="create table jdbcOutputTable (" +" userId varchar(50) not null,total bigint,avgVal double " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +" 'connector.table' = 'demo', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = 123456' )";tableEnv.sqlUpdate(sql);aggregateTable.insertInto("jdbcOutputTable");tableEnv.execute("my job");}
}
文件info.txt
user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9
相关文章:
flink:通过table api把文件中读取的数据写入MySQL
当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionE…...
【Java 多线程 哈希表】 HashTable, HashMap, ConcurrentHashMap 之间的区别
HashTable、HashMap和ConcurrentHashMap都是Java中用于存储键值对的集合框架的一部分,但它们之间存在一些重要的联系和区别。 联系 键值对存储:它们都用于存储键值对,并允许你根据键来检索值。基于哈希:它们内部都使用了哈希表来…...
有趣之matlab-烟花
待整合1 2 3 动态 有趣编程之11 静态 逼真 3 .m文件路径下放back1.jpg back4.jpg…背景照片 点击screen 就会有小白点升起,爆炸 function yanhuamoban()clear all;%定义全局变量global ah ;%坐标轴句柄global styleNum ;%爆炸图案样式global multiColor; %多颜色变换…...

C语言指针与数组(不适合初学者版):一篇文章带你深入了解指针与数组!
🎈个人主页:JAMES别扣了 💕在校大学生一枚。对IT有着极其浓厚的兴趣 ✨系列专栏目前为C语言初阶、后续会更新c语言的学习方法以及c题目分享. 😍希望我的文章对大家有着不一样的帮助,欢迎大家关注我,我也会回…...
springboot Mongo大数据查询优化方案
前言 因为项目需要把传感器的数据保存起来,当时设计的时是mongo来存储,后期需要从mongo DB里查询传感器的数据记录。由于传感器每秒都会像mongo数据库存500条左右的数据,1天就有4320万条数据,要想按照时间条件去查询,…...

Ollama管理本地开源大模型,用Open WebUI访问Ollama接口
现在开源大模型一个接一个的,而且各个都说自己的性能非常厉害,但是对于我们这些使用者,用起来就比较尴尬了。因为一个模型一个调用的方式,先得下载模型,下完模型,写加载代码,麻烦得很。 对于程…...

Linux--基本知识入门
一.几个基本知识 终端: CtrlAltT 或者桌面/文件夹右键,打开终端切换为管理员: sudo su 退出:exit查看内核版本号: uname -a内核版本号含义: 5 代表主版本号;13代表次版本号;0代表修订版本号;30代表修订版本的第几次微调;数字越大表示内核越新. 二.目录…...

基于springboot+vue实现的大学计算机课程管理平台的设计与实现(全套资料)
一、系统架构 前端:vue | antv 后端:springboot | mybatis-plus 环境:jdk17 | mysql | maven | node | redis 二、代码及数据库 三、功能介绍 01. 登录页 02. 首页 03. 系统基础模块-用户管理 04. 系统基础模块-部门…...

LeetCode2115. 从给定原材料中找到所有可以做出的菜
拓扑排序 题面 题目链接:2115. 从给定原材料中找到所有可以做出的菜 - 力扣(LeetCode) 你有 n 道不同菜的信息。给你一个字符串数组 recipes 和一个二维字符串数组 ingredients 。第 i 道菜的名字为 recipes[i] ,如果你有它 所有…...

项目性能优化—性能优化的指标、目标
项目性能优化—性能优化的指标、目标 性能优化的终极目标是什么 性能优化的目标实际上是为了更好的用户体验: 一般我们认为用户体验是下面的公式: 用户体验 产品设计(非技术) 系统性能 ≈ 系统性能 快 那什么样的体验叫快呢…...
蓝桥杯刷题(三)
一、P8752 [蓝桥杯 2021 省 B2] 特殊年份(洛谷) 题目描述 今年是 2021 年,2021 这个数字非常特殊, 它的千位和十位相等, 个位比百位大 1,我们称满足这样条件的年份为特殊年份。 输入 5 个年份,请计算这里面有多少个…...
20240312-算法复习打卡day21||● 530.二叉搜索树的最小绝对差 ● 501.二叉搜索树中的众数 ● 236. 二叉树的最近公共祖先
530.二叉搜索树的最小绝对差 1.中序遍历得到升序数组 class Solution { private:vector<int> vec;void traversal(TreeNode* root) {if (root NULL) return;if (root->left) traversal(root->left);vec.push_back(root->val);if (root->right) traversal(r…...

今天我们来学习一下关于MySQL数据库
目录 前言: 1.MySQL定义: 1.1基础概念: 1.1.1数据库(Database): 1.1.2表(Table): 1.1.3记录(Record)与字段(Field): …...

长期护理保险可改善老年人心理健康 | CHARLS CLHLS CFPS 公共数据库周报(3.6)...
欢迎报名2024年“真实世界临床研究”课程! 本周郑老师开讲:“真实世界临床研究”培训班,3月16-17日两天,欢迎报名! CHARLS公共数据库 CHARLS数据库简介中国健康与养老追踪调查(China Health and Retirement Longitud…...

49、C++/友元、常成员函数和常对象、运算符重载学习20240314
一、封装类 用其成员函数实现(对该类的)数学运算符的重载(加法),并封装一个全局函数实现(对该类的)数学运算符的重载(减法)。 代码: #include <iostream…...

SQL Server错误:15404
执行维护计划失败,提示SQL Server Error 15404 无法获取有关... 异常如下图: 原因:数据库用户名与计算机名称不一致 解决办法:1.重名称数据库用户名 将前缀改成计算机名 2.重启SQL Server代理...

Halcon文件操作
1、Region读写操作 region(区域)是一种重要的数据类型,用于表示图像中的特定区域。这些区域可以代表图像中的目标、感兴趣的区域、边缘、形状等等 read_image (Image, printer_chip/printer_chip_01) dev_open_window (0, 0, 512, 512, black…...
【测试知识】业务面试问答突击版1
高内聚低耦合 高内聚指的是将相关的功能或数据组织在一起,使得模块内部的各个元素紧密地联系在一起,完成特定的任务。 低耦合指的是模块之间的依赖关系尽可能地降低,模块之间的接口简单清晰,减少模块之间的相互影响。 文章目录 整…...

使用el-row及el-col页面缩放时出现空行解决方案
问题: 当缩放到90%或者110%,选中下拉后,下方就会出现空行 如下图所示: 关于el-row 和 el-col : 参数说明类型可选值默认值span栅格占据的列数number—24offset栅格左侧的间隔格数number—0push栅格向右移动格数number…...

java中几种对象存储(文件存储)中间件的介绍
一、前言 在博主得到系统中使用的对象存储主要有OSS(阿里云的对象存储) COS(腾讯云的对象存储)OBS(华为云的对象存储)还有就是MinIO 这些玩意。其实这种东西大差不差,几乎实现方式都是一样&…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...

短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...

基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...

android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...