当前位置: 首页 > news >正文

FlinkSql读取kafka数据流的方法(scala)

我的scala版本为2.12

<scala.binary.version>2.12</scala.binary.version>

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取kafka数据流需要如下依赖:

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:

package cha01import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import Util._import scala.util.Randomobject FlinkSqlKafkaConnectorProducer {def main(args: Array[String]): Unit = {val producerConf = new Properties()producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val topic = "person_test"val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);val rand = new Random()for(i <- 1 to 2000){val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"val record: ProducerRecord[Integer, String] =new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)producer.send(record)Thread.sleep(50+rand.nextInt(500))}producer.flush()producer.close()}
}

此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:

id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female

随后再在工程中建立一个scala类,内容示例如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject CSVKafkaSystem {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|name STRING,|age INT,|gender STRING|) WITH (|'connector' = 'kafka',|'topic'= 'person_test',|'properties.bootstrap.servers' = 'single01:9092',|'properties.group.id' = 'person_test_group',|'scan.startup.mode' = 'earliest-offset',|'format' = 'csv',|'csv.ignore-parse-errors' = 'true',|'csv.field-delimiter' = ','|)|""".stripMargin)tabEnv.sqlQuery("SELECT * FROM person").execute().print()}
}

其中的一些参数解释如下:'

connector' = 'kafka'

指定连接器类型为kafka

'topic'= 'person_test'

指定要读取的kafka主题为person_test

'properties.bootstrap.servers' = 'single01:9092'

指定kafka所在的服务器的ip地址以及端口号

'properties.group.id' = 'person_test_group'

指定 Kafka 消费者组的 ID为person_test_group

'scan.startup.mode' = 'earliest-offset'

指定了控制 Flink 从 Kafka 中读取数据时的起始位置

  • earliest-offset 表示从 Kafka 中每个分区的最早消息开始读取。
  • latest-offset 表示从 Kafka 中每个分区的最新消息开始读取。
  • group-offsets 表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置

'format' = 'csv'

指定了 kafka 消息的格式为csv

'csv.ignore-parse-errors' = 'true'

指定了忽略解析异常的信息

'csv.field-delimiter' = '

指定 CSV 数据中字段的分隔符为,

可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

相关文章:

FlinkSql读取kafka数据流的方法(scala)

我的scala版本为2.12 <scala.binary.version>2.12</scala.binary.version> 我的Flink版本为1.13.6 <flink.version>1.13.6</flink.version> FlinkSql读取kafka数据流需要如下依赖&#xff1a; <dependency><groupId>org.apache.flink&…...

.NET 9 中 IFormFile 的详细使用讲解

在.NET应用程序中&#xff0c;处理文件上传是一个常见的需求。.NET 9 提供了 IFormFile 接口&#xff0c;它可以帮助我们轻松地处理来自客户端的文件上传。以下是 IFormFile 的详细使用讲解。 IFormFile 接口简介 IFormFile 是一个表示上传文件的接口&#xff0c;它提供了以下…...

使用阿里云远程访问 Synology Web Station 的指南

使用阿里云远程访问 Synology Web Station 的指南 本文将指导如何通过阿里云服务器配置 Nginx 和 FRP&#xff0c;远程访问部署在 Synology NAS 上的 Web Station 服务&#xff0c;同时支持 HTTPS 安全访问。 背景 通过 Synology NAS 的 Web Station&#xff0c;可以部署 Wor…...

LlamaFactory介绍

目录 一、什么是LlamaFactory 1. 安装 LlamaFactory 2. 下载 LLaMA 模型 3. 运行 LLaMA 模型 4. 微调 LLaMA 模型 5. 优化本地运行 6. 推理加速 7. 硬件要求 二、总结 一、什么是LlamaFactory LlamaFactory 是一个用于训练和运行 LLaMA(Meta 的开源大型语言模型)模型…...

vue 项目使用 nginx 部署

前言 记录下使用element-admin-template 改造项目踩过的坑及打包部署过程 一、根据权限增加动态路由不生效 原因是Sidebar中路由取的 this.$router.options.routes,需要在计算路由 permission.js 增加如下代码 // generate accessible routes map based on roles const acce…...

<项目代码>YOLOv8 玉米地杂草识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…...

Wxml2Canvas小程序将dom转为图片,bug总结

1.显示文字 标签上面使用 data-type"text" 加上class名 <view data-type"text" class"my_draw_canvas"><text data-type"text" class"center my_draw_canvas" data-text"企业出游证明">企业出游证明…...

[ 网络安全介绍 3 ] 网络安全事件相关案例有哪些?

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…...

SpringMVC学习笔记(二)

五、Rest风格编程 &#xff08;一&#xff09;Rest风格URL规范介绍 1、什么是restful RESTful架构&#xff0c;就是目前最流行的一种互联网软件架构风格。它结构清晰、符合标准、易于理解、扩展方便&#xff0c;所以正得到越来越多网站的采用。REST这个词&#xff0c;是Roy T…...

51c嵌入式~单片机合集2

我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、不同的电平信号的MCU怎么通信&#xff1f; 下面这个“电平转换”电路&#xff0c;理解后令人心情愉快。电路设计其实也可以很有趣。 先说一说这个电路的用途&#xff1a;当两个MCU在不同的工作电压下工作&…...

JavaScript:浏览器对象模型BOM

BOM介绍 浏览器对象模型&#xff08;Brower Object Model&#xff0c;BOM&#xff09;提供了独立于内容而与浏览器窗口进行交互的对象&#xff0c;其核心对象是window BOM由一系列相关的对象构成&#xff0c;并且每个对象都提供了很多方法和属性。 BOM与DOM区别 DOM是文档对…...

Unity音频导入设置

参考&#xff1a;unity官方文档 导入设置 Force To Mono&#xff1a;强制单声道。启用后音频片段将降混为单声道声音。可以节省该资源所占据的空间。 Normalize&#xff1a;峰值归一化。降混过程通常会导致信号比原始信号更安静。峰值归一化的信号为音频源的音量属性提供了后…...

【数据分享】中国对外投资合作发展报告(2013-2023)

数据介绍 绪 论............................................................................................................................. 1 对外投资合作高质量发展迈出新步伐................................................................... 2 第一篇 发…...

java8之Stream流

文章目录 Stream流的定义和特性‌定义特性‌中间操作‌终结操作‌ 生成流forEachmapfilterlimitsorted并行&#xff08;parallel&#xff09;程序Collectors Stream流的定义和特性‌ 定义 Stream是Java 8 API添加的一个新的抽象&#xff0c;用于以声明性方式处理数据集合。它…...

pipx安装提示找不到包

执行&#xff1a; pipx install --include-deps --force "ansible6.*"WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by NewConnectionError(<pip._vendor.urllib3.connection.HTTPSConnection …...

Codeforces Round 987 (Div. 2)(前四道)

A. Penchick and Modern Monument 翻译&#xff1a; 在繁华大都市马尼拉的摩天大楼中&#xff0c;菲律宾最新的 Noiph 购物中心刚刚竣工&#xff01;建筑管理方 Penchick 订购了一座由 n 根支柱组成的先进纪念碑。 纪念碑支柱的高度可以用一个由 n 个正整数组成的数组 h 来表示…...

PCB+SMT线上报价系统+PCB生产ERP系统自动化拼板模块升级

PCB生产ERP系统的智能拼版技术&#xff0c;是基于PCB前端报价系统获取到的用户或市场人员已录入系统的板子尺寸及set参数等&#xff0c;按照最优原则或利用率最大化原则自动进行计算并输出拼版样式图和板材利用率&#xff0c;提高工程人员效率&#xff0c;减少板材的浪费。覆铜…...

微信小程序_小程序视图与逻辑_day3

一、目标 A. 能够知道如何实现页面之间的导航跳转 B. 能够知道如何实现下拉刷新效果 C. 能够知道如何实现上拉加载更多效果 D. 能够知道小程序中常用的生命周期 二、目录 A. 页面导航 B. 页面事件 C. 生命周期 D. WXS脚本 E. 案例-本地生活&#xff08;列表页面&#xff09;…...

kubesphere环境-本地Harbor仓库+k8s集群(单master 多master)+Prometheus监控平台部署

前言&#xff1a;半月前在公司生产环境上离线部署了k8s集群Victoria Metrics(二开版)自研版夜莺 监控平台的搭建&#xff0c;下面我租用3台华为云服务器演示部署kubesphere环境-本地Harbor仓库k8s集群&#xff08;单master节点 & 单master节点&#xff09;Prometheus监控部…...

【提高篇】3.3 GPIO(三,工作模式详解 上)

目录 一,工作模式介绍 二,输入浮空 2.1 输入浮空简介 2.2 输入浮空特点 2.3 按键检测示例 2.4 高阻态 三,输入上拉 3.1 输入上拉简介 3.2 输入上拉的特点 3.3 按键检测示例 四,输入下拉 4.1 输入下拉简介 4.2 输入下拉特点 4.3 按键检测示例 一,工作模式介绍…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

技术栈RabbitMq的介绍和使用

目录 1. 什么是消息队列&#xff1f;2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...

QT3D学习笔记——圆台、圆锥

类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体&#xff08;对象或容器&#xff09;QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质&#xff08;定义颜色、反光等&#xff09;QFirstPersonC…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...