当前位置: 首页 > news >正文

flinksql kafka到mysql累计指标练习

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}

kafka topic :wxt1
在这里插入图片描述

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +"  id INT,\n" +"  name STRING,\n" +"  age INT,\n" +"  proctime as PROCTIME()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = '" + kafkaTopic + "',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = '" + groupId + "',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +"  window_start STRING,\n" +"  window_end STRING,\n" +"  name STRING,\n" +"  age INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'wxt2',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = 'kafka_table2',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +"  window_start String,\n" +"   window_end String,\n" +"    name String,\n" +"    age INT\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +"   'username' = 'root',\n" +"   'password' = '12345678',\n" +"   'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by  window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>

相关文章:

flinksql kafka到mysql累计指标练习

flinksql 累计指标练习 数据流向&#xff1a;kafka ->kafka ->mysql 模拟写数据到kafka topic&#xff1a;wxt中 import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Produ…...

pdf转jpg的方法【ps和工具方法】

pdf转jpg的方法&#xff1a; 1.photoshop办法&#xff1a; pdf直接拖入ps中&#xff0c;另存为*.Jpg文件即可 另外注意的时候&#xff0c;有时候别人给你pdf文件中包含你需要的jpg文件&#xff0c;千万不要截图进入ps中&#xff0c;直接把文件拖入ps中&#xff0c;这样的文件…...

【已解决】Qt发送信号后,槽函数没有响应

Qt发送信号后&#xff0c;槽函数没有响应 检查有没有连接正确的信号和槽函数&#xff0c;有时候&#xff0c;大意了&#xff0c;会写错检查connect函数返回值&#xff0c;有没有连接成功检查对象的创建方式&#xff0c;确保在信号发送前&#xff0c;以及槽函数接收前&#xff…...

Kafka入门05——基础知识

目录 副本数据同步原理 HW和LEO的更新流程 第一种情况 第二种情况 数据丢失的情况 解决方案 Leader副本的选举过程 日志清除策略和压缩策略 日志清除策略 日志压缩策略 Kafka存储手段 零拷贝&#xff08;Zero-Copy&#xff09; 页缓存&#xff08;Page Cache&…...

WordPress(7)配置邮箱发送功能

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、注册登陆163邮箱1. 配置SMTP二、开启smtp1.添加授权码二.在网站中配置smtp服务1.在主题的Boxmoe主题设置中开启邮箱设置三.安装所需要的插件1.安装完毕开启插件即可四.SMTP邮箱服务测试总结…...

简化路径(C++解法)

题目 给你一个字符串 path &#xff0c;表示指向某一文件或目录的 Unix 风格 绝对路径 &#xff08;以 / 开头&#xff09;&#xff0c;请你将其转化为更加简洁的规范路径。 在 Unix 风格的文件系统中&#xff0c;一个点&#xff08;.&#xff09;表示当前目录本身&#xff1…...

CS224W1.1——图机器学习介绍

文章目录 1. 介绍2. 主要问题3. 深度学习如何应用在图结构中4. 课程大纲 学习一下斯坦福CS224W的图机器学习&#xff08;2021年&#xff09;&#xff0c;并做一下学习笔记&#xff0c;主要是研究方向与图神经网络相关。这次是第一次笔记&#xff0c;图片很多都是从斯坦福的PPT里…...

docker搭建waline评论系统

我这里是给博客网站嵌入评论系统的 1.登录LeanCloud 国际版&#xff0c;没有账号可以注册个 链接&#xff1a;点击跳转 2.新建应用&#xff0c;选择开发版&#xff08;免费&#xff09;&#xff0c;商用版每个月最低消费5美刀。 3.在设置-应用凭证里面将AppID、AppKey、Maste…...

sql server 生成连续日期和数字

在sqlserver里&#xff0c;可以利用系统表master..spt_values里面存储的连续数字0到2047&#xff0c;结合dateadd&#xff08;&#xff09;函数生成连续的日期 select convert (varchar(10),dateadd(d, number, getdate()),23) as workday from master..spt_values where type…...

太极v14.0.4 免ROOT用Xposed

一个帮助你免 Root、免解锁免刷机使用 Xposed 模块的 APP 框架。 模块通过它改变系统和应用的行为&#xff0c;既能以传统的 Root/ 刷机方式运作&#xff0c; 也能免 Root/ 免刷机运行&#xff1b;并且它支持 Android 5.0 ~ 11。 简单来说&#xff0c;太极就是个 Xposed 框架…...

