当前位置: 首页 > news >正文

flinksql kafka到mysql累计指标练习

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}

kafka topic :wxt1
在这里插入图片描述

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +"  id INT,\n" +"  name STRING,\n" +"  age INT,\n" +"  proctime as PROCTIME()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = '" + kafkaTopic + "',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = '" + groupId + "',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +"  window_start STRING,\n" +"  window_end STRING,\n" +"  name STRING,\n" +"  age INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'wxt2',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = 'kafka_table2',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +"  window_start String,\n" +"   window_end String,\n" +"    name String,\n" +"    age INT\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +"   'username' = 'root',\n" +"   'password' = '12345678',\n" +"   'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by  window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>

相关文章:

flinksql kafka到mysql累计指标练习

flinksql 累计指标练习 数据流向&#xff1a;kafka ->kafka ->mysql 模拟写数据到kafka topic&#xff1a;wxt中 import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Produ…...

pdf转jpg的方法【ps和工具方法】

pdf转jpg的方法&#xff1a; 1.photoshop办法&#xff1a; pdf直接拖入ps中&#xff0c;另存为*.Jpg文件即可 另外注意的时候&#xff0c;有时候别人给你pdf文件中包含你需要的jpg文件&#xff0c;千万不要截图进入ps中&#xff0c;直接把文件拖入ps中&#xff0c;这样的文件…...

【已解决】Qt发送信号后,槽函数没有响应

Qt发送信号后&#xff0c;槽函数没有响应 检查有没有连接正确的信号和槽函数&#xff0c;有时候&#xff0c;大意了&#xff0c;会写错检查connect函数返回值&#xff0c;有没有连接成功检查对象的创建方式&#xff0c;确保在信号发送前&#xff0c;以及槽函数接收前&#xff…...

Kafka入门05——基础知识

目录 副本数据同步原理 HW和LEO的更新流程 第一种情况 第二种情况 数据丢失的情况 解决方案 Leader副本的选举过程 日志清除策略和压缩策略 日志清除策略 日志压缩策略 Kafka存储手段 零拷贝&#xff08;Zero-Copy&#xff09; 页缓存&#xff08;Page Cache&…...

WordPress(7)配置邮箱发送功能

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、注册登陆163邮箱1. 配置SMTP二、开启smtp1.添加授权码二.在网站中配置smtp服务1.在主题的Boxmoe主题设置中开启邮箱设置三.安装所需要的插件1.安装完毕开启插件即可四.SMTP邮箱服务测试总结…...

简化路径(C++解法)

题目 给你一个字符串 path &#xff0c;表示指向某一文件或目录的 Unix 风格 绝对路径 &#xff08;以 / 开头&#xff09;&#xff0c;请你将其转化为更加简洁的规范路径。 在 Unix 风格的文件系统中&#xff0c;一个点&#xff08;.&#xff09;表示当前目录本身&#xff1…...

CS224W1.1——图机器学习介绍

文章目录 1. 介绍2. 主要问题3. 深度学习如何应用在图结构中4. 课程大纲 学习一下斯坦福CS224W的图机器学习&#xff08;2021年&#xff09;&#xff0c;并做一下学习笔记&#xff0c;主要是研究方向与图神经网络相关。这次是第一次笔记&#xff0c;图片很多都是从斯坦福的PPT里…...

docker搭建waline评论系统

我这里是给博客网站嵌入评论系统的 1.登录LeanCloud 国际版&#xff0c;没有账号可以注册个 链接&#xff1a;点击跳转 2.新建应用&#xff0c;选择开发版&#xff08;免费&#xff09;&#xff0c;商用版每个月最低消费5美刀。 3.在设置-应用凭证里面将AppID、AppKey、Maste…...

sql server 生成连续日期和数字

在sqlserver里&#xff0c;可以利用系统表master..spt_values里面存储的连续数字0到2047&#xff0c;结合dateadd&#xff08;&#xff09;函数生成连续的日期 select convert (varchar(10),dateadd(d, number, getdate()),23) as workday from master..spt_values where type…...

太极v14.0.4 免ROOT用Xposed

