FlinkSql读取kafka数据流的方法(scala)
我的scala版本为2.12
<scala.binary.version>2.12</scala.binary.version>
我的Flink版本为1.13.6
<flink.version>1.13.6</flink.version>
FlinkSql读取kafka数据流需要如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:
package cha01import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import Util._import scala.util.Randomobject FlinkSqlKafkaConnectorProducer {def main(args: Array[String]): Unit = {val producerConf = new Properties()producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val topic = "person_test"val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);val rand = new Random()for(i <- 1 to 2000){val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"val record: ProducerRecord[Integer, String] =new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)producer.send(record)Thread.sleep(50+rand.nextInt(500))}producer.flush()producer.close()}
}
此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:
id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female
随后再在工程中建立一个scala类,内容示例如下:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject CSVKafkaSystem {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|name STRING,|age INT,|gender STRING|) WITH (|'connector' = 'kafka',|'topic'= 'person_test',|'properties.bootstrap.servers' = 'single01:9092',|'properties.group.id' = 'person_test_group',|'scan.startup.mode' = 'earliest-offset',|'format' = 'csv',|'csv.ignore-parse-errors' = 'true',|'csv.field-delimiter' = ','|)|""".stripMargin)tabEnv.sqlQuery("SELECT * FROM person").execute().print()}
}
其中的一些参数解释如下:'
connector' = 'kafka'
指定连接器类型为kafka
'topic'= 'person_test'
指定要读取的kafka主题为person_test
'properties.bootstrap.servers' = 'single01:9092'
指定kafka所在的服务器的ip地址以及端口号
'properties.group.id' = 'person_test_group'
指定 Kafka 消费者组的 ID为person_test_group
'scan.startup.mode' = 'earliest-offset'
指定了控制 Flink 从 Kafka 中读取数据时的起始位置
earliest-offset表示从 Kafka 中每个分区的最早消息开始读取。latest-offset表示从 Kafka 中每个分区的最新消息开始读取。group-offsets表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置
'format' = 'csv'
指定了 kafka 消息的格式为csv
'csv.ignore-parse-errors' = 'true'
指定了忽略解析异常的信息
'csv.field-delimiter' = '
指定 CSV 数据中字段的分隔符为,
可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

相关文章:
FlinkSql读取kafka数据流的方法(scala)
我的scala版本为2.12 <scala.binary.version>2.12</scala.binary.version> 我的Flink版本为1.13.6 <flink.version>1.13.6</flink.version> FlinkSql读取kafka数据流需要如下依赖: <dependency><groupId>org.apache.flink&…...
.NET 9 中 IFormFile 的详细使用讲解
在.NET应用程序中,处理文件上传是一个常见的需求。.NET 9 提供了 IFormFile 接口,它可以帮助我们轻松地处理来自客户端的文件上传。以下是 IFormFile 的详细使用讲解。 IFormFile 接口简介 IFormFile 是一个表示上传文件的接口,它提供了以下…...
使用阿里云远程访问 Synology Web Station 的指南
使用阿里云远程访问 Synology Web Station 的指南 本文将指导如何通过阿里云服务器配置 Nginx 和 FRP,远程访问部署在 Synology NAS 上的 Web Station 服务,同时支持 HTTPS 安全访问。 背景 通过 Synology NAS 的 Web Station,可以部署 Wor…...
LlamaFactory介绍
目录 一、什么是LlamaFactory 1. 安装 LlamaFactory 2. 下载 LLaMA 模型 3. 运行 LLaMA 模型 4. 微调 LLaMA 模型 5. 优化本地运行 6. 推理加速 7. 硬件要求 二、总结 一、什么是LlamaFactory LlamaFactory 是一个用于训练和运行 LLaMA(Meta 的开源大型语言模型)模型…...
vue 项目使用 nginx 部署
前言 记录下使用element-admin-template 改造项目踩过的坑及打包部署过程 一、根据权限增加动态路由不生效 原因是Sidebar中路由取的 this.$router.options.routes,需要在计算路由 permission.js 增加如下代码 // generate accessible routes map based on roles const acce…...
<项目代码>YOLOv8 玉米地杂草识别<目标检测>
YOLOv8是一种单阶段(one-stage)检测算法,它将目标检测问题转化为一个回归问题,能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法(如Faster R-CNN),YOLOv8具有更高的…...
Wxml2Canvas小程序将dom转为图片,bug总结
1.显示文字 标签上面使用 data-type"text" 加上class名 <view data-type"text" class"my_draw_canvas"><text data-type"text" class"center my_draw_canvas" data-text"企业出游证明">企业出游证明…...
[ 网络安全介绍 3 ] 网络安全事件相关案例有哪些?
🍬 博主介绍 👨🎓 博主介绍:大家好,我是 _PowerShell ,很高兴认识大家~ ✨主攻领域:【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 🎉点赞➕评论➕收藏 养成习…...
SpringMVC学习笔记(二)
五、Rest风格编程 (一)Rest风格URL规范介绍 1、什么是restful RESTful架构,就是目前最流行的一种互联网软件架构风格。它结构清晰、符合标准、易于理解、扩展方便,所以正得到越来越多网站的采用。REST这个词,是Roy T…...
51c嵌入式~单片机合集2
我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、不同的电平信号的MCU怎么通信? 下面这个“电平转换”电路,理解后令人心情愉快。电路设计其实也可以很有趣。 先说一说这个电路的用途:当两个MCU在不同的工作电压下工作&…...
JavaScript:浏览器对象模型BOM
BOM介绍 浏览器对象模型(Brower Object Model,BOM)提供了独立于内容而与浏览器窗口进行交互的对象,其核心对象是window BOM由一系列相关的对象构成,并且每个对象都提供了很多方法和属性。 BOM与DOM区别 DOM是文档对…...
Unity音频导入设置
参考:unity官方文档 导入设置 Force To Mono:强制单声道。启用后音频片段将降混为单声道声音。可以节省该资源所占据的空间。 Normalize:峰值归一化。降混过程通常会导致信号比原始信号更安静。峰值归一化的信号为音频源的音量属性提供了后…...
【数据分享】中国对外投资合作发展报告(2013-2023)
数据介绍 绪 论............................................................................................................................. 1 对外投资合作高质量发展迈出新步伐................................................................... 2 第一篇 发…...
java8之Stream流
文章目录 Stream流的定义和特性定义特性中间操作终结操作 生成流forEachmapfilterlimitsorted并行(parallel)程序Collectors Stream流的定义和特性 定义 Stream是Java 8 API添加的一个新的抽象,用于以声明性方式处理数据集合。它…...
pipx安装提示找不到包
执行: pipx install --include-deps --force "ansible6.*"WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by NewConnectionError(<pip._vendor.urllib3.connection.HTTPSConnection …...
Codeforces Round 987 (Div. 2)(前四道)
A. Penchick and Modern Monument 翻译: 在繁华大都市马尼拉的摩天大楼中,菲律宾最新的 Noiph 购物中心刚刚竣工!建筑管理方 Penchick 订购了一座由 n 根支柱组成的先进纪念碑。 纪念碑支柱的高度可以用一个由 n 个正整数组成的数组 h 来表示…...
PCB+SMT线上报价系统+PCB生产ERP系统自动化拼板模块升级
PCB生产ERP系统的智能拼版技术,是基于PCB前端报价系统获取到的用户或市场人员已录入系统的板子尺寸及set参数等,按照最优原则或利用率最大化原则自动进行计算并输出拼版样式图和板材利用率,提高工程人员效率,减少板材的浪费。覆铜…...
微信小程序_小程序视图与逻辑_day3
一、目标 A. 能够知道如何实现页面之间的导航跳转 B. 能够知道如何实现下拉刷新效果 C. 能够知道如何实现上拉加载更多效果 D. 能够知道小程序中常用的生命周期 二、目录 A. 页面导航 B. 页面事件 C. 生命周期 D. WXS脚本 E. 案例-本地生活(列表页面)…...
kubesphere环境-本地Harbor仓库+k8s集群(单master 多master)+Prometheus监控平台部署
前言:半月前在公司生产环境上离线部署了k8s集群Victoria Metrics(二开版)自研版夜莺 监控平台的搭建,下面我租用3台华为云服务器演示部署kubesphere环境-本地Harbor仓库k8s集群(单master节点 & 单master节点)Prometheus监控部…...
【提高篇】3.3 GPIO(三,工作模式详解 上)
目录 一,工作模式介绍 二,输入浮空 2.1 输入浮空简介 2.2 输入浮空特点 2.3 按键检测示例 2.4 高阻态 三,输入上拉 3.1 输入上拉简介 3.2 输入上拉的特点 3.3 按键检测示例 四,输入下拉 4.1 输入下拉简介 4.2 输入下拉特点 4.3 按键检测示例 一,工作模式介绍…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...
日常一水C
多态 言简意赅:就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过,当子类和父类的函数名相同时,会隐藏父类的同名函数转而调用子类的同名函数,如果要调用父类的同名函数,那么就需要对父类进行引用&#…...
wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能
指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...
C# winform教程(二)----checkbox
一、作用 提供一个用户选择或者不选的状态,这是一个可以多选的控件。 二、属性 其实功能大差不差,除了特殊的几个外,与button基本相同,所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...
