Flink SQL kafka连接器
版本说明
Flink和kafka的版本号有一定的匹配关系,操作成功的版本:
- Flink1.17.1
- kafka_2.12-3.3.1
添加kafka连接器依赖
将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下
下载flink-sql-connector-kafka连接器jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下
[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
分发flink-connector-kafka-1.17.1.jar
xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
启动yarn-session
[hadoop@node2 ~]$ myhadoop.sh start [hadoop@node2 ~]$ yarn-session.sh -d
启动kafka集群
[hadoop@node2 ~]$ zk.sh start [hadoop@node2 ~]$ kf.sh start
创建kafka主题
查看主题 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list 如果没有ws1,则创建 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
普通Kafka表
'connector' = 'kafka'
进入Flink SQL客户端
[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志输出 ... Flink SQL>
创建Kafka的映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);
可以往kafka读数据,也可以往kafka写数据。
插入数据到Kafka表
如果没有source表,先创建source表,如果source表存在则不需要再创建。
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
把source表插入t1表
insert into t1(id,ts,vc) select * from source;
如果报错
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)
cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
查看是否复制成功
$ ls $FLINK_HOME/lib
重启sql-client重新操作,成功如下:
Flink SQL> CREATE TABLE t1(
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
> `partition` BIGINT METADATA VIRTUAL,
> `offset` BIGINT METADATA VIRTUAL,
> id int,
> ts bigint ,
> vc int )
> WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
> 'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
> 'scan.startup.mode' = 'earliest-offset',
> -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
> 'topic' = 'ws1',
> 'format' = 'json'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE source (
> id INT,
> ts BIGINT,
> vc INT
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='1',
> 'fields.id.kind'='random',
> 'fields.id.min'='1',
> 'fields.id.max'='10',
> 'fields.ts.kind'='sequence',
> 'fields.ts.start'='1',
> 'fields.ts.end'='1000000',
> 'fields.vc.kind'='random',
> 'fields.vc.min'='1',
> 'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
查询Kafka表
select * from t1;
报错
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
重启yarn session,重新操作,成功如下:
Flink SQL> CREATE TABLE t1(
> `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
> `partition` BIGINT METADATA VIRTUAL,
> `offset` BIGINT METADATA VIRTUAL,
> id int,
> ts bigint ,
> vc int )
> WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
> 'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
> 'scan.startup.mode' = 'earliest-offset',
> -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
> 'topic' = 'ws1',
> 'format' = 'json'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE source (
> id INT,
> ts BIGINT,
> vc INT
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='1',
> 'fields.id.kind'='random',
> 'fields.id.min'='1',
> 'fields.id.max'='10',
> 'fields.ts.kind'='sequence',
> 'fields.ts.start'='1',
> 'fields.ts.end'='1000000',
> 'fields.vc.kind'='random',
> 'fields.vc.min'='1',
> 'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
Flink SQL>

upsert-kafka表
'connector' = 'upsert-kafka'
如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
如果没有kafka名为ws2的topic,将自动被创建。
插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id;
查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。
设置显示模式
SET sql-client.execution.result-mode=tableau;
查询t2表数据
select * from t2;
如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。
进入Flink Web UI,cancel掉所有running job,重新操作成功如下:
删除表
Flink SQL> show tables; +------------+ | table name | +------------+ | source | | t1 | | t2 | +------------+ 3 rows in set Flink SQL> drop table source; Flink SQL> drop table t1; Flink SQL> drop table t2;
创建表
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
设置显示模式
SET sql-client.execution.result-mode=tableau;
查询表
select * from t2;

完成!enjoy it!
相关文章:
Flink SQL kafka连接器
版本说明 Flink和kafka的版本号有一定的匹配关系,操作成功的版本: Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…...
glm-4 联网搜索 api 测试
今天测试了一下 glm-4 的联网搜索 web_search tool 调用,发现了 web_search 的网页检索返回结果中几个比较诡异的事情,特此记录: 有些检索结果没有 icon、link、media 字段,但从内容上看确实是联网搜索出来的结果,不知…...
Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序
Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序 SSM 图书管理系统小程序 功能介绍 用户 登录 注册 首页 图片轮播 图书信息推荐 图书详情 赞 踩 评论 收藏 系统公告 公告详情 用户信息修改 我的待还 图书归还 催还提醒 我的收藏管理 意见反馈 管理员 登录 个人中心…...
bert训练的一些技巧(rand() < self.skipgram_prb)
rand() < self.skip_gram_prb) 是一个条件表达式,用来判断是否进行skip-gram掩码操作。这种掩码操作通常用于自然语言处理中的数据增强,通过概率决定是否应用skip-gram掩码。下面是对这个表达式的详细解释: 解释 rand(): rand() 是一个随…...
pandas修改时间索引报错处理
import pandas as pd import numpy as np import osdfpd.DataFrame(index[a,b,c],data{序列:[1,2,3]}) df.rename(index{a:a1},inplaceTrue) print(df) print(df.index.dtype)df1pd.DataFrame(index[2024-01-01,2024-01-02,2024-01-03],data{序列:[1,2,3]}) df1.rename(index{2…...
Nginx Bla~Bla~
root 和 alias指令都用于指定服务器上的文件系统路径,但它们在用法和行为上有一些不同 root指令通常用于在Nginx配置中定义一个目录,该目录将作为请求的根目录。 server { location /static/ {root /var/www; 请求 /static/index.html 将映射到 /v…...
java awt和swing介绍
Java AWT(Abstract Window Toolkit)和 Swing 是用于创建图形用户界面(GUI)的 Java API。 AWT AWT 是 Java 最初的平台依赖的窗口图形界面工具包,它提供了一组基本的 GUI 组件、窗口管理、事件处理等。AWT 组件是重量…...
奇怪的错误记录
https://github.com/meta-llama/llama3/issues/80 读模型没问题,推理时出现: RuntimeError: “triu_tril_cuda_template” not implemented for ‘BFloat16’ ———————————————— 事发原因 我尝试了解transformers的AutoProcessor时&a…...
来啦,经典传说大变身牛郎织女后代逗趣日常
《落凡尘:星宿大冒险》来啦! 经典传说大变身,牛郎织女后代金风, 上演一出“星际小侦探”的逗趣日常! 想象一下,二十八星宿那些傲娇的星星们, 居然能“离家出走”,还差点把天给掀了…...
【uniapp-ios】App端与webview端相互通信的方法以及注意事项
前言 在开发中,使用uniapp开发的项目开发效率是极高的,使用一套代码就能够同时在多端上线,像笔者之前写过的使用Flutter端和webview端之间的相互通信方法和问题,这种方式本质上实际上是h5和h5之间的通信,网上有非常多…...
Qt常用基础控件总结—表格控件(QTableWidget类)
表格控件QTableWidget 表格控件最上面一排是只读的水平表头,最左边一列是只读的垂直表头。表头又可以细分为多个分段(section),水平表头的分段就是表格各个列的列首,垂直表头 分段就是表格各个行的行首。表格控件的实体区域是按行、列排布的单元格,单元格内容一般用 QTa…...
笔记:Entity Framework Core 数据库迁移add-migration
一、目的: 数据库迁移是一种管理数据库架构变化的技术,它允许开发者在应用程序的生命周期中安全地更新数据库架构,而不会丢失数据或破坏现有的数据库结构。在Entity Framework Core(EF Core)中,数据库迁移特…...
准备工作+1、请求和响应+2、模型和管理站点
Django快速入门——创建一个基本的投票应用程序 准备工作1、创建虚拟环境2、安装django 1、请求和响应(1)创建项目(2)用于开发的简易服务器(3)创建投票应用(4)编写第一个视图1、编写…...
js 格式化时间
方法一:使用toLocaleString或toLocaleDateString/toLocaleTimeString Date对象提供了toLocaleString()、toLocaleDateString()和toLocaleTimeString()方法,这些方法允许你根据本地时间格式来显示日期和时间。虽然它们不直接提供高度自定义的格式选项&am…...
python 缩放照片
pip install Pillow from PIL import Image 打开一个图片文件 img Image.open(r"C:\Users\Administrator\Desktop\我的证件\证件照.jpg") 设定新的尺寸 new_size (480, 640) 缩放图片 resized_img img.resize(new_size) 显示缩放后的图片 resized_img.sh…...
【C语言】指针(1):入门理解(课堂随笔)
目录 一、内存和地址 二、指针变量和地址 三、指针变量类型的意义 一、内存和地址 只要讲指针就离不开内存 因为指针就是访问内存的 计算上CPU(中央处理器)在处理数据的时候,需要的数据是在内存中读取的,处理后的数 据也会放…...
LLMs可以进行任务规划吗?如果不行,LLMs+GNN可以吗?
深度图学习与大模型LLM(小编): 大家好,今天向大家介绍一篇最新发布的研究论文(20240530)。这篇论文探讨了如何通过引入GNN来提高大模型在任务规划(task planning)中的性能。*论文分析了LLMs在任务规划上的局限性,并提出了一种简单而有效的解决方案。* 1.…...
性价比高充电宝有哪些?充电宝十大最佳品牌大盘点!
在如今这个高度数字化的时代,我们的生活离不开各种电子设备,而充电宝作为保障电子设备续航的重要工具,其地位日益凸显。然而,面对市场上琳琅满目的充电宝品牌和产品,要挑选到一款性价比高的充电宝并非易事。在这篇盘点…...
hnust 1963: 邻接矩阵表示法
hnust 1963: 邻接矩阵表示法 题目描述 输入一个图,用邻接矩阵存储,并实现一些操作。 拷贝下面的代码,按要求完成其中的FirstAdjVex,NextAdjVex和CreateUDG操作,其他地方不得改动。 //邻接矩阵表示图 #include <io…...
Hadoop-15-Hive 元数据管理与存储 Metadata 内嵌模式 本地模式 远程模式 集群规划配置 启动服务 3节点云服务器实测
章节内容 上一节我们完成了: Hive中数据导出:HDFSHQL操作上传内容至Hive、增删改查等操作 背景介绍 这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...
