FlinkSql读取kafka数据流的方法(scala)
我的scala版本为2.12
<scala.binary.version>2.12</scala.binary.version>
我的Flink版本为1.13.6
<flink.version>1.13.6</flink.version>
FlinkSql读取kafka数据流需要如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:
package cha01import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import Util._import scala.util.Randomobject FlinkSqlKafkaConnectorProducer {def main(args: Array[String]): Unit = {val producerConf = new Properties()producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val topic = "person_test"val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);val rand = new Random()for(i <- 1 to 2000){val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"val record: ProducerRecord[Integer, String] =new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)producer.send(record)Thread.sleep(50+rand.nextInt(500))}producer.flush()producer.close()}
}
此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:
id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female
随后再在工程中建立一个scala类,内容示例如下:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject CSVKafkaSystem {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|name STRING,|age INT,|gender STRING|) WITH (|'connector' = 'kafka',|'topic'= 'person_test',|'properties.bootstrap.servers' = 'single01:9092',|'properties.group.id' = 'person_test_group',|'scan.startup.mode' = 'earliest-offset',|'format' = 'csv',|'csv.ignore-parse-errors' = 'true',|'csv.field-delimiter' = ','|)|""".stripMargin)tabEnv.sqlQuery("SELECT * FROM person").execute().print()}
}
其中的一些参数解释如下:'
connector' = 'kafka'
指定连接器类型为kafka
'topic'= 'person_test'
指定要读取的kafka主题为person_test
'properties.bootstrap.servers' = 'single01:9092'
指定kafka所在的服务器的ip地址以及端口号
'properties.group.id' = 'person_test_group'
指定 Kafka 消费者组的 ID为person_test_group
'scan.startup.mode' = 'earliest-offset'
指定了控制 Flink 从 Kafka 中读取数据时的起始位置
earliest-offset表示从 Kafka 中每个分区的最早消息开始读取。latest-offset表示从 Kafka 中每个分区的最新消息开始读取。group-offsets表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置
'format' = 'csv'
指定了 kafka 消息的格式为csv
'csv.ignore-parse-errors' = 'true'
指定了忽略解析异常的信息
'csv.field-delimiter' = '
指定 CSV 数据中字段的分隔符为,
可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

相关文章:
FlinkSql读取kafka数据流的方法(scala)
我的scala版本为2.12 <scala.binary.version>2.12</scala.binary.version> 我的Flink版本为1.13.6 <flink.version>1.13.6</flink.version> FlinkSql读取kafka数据流需要如下依赖: <dependency><groupId>org.apache.flink&…...
.NET 9 中 IFormFile 的详细使用讲解
在.NET应用程序中,处理文件上传是一个常见的需求。.NET 9 提供了 IFormFile 接口,它可以帮助我们轻松地处理来自客户端的文件上传。以下是 IFormFile 的详细使用讲解。 IFormFile 接口简介 IFormFile 是一个表示上传文件的接口,它提供了以下…...
使用阿里云远程访问 Synology Web Station 的指南
使用阿里云远程访问 Synology Web Station 的指南 本文将指导如何通过阿里云服务器配置 Nginx 和 FRP,远程访问部署在 Synology NAS 上的 Web Station 服务,同时支持 HTTPS 安全访问。 背景 通过 Synology NAS 的 Web Station,可以部署 Wor…...
LlamaFactory介绍
目录 一、什么是LlamaFactory 1. 安装 LlamaFactory 2. 下载 LLaMA 模型 3. 运行 LLaMA 模型 4. 微调 LLaMA 模型 5. 优化本地运行 6. 推理加速 7. 硬件要求 二、总结 一、什么是LlamaFactory LlamaFactory 是一个用于训练和运行 LLaMA(Meta 的开源大型语言模型)模型…...
vue 项目使用 nginx 部署
前言 记录下使用element-admin-template 改造项目踩过的坑及打包部署过程 一、根据权限增加动态路由不生效 原因是Sidebar中路由取的 this.$router.options.routes,需要在计算路由 permission.js 增加如下代码 // generate accessible routes map based on roles const acce…...
<项目代码>YOLOv8 玉米地杂草识别<目标检测>
YOLOv8是一种单阶段(one-stage)检测算法,它将目标检测问题转化为一个回归问题,能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法(如Faster R-CNN),YOLOv8具有更高的…...
Wxml2Canvas小程序将dom转为图片,bug总结
1.显示文字 标签上面使用 data-type"text" 加上class名 <view data-type"text" class"my_draw_canvas"><text data-type"text" class"center my_draw_canvas" data-text"企业出游证明">企业出游证明…...
[ 网络安全介绍 3 ] 网络安全事件相关案例有哪些?
🍬 博主介绍 👨🎓 博主介绍:大家好,我是 _PowerShell ,很高兴认识大家~ ✨主攻领域:【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 🎉点赞➕评论➕收藏 养成习…...
SpringMVC学习笔记(二)
五、Rest风格编程 (一)Rest风格URL规范介绍 1、什么是restful RESTful架构,就是目前最流行的一种互联网软件架构风格。它结构清晰、符合标准、易于理解、扩展方便,所以正得到越来越多网站的采用。REST这个词,是Roy T…...
51c嵌入式~单片机合集2
我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、不同的电平信号的MCU怎么通信? 下面这个“电平转换”电路,理解后令人心情愉快。电路设计其实也可以很有趣。 先说一说这个电路的用途:当两个MCU在不同的工作电压下工作&…...
JavaScript:浏览器对象模型BOM
BOM介绍 浏览器对象模型(Brower Object Model,BOM)提供了独立于内容而与浏览器窗口进行交互的对象,其核心对象是window BOM由一系列相关的对象构成,并且每个对象都提供了很多方法和属性。 BOM与DOM区别 DOM是文档对…...
Unity音频导入设置
参考:unity官方文档 导入设置 Force To Mono:强制单声道。启用后音频片段将降混为单声道声音。可以节省该资源所占据的空间。 Normalize:峰值归一化。降混过程通常会导致信号比原始信号更安静。峰值归一化的信号为音频源的音量属性提供了后…...
【数据分享】中国对外投资合作发展报告(2013-2023)
数据介绍 绪 论............................................................................................................................. 1 对外投资合作高质量发展迈出新步伐................................................................... 2 第一篇 发…...
java8之Stream流
文章目录 Stream流的定义和特性定义特性中间操作终结操作 生成流forEachmapfilterlimitsorted并行(parallel)程序Collectors Stream流的定义和特性 定义 Stream是Java 8 API添加的一个新的抽象,用于以声明性方式处理数据集合。它…...
pipx安装提示找不到包
执行: pipx install --include-deps --force "ansible6.*"WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by NewConnectionError(<pip._vendor.urllib3.connection.HTTPSConnection …...
Codeforces Round 987 (Div. 2)(前四道)
A. Penchick and Modern Monument 翻译: 在繁华大都市马尼拉的摩天大楼中,菲律宾最新的 Noiph 购物中心刚刚竣工!建筑管理方 Penchick 订购了一座由 n 根支柱组成的先进纪念碑。 纪念碑支柱的高度可以用一个由 n 个正整数组成的数组 h 来表示…...
PCB+SMT线上报价系统+PCB生产ERP系统自动化拼板模块升级
PCB生产ERP系统的智能拼版技术,是基于PCB前端报价系统获取到的用户或市场人员已录入系统的板子尺寸及set参数等,按照最优原则或利用率最大化原则自动进行计算并输出拼版样式图和板材利用率,提高工程人员效率,减少板材的浪费。覆铜…...
微信小程序_小程序视图与逻辑_day3
一、目标 A. 能够知道如何实现页面之间的导航跳转 B. 能够知道如何实现下拉刷新效果 C. 能够知道如何实现上拉加载更多效果 D. 能够知道小程序中常用的生命周期 二、目录 A. 页面导航 B. 页面事件 C. 生命周期 D. WXS脚本 E. 案例-本地生活(列表页面)…...
kubesphere环境-本地Harbor仓库+k8s集群(单master 多master)+Prometheus监控平台部署
前言:半月前在公司生产环境上离线部署了k8s集群Victoria Metrics(二开版)自研版夜莺 监控平台的搭建,下面我租用3台华为云服务器演示部署kubesphere环境-本地Harbor仓库k8s集群(单master节点 & 单master节点)Prometheus监控部…...
【提高篇】3.3 GPIO(三,工作模式详解 上)
目录 一,工作模式介绍 二,输入浮空 2.1 输入浮空简介 2.2 输入浮空特点 2.3 按键检测示例 2.4 高阻态 三,输入上拉 3.1 输入上拉简介 3.2 输入上拉的特点 3.3 按键检测示例 四,输入下拉 4.1 输入下拉简介 4.2 输入下拉特点 4.3 按键检测示例 一,工作模式介绍…...
4 款主流论文降 AI 软件实测对比!谁能 5 分钟把 AI 率降到 10% 以下
4 款主流论文降 AI 软件实测对比!谁能 5 分钟把 AI 率降到 10% 以下 毕业季最焦虑的事——答辩前剩 3 天、AI 率还有 70%、想找一款 5 分钟就能搞定的工具。 市面上很多工具宣称"几分钟出结果"——但实测下来快的快、慢的慢、效果差距更大。这篇文章实测对…...
别再乱写RS485协议了!基于STM32F103C8T6,聊聊工业通讯中帧结构的那些坑
工业级RS485通讯协议设计:从基础到实战的避坑指南 在嘈杂的工厂车间里,一排STM32F103C8T6控制器通过RS485总线连接着二十多台设备。突然,3号节点的温度传感器数据开始随机跳变,而工程师小王发现每当隔壁车间的变频器启动时&#x…...
Molflow仿真结果怎么看?Texture、Profile、Counter Facet全解析,选对方法效率翻倍
Molflow仿真结果解读实战指南:Texture、Profile、Counter Facet深度解析 面对真空系统仿真结果,许多工程师常陷入"数据海洋"的困惑——明明跑完了模拟,却不知如何高效提取关键信息。Molflow作为专业级真空仿真工具,提供…...
办公Agent从0到1落地指南,5个步骤 + 6个避坑
大家好,我是小悟。 一、核心逻辑:Agent不是“对话机器人”,而是“数字执行者” 很多团队误以为采购了某个AI助手(如会议纪要工具、代码生成插件)就是引进了Agent。真正的办公Agent具备“感知-决策-执行”闭环ÿ…...
【亲测免费】 UPX脱壳机资源下载
UPX脱壳机资源下载 【下载地址】UPX脱壳机资源下载 UPX脱壳机资源下载本仓库提供了一个名为“upx脱壳机”的资源文件下载 项目地址: https://gitcode.com/open-source-toolkit/3cfe1 本仓库提供了一个名为“upx脱壳机”的资源文件下载。该资源文件是一个名为“HA_UPXShe…...
避坑指南:为什么你的Realsense D435i视频流用VLC/EasyPlayer打不开?RTSP回传思翼MK15E的正确姿势
深度解析:Realsense D435i视频流RTSP传输的兼容性陷阱与实战解决方案 当你在无人机项目中尝试通过RTSP协议传输Realsense D435i的实时视频流时,是否遇到过VLC或EasyPlayer无法正常播放的困扰?这种看似简单的视频流传输背后,隐藏着…...
中美Agent生态的路径差异——《重构与崛起——OpenClaw时代的中国Agent产业生态报告》解读三
易观分析:面对OpenClaw掀起的全球AI Agent技术浪潮,中美两国走出截然不同的发展路径。美国生态追求底层框架与协议的原创定义;而中国生态以应用驱动、平台绑定和合规先行为核心逻辑,快速将前沿技术转化为可落地的商业现实。这两条…...
告别点点点!用Ranorex Studio录制你的第一个计算器自动化测试(附详细截图)
从零开始:用Ranorex Studio实现计算器自动化测试的完整指南 第一次接触自动化测试时,那种既期待又忐忑的心情我至今记忆犹新。作为一位长期被重复性手工测试困扰的QA工程师,每天面对相同的测试用例,点击相同的按钮,验证…...
ansys网格的一阶和二阶什么区别?
一阶和二阶网格的核心区别在于单元内插值函数的阶次不同,导致精度与计算成本的差异。简单来说,一阶单元用直线描述变形,二阶单元用曲线描述,因此二阶更精确但更耗资源。 一阶网格(Linear Element) 节点分布:仅在单元角点设置节点,如六面体有8个节点(Solid185)。…...
跨平台图形API实战选型:从Vulkan、DirectX到Metal与WebGPU的架构抉择
1. 图形API的演变与现状 十年前我刚入行时,OpenGL还是图形开发的主流选择。记得第一次在Ubuntu上配置GLFW环境就花了整整两天,而现在Vulkan只需要几行命令就能跑起来。这种变化背后是GPU架构的革命性演进——从固定功能管线到可编程着色器,再…...
