flinksql kafka到mysql累计指标练习
flinksql 累计指标练习
数据流向:kafka ->kafka ->mysql
模拟写数据到kafka topic:wxt中
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}
kafka topic :wxt1

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +" id INT,\n" +" name STRING,\n" +" age INT,\n" +" proctime as PROCTIME()\n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = '" + kafkaTopic + "',\n" +" 'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +" 'properties.group.id' = '" + groupId + "',\n" +" 'format' = 'json',\n" +" 'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +" window_start STRING,\n" +" window_end STRING,\n" +" name STRING,\n" +" age INT\n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'wxt2',\n" +" 'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +" 'properties.group.id' = 'kafka_table2',\n" +" 'format' = 'json',\n" +" 'scan.startup.mode' = 'latest-offset',\n" +" 'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +" window_start String,\n" +" window_end String,\n" +" name String,\n" +" age INT\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +" 'username' = 'root',\n" +" 'password' = '12345678',\n" +" 'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}
kafka topic :wxt2

mysql结果数据:

pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>
相关文章:
flinksql kafka到mysql累计指标练习
flinksql 累计指标练习 数据流向:kafka ->kafka ->mysql 模拟写数据到kafka topic:wxt中 import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Produ…...
pdf转jpg的方法【ps和工具方法】
pdf转jpg的方法: 1.photoshop办法: pdf直接拖入ps中,另存为*.Jpg文件即可 另外注意的时候,有时候别人给你pdf文件中包含你需要的jpg文件,千万不要截图进入ps中,直接把文件拖入ps中,这样的文件…...
【已解决】Qt发送信号后,槽函数没有响应
Qt发送信号后,槽函数没有响应 检查有没有连接正确的信号和槽函数,有时候,大意了,会写错检查connect函数返回值,有没有连接成功检查对象的创建方式,确保在信号发送前,以及槽函数接收前ÿ…...
Kafka入门05——基础知识
目录 副本数据同步原理 HW和LEO的更新流程 第一种情况 第二种情况 数据丢失的情况 解决方案 Leader副本的选举过程 日志清除策略和压缩策略 日志清除策略 日志压缩策略 Kafka存储手段 零拷贝(Zero-Copy) 页缓存(Page Cache&…...
WordPress(7)配置邮箱发送功能
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、注册登陆163邮箱1. 配置SMTP二、开启smtp1.添加授权码二.在网站中配置smtp服务1.在主题的Boxmoe主题设置中开启邮箱设置三.安装所需要的插件1.安装完毕开启插件即可四.SMTP邮箱服务测试总结…...
简化路径(C++解法)
题目 给你一个字符串 path ,表示指向某一文件或目录的 Unix 风格 绝对路径 (以 / 开头),请你将其转化为更加简洁的规范路径。 在 Unix 风格的文件系统中,一个点(.)表示当前目录本身࿱…...
CS224W1.1——图机器学习介绍
文章目录 1. 介绍2. 主要问题3. 深度学习如何应用在图结构中4. 课程大纲 学习一下斯坦福CS224W的图机器学习(2021年),并做一下学习笔记,主要是研究方向与图神经网络相关。这次是第一次笔记,图片很多都是从斯坦福的PPT里…...
docker搭建waline评论系统
我这里是给博客网站嵌入评论系统的 1.登录LeanCloud 国际版,没有账号可以注册个 链接:点击跳转 2.新建应用,选择开发版(免费),商用版每个月最低消费5美刀。 3.在设置-应用凭证里面将AppID、AppKey、Maste…...
sql server 生成连续日期和数字
在sqlserver里,可以利用系统表master..spt_values里面存储的连续数字0到2047,结合dateadd()函数生成连续的日期 select convert (varchar(10),dateadd(d, number, getdate()),23) as workday from master..spt_values where type…...
太极v14.0.4 免ROOT用Xposed
一个帮助你免 Root、免解锁免刷机使用 Xposed 模块的 APP 框架。 模块通过它改变系统和应用的行为,既能以传统的 Root/ 刷机方式运作, 也能免 Root/ 免刷机运行;并且它支持 Android 5.0 ~ 11。 简单来说,太极就是个 Xposed 框架…...
python DevOps
在云原生中,python扮演的角色是什么? 在云原生环境中,Python 作为一种高级编程语言,在多个方面扮演着重要角色。云原生是指利用云计算的各种优势(如弹性、可扩展性和自动化),构建和运行应用程序…...
Git(四)底层命令:git对象、树对象、提交对象
目录 一、知识回顾1.1 Linux 基础命令1.2 .git 文件夹解析 二、git 对象(数据对象)2.1 hash-object 存储对象2.2 cat-file 查看对象 三、树对象3.1 ls-files 查看暂存区3.2 update-index 创建暂存区3.3 write-tree 生成树对象3.4 更新暂存区,…...
LVS-DR模式+keepalived+nginx+tomcat实现动静分离、负载均衡、高可用实验
实验条件: test2——20.0.0.20——主服务器——ipvsadm、keepalived服务 test3——20.0.0.30——备服务器——ipvsadm、keepalived服务 nginx5——20.0.0.51——后端真实服务器1(tomcat的代理服务器)——nginx服务 nginx6——20.0.0.61—…...
canvas 状态管理
本文简介 带尬猴,我是德育处主任 canvas 绘图时会根据当前状态来绘制。很多的 canvas 库都利用到这一特性。比如 p5.js 利用了 canvas 状态特性衍生出 push 和 pop 函数实现状态隔离(既然提到了,下一篇就讲这个)。 有兴趣了解 p…...
vue中如何给后端过来的数组中每一个对象加一个新的属性和新的对象(不影响后端的原始数据)
方法: 先看后端的原数据 1、给数组中每一个对象加一个新的属性: 输出查看数组list的值: 2、给数组list加入新的对象: 输出结果: 3、总结: 如果是数组中每个对象新增属性就用map遍历每个对象加入新增的属性…...
SpringAOP源码解析之TargetSource(四)
前言 在Spring框架中,TargetSource是一个接口,用于封装获取目标对象(也就是被代理的对象)的逻辑。它的主要作用是提供代理对象使用的目标对象,并且允许在运行时动态地切换目标对象。TargetSource在Spring的AOP&#x…...
Centos7 安装nvidia显卡驱动
参考一:https://blog.csdn.net/awen19921106/article/details/131331450 参考二:https://www.cnblogs.com/lishanyang/p/17326021.html 报错一: ERROR: Unable to find the kernel source tree for the currently running kernel. Please …...
22 行为型模式-状态模式
1 状态模式介绍 2 状态模式结构 3 状态模式实现 代码示例 //抽象状态接口 public interface State {//声明抽象方法,不同具体状态类可以有不同实现void handle(Context context); }...
Jetpack:018-Jetpack中的导航一
文章目录 1. 概念介绍2. 使用方法2.1 基本概念2.2 传统用法2.3 新的用法 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack库中对话框相关的内容,本章回中主要介绍 导航。闲话休提,让我们一起Talk Android Jetpack吧! 1. 概念介绍 我…...
Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等)
Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等) 问题一、新安装的Linux使用命令lsb_release提示无此命令,需先安装再使用 Linux安装lsb命令 lsb是Linux Standard Base的缩写(Linux基本标准ÿ…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
关于uniapp展示PDF的解决方案
在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项: 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库: npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...
LOOI机器人的技术实现解析:从手势识别到边缘检测
LOOI机器人作为一款创新的AI硬件产品,通过将智能手机转变为具有情感交互能力的桌面机器人,展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家,我将全面解析LOOI的技术实现架构,特别是其手势识别、物体识别和环境…...
Linux操作系统共享Windows操作系统的文件
目录 一、共享文件 二、挂载 一、共享文件 点击虚拟机选项-设置 点击选项,设置文件夹共享为总是启用,点击添加,可添加需要共享的文件夹 查询是否共享成功 ls /mnt/hgfs 如果显示Download(这是我共享的文件夹)&…...
EEG-fNIRS联合成像在跨频率耦合研究中的创新应用
摘要 神经影像技术对医学科学产生了深远的影响,推动了许多神经系统疾病研究的进展并改善了其诊断方法。在此背景下,基于神经血管耦合现象的多模态神经影像方法,通过融合各自优势来提供有关大脑皮层神经活动的互补信息。在这里,本研…...
