PiflowX组件-ReadFromUpsertKafka
ReadFromUpsertKafka组件
组件说明
upsert方式从Kafka topic中读取数据。
计算引擎
flink
有界性
Unbounded
组件分组
kafka
端口
Inport:默认端口
outport:默认端口
组件属性
| 名称 | 展示名称 | 默认值 | 允许值 | 是否必填 | 描述 | 例子 |
|---|---|---|---|---|---|---|
| kafka_host | KAFKA_HOST | “” | 无 | 是 | 逗号分隔的Kafka broker列表。 | 127.0.0.1:9092 |
| topic | TOPIC | “” | 无 | 是 | 用于写入Kafka topic名称。 | topic-1 |
| tableDefinition | TableDefinition | “” | 无 | 是 | Flink table定义。 | |
| key_format | keyFormat | “” | Set(“json”, “csv”, “avro”) | 是 | 用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。 | json |
| value_format | ValueFormat | “” | Set(“json”, “csv”, “avro”) | 是 | 用于对Kafka消息中value部分反序列化的格式 | json |
| value_fields_include | ValueFieldsInclude | ALL | Set(“ALL”, “EXCEPT_KEY”) | 是 | 控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 | ALL |
| key_fields_prefix | KeyFieldsPrefix | “” | 无 | 否 | 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。 | |
| properties | PROPERTIES | “” | 无 | 否 | 该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。 |
ReadFromUpsertKafka示例配置
演示实时统计网页pv和uv的总量。
{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
-
通过
k.kafka.ReadFromUps从kafka的result_total_pv_uv_mintopic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据); -
通过
ShowChangeLogData组件将数据输出到控制台。
tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}
演示DEMO

欢迎关注PiflowX公众号,谢谢支持!!!

