当前位置: 首页 > news >正文

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述
欢迎关注PiflowX公众号,谢谢支持!!!

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

相关文章:

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件 组件说明 upsert方式从Kafka topic中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HO…...

keil 5 ARM CC编译错误和警告解释大全(3)序列号2000-3000

2001年:已声明虚拟参数,但从未使用过 2002年:虚拟参数重新定义为do变量 2003:无法优化:常量/表达式传递给可能修改的变量 2004:重新维度的数组作为参数传递 2005:重维度数组等价 2006&…...

CentOS 7 实战指南:文件或目录的权限操作命令详解

前言 这篇文章详细介绍了文件和目录的常用权限操作命令,并提供了全面的技术解析。通过本文,你将学习如何使用 chmod 和 chown 命令来管理文件和目录的权限,控制用户和用户组的访问权限。无论你是初学者还是有经验的系统管理员,这…...

我的第一个前端项目,vue项目从零开始创建和运行

​入门前端,从基础做起,从零开始新建项目 背景:VUE脚手架项目是一个“单页面”应用,即整个项目中只有1个网页! 在VUE脚手架项目中,主要是设计各个“视图组件”,它们都是整个网页中某个部分&…...

【OJ】C++,Java,Python,Go,Rust

for循环语法 // cpp// java// python for i in range(集合): for i, val in enumerate(集合): for v1,v2,v3,... in zip(集合1,集合2,集合3,...):Pair // cpp pair<int, string> first second // java Pair<Integer, String> first() new Pair<>(firstVal…...

Flink 任务指标监控

目录 状态监控指标 JobManager 指标 TaskManager 指标 Job 指标 资源监控指标 数据流监控指标 任务监控指标 网络监控指标 容错监控指标 数据源监控指标 数据存储监控指标 当使用 Apache Flink 进行流处理任务时&#xff0c;可以根据不同的监控需求&#xff0c;监控…...

Go语言程序设计-第7章--接口

Go语言程序设计-第7章–接口 接口类型是对其他类型行为的概括与抽象。 Go 语言的接口的独特之处在于它是隐式实现。对于一个具体的类型&#xff0c;无须声明它实现了哪些接口&#xff0c;只要提供接口所必须实现的方法即可。 7.1 接口即约定 7.2 接口类型 package iotype …...

性能优化-OpenMP基础教程(二)

本文主要介绍OpenMP并行编程技术&#xff0c;编程模型、指令和函数的介绍、以及OpenMP实战的几个例子。希望给OpenMP并行编程者提供指导。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&am…...

让电脑变得更聪明——用python实现五子棋游戏

作为经典的棋类游戏&#xff0c;五子棋深受大众喜爱&#xff0c;但如果仅实现人与人的博弈&#xff0c;那程序很简单&#xff0c;如果要实现人机对战&#xff0c;教会计算机如何战胜人类&#xff0c;那就不是十分容易的事了。本文我们先从简单入手&#xff0c;完成五子棋游戏的…...

C#-接口

接口 (interface) 定义了一个可由类和结构实现的协定。接口可以包含方法、属性、事件和索引器。接口不提供它所定义的成员的实现 — 它仅指定实现该接口的类或结构必须提供的成员。 接口可支持多重继承。在下面的示例中,接口 IComboBox 同时从 ITextBox 和 IListBox 继承。 i…...

ASP.NET可视化流程设计器源码

源码介绍: ASP.NET可视化流程设计器源码已应用于众多大型企事业单位。拥有全浏览器兼容的可视化流程设计器、表单设计器、基于角色的权限管理等系统开发必须功能&#xff0c;大大为您节省开发时间&#xff0c;是您开发OA.CRM、HR等企事业各种应用管理系统和工作流系统的最佳基…...

景联文科技GPT教育题库:AI教育大模型的强大数据引擎

GPT-4发布后&#xff0c;美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为&#xff0c;这个几乎是用“刷题”方式喂大的AI教育大模型的到来&#xff0c;意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展&#xff0c;借助GPT和AI教育…...

PHP进阶-实现网站的QQ授权登录

授权登录是站点开发常见的应用场景&#xff0c;通过社交媒体一键授权可以跳过注册站点账户的繁琐操作。本文将讲解如何用PHP实现QQ授权登录。首先&#xff0c;我们需要申请QQ互联开发者账号获得APPID和密钥&#xff1b;接着&#xff0c;我们下载QQ官方SDK&#xff1a;PHP SDK v…...

字节跳动基础架构SRE-Copilot获得2023 CCF国际AIOps挑战赛冠军

近日&#xff0c;2023 CCF国际AIOps挑战赛决赛暨“大模型时代的AIOps”研讨会在北京成功举办&#xff0c;活动吸引了来自互联网、运营商、科研院所、高校、软硬件厂商等领域多名专家学者参与&#xff0c;为智能运维的前沿学术研究、落地生产实践打开了新思路。决赛中&#xff0…...

python moviepy 图文批量合成带字幕口播视频

最近在研究将图片和文本批量合成为带字幕口播视频 主要是基于python的moviepy库 from generator import audio, pics, subs, videodef main():texts_input examplepics_input example# 图片分辨率预处理pics.adjust(pics_input)# 文字转语音audio.text_to_audio(texts_inpu…...

【代码片段】Linux C++打印当前函数调用堆栈

在开发大型项目时&#xff0c;尤其是多线程情况下&#xff0c;一般无法使用断点调试&#xff0c;这时候将当前函数的调用堆栈打印出来是非常有必要和有效的问题排查手段。 这里记录一段Linux环境下&#xff0c;打印函数堆栈的代码。 void get_native_callstack(std::string &a…...

Linux程序、进程以及计划任务(第一部分)

目录 一、程序和进程 1、什么是程序&#xff1f; 2、什么是进程&#xff1f; 3、线程是什么&#xff1f; 4、如何查看是多线程还是单线程 5、进程结束的两种情况&#xff1a; 6、进程的状态 二、查看进程信息的相关命令 1、ps&#xff1a;查看静态的进程统计信息 2、…...

Oracle database 12cRAC异地恢复至单机

环境 rac 环境 byoradbrac Oracle12.1.0.2 系统版本&#xff1a;Red Hat Enterprise Linux Server release 6.5 软件版本&#xff1a;Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit byoradb1&#xff1a;172.17.38.44 byoradb2&#xff1a;172.17.38.4…...

【docker】linux部署docker

简介 首先我需要声明的是&#xff0c;我的系统是centos7&#xff0c;下载工具使用的是yum&#xff1b;在linux上部署docker&#xff0c;之前一直看的是这篇文章Linux之Docker部署&#xff0c;基本上功能方面也都可以使用&#xff0c;部署起来也是比较的简单。首先我先讲述这篇…...

【K8S 云原生】Pod资源限制、Pod容器健康检查(探针)

目录 一、docker的重启方式和K8S重启方式 1、Pod的重启方式&#xff1a; 2、docker的重启策略&#xff1a; 二、yaml文件快速生成&#xff1a; 三、pod的状态&#xff1a; 四、Pod的资源限制 1、限制的方式和种类 2、CPU的限制的格式&#xff1a; 五、K8S拉取镜像的策…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...