当前位置: 首页 > news >正文

Flink SQL kafka连接器

版本说明

Flink和kafka的版本号有一定的匹配关系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依赖

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分发flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

启动yarn-session

[hadoop@node2 ~]$ myhadoop.sh start
[hadoop@node2 ~]$ yarn-session.sh -d

启动kafka集群

[hadoop@node2 ~]$ zk.sh start
[hadoop@node2 ~]$ kf.sh start

创建kafka主题

查看主题
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
​
如果没有ws1,则创建
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
​

普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL> 

创建Kafka的映射表

CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);

可以往kafka读数据,也可以往kafka写数据。

插入数据到Kafka表

如果没有source表,先创建source表,如果source表存在则不需要再创建。

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否复制成功

$ ls $FLINK_HOME/lib

重启sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
​

查询Kafka表

select * from t1;

报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord​

重启yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
​
​
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
​
Flink SQL> 
​

 

upsert-kafka表

'connector' = 'upsert-kafka'

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

如果没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。

设置显示模式

SET sql-client.execution.result-mode=tableau;

 查询t2表数据

select * from t2;

如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。

进入Flink Web UI,cancel掉所有running job,重新操作成功如下:

删除表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|     source |
|         t1 |
|         t2 |
+------------+
3 rows in set
​
Flink SQL> drop table source;
Flink SQL> drop table t1;
Flink SQL> drop table t2;

创建表

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

设置显示模式

SET sql-client.execution.result-mode=tableau;

查询表

select * from t2;

 

完成!enjoy it!

相关文章:

Flink SQL kafka连接器

版本说明 Flink和kafka的版本号有一定的匹配关系,操作成功的版本: Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…...

glm-4 联网搜索 api 测试

今天测试了一下 glm-4 的联网搜索 web_search tool 调用,发现了 web_search 的网页检索返回结果中几个比较诡异的事情,特此记录: 有些检索结果没有 icon、link、media 字段,但从内容上看确实是联网搜索出来的结果,不知…...

Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序

Java毕业设计 基于SSM vue图书管理系统小程序 微信小程序 SSM 图书管理系统小程序 功能介绍 用户 登录 注册 首页 图片轮播 图书信息推荐 图书详情 赞 踩 评论 收藏 系统公告 公告详情 用户信息修改 我的待还 图书归还 催还提醒 我的收藏管理 意见反馈 管理员 登录 个人中心…...

bert训练的一些技巧(rand() < self.skipgram_prb)

rand() < self.skip_gram_prb) 是一个条件表达式&#xff0c;用来判断是否进行skip-gram掩码操作。这种掩码操作通常用于自然语言处理中的数据增强&#xff0c;通过概率决定是否应用skip-gram掩码。下面是对这个表达式的详细解释&#xff1a; 解释 rand(): rand() 是一个随…...

pandas修改时间索引报错处理