一个帮助你免 Root、免解锁免刷机使用 Xposed 模块的 APP 框架。 模块通过它改变系统和应用的行为&#xff0c;既能以传统的 Root/ 刷机方式运作&#xff0c; 也能免 Root/ 免刷机运行&#xff1b;并且它支持 Android 5.0 ~ 11。 简单来说&#xff0c;太极就是个 Xposed 框架…...

python DevOps

在云原生中&#xff0c;python扮演的角色是什么&#xff1f; 在云原生环境中&#xff0c;Python 作为一种高级编程语言&#xff0c;在多个方面扮演着重要角色。云原生是指利用云计算的各种优势&#xff08;如弹性、可扩展性和自动化&#xff09;&#xff0c;构建和运行应用程序…...

Git(四)底层命令:git对象、树对象、提交对象

目录 一、知识回顾1.1 Linux 基础命令1.2 .git 文件夹解析 二、git 对象&#xff08;数据对象&#xff09;2.1 hash-object 存储对象2.2 cat-file 查看对象 三、树对象3.1 ls-files 查看暂存区3.2 update-index 创建暂存区3.3 write-tree 生成树对象3.4 更新暂存区&#xff0c;…...

LVS-DR模式+keepalived+nginx+tomcat实现动静分离、负载均衡、高可用实验

实验条件&#xff1a; test2——20.0.0.20——主服务器——ipvsadm、keepalived服务 test3——20.0.0.30——备服务器——ipvsadm、keepalived服务 nginx5——20.0.0.51——后端真实服务器1&#xff08;tomcat的代理服务器&#xff09;——nginx服务 nginx6——20.0.0.61—…...

canvas 状态管理

本文简介 带尬猴&#xff0c;我是德育处主任 canvas 绘图时会根据当前状态来绘制。很多的 canvas 库都利用到这一特性。比如 p5.js 利用了 canvas 状态特性衍生出 push 和 pop 函数实现状态隔离&#xff08;既然提到了&#xff0c;下一篇就讲这个&#xff09;。 有兴趣了解 p…...

vue中如何给后端过来的数组中每一个对象加一个新的属性和新的对象(不影响后端的原始数据)

方法&#xff1a; 先看后端的原数据 1、给数组中每一个对象加一个新的属性&#xff1a; 输出查看数组list的值&#xff1a; 2、给数组list加入新的对象&#xff1a; 输出结果&#xff1a; 3、总结&#xff1a; 如果是数组中每个对象新增属性就用map遍历每个对象加入新增的属性…...

SpringAOP源码解析之TargetSource(四)

前言 在Spring框架中&#xff0c;TargetSource是一个接口&#xff0c;用于封装获取目标对象&#xff08;也就是被代理的对象&#xff09;的逻辑。它的主要作用是提供代理对象使用的目标对象&#xff0c;并且允许在运行时动态地切换目标对象。TargetSource在Spring的AOP&#x…...

Centos7 安装nvidia显卡驱动

参考一&#xff1a;https://blog.csdn.net/awen19921106/article/details/131331450 参考二&#xff1a;https://www.cnblogs.com/lishanyang/p/17326021.html 报错一&#xff1a; ERROR: Unable to find the kernel source tree for the currently running kernel. Please …...

22 行为型模式-状态模式

1 状态模式介绍 2 状态模式结构 3 状态模式实现 代码示例 //抽象状态接口 public interface State {//声明抽象方法,不同具体状态类可以有不同实现void handle(Context context); }...

Jetpack:018-Jetpack中的导航一

文章目录 1. 概念介绍2. 使用方法2.1 基本概念2.2 传统用法2.3 新的用法 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack库中对话框相关的内容&#xff0c;本章回中主要介绍 导航。闲话休提&#xff0c;让我们一起Talk Android Jetpack吧&#xff01; 1. 概念介绍 我…...

Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等)

Linux常见问题解决操作&#xff08;yum被占用、lsb无此命令、Linux开机进入命令界面等&#xff09; 问题一、新安装的Linux使用命令lsb_release提示无此命令&#xff0c;需先安装再使用 Linux安装lsb命令 lsb是Linux Standard Base的缩写&#xff08;Linux基本标准&#xff…...

一次 Spring 循环依赖源码走读:从三级缓存误用到 Bean 生命周期深度解析

