Flink SQL kafka连接器
版本说明
Flink和kafka的版本号有一定的匹配关系,操作成功的版本:
- Flink1.17.1
- kafka_2.12-3.3.1
添加kafka连接器依赖
将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下
下载flink-sql-connector-kafka连接器jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1
上传到flink的lib目录下
[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
分发flink-connector-kafka-1.17.1.jar
xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
启动yarn-session
[hadoop@node2 ~]$ myhadoop.sh start [hadoop@node2 ~]$ yarn-session.sh -d
启动kafka集群
[hadoop@node2 ~]$ zk.sh start [hadoop@node2 ~]$ kf.sh start
创建kafka主题
查看主题 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list 如果没有ws1,则创建 [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
普通Kafka表
'connector' = 'kafka'
进入Flink SQL客户端
[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志输出 ... Flink SQL>
创建Kafka的映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);
可以往kafka读数据,也可以往kafka写数据。
插入数据到Kafka表
如果没有source表,先创建source表,如果source表存在则不需要再创建。
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
把source表插入t1表
insert into t1(id,ts,vc) select * from source;
如果报错
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)
cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
查看是否复制成功
$ ls $FLINK_HOME/lib
重启sql-client重新操作,成功如下:
Flink SQL> CREATE TABLE t1( > `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', > --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读 > `partition` BIGINT METADATA VIRTUAL, > `offset` BIGINT METADATA VIRTUAL, > id int, > ts bigint , > vc int ) > WITH ( > 'connector' = 'kafka', > 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094', > 'properties.group.id' = 'test', > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' > 'scan.startup.mode' = 'earliest-offset', > -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区 > 'sink.partitioner' = 'fixed', > 'topic' = 'ws1', > 'format' = 'json' > ); [INFO] Execute statement succeed. Flink SQL> CREATE TABLE source ( > id INT, > ts BIGINT, > vc INT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='1', > 'fields.id.kind'='random', > 'fields.id.min'='1', > 'fields.id.max'='10', > 'fields.ts.kind'='sequence', > 'fields.ts.start'='1', > 'fields.ts.end'='1000000', > 'fields.vc.kind'='random', > 'fields.vc.min'='1', > 'fields.vc.max'='100' > ); [INFO] Execute statement succeed. Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b1765f969c3ae637bd4c8100efbb0c4e
查询Kafka表
select * from t1;
报错
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
重启yarn session,重新操作,成功如下:
Flink SQL> CREATE TABLE t1( > `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', > --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读 > `partition` BIGINT METADATA VIRTUAL, > `offset` BIGINT METADATA VIRTUAL, > id int, > ts bigint , > vc int ) > WITH ( > 'connector' = 'kafka', > 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094', > 'properties.group.id' = 'test', > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' > 'scan.startup.mode' = 'earliest-offset', > -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区 > 'sink.partitioner' = 'fixed', > 'topic' = 'ws1', > 'format' = 'json' > ); [INFO] Execute statement succeed. Flink SQL> CREATE TABLE source ( > id INT, > ts BIGINT, > vc INT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='1', > 'fields.id.kind'='random', > 'fields.id.min'='1', > 'fields.id.max'='10', > 'fields.ts.kind'='sequence', > 'fields.ts.start'='1', > 'fields.ts.end'='1000000', > 'fields.vc.kind'='random', > 'fields.vc.min'='1', > 'fields.vc.max'='100' > ); [INFO] Execute statement succeed. Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 84292f84d1fce4756ccd8ae294b6163a Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'. select * from t1; [INFO] Result retrieval cancelled. Flink SQL>
upsert-kafka表
'connector' = 'upsert-kafka'
如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
如果没有kafka名为ws2的topic,将自动被创建。
插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id;
查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。
设置显示模式
SET sql-client.execution.result-mode=tableau;
查询t2表数据
select * from t2;
如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。
进入Flink Web UI,cancel掉所有running job,重新操作成功如下:
删除表
Flink SQL> show tables; +------------+ | table name | +------------+ | source | | t1 | | t2 | +------------+ 3 rows in set Flink SQL> drop table source; Flink SQL> drop table t1; Flink SQL> drop table t2;
创建表
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);
设置显示模式
SET sql-client.execution.result-mode=tableau;
查询表
select * from t2;
完成!enjoy it!
相关文章:

Flink SQL kafka连接器
版本说明 Flink和kafka的版本号有一定的匹配关系,操作成功的版本: Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…...

glm-4 联网搜索 api 测试
今天测试了一下 glm-4 的联网搜索 web_search tool 调用,发现了 web_search 的网页检索返回结果中几个比较诡异的事情,特此记录: 有些检索结果没有 icon、link、media 字段,但从内容上看确实是联网搜索出来的结果,不知…...

Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序
Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序 SSM 图书管理系统小程序 功能介绍 用户 登录 注册 首页 图片轮播 图书信息推荐 图书详情 赞 踩 评论 收藏 系统公告 公告详情 用户信息修改 我的待还 图书归还 催还提醒 我的收藏管理 意见反馈 管理员 登录 个人中心…...

bert训练的一些技巧(rand() < self.skipgram_prb)
rand() < self.skip_gram_prb) 是一个条件表达式,用来判断是否进行skip-gram掩码操作。这种掩码操作通常用于自然语言处理中的数据增强,通过概率决定是否应用skip-gram掩码。下面是对这个表达式的详细解释: 解释 rand(): rand() 是一个随…...