import pandas as pd import numpy as np import osdfpd.DataFrame(index[a,b,c],data{序列:[1,2,3]}) df.rename(index{a:a1},inplaceTrue) print(df) print(df.index.dtype)df1pd.DataFrame(index[2024-01-01,2024-01-02,2024-01-03],data{序列:[1,2,3]}) df1.rename(index{2…...

Nginx Bla~Bla~

root 和 alias指令都用于指定服务器上的文件系统路径&#xff0c;但它们在用法和行为上有一些不同 root指令通常用于在Nginx配置中定义一个目录&#xff0c;该目录将作为请求的根目录。 server { location /static/ {root /var/www; 请求 /static/index.html 将映射到 /v…...

java awt和swing介绍

Java AWT&#xff08;Abstract Window Toolkit&#xff09;和 Swing 是用于创建图形用户界面&#xff08;GUI&#xff09;的 Java API。 AWT AWT 是 Java 最初的平台依赖的窗口图形界面工具包&#xff0c;它提供了一组基本的 GUI 组件、窗口管理、事件处理等。AWT 组件是重量…...

奇怪的错误记录

https://github.com/meta-llama/llama3/issues/80 读模型没问题&#xff0c;推理时出现&#xff1a; RuntimeError: “triu_tril_cuda_template” not implemented for ‘BFloat16’ ———————————————— 事发原因 我尝试了解transformers的AutoProcessor时&a…...

来啦,经典传说大变身牛郎织女后代逗趣日常

《落凡尘&#xff1a;星宿大冒险》来啦&#xff01; 经典传说大变身&#xff0c;牛郎织女后代金风&#xff0c; 上演一出“星际小侦探”的逗趣日常&#xff01; 想象一下&#xff0c;二十八星宿那些傲娇的星星们&#xff0c; 居然能“离家出走”&#xff0c;还差点把天给掀了…...

【uniapp-ios】App端与webview端相互通信的方法以及注意事项

前言 在开发中&#xff0c;使用uniapp开发的项目开发效率是极高的&#xff0c;使用一套代码就能够同时在多端上线&#xff0c;像笔者之前写过的使用Flutter端和webview端之间的相互通信方法和问题&#xff0c;这种方式本质上实际上是h5和h5之间的通信&#xff0c;网上有非常多…...

Qt常用基础控件总结—表格控件(QTableWidget类)

表格控件QTableWidget 表格控件最上面一排是只读的水平表头,最左边一列是只读的垂直表头。表头又可以细分为多个分段(section),水平表头的分段就是表格各个列的列首,垂直表头 分段就是表格各个行的行首。表格控件的实体区域是按行、列排布的单元格,单元格内容一般用 QTa…...

笔记:Entity Framework Core 数据库迁移add-migration

一、目的&#xff1a; 数据库迁移是一种管理数据库架构变化的技术&#xff0c;它允许开发者在应用程序的生命周期中安全地更新数据库架构&#xff0c;而不会丢失数据或破坏现有的数据库结构。在Entity Framework Core&#xff08;EF Core&#xff09;中&#xff0c;数据库迁移特…...

准备工作+1、请求和响应+2、模型和管理站点

Django快速入门——创建一个基本的投票应用程序 准备工作1、创建虚拟环境2、安装django 1、请求和响应&#xff08;1&#xff09;创建项目&#xff08;2&#xff09;用于开发的简易服务器&#xff08;3&#xff09;创建投票应用&#xff08;4&#xff09;编写第一个视图1、编写…...

js 格式化时间

方法一&#xff1a;使用toLocaleString或toLocaleDateString/toLocaleTimeString Date对象提供了toLocaleString()、toLocaleDateString()和toLocaleTimeString()方法&#xff0c;这些方法允许你根据本地时间格式来显示日期和时间。虽然它们不直接提供高度自定义的格式选项&am…...

python 缩放照片

pip install Pillow from PIL import Image 打开一个图片文件 img Image.open(r"C:\Users\Administrator\Desktop\我的证件\证件照.jpg") 设定新的尺寸 new_size (480, 640) 缩放图片 resized_img img.resize(new_size) 显示缩放后的图片 resized_img.sh…...

【C语言】指针(1):入门理解(课堂随笔)

目录 一、内存和地址 二、指针变量和地址 三、指针变量类型的意义 一、内存和地址 只要讲指针就离不开内存 因为指针就是访问内存的 计算上CPU&#xff08;中央处理器&#xff09;在处理数据的时候&#xff0c;需要的数据是在内存中读取的&#xff0c;处理后的数 据也会放…...

LLMs可以进行任务规划吗?如果不行,LLMs+GNN可以吗?

深度图学习与大模型LLM(小编): 大家好,今天向大家介绍一篇最新发布的研究论文&#xff08;20240530&#xff09;。这篇论文探讨了如何通过引入GNN来提高大模型在任务规划(task planning)中的性能。*论文分析了LLMs在任务规划上的局限性,并提出了一种简单而有效的解决方案。* 1.…...

性价比高充电宝有哪些?充电宝十大最佳品牌大盘点!

在如今这个高度数字化的时代&#xff0c;我们的生活离不开各种电子设备&#xff0c;而充电宝作为保障电子设备续航的重要工具&#xff0c;其地位日益凸显。然而&#xff0c;面对市场上琳琅满目的充电宝品牌和产品&#xff0c;要挑选到一款性价比高的充电宝并非易事。在这篇盘点…...

hnust 1963: 邻接矩阵表示法

hnust 1963: 邻接矩阵表示法 题目描述 输入一个图&#xff0c;用邻接矩阵存储&#xff0c;并实现一些操作。 拷贝下面的代码&#xff0c;按要求完成其中的FirstAdjVex&#xff0c;NextAdjVex和CreateUDG操作&#xff0c;其他地方不得改动。 //邻接矩阵表示图 #include <io…...

Hadoop-15-Hive 元数据管理与存储 Metadata 内嵌模式 本地模式 远程模式 集群规划配置 启动服务 3节点云服务器实测

章节内容 上一节我们完成了&#xff1a; Hive中数据导出&#xff1a;HDFSHQL操作上传内容至Hive、增删改查等操作 背景介绍 这里是三台公网云服务器&#xff0c;每台 2C4G&#xff0c;搭建一个Hadoop的学习环境&#xff0c;供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验

系列回顾&#xff1a; 在上一篇中&#xff0c;我们成功地为应用集成了数据库&#xff0c;并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了&#xff01;但是&#xff0c;如果你仔细审视那些 API&#xff0c;会发现它们还很“粗糙”&#xff1a;有…...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

​​企业大模型服务合规指南:深度解析备案与登记制度​​

伴随AI技术的爆炸式发展&#xff0c;尤其是大模型&#xff08;LLM&#xff09;在各行各业的深度应用和整合&#xff0c;企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者&#xff0c;还是积极拥抱AI转型的传统企业&#xff0c;在面向公众…...

解析“道作为序位生成器”的核心原理

解析“道作为序位生成器”的核心原理 以下完整展开道函数的零点调控机制&#xff0c;重点解析"道作为序位生成器"的核心原理与实现框架&#xff1a; 一、道函数的零点调控机制 1. 道作为序位生成器 道在认知坐标系$(x_{\text{物}}, y_{\text{意}}, z_{\text{文}}…...

Springboot 高校报修与互助平台小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;高校报修与互助平台小程序被用户普遍使用&#xff0c;为…...

DriveGPT4: Interpretable End-to-end Autonomous Driving via Large Language Model

一、研究背景与创新点 (一)现有方法的局限性 当前智驾系统面临两大核心挑战:一是长尾问题,即系统在遇到新场景时可能失效,例如突发交通状况或非常规道路环境;二是可解释性问题,传统方法无法解释智驾系统的决策过程,用户难以理解车辆行为的依据。传统语言模型(如 BERT…...

Redis专题-实战篇一-基于Session和Redis实现登录业务

GitHub项目地址&#xff1a;https://github.com/whltaoin/redisLearningProject_hm-dianping 基于Session实现登录业务功能提交版本码&#xff1a;e34399f 基于Redis实现登录业务提交版本码&#xff1a;60bf740 一、导入黑马点评后端项目 项目架构图 1. 前期阶段2. 后续阶段导…...

组合模式:构建树形结构的艺术

引言:处理复杂对象结构的挑战 在软件开发中,我们常遇到需要处理部分-整体层次结构的场景: 文件系统中的文件与文件夹GUI中的容器与组件组织结构中的部门与员工菜单系统中的子菜单与菜单项组合模式正是为解决这类问题而生的设计模式。它允许我们将对象组合成树形结构来表示&…...