flinksql kafka到mysql累计指标练习
flinksql 累计指标练习
数据流向:kafka ->kafka ->mysql
模拟写数据到kafka topic:wxt中
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}
kafka topic :wxt1

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +" id INT,\n" +" name STRING,\n" +" age INT,\n" +" proctime as PROCTIME()\n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = '" + kafkaTopic + "',\n" +" 'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +" 'properties.group.id' = '" + groupId + "',\n" +" 'format' = 'json',\n" +" 'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +" window_start STRING,\n" +" window_end STRING,\n" +" name STRING,\n" +" age INT\n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'wxt2',\n" +" 'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +" 'properties.group.id' = 'kafka_table2',\n" +" 'format' = 'json',\n" +" 'scan.startup.mode' = 'latest-offset',\n" +" 'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +" window_start String,\n" +" window_end String,\n" +" name String,\n" +" age INT\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +" 'username' = 'root',\n" +" 'password' = '12345678',\n" +" 'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}
kafka topic :wxt2

mysql结果数据:

pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>
相关文章:
flinksql kafka到mysql累计指标练习
flinksql 累计指标练习 数据流向:kafka ->kafka ->mysql 模拟写数据到kafka topic:wxt中 import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Produ…...
pdf转jpg的方法【ps和工具方法】
pdf转jpg的方法: 1.photoshop办法: pdf直接拖入ps中,另存为*.Jpg文件即可 另外注意的时候,有时候别人给你pdf文件中包含你需要的jpg文件,千万不要截图进入ps中,直接把文件拖入ps中,这样的文件…...
【已解决】Qt发送信号后,槽函数没有响应
Qt发送信号后,槽函数没有响应 检查有没有连接正确的信号和槽函数,有时候,大意了,会写错检查connect函数返回值,有没有连接成功检查对象的创建方式,确保在信号发送前,以及槽函数接收前ÿ…...
Kafka入门05——基础知识
目录 副本数据同步原理 HW和LEO的更新流程 第一种情况 第二种情况 数据丢失的情况 解决方案 Leader副本的选举过程 日志清除策略和压缩策略 日志清除策略 日志压缩策略 Kafka存储手段 零拷贝(Zero-Copy) 页缓存(Page Cache&…...
WordPress(7)配置邮箱发送功能
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、注册登陆163邮箱1. 配置SMTP二、开启smtp1.添加授权码二.在网站中配置smtp服务1.在主题的Boxmoe主题设置中开启邮箱设置三.安装所需要的插件1.安装完毕开启插件即可四.SMTP邮箱服务测试总结…...
简化路径(C++解法)
题目 给你一个字符串 path ,表示指向某一文件或目录的 Unix 风格 绝对路径 (以 / 开头),请你将其转化为更加简洁的规范路径。 在 Unix 风格的文件系统中,一个点(.)表示当前目录本身࿱…...
CS224W1.1——图机器学习介绍
文章目录 1. 介绍2. 主要问题3. 深度学习如何应用在图结构中4. 课程大纲 学习一下斯坦福CS224W的图机器学习(2021年),并做一下学习笔记,主要是研究方向与图神经网络相关。这次是第一次笔记,图片很多都是从斯坦福的PPT里…...
docker搭建waline评论系统
我这里是给博客网站嵌入评论系统的 1.登录LeanCloud 国际版,没有账号可以注册个 链接:点击跳转 2.新建应用,选择开发版(免费),商用版每个月最低消费5美刀。 3.在设置-应用凭证里面将AppID、AppKey、Maste…...
sql server 生成连续日期和数字
在sqlserver里,可以利用系统表master..spt_values里面存储的连续数字0到2047,结合dateadd()函数生成连续的日期 select convert (varchar(10),dateadd(d, number, getdate()),23) as workday from master..spt_values where type…...
太极v14.0.4 免ROOT用Xposed
一个帮助你免 Root、免解锁免刷机使用 Xposed 模块的 APP 框架。 模块通过它改变系统和应用的行为,既能以传统的 Root/ 刷机方式运作, 也能免 Root/ 免刷机运行;并且它支持 Android 5.0 ~ 11。 简单来说,太极就是个 Xposed 框架…...
python DevOps
在云原生中,python扮演的角色是什么? 在云原生环境中,Python 作为一种高级编程语言,在多个方面扮演着重要角色。云原生是指利用云计算的各种优势(如弹性、可扩展性和自动化),构建和运行应用程序…...
Git(四)底层命令:git对象、树对象、提交对象
目录 一、知识回顾1.1 Linux 基础命令1.2 .git 文件夹解析 二、git 对象(数据对象)2.1 hash-object 存储对象2.2 cat-file 查看对象 三、树对象3.1 ls-files 查看暂存区3.2 update-index 创建暂存区3.3 write-tree 生成树对象3.4 更新暂存区,…...
LVS-DR模式+keepalived+nginx+tomcat实现动静分离、负载均衡、高可用实验
实验条件: test2——20.0.0.20——主服务器——ipvsadm、keepalived服务 test3——20.0.0.30——备服务器——ipvsadm、keepalived服务 nginx5——20.0.0.51——后端真实服务器1(tomcat的代理服务器)——nginx服务 nginx6——20.0.0.61—…...
canvas 状态管理
本文简介 带尬猴,我是德育处主任 canvas 绘图时会根据当前状态来绘制。很多的 canvas 库都利用到这一特性。比如 p5.js 利用了 canvas 状态特性衍生出 push 和 pop 函数实现状态隔离(既然提到了,下一篇就讲这个)。 有兴趣了解 p…...
vue中如何给后端过来的数组中每一个对象加一个新的属性和新的对象(不影响后端的原始数据)
方法: 先看后端的原数据 1、给数组中每一个对象加一个新的属性: 输出查看数组list的值: 2、给数组list加入新的对象: 输出结果: 3、总结: 如果是数组中每个对象新增属性就用map遍历每个对象加入新增的属性…...
SpringAOP源码解析之TargetSource(四)
前言 在Spring框架中,TargetSource是一个接口,用于封装获取目标对象(也就是被代理的对象)的逻辑。它的主要作用是提供代理对象使用的目标对象,并且允许在运行时动态地切换目标对象。TargetSource在Spring的AOP&#x…...
Centos7 安装nvidia显卡驱动
参考一:https://blog.csdn.net/awen19921106/article/details/131331450 参考二:https://www.cnblogs.com/lishanyang/p/17326021.html 报错一: ERROR: Unable to find the kernel source tree for the currently running kernel. Please …...
22 行为型模式-状态模式
1 状态模式介绍 2 状态模式结构 3 状态模式实现 代码示例 //抽象状态接口 public interface State {//声明抽象方法,不同具体状态类可以有不同实现void handle(Context context); }...
Jetpack:018-Jetpack中的导航一
文章目录 1. 概念介绍2. 使用方法2.1 基本概念2.2 传统用法2.3 新的用法 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack库中对话框相关的内容,本章回中主要介绍 导航。闲话休提,让我们一起Talk Android Jetpack吧! 1. 概念介绍 我…...
Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等)
Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等) 问题一、新安装的Linux使用命令lsb_release提示无此命令,需先安装再使用 Linux安装lsb命令 lsb是Linux Standard Base的缩写(Linux基本标准ÿ…...
一次 Spring 循环依赖源码走读:从三级缓存误用到 Bean 生命周期深度解析
在团队最近一次架构评审会上,关于 Spring 循环依赖的处理方式爆发了一场激烈争论。 “直接用 Lazy 不就行了?” 小李拍着桌子说,“我上个月在订单服务里就这么干的,上线一点问题没有。” “Lazy 只是绕开问题,不是解决…...
解决UE VR开发痛点:VRExpansionPlugin实战指南与架构优化
解决UE VR开发痛点:VRExpansionPlugin实战指南与架构优化 【免费下载链接】VRExpansionPlugin A UE4 VR framework 项目地址: https://gitcode.com/gh_mirrors/vr/VRExpansionPlugin 在UE VR开发中,开发者常面临手部追踪精度不足、交互系统复杂、…...
高效视频下载工具yt-dlp-gui:图形界面让视频提取更简单
高效视频下载工具yt-dlp-gui:图形界面让视频提取更简单 【免费下载链接】yt-dlp-gui Windows GUI for yt-dlp 项目地址: https://gitcode.com/gh_mirrors/yt/yt-dlp-gui 在数字化时代,网络视频已成为信息获取与娱乐的重要方式,但许多平…...
5种多屏显示优化方案:专业用户的DPI精准控制指南
5种多屏显示优化方案:专业用户的DPI精准控制指南 【免费下载链接】SetDPI 项目地址: https://gitcode.com/gh_mirrors/se/SetDPI 场景痛点:跨行业的显示一致性难题 内容创作者的显示困境 视频剪辑师张明在4K主显示器上精心调整的画面比例&…...
OpenClaw镜像体验:Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF云端快速测试方案
OpenClaw镜像体验:Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF云端快速测试方案 1. 为什么选择云端体验OpenClaw 第一次接触OpenClaw时,我被它的自动化能力吸引,但本地安装过程却让我望而却步。作为一个经常需要快速验证技术方案的开…...
百度网盘直链解析开源工具完全指南:从入门到精通
百度网盘直链解析开源工具完全指南:从入门到精通 【免费下载链接】baidu-wangpan-parse 获取百度网盘分享文件的下载地址 项目地址: https://gitcode.com/gh_mirrors/ba/baidu-wangpan-parse 你是否曾经历过这样的困扰:明明网络带宽充足ÿ…...
AWPortrait-Z问题解决:图像模糊、速度慢?常见问题一键搞定
AWPortrait-Z问题解决:图像模糊、速度慢?常见问题一键搞定 1. 快速诊断:你的问题属于哪一类? 在使用AWPortrait-Z生成人像时,最常见的问题可以归纳为三类: 图像质量问题:模糊、失真、细节不足…...
OFA图像语义蕴含Web应用5分钟部署教程:图文匹配AI一键搭建
OFA图像语义蕴含Web应用5分钟部署教程:图文匹配AI一键搭建 1. 项目简介与核心价值 OFA(One For All)图像语义蕴含模型是阿里巴巴达摩院研发的多模态深度学习系统,能够智能分析图像内容与文本描述之间的逻辑关系。这个Web应用将强…...
LoRA训练助手效果展示:GPT模型微调前后对比
LoRA训练助手效果展示:GPT模型微调前后对比 1. 引言 你是否曾经遇到过这样的情况:用GPT模型生成的内容总是差那么点意思,要么风格不对,要么专业度不够,要么就是不符合你的特定需求?就像让一个通才来处理专…...
OWL ADVENTURE场景实战:打造你的个人创意图片分析助手
OWL ADVENTURE场景实战:打造你的个人创意图片分析助手 你是否经常遇到这样的情况:看到一张有趣的图片,想知道它背后的故事?或者作为设计师,需要快速分析竞品的视觉风格?又或者只是想找个能"看懂"…...