pandas修改时间索引报错处理
import pandas as pd import numpy as np import osdfpd.DataFrame(index[a,b,c],data{序列:[1,2,3]}) df.rename(index{a:a1},inplaceTrue) print(df) print(df.index.dtype)df1pd.DataFrame(index[2024-01-01,2024-01-02,2024-01-03],data{序列:[1,2,3]}) df1.rename(index{2…...

Nginx Bla~Bla~
root 和 alias指令都用于指定服务器上的文件系统路径,但它们在用法和行为上有一些不同 root指令通常用于在Nginx配置中定义一个目录,该目录将作为请求的根目录。 server { location /static/ {root /var/www; 请求 /static/index.html 将映射到 /v…...

java awt和swing介绍
Java AWT(Abstract Window Toolkit)和 Swing 是用于创建图形用户界面(GUI)的 Java API。 AWT AWT 是 Java 最初的平台依赖的窗口图形界面工具包,它提供了一组基本的 GUI 组件、窗口管理、事件处理等。AWT 组件是重量…...

奇怪的错误记录
https://github.com/meta-llama/llama3/issues/80 读模型没问题,推理时出现: RuntimeError: “triu_tril_cuda_template” not implemented for ‘BFloat16’ ———————————————— 事发原因 我尝试了解transformers的AutoProcessor时&a…...

来啦,经典传说大变身牛郎织女后代逗趣日常
《落凡尘:星宿大冒险》来啦! 经典传说大变身,牛郎织女后代金风, 上演一出“星际小侦探”的逗趣日常! 想象一下,二十八星宿那些傲娇的星星们, 居然能“离家出走”,还差点把天给掀了…...

【uniapp-ios】App端与webview端相互通信的方法以及注意事项
前言 在开发中,使用uniapp开发的项目开发效率是极高的,使用一套代码就能够同时在多端上线,像笔者之前写过的使用Flutter端和webview端之间的相互通信方法和问题,这种方式本质上实际上是h5和h5之间的通信,网上有非常多…...

Qt常用基础控件总结—表格控件(QTableWidget类)
表格控件QTableWidget 表格控件最上面一排是只读的水平表头,最左边一列是只读的垂直表头。表头又可以细分为多个分段(section),水平表头的分段就是表格各个列的列首,垂直表头 分段就是表格各个行的行首。表格控件的实体区域是按行、列排布的单元格,单元格内容一般用 QTa…...

笔记:Entity Framework Core 数据库迁移add-migration
一、目的: 数据库迁移是一种管理数据库架构变化的技术,它允许开发者在应用程序的生命周期中安全地更新数据库架构,而不会丢失数据或破坏现有的数据库结构。在Entity Framework Core(EF Core)中,数据库迁移特…...

准备工作+1、请求和响应+2、模型和管理站点
Django快速入门——创建一个基本的投票应用程序 准备工作1、创建虚拟环境2、安装django 1、请求和响应(1)创建项目(2)用于开发的简易服务器(3)创建投票应用(4)编写第一个视图1、编写…...

js 格式化时间
方法一:使用toLocaleString或toLocaleDateString/toLocaleTimeString Date对象提供了toLocaleString()、toLocaleDateString()和toLocaleTimeString()方法,这些方法允许你根据本地时间格式来显示日期和时间。虽然它们不直接提供高度自定义的格式选项&am…...

python 缩放照片
pip install Pillow from PIL import Image 打开一个图片文件 img Image.open(r"C:\Users\Administrator\Desktop\我的证件\证件照.jpg") 设定新的尺寸 new_size (480, 640) 缩放图片 resized_img img.resize(new_size) 显示缩放后的图片 resized_img.sh…...

【C语言】指针(1):入门理解(课堂随笔)
目录 一、内存和地址 二、指针变量和地址 三、指针变量类型的意义 一、内存和地址 只要讲指针就离不开内存 因为指针就是访问内存的 计算上CPU(中央处理器)在处理数据的时候,需要的数据是在内存中读取的,处理后的数 据也会放…...

LLMs可以进行任务规划吗?如果不行,LLMs+GNN可以吗?
深度图学习与大模型LLM(小编): 大家好,今天向大家介绍一篇最新发布的研究论文(20240530)。这篇论文探讨了如何通过引入GNN来提高大模型在任务规划(task planning)中的性能。*论文分析了LLMs在任务规划上的局限性,并提出了一种简单而有效的解决方案。* 1.…...

性价比高充电宝有哪些?充电宝十大最佳品牌大盘点!
在如今这个高度数字化的时代,我们的生活离不开各种电子设备,而充电宝作为保障电子设备续航的重要工具,其地位日益凸显。然而,面对市场上琳琅满目的充电宝品牌和产品,要挑选到一款性价比高的充电宝并非易事。在这篇盘点…...

hnust 1963: 邻接矩阵表示法
hnust 1963: 邻接矩阵表示法 题目描述 输入一个图,用邻接矩阵存储,并实现一些操作。 拷贝下面的代码,按要求完成其中的FirstAdjVex,NextAdjVex和CreateUDG操作,其他地方不得改动。 //邻接矩阵表示图 #include <io…...

Hadoop-15-Hive 元数据管理与存储 Metadata 内嵌模式 本地模式 远程模式 集群规划配置 启动服务 3节点云服务器实测
章节内容 上一节我们完成了: Hive中数据导出:HDFSHQL操作上传内容至Hive、增删改查等操作 背景介绍 这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…...

215.Mit6.S081-实验三-page tables
在本实验室中,您将探索页表并对其进行修改,以简化将数据从用户空间复制到内核空间的函数。 一、实验准备 开始编码之前,请阅读xv6手册的第3章和相关文件: kernel/memlayout.h,它捕获了内存的布局。kernel/vm.c&…...

flask使用定时任务flask_apscheduler(APScheduler)
Flask-APScheduler描述: Flask-APScheduler 是一个 Flask 扩展,增加了对 APScheduler 的支持。 APScheduler 有三个内置的调度系统可供您使用: Cron 式调度(可选开始/结束时间) 基于间隔的执行(以偶数间隔运行作业…...

ApiFox或postman怎么用params类型传输json或集合+json的String类型
你是否碰见过这样的接口? post请求然后传输的参数都要和查询时一样以param形式传参数,那String什么的都好说,传就直接进后台了,那json呢,集合呢,是不是直接给你返400呢. 1.传json如何处理 那我们看看怎么实现,如果你要传json数据,那需要将特殊字符转义,也叫url转码,否则传不…...

数据结构第16节 最大堆
最大堆是一种特殊的完全二叉树数据结构,其中每个父节点的键值都大于或等于其子节点的键值。在Java中,最大堆通常用于实现优先队列,堆排序算法,或者在需要快速访问最大元素的应用场景中。 让我们通过一个具体的案例来说明最大堆的…...

显卡、显卡驱动、cuda、cuDNN之间关系
显卡、显卡驱动、CUDA 和 cuDNN 是构成高性能计算和深度学习环境的关键组件,它们之间有着紧密的联系。下面是对这些组件及其关系的详细介绍: 显卡(GPU) 显卡,全称为图形处理器(Graphics Processing Unit&…...

Rewrk一个更现代的http框架基准测试实用程序
Rewrk一个更现代的http框架基准测试实用程序。HTTP基准测试(HTTP benchmarking)是一种测量和评估HTTP服务器或应用程序性能指标的活动。其目的是在特定条件下模拟大量用户请求,以测量服务器或应用程序的响应能力、吞吐量、延迟等指标…...

【算法】排序算法介绍 附带C#和Python实现代码
1. 冒泡排序(Bubble Sort) 2. 选择排序(Selection Sort) 3. 插入排序(Insertion Sort) 4. 归并排序(Merge Sort) 5. 快速排序(Quick Sort) 排序算法是计算机科学中的一个基础而重要的部分,用于将一组数据按照一定的顺序排列。下面介绍几种常见的排序算法,…...

360安全浏览器就是不行-python秒破解
下面画框都很容易破解,大家试试...

Python实现傅里叶级数可视化工具
Python实现傅里叶级数可视化工具 flyfish 有matlab实现,我没matlab,我有Python,所以我用Python实现。 整个工具的实现代码放在最后,界面使用PyQt5开发 起源 傅里叶级数(Fourier Series)由法国数学家和物理学家让-巴…...

PDF 分割拆分 API 数据接口
PDF 分割拆分 API 数据接口 文件处理,PDF 高效的 PDF 分割工具,高效处理,可永久存储。 1. 产品功能 高效处理大文件;支持多语言字符识别;支持 formdata 格式 PDF 文件流传参;支持设置每个 PDF 文件的页数…...