当前位置: 首页 > news >正文

FlinkSql读取kafka数据流的方法(scala)

我的scala版本为2.12

<scala.binary.version>2.12</scala.binary.version>

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取kafka数据流需要如下依赖:

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:

package cha01import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import Util._import scala.util.Randomobject FlinkSqlKafkaConnectorProducer {def main(args: Array[String]): Unit = {val producerConf = new Properties()producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val topic = "person_test"val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);val rand = new Random()for(i <- 1 to 2000){val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"val record: ProducerRecord[Integer, String] =new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)producer.send(record)Thread.sleep(50+rand.nextInt(500))}producer.flush()producer.close()}
}

此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:

id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female

随后再在工程中建立一个scala类,内容示例如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject CSVKafkaSystem {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|name STRING,|age INT,|gender STRING|) WITH (|'connector' = 'kafka',|'topic'= 'person_test',|'properties.bootstrap.servers' = 'single01:9092',|'properties.group.id' = 'person_test_group',|'scan.startup.mode' = 'earliest-offset',|'format' = 'csv',|'csv.ignore-parse-errors' = 'true',|'csv.field-delimiter' = ','|)|""".stripMargin)tabEnv.sqlQuery("SELECT * FROM person").execute().print()}
}

其中的一些参数解释如下:'

connector' = 'kafka'

指定连接器类型为kafka

'topic'= 'person_test'

指定要读取的kafka主题为person_test

'properties.bootstrap.servers' = 'single01:9092'

指定kafka所在的服务器的ip地址以及端口号

'properties.group.id' = 'person_test_group'

指定 Kafka 消费者组的 ID为person_test_group

'scan.startup.mode' = 'earliest-offset'

指定了控制 Flink 从 Kafka 中读取数据时的起始位置

  • earliest-offset 表示从 Kafka 中每个分区的最早消息开始读取。
  • latest-offset 表示从 Kafka 中每个分区的最新消息开始读取。
  • group-offsets 表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置

'format' = 'csv'

指定了 kafka 消息的格式为csv

'csv.ignore-parse-errors' = 'true'

指定了忽略解析异常的信息

'csv.field-delimiter' = '

指定 CSV 数据中字段的分隔符为,

可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

相关文章:

FlinkSql读取kafka数据流的方法(scala)

我的scala版本为2.12 <scala.binary.version>2.12</scala.binary.version> 我的Flink版本为1.13.6 <flink.version>1.13.6</flink.version> FlinkSql读取kafka数据流需要如下依赖&#xff1a; <dependency><groupId>org.apache.flink&…...

.NET 9 中 IFormFile 的详细使用讲解

在.NET应用程序中&#xff0c;处理文件上传是一个常见的需求。.NET 9 提供了 IFormFile 接口&#xff0c;它可以帮助我们轻松地处理来自客户端的文件上传。以下是 IFormFile 的详细使用讲解。 IFormFile 接口简介 IFormFile 是一个表示上传文件的接口&#xff0c;它提供了以下…...

使用阿里云远程访问 Synology Web Station 的指南

使用阿里云远程访问 Synology Web Station 的指南 本文将指导如何通过阿里云服务器配置 Nginx 和 FRP&#xff0c;远程访问部署在 Synology NAS 上的 Web Station 服务&#xff0c;同时支持 HTTPS 安全访问。 背景 通过 Synology NAS 的 Web Station&#xff0c;可以部署 Wor…...

LlamaFactory介绍

目录 一、什么是LlamaFactory 1. 安装 LlamaFactory 2. 下载 LLaMA 模型 3. 运行 LLaMA 模型 4. 微调 LLaMA 模型 5. 优化本地运行 6. 推理加速 7. 硬件要求 二、总结 一、什么是LlamaFactory LlamaFactory 是一个用于训练和运行 LLaMA(Meta 的开源大型语言模型)模型…...

vue 项目使用 nginx 部署

前言 记录下使用element-admin-template 改造项目踩过的坑及打包部署过程 一、根据权限增加动态路由不生效 原因是Sidebar中路由取的 this.$router.options.routes,需要在计算路由 permission.js 增加如下代码 // generate accessible routes map based on roles const acce…...

<项目代码>YOLOv8 玉米地杂草识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…...

Wxml2Canvas小程序将dom转为图片,bug总结

1.显示文字 标签上面使用 data-type"text" 加上class名 <view data-type"text" class"my_draw_canvas"><text data-type"text" class"center my_draw_canvas" data-text"企业出游证明">企业出游证明…...

[ 网络安全介绍 3 ] 网络安全事件相关案例有哪些?

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…...

SpringMVC学习笔记(二)

五、Rest风格编程 &#xff08;一&#xff09;Rest风格URL规范介绍 1、什么是restful RESTful架构&#xff0c;就是目前最流行的一种互联网软件架构风格。它结构清晰、符合标准、易于理解、扩展方便&#xff0c;所以正得到越来越多网站的采用。REST这个词&#xff0c;是Roy T…...

51c嵌入式~单片机合集2

我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、不同的电平信号的MCU怎么通信&#xff1f; 下面这个“电平转换”电路&#xff0c;理解后令人心情愉快。电路设计其实也可以很有趣。 先说一说这个电路的用途&#xff1a;当两个MCU在不同的工作电压下工作&…...

JavaScript:浏览器对象模型BOM

BOM介绍 浏览器对象模型&#xff08;Brower Object Model&#xff0c;BOM&#xff09;提供了独立于内容而与浏览器窗口进行交互的对象&#xff0c;其核心对象是window BOM由一系列相关的对象构成&#xff0c;并且每个对象都提供了很多方法和属性。 BOM与DOM区别 DOM是文档对…...

Unity音频导入设置

参考&#xff1a;unity官方文档 导入设置 Force To Mono&#xff1a;强制单声道。启用后音频片段将降混为单声道声音。可以节省该资源所占据的空间。 Normalize&#xff1a;峰值归一化。降混过程通常会导致信号比原始信号更安静。峰值归一化的信号为音频源的音量属性提供了后…...

【数据分享】中国对外投资合作发展报告(2013-2023)

数据介绍 绪 论............................................................................................................................. 1 对外投资合作高质量发展迈出新步伐................................................................... 2 第一篇 发…...

java8之Stream流

文章目录 Stream流的定义和特性‌定义特性‌中间操作‌终结操作‌ 生成流forEachmapfilterlimitsorted并行&#xff08;parallel&#xff09;程序Collectors Stream流的定义和特性‌ 定义 Stream是Java 8 API添加的一个新的抽象&#xff0c;用于以声明性方式处理数据集合。它…...

pipx安装提示找不到包

执行&#xff1a; pipx install --include-deps --force "ansible6.*"WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by NewConnectionError(<pip._vendor.urllib3.connection.HTTPSConnection …...

Codeforces Round 987 (Div. 2)(前四道)

A. Penchick and Modern Monument 翻译&#xff1a; 在繁华大都市马尼拉的摩天大楼中&#xff0c;菲律宾最新的 Noiph 购物中心刚刚竣工&#xff01;建筑管理方 Penchick 订购了一座由 n 根支柱组成的先进纪念碑。 纪念碑支柱的高度可以用一个由 n 个正整数组成的数组 h 来表示…...

PCB+SMT线上报价系统+PCB生产ERP系统自动化拼板模块升级

PCB生产ERP系统的智能拼版技术&#xff0c;是基于PCB前端报价系统获取到的用户或市场人员已录入系统的板子尺寸及set参数等&#xff0c;按照最优原则或利用率最大化原则自动进行计算并输出拼版样式图和板材利用率&#xff0c;提高工程人员效率&#xff0c;减少板材的浪费。覆铜…...

微信小程序_小程序视图与逻辑_day3

一、目标 A. 能够知道如何实现页面之间的导航跳转 B. 能够知道如何实现下拉刷新效果 C. 能够知道如何实现上拉加载更多效果 D. 能够知道小程序中常用的生命周期 二、目录 A. 页面导航 B. 页面事件 C. 生命周期 D. WXS脚本 E. 案例-本地生活&#xff08;列表页面&#xff09;…...

kubesphere环境-本地Harbor仓库+k8s集群(单master 多master)+Prometheus监控平台部署

前言&#xff1a;半月前在公司生产环境上离线部署了k8s集群Victoria Metrics(二开版)自研版夜莺 监控平台的搭建&#xff0c;下面我租用3台华为云服务器演示部署kubesphere环境-本地Harbor仓库k8s集群&#xff08;单master节点 & 单master节点&#xff09;Prometheus监控部…...

【提高篇】3.3 GPIO(三,工作模式详解 上)

目录 一,工作模式介绍 二,输入浮空 2.1 输入浮空简介 2.2 输入浮空特点 2.3 按键检测示例 2.4 高阻态 三,输入上拉 3.1 输入上拉简介 3.2 输入上拉的特点 3.3 按键检测示例 四,输入下拉 4.1 输入下拉简介 4.2 输入下拉特点 4.3 按键检测示例 一,工作模式介绍…...

‘视’不可挡:OAK相机助力无人机智控飞行!

南京邮电大学通达学院的刘同学用我们的oak-d-lite实现精确打击无人机的避障和目标识别定位功能&#xff0c;取得了比赛冠军。我们盼望着更多的朋友们能够加入到我们OAK的队伍中来&#xff0c;参与到各式各样的比赛中去。我们相信&#xff0c;有了我们相机的助力&#xff0c;大家…...

javaScript交互补充(元素的三大系列)

1、元素的三大系列 1.1、offset系列 1.1.1、offset初相识 使用offset系列相关属性可以动态的得到该元素的位置&#xff08;偏移&#xff09;、大小等 获得元素距离带有定位祖先元素的位置获得元素自身的大小&#xff08;宽度高度&#xff09;注意&#xff1a;返回的数值都不…...

数据结构(基本概念及顺序表)

基本概念&#xff1a; 1、引入 程序数据结构算法 数据&#xff1a; 数值数据&#xff1a;能够直接参加运算的数据&#xff08;数值&#xff0c;字符&#xff09; 非数值数据&#xff1a;不能够直接参加运算的数据&#xff08;字符串、图片等&#xff09; 数据即是信息的载…...

【全面系统性介绍】虚拟机VM中CentOS 7 安装和网络配置指南

一、CentOS 7下载源 华为源&#xff1a;https://mirrors.huaweicloud.com/centos/7/isos/x86_64/ 阿里云源&#xff1a;centos-vault-7.9.2009-isos-x86_64安装包下载_开源镜像站-阿里云 百度网盘源&#xff1a;https://pan.baidu.com/s/1MjFPWS2P2pIRMLA2ioDlVg?pwdfudi &…...

html + css 自适应首页布局案例

文章目录 前言一、组成二、代码1. css 样式2. body 内容3.全部整体 三、效果 前言 一个自适应的html布局 一、组成 整体居中&#xff0c;宽度1200px&#xff0c;小屏幕宽度100% 二、代码 1. css 样式 代码如下&#xff08;示例&#xff09;&#xff1a; <style>* {…...

时钟之CSS+JS版

写在前面 此版本绘制的时钟基于CSSJS模式。 优点操作简单&#xff0c;缺点当然是不够灵活。下一篇会基于HTML5的canvas标签&#xff0c;使用JS绘制。会更灵活&#xff0c;元素更加丰富。 HTML代码 <div class"box"><article class"clock"><…...

ubuntu18.04 配置安卓编译环境

目前有个项目&#xff0c;验收时有个要求是在linux中进行编译打包生成apk文件。我平时都是在windows环境android studio中进行打包的&#xff0c;花了半天时间研究了一下&#xff0c;记录如下&#xff1a; 安装安卓sdk cd /opt wget https://dl.google.com/android/reposito…...

pycharm分支提交操作

一、Pycharm拉取Git远程仓库代码 1、点击VCS > Get from Version Control 2、输入git的url&#xff0c;选择自己的项目路径 3、点击Clone&#xff0c;就拉取成功了 默认签出分支为main 选择develop签出即可进行开发工作 二、创建分支&#xff08;非必要可以不使用&#xf…...

ESP32-C3 开发笔记 之 arduino 正常上传 串口乱码2024/11/15

ESP32-C3 开发笔记 之 arduino 正常上传 串口乱码 ESP32-C3 开发笔记 之 arduino 正常上传程序 但是打开串口,串口快速刷新 芯片一直处于重启状态 找了很久的原因没找到,用Mixly 上传就正常 最后看到这篇 文章https://blog.csdn.net/luooove/article/details/132351398修改了Fl…...

Ubuntu 的 ROS 操作系统 turtlebot3 SLAM仿真

引言 SLAM&#xff08;同步定位与地图构建&#xff09;在Gazebo仿真环境中的应用能够模拟真实机器人进行环境建图和导航。通过SLAM仿真&#xff0c;开发者可以在虚拟环境中测试算法&#xff0c;而不必依赖真实硬件&#xff0c;便于调试与优化。 Gazebo提供了多个虚拟环境&…...