Flink SQL kafka连接器
版本说明
Flink和kafka的版本号有一定的匹配关系,操作成功的版本:
- Flink1.17.1
- kafka_2.12-3.3.1
添加kafka连接器依赖
将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下
下载flink-sql-connector-kafka连接器jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下
[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
分发flink-connector-kafka-1.17.1.jar
xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
启动yarn-session
[hadoop@node2 ~]$ myhadoop.sh start [hadoop@node2 ~]$ yarn-session.sh -d
启动kafka集群
[hadoop@node2 ~]$ zk.sh start [hadoop@node2 ~]$ kf.sh start
创建kafka主题
查看主题 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list 如果没有ws1,则创建 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
普通Kafka表
'connector' = 'kafka'
进入Flink SQL客户端
[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志输出 ... Flink SQL>
创建Kafka的映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);
可以往kafka读数据,也可以往kafka写数据。
插入数据到Kafka表
如果没有source表,先创建source表,如果source表存在则不需要再创建。
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
把source表插入t1表
insert into t1(id,ts,vc) select * from source;
如果报错
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)
cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
查看是否复制成功
$ ls $FLINK_HOME/lib
重启sql-client重新操作,成功如下:
Flink SQL> CREATE TABLE t1(
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
> `partition` BIGINT METADATA VIRTUAL,
> `offset` BIGINT METADATA VIRTUAL,
> id int,
> ts bigint ,
> vc int )
> WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
> 'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
> 'scan.startup.mode' = 'earliest-offset',
> -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
> 'topic' = 'ws1',
> 'format' = 'json'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE source (
> id INT,
> ts BIGINT,
> vc INT
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='1',
> 'fields.id.kind'='random',
> 'fields.id.min'='1',
> 'fields.id.max'='10',
> 'fields.ts.kind'='sequence',
> 'fields.ts.start'='1',
> 'fields.ts.end'='1000000',
> 'fields.vc.kind'='random',
> 'fields.vc.min'='1',
> 'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
查询Kafka表
select * from t1;
报错
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
重启yarn session,重新操作,成功如下:
Flink SQL> CREATE TABLE t1(
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
> `partition` BIGINT METADATA VIRTUAL,
> `offset` BIGINT METADATA VIRTUAL,
> id int,
> ts bigint ,
> vc int )
> WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
> 'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
> 'scan.startup.mode' = 'earliest-offset',
> -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
> 'topic' = 'ws1',
> 'format' = 'json'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE source (
> id INT,
> ts BIGINT,
> vc INT
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='1',
> 'fields.id.kind'='random',
> 'fields.id.min'='1',
> 'fields.id.max'='10',
> 'fields.ts.kind'='sequence',
> 'fields.ts.start'='1',
> 'fields.ts.end'='1000000',
> 'fields.vc.kind'='random',
> 'fields.vc.min'='1',
> 'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
Flink SQL>

upsert-kafka表
'connector' = 'upsert-kafka'
如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
如果没有kafka名为ws2的topic,将自动被创建。
插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id;
查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。
设置显示模式
SET sql-client.execution.result-mode=tableau;
查询t2表数据
select * from t2;
如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。
进入Flink Web UI,cancel掉所有running job,重新操作成功如下:
删除表
Flink SQL> show tables; +------------+ | table name | +------------+ | source | | t1 | | t2 | +------------+ 3 rows in set Flink SQL> drop table source; Flink SQL> drop table t1; Flink SQL> drop table t2;
创建表
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
设置显示模式
SET sql-client.execution.result-mode=tableau;
查询表
select * from t2;

完成!enjoy it!
相关文章:
Flink SQL kafka连接器
版本说明 Flink和kafka的版本号有一定的匹配关系,操作成功的版本: Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…...
glm-4 联网搜索 api 测试
今天测试了一下 glm-4 的联网搜索 web_search tool 调用,发现了 web_search 的网页检索返回结果中几个比较诡异的事情,特此记录: 有些检索结果没有 icon、link、media 字段,但从内容上看确实是联网搜索出来的结果,不知…...
Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序
Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序 SSM 图书管理系统小程序 功能介绍 用户 登录 注册 首页 图片轮播 图书信息推荐 图书详情 赞 踩 评论 收藏 系统公告 公告详情 用户信息修改 我的待还 图书归还 催还提醒 我的收藏管理 意见反馈 管理员 登录 个人中心…...
bert训练的一些技巧(rand() < self.skipgram_prb)
rand() < self.skip_gram_prb) 是一个条件表达式,用来判断是否进行skip-gram掩码操作。这种掩码操作通常用于自然语言处理中的数据增强,通过概率决定是否应用skip-gram掩码。下面是对这个表达式的详细解释: 解释 rand(): rand() 是一个随…...
pandas修改时间索引报错处理
import pandas as pd import numpy as np import osdfpd.DataFrame(index[a,b,c],data{序列:[1,2,3]}) df.rename(index{a:a1},inplaceTrue) print(df) print(df.index.dtype)df1pd.DataFrame(index[2024-01-01,2024-01-02,2024-01-03],data{序列:[1,2,3]}) df1.rename(index{2…...
Nginx Bla~Bla~
root 和 alias指令都用于指定服务器上的文件系统路径,但它们在用法和行为上有一些不同 root指令通常用于在Nginx配置中定义一个目录,该目录将作为请求的根目录。 server { location /static/ {root /var/www; 请求 /static/index.html 将映射到 /v…...
java awt和swing介绍
Java AWT(Abstract Window Toolkit)和 Swing 是用于创建图形用户界面(GUI)的 Java API。 AWT AWT 是 Java 最初的平台依赖的窗口图形界面工具包,它提供了一组基本的 GUI 组件、窗口管理、事件处理等。AWT 组件是重量…...
奇怪的错误记录
https://github.com/meta-llama/llama3/issues/80 读模型没问题,推理时出现: RuntimeError: “triu_tril_cuda_template” not implemented for ‘BFloat16’ ———————————————— 事发原因 我尝试了解transformers的AutoProcessor时&a…...
来啦,经典传说大变身牛郎织女后代逗趣日常
《落凡尘:星宿大冒险》来啦! 经典传说大变身,牛郎织女后代金风, 上演一出“星际小侦探”的逗趣日常! 想象一下,二十八星宿那些傲娇的星星们, 居然能“离家出走”,还差点把天给掀了…...
【uniapp-ios】App端与webview端相互通信的方法以及注意事项
前言 在开发中,使用uniapp开发的项目开发效率是极高的,使用一套代码就能够同时在多端上线,像笔者之前写过的使用Flutter端和webview端之间的相互通信方法和问题,这种方式本质上实际上是h5和h5之间的通信,网上有非常多…...
Qt常用基础控件总结—表格控件(QTableWidget类)
表格控件QTableWidget 表格控件最上面一排是只读的水平表头,最左边一列是只读的垂直表头。表头又可以细分为多个分段(section),水平表头的分段就是表格各个列的列首,垂直表头 分段就是表格各个行的行首。表格控件的实体区域是按行、列排布的单元格,单元格内容一般用 QTa…...
笔记:Entity Framework Core 数据库迁移add-migration
一、目的: 数据库迁移是一种管理数据库架构变化的技术,它允许开发者在应用程序的生命周期中安全地更新数据库架构,而不会丢失数据或破坏现有的数据库结构。在Entity Framework Core(EF Core)中,数据库迁移特…...
准备工作+1、请求和响应+2、模型和管理站点
Django快速入门——创建一个基本的投票应用程序 准备工作1、创建虚拟环境2、安装django 1、请求和响应(1)创建项目(2)用于开发的简易服务器(3)创建投票应用(4)编写第一个视图1、编写…...
js 格式化时间
方法一:使用toLocaleString或toLocaleDateString/toLocaleTimeString Date对象提供了toLocaleString()、toLocaleDateString()和toLocaleTimeString()方法,这些方法允许你根据本地时间格式来显示日期和时间。虽然它们不直接提供高度自定义的格式选项&am…...
python 缩放照片
pip install Pillow from PIL import Image 打开一个图片文件 img Image.open(r"C:\Users\Administrator\Desktop\我的证件\证件照.jpg") 设定新的尺寸 new_size (480, 640) 缩放图片 resized_img img.resize(new_size) 显示缩放后的图片 resized_img.sh…...
【C语言】指针(1):入门理解(课堂随笔)
目录 一、内存和地址 二、指针变量和地址 三、指针变量类型的意义 一、内存和地址 只要讲指针就离不开内存 因为指针就是访问内存的 计算上CPU(中央处理器)在处理数据的时候,需要的数据是在内存中读取的,处理后的数 据也会放…...
LLMs可以进行任务规划吗?如果不行,LLMs+GNN可以吗?
深度图学习与大模型LLM(小编): 大家好,今天向大家介绍一篇最新发布的研究论文(20240530)。这篇论文探讨了如何通过引入GNN来提高大模型在任务规划(task planning)中的性能。*论文分析了LLMs在任务规划上的局限性,并提出了一种简单而有效的解决方案。* 1.…...
性价比高充电宝有哪些?充电宝十大最佳品牌大盘点!
在如今这个高度数字化的时代,我们的生活离不开各种电子设备,而充电宝作为保障电子设备续航的重要工具,其地位日益凸显。然而,面对市场上琳琅满目的充电宝品牌和产品,要挑选到一款性价比高的充电宝并非易事。在这篇盘点…...
hnust 1963: 邻接矩阵表示法
hnust 1963: 邻接矩阵表示法 题目描述 输入一个图,用邻接矩阵存储,并实现一些操作。 拷贝下面的代码,按要求完成其中的FirstAdjVex,NextAdjVex和CreateUDG操作,其他地方不得改动。 //邻接矩阵表示图 #include <io…...
Hadoop-15-Hive 元数据管理与存储 Metadata 内嵌模式 本地模式 远程模式 集群规划配置 启动服务 3节点云服务器实测
章节内容 上一节我们完成了: Hive中数据导出:HDFSHQL操作上传内容至Hive、增删改查等操作 背景介绍 这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...