演示案例参考
实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客
相关文章:
PiflowX组件-ReadFromUpsertKafka
ReadFromUpsertKafka组件 组件说明 upsert方式从Kafka topic中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HO…...
keil 5 ARM CC编译错误和警告解释大全(3)序列号2000-3000
2001年:已声明虚拟参数,但从未使用过 2002年:虚拟参数重新定义为do变量 2003:无法优化:常量/表达式传递给可能修改的变量 2004:重新维度的数组作为参数传递 2005:重维度数组等价 2006&…...
CentOS 7 实战指南:文件或目录的权限操作命令详解
前言 这篇文章详细介绍了文件和目录的常用权限操作命令,并提供了全面的技术解析。通过本文,你将学习如何使用 chmod 和 chown 命令来管理文件和目录的权限,控制用户和用户组的访问权限。无论你是初学者还是有经验的系统管理员,这…...
我的第一个前端项目,vue项目从零开始创建和运行
入门前端,从基础做起,从零开始新建项目 背景:VUE脚手架项目是一个“单页面”应用,即整个项目中只有1个网页! 在VUE脚手架项目中,主要是设计各个“视图组件”,它们都是整个网页中某个部分&…...
【OJ】C++,Java,Python,Go,Rust
for循环语法 // cpp// java// python for i in range(集合): for i, val in enumerate(集合): for v1,v2,v3,... in zip(集合1,集合2,集合3,...):Pair // cpp pair<int, string> first second // java Pair<Integer, String> first() new Pair<>(firstVal…...
Flink 任务指标监控
目录 状态监控指标 JobManager 指标 TaskManager 指标 Job 指标 资源监控指标 数据流监控指标 任务监控指标 网络监控指标 容错监控指标 数据源监控指标 数据存储监控指标 当使用 Apache Flink 进行流处理任务时,可以根据不同的监控需求,监控…...
Go语言程序设计-第7章--接口
Go语言程序设计-第7章–接口 接口类型是对其他类型行为的概括与抽象。 Go 语言的接口的独特之处在于它是隐式实现。对于一个具体的类型,无须声明它实现了哪些接口,只要提供接口所必须实现的方法即可。 7.1 接口即约定 7.2 接口类型 package iotype …...
性能优化-OpenMP基础教程(二)
本文主要介绍OpenMP并行编程技术,编程模型、指令和函数的介绍、以及OpenMP实战的几个例子。希望给OpenMP并行编程者提供指导。 🎬个人简介:一个全栈工程师的升级之路! 📋个人专栏:高性能(HPC&am…...
让电脑变得更聪明——用python实现五子棋游戏
作为经典的棋类游戏,五子棋深受大众喜爱,但如果仅实现人与人的博弈,那程序很简单,如果要实现人机对战,教会计算机如何战胜人类,那就不是十分容易的事了。本文我们先从简单入手,完成五子棋游戏的…...
C#-接口
接口 (interface) 定义了一个可由类和结构实现的协定。接口可以包含方法、属性、事件和索引器。接口不提供它所定义的成员的实现 — 它仅指定实现该接口的类或结构必须提供的成员。 接口可支持多重继承。在下面的示例中,接口 IComboBox 同时从 ITextBox 和 IListBox 继承。 i…...
ASP.NET可视化流程设计器源码
源码介绍: ASP.NET可视化流程设计器源码已应用于众多大型企事业单位。拥有全浏览器兼容的可视化流程设计器、表单设计器、基于角色的权限管理等系统开发必须功能,大大为您节省开发时间,是您开发OA.CRM、HR等企事业各种应用管理系统和工作流系统的最佳基…...
景联文科技GPT教育题库:AI教育大模型的强大数据引擎
GPT-4发布后,美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为,这个几乎是用“刷题”方式喂大的AI教育大模型的到来,意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展,借助GPT和AI教育…...
PHP进阶-实现网站的QQ授权登录
授权登录是站点开发常见的应用场景,通过社交媒体一键授权可以跳过注册站点账户的繁琐操作。本文将讲解如何用PHP实现QQ授权登录。首先,我们需要申请QQ互联开发者账号获得APPID和密钥;接着,我们下载QQ官方SDK:PHP SDK v…...
字节跳动基础架构SRE-Copilot获得2023 CCF国际AIOps挑战赛冠军
近日,2023 CCF国际AIOps挑战赛决赛暨“大模型时代的AIOps”研讨会在北京成功举办,活动吸引了来自互联网、运营商、科研院所、高校、软硬件厂商等领域多名专家学者参与,为智能运维的前沿学术研究、落地生产实践打开了新思路。决赛中࿰…...
python moviepy 图文批量合成带字幕口播视频
最近在研究将图片和文本批量合成为带字幕口播视频 主要是基于python的moviepy库 from generator import audio, pics, subs, videodef main():texts_input examplepics_input example# 图片分辨率预处理pics.adjust(pics_input)# 文字转语音audio.text_to_audio(texts_inpu…...
【代码片段】Linux C++打印当前函数调用堆栈
在开发大型项目时,尤其是多线程情况下,一般无法使用断点调试,这时候将当前函数的调用堆栈打印出来是非常有必要和有效的问题排查手段。 这里记录一段Linux环境下,打印函数堆栈的代码。 void get_native_callstack(std::string &a…...
Linux程序、进程以及计划任务(第一部分)
目录 一、程序和进程 1、什么是程序? 2、什么是进程? 3、线程是什么? 4、如何查看是多线程还是单线程 5、进程结束的两种情况: 6、进程的状态 二、查看进程信息的相关命令 1、ps:查看静态的进程统计信息 2、…...
Oracle database 12cRAC异地恢复至单机
环境 rac 环境 byoradbrac Oracle12.1.0.2 系统版本:Red Hat Enterprise Linux Server release 6.5 软件版本:Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit byoradb1:172.17.38.44 byoradb2:172.17.38.4…...
【docker】linux部署docker
简介 首先我需要声明的是,我的系统是centos7,下载工具使用的是yum;在linux上部署docker,之前一直看的是这篇文章Linux之Docker部署,基本上功能方面也都可以使用,部署起来也是比较的简单。首先我先讲述这篇…...
【K8S 云原生】Pod资源限制、Pod容器健康检查(探针)
目录 一、docker的重启方式和K8S重启方式 1、Pod的重启方式: 2、docker的重启策略: 二、yaml文件快速生成: 三、pod的状态: 四、Pod的资源限制 1、限制的方式和种类 2、CPU的限制的格式: 五、K8S拉取镜像的策…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
android RelativeLayout布局
<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...
破解路内监管盲区:免布线低位视频桩重塑停车管理新标准
城市路内停车管理常因行道树遮挡、高位设备盲区等问题,导致车牌识别率低、逃费率高,传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法,正成为破局关键。该设备安装于车位侧方0.5-0.7米高度,直接规避树枝遮…...
前端中slice和splic的区别
1. slice slice 用于从数组中提取一部分元素,返回一个新的数组。 特点: 不修改原数组:slice 不会改变原数组,而是返回一个新的数组。提取数组的部分:slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...