在团队最近一次架构评审会上&#xff0c;关于 Spring 循环依赖的处理方式爆发了一场激烈争论。 “直接用 Lazy 不就行了&#xff1f;” 小李拍着桌子说&#xff0c;“我上个月在订单服务里就这么干的&#xff0c;上线一点问题没有。” “Lazy 只是绕开问题&#xff0c;不是解决…...

解决UE VR开发痛点:VRExpansionPlugin实战指南与架构优化

解决UE VR开发痛点&#xff1a;VRExpansionPlugin实战指南与架构优化 【免费下载链接】VRExpansionPlugin A UE4 VR framework 项目地址: https://gitcode.com/gh_mirrors/vr/VRExpansionPlugin 在UE VR开发中&#xff0c;开发者常面临手部追踪精度不足、交互系统复杂、…...

高效视频下载工具yt-dlp-gui:图形界面让视频提取更简单

高效视频下载工具yt-dlp-gui&#xff1a;图形界面让视频提取更简单 【免费下载链接】yt-dlp-gui Windows GUI for yt-dlp 项目地址: https://gitcode.com/gh_mirrors/yt/yt-dlp-gui 在数字化时代&#xff0c;网络视频已成为信息获取与娱乐的重要方式&#xff0c;但许多平…...

5种多屏显示优化方案:专业用户的DPI精准控制指南

5种多屏显示优化方案&#xff1a;专业用户的DPI精准控制指南 【免费下载链接】SetDPI 项目地址: https://gitcode.com/gh_mirrors/se/SetDPI 场景痛点&#xff1a;跨行业的显示一致性难题 内容创作者的显示困境 视频剪辑师张明在4K主显示器上精心调整的画面比例&…...

OpenClaw镜像体验:Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF云端快速测试方案

OpenClaw镜像体验&#xff1a;Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF云端快速测试方案 1. 为什么选择云端体验OpenClaw 第一次接触OpenClaw时&#xff0c;我被它的自动化能力吸引&#xff0c;但本地安装过程却让我望而却步。作为一个经常需要快速验证技术方案的开…...

百度网盘直链解析开源工具完全指南:从入门到精通

百度网盘直链解析开源工具完全指南&#xff1a;从入门到精通 【免费下载链接】baidu-wangpan-parse 获取百度网盘分享文件的下载地址 项目地址: https://gitcode.com/gh_mirrors/ba/baidu-wangpan-parse 你是否曾经历过这样的困扰&#xff1a;明明网络带宽充足&#xff…...

AWPortrait-Z问题解决:图像模糊、速度慢?常见问题一键搞定

AWPortrait-Z问题解决&#xff1a;图像模糊、速度慢&#xff1f;常见问题一键搞定 1. 快速诊断&#xff1a;你的问题属于哪一类&#xff1f; 在使用AWPortrait-Z生成人像时&#xff0c;最常见的问题可以归纳为三类&#xff1a; 图像质量问题&#xff1a;模糊、失真、细节不足…...

OFA图像语义蕴含Web应用5分钟部署教程:图文匹配AI一键搭建

OFA图像语义蕴含Web应用5分钟部署教程&#xff1a;图文匹配AI一键搭建 1. 项目简介与核心价值 OFA&#xff08;One For All&#xff09;图像语义蕴含模型是阿里巴巴达摩院研发的多模态深度学习系统&#xff0c;能够智能分析图像内容与文本描述之间的逻辑关系。这个Web应用将强…...

LoRA训练助手效果展示:GPT模型微调前后对比

LoRA训练助手效果展示&#xff1a;GPT模型微调前后对比 1. 引言 你是否曾经遇到过这样的情况&#xff1a;用GPT模型生成的内容总是差那么点意思&#xff0c;要么风格不对&#xff0c;要么专业度不够&#xff0c;要么就是不符合你的特定需求&#xff1f;就像让一个通才来处理专…...

OWL ADVENTURE场景实战:打造你的个人创意图片分析助手

OWL ADVENTURE场景实战&#xff1a;打造你的个人创意图片分析助手 你是否经常遇到这样的情况&#xff1a;看到一张有趣的图片&#xff0c;想知道它背后的故事&#xff1f;或者作为设计师&#xff0c;需要快速分析竞品的视觉风格&#xff1f;又或者只是想找个能"看懂"…...