python DevOps

在云原生中&#xff0c;python扮演的角色是什么&#xff1f; 在云原生环境中&#xff0c;Python 作为一种高级编程语言&#xff0c;在多个方面扮演着重要角色。云原生是指利用云计算的各种优势&#xff08;如弹性、可扩展性和自动化&#xff09;&#xff0c;构建和运行应用程序…...

Git(四)底层命令:git对象、树对象、提交对象

目录 一、知识回顾1.1 Linux 基础命令1.2 .git 文件夹解析 二、git 对象&#xff08;数据对象&#xff09;2.1 hash-object 存储对象2.2 cat-file 查看对象 三、树对象3.1 ls-files 查看暂存区3.2 update-index 创建暂存区3.3 write-tree 生成树对象3.4 更新暂存区&#xff0c;…...

LVS-DR模式+keepalived+nginx+tomcat实现动静分离、负载均衡、高可用实验

实验条件&#xff1a; test2——20.0.0.20——主服务器——ipvsadm、keepalived服务 test3——20.0.0.30——备服务器——ipvsadm、keepalived服务 nginx5——20.0.0.51——后端真实服务器1&#xff08;tomcat的代理服务器&#xff09;——nginx服务 nginx6——20.0.0.61—…...

canvas 状态管理

本文简介 带尬猴&#xff0c;我是德育处主任 canvas 绘图时会根据当前状态来绘制。很多的 canvas 库都利用到这一特性。比如 p5.js 利用了 canvas 状态特性衍生出 push 和 pop 函数实现状态隔离&#xff08;既然提到了&#xff0c;下一篇就讲这个&#xff09;。 有兴趣了解 p…...

vue中如何给后端过来的数组中每一个对象加一个新的属性和新的对象(不影响后端的原始数据)

方法&#xff1a; 先看后端的原数据 1、给数组中每一个对象加一个新的属性&#xff1a; 输出查看数组list的值&#xff1a; 2、给数组list加入新的对象&#xff1a; 输出结果&#xff1a; 3、总结&#xff1a; 如果是数组中每个对象新增属性就用map遍历每个对象加入新增的属性…...

SpringAOP源码解析之TargetSource(四)

前言 在Spring框架中&#xff0c;TargetSource是一个接口&#xff0c;用于封装获取目标对象&#xff08;也就是被代理的对象&#xff09;的逻辑。它的主要作用是提供代理对象使用的目标对象&#xff0c;并且允许在运行时动态地切换目标对象。TargetSource在Spring的AOP&#x…...

Centos7 安装nvidia显卡驱动

参考一&#xff1a;https://blog.csdn.net/awen19921106/article/details/131331450 参考二&#xff1a;https://www.cnblogs.com/lishanyang/p/17326021.html 报错一&#xff1a; ERROR: Unable to find the kernel source tree for the currently running kernel. Please …...

22 行为型模式-状态模式

1 状态模式介绍 2 状态模式结构 3 状态模式实现 代码示例 //抽象状态接口 public interface State {//声明抽象方法,不同具体状态类可以有不同实现void handle(Context context); }...

Jetpack:018-Jetpack中的导航一

文章目录 1. 概念介绍2. 使用方法2.1 基本概念2.2 传统用法2.3 新的用法 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack库中对话框相关的内容&#xff0c;本章回中主要介绍 导航。闲话休提&#xff0c;让我们一起Talk Android Jetpack吧&#xff01; 1. 概念介绍 我…...

Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等)

Linux常见问题解决操作&#xff08;yum被占用、lsb无此命令、Linux开机进入命令界面等&#xff09; 问题一、新安装的Linux使用命令lsb_release提示无此命令&#xff0c;需先安装再使用 Linux安装lsb命令 lsb是Linux Standard Base的缩写&#xff08;Linux基本标准&#xff…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

FFmpeg:Windows系统小白安装及其使用

一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】&#xff0c;注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录&#xff08;即exe所在文件夹&#xff09;加入系统变量…...

掌握 HTTP 请求:理解 cURL GET 语法

cURL 是一个强大的命令行工具&#xff0c;用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中&#xff0c;cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...