当前位置: 首页 > news >正文

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述
欢迎关注PiflowX公众号,谢谢支持!!!

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

相关文章:

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件 组件说明 upsert方式从Kafka topic中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HO…...

keil 5 ARM CC编译错误和警告解释大全(3)序列号2000-3000

2001年:已声明虚拟参数,但从未使用过 2002年:虚拟参数重新定义为do变量 2003:无法优化:常量/表达式传递给可能修改的变量 2004:重新维度的数组作为参数传递 2005:重维度数组等价 2006&…...

CentOS 7 实战指南:文件或目录的权限操作命令详解

前言 这篇文章详细介绍了文件和目录的常用权限操作命令,并提供了全面的技术解析。通过本文,你将学习如何使用 chmod 和 chown 命令来管理文件和目录的权限,控制用户和用户组的访问权限。无论你是初学者还是有经验的系统管理员,这…...

我的第一个前端项目,vue项目从零开始创建和运行

​入门前端,从基础做起,从零开始新建项目 背景:VUE脚手架项目是一个“单页面”应用,即整个项目中只有1个网页! 在VUE脚手架项目中,主要是设计各个“视图组件”,它们都是整个网页中某个部分&…...

【OJ】C++,Java,Python,Go,Rust

for循环语法 // cpp// java// python for i in range(集合): for i, val in enumerate(集合): for v1,v2,v3,... in zip(集合1,集合2,集合3,...):Pair // cpp pair<int, string> first second // java Pair<Integer, String> first() new Pair<>(firstVal…...

Flink 任务指标监控

目录 状态监控指标 JobManager 指标 TaskManager 指标 Job 指标 资源监控指标 数据流监控指标 任务监控指标 网络监控指标 容错监控指标 数据源监控指标 数据存储监控指标 当使用 Apache Flink 进行流处理任务时&#xff0c;可以根据不同的监控需求&#xff0c;监控…...

Go语言程序设计-第7章--接口

Go语言程序设计-第7章–接口 接口类型是对其他类型行为的概括与抽象。 Go 语言的接口的独特之处在于它是隐式实现。对于一个具体的类型&#xff0c;无须声明它实现了哪些接口&#xff0c;只要提供接口所必须实现的方法即可。 7.1 接口即约定 7.2 接口类型 package iotype …...

性能优化-OpenMP基础教程(二)

本文主要介绍OpenMP并行编程技术&#xff0c;编程模型、指令和函数的介绍、以及OpenMP实战的几个例子。希望给OpenMP并行编程者提供指导。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&am…...

让电脑变得更聪明——用python实现五子棋游戏

作为经典的棋类游戏&#xff0c;五子棋深受大众喜爱&#xff0c;但如果仅实现人与人的博弈&#xff0c;那程序很简单&#xff0c;如果要实现人机对战&#xff0c;教会计算机如何战胜人类&#xff0c;那就不是十分容易的事了。本文我们先从简单入手&#xff0c;完成五子棋游戏的…...

C#-接口

接口 (interface) 定义了一个可由类和结构实现的协定。接口可以包含方法、属性、事件和索引器。接口不提供它所定义的成员的实现 — 它仅指定实现该接口的类或结构必须提供的成员。 接口可支持多重继承。在下面的示例中,接口 IComboBox 同时从 ITextBox 和 IListBox 继承。 i…...

ASP.NET可视化流程设计器源码

源码介绍: ASP.NET可视化流程设计器源码已应用于众多大型企事业单位。拥有全浏览器兼容的可视化流程设计器、表单设计器、基于角色的权限管理等系统开发必须功能&#xff0c;大大为您节省开发时间&#xff0c;是您开发OA.CRM、HR等企事业各种应用管理系统和工作流系统的最佳基…...

景联文科技GPT教育题库:AI教育大模型的强大数据引擎

GPT-4发布后&#xff0c;美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为&#xff0c;这个几乎是用“刷题”方式喂大的AI教育大模型的到来&#xff0c;意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展&#xff0c;借助GPT和AI教育…...

PHP进阶-实现网站的QQ授权登录

授权登录是站点开发常见的应用场景&#xff0c;通过社交媒体一键授权可以跳过注册站点账户的繁琐操作。本文将讲解如何用PHP实现QQ授权登录。首先&#xff0c;我们需要申请QQ互联开发者账号获得APPID和密钥&#xff1b;接着&#xff0c;我们下载QQ官方SDK&#xff1a;PHP SDK v…...

字节跳动基础架构SRE-Copilot获得2023 CCF国际AIOps挑战赛冠军

近日&#xff0c;2023 CCF国际AIOps挑战赛决赛暨“大模型时代的AIOps”研讨会在北京成功举办&#xff0c;活动吸引了来自互联网、运营商、科研院所、高校、软硬件厂商等领域多名专家学者参与&#xff0c;为智能运维的前沿学术研究、落地生产实践打开了新思路。决赛中&#xff0…...

python moviepy 图文批量合成带字幕口播视频

最近在研究将图片和文本批量合成为带字幕口播视频 主要是基于python的moviepy库 from generator import audio, pics, subs, videodef main():texts_input examplepics_input example# 图片分辨率预处理pics.adjust(pics_input)# 文字转语音audio.text_to_audio(texts_inpu…...

【代码片段】Linux C++打印当前函数调用堆栈

在开发大型项目时&#xff0c;尤其是多线程情况下&#xff0c;一般无法使用断点调试&#xff0c;这时候将当前函数的调用堆栈打印出来是非常有必要和有效的问题排查手段。 这里记录一段Linux环境下&#xff0c;打印函数堆栈的代码。 void get_native_callstack(std::string &a…...

Linux程序、进程以及计划任务(第一部分)

目录 一、程序和进程 1、什么是程序&#xff1f; 2、什么是进程&#xff1f; 3、线程是什么&#xff1f; 4、如何查看是多线程还是单线程 5、进程结束的两种情况&#xff1a; 6、进程的状态 二、查看进程信息的相关命令 1、ps&#xff1a;查看静态的进程统计信息 2、…...

Oracle database 12cRAC异地恢复至单机

环境 rac 环境 byoradbrac Oracle12.1.0.2 系统版本&#xff1a;Red Hat Enterprise Linux Server release 6.5 软件版本&#xff1a;Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit byoradb1&#xff1a;172.17.38.44 byoradb2&#xff1a;172.17.38.4…...

【docker】linux部署docker

简介 首先我需要声明的是&#xff0c;我的系统是centos7&#xff0c;下载工具使用的是yum&#xff1b;在linux上部署docker&#xff0c;之前一直看的是这篇文章Linux之Docker部署&#xff0c;基本上功能方面也都可以使用&#xff0c;部署起来也是比较的简单。首先我先讲述这篇…...

【K8S 云原生】Pod资源限制、Pod容器健康检查(探针)

目录 一、docker的重启方式和K8S重启方式 1、Pod的重启方式&#xff1a; 2、docker的重启策略&#xff1a; 二、yaml文件快速生成&#xff1a; 三、pod的状态&#xff1a; 四、Pod的资源限制 1、限制的方式和种类 2、CPU的限制的格式&#xff1a; 五、K8S拉取镜像的策…...

Cursor AI助手Pro功能破解技术深度解析:三重防护机制与实战指南

Cursor AI助手Pro功能破解技术深度解析&#xff1a;三重防护机制与实战指南 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached…...

独立开发者生存指南:一个人搞定产品、开发、运营

一、从测试视角洞察独立开发的核心逻辑软件测试从业者转型独立开发者&#xff0c;最大的优势在于对产品质量的天然敏感度和用户视角的深度理解。在大厂分工体系中&#xff0c;测试人员是距离用户反馈最近的角色之一&#xff0c;每天都在与产品的bug、用户的抱怨打交道&#xff…...

实战指南:VRM-Addon-for-Blender 终极VRM格式导入导出解决方案

实战指南&#xff1a;VRM-Addon-for-Blender 终极VRM格式导入导出解决方案 【免费下载链接】VRM-Addon-for-Blender VRM Importer, Exporter and Utilities for Blender 2.93 to 5.1 项目地址: https://gitcode.com/gh_mirrors/vr/VRM-Addon-for-Blender VRM&#xff08…...

别再死磕ViT了!用Swin-Transformer搞定高分辨率图像识别,保姆级原理拆解

高分辨率图像识别新范式&#xff1a;Swin-Transformer实战指南 当计算机视觉工程师面对4K医学影像或卫星地图时&#xff0c;传统ViT模型往往会遭遇显存爆炸的尴尬。我曾在一个遥感项目中发现&#xff0c;直接将ViT应用于20482048像素的图像&#xff0c;单次前向传播就消耗了32G…...

Hummingbot自动化交易框架:从原理到实战的量化交易指南

1. 项目概述&#xff1a;一个为专业交易者打造的自动化交易框架如果你在加密货币交易领域摸爬滚打过一段时间&#xff0c;一定会对“手动盯盘”的疲惫和“情绪化操作”的代价深有体会。市场24/7运转&#xff0c;机会转瞬即逝&#xff0c;而人的精力终究有限。这正是我最初接触并…...

[技术解析] 边缘结构模型MSM:破解时依性混杂的因果推断利器

1. 边缘结构模型MSM&#xff1a;因果推断的"时光机" 想象你是一名医生&#xff0c;正在研究某种降压药的长期疗效。患者A连续服药3个月后血压稳定&#xff0c;患者B服药1个月后自行停药导致血压反弹。传统统计方法会简单对比两组结果&#xff0c;但忽略了一个关键问…...

SpringBoot生产级监控与异常日志运维实战,线上项目稳定排查不慌

SpringBoot项目本地开发调试正常&#xff0c;部署到生产环境后频繁出现接口报错、服务卡顿、内存溢出、接口响应缓慢、数据库连接耗尽等线上问题&#xff0c;开发者无法实时查看项目运行状态&#xff0c;报错无精准日志定位&#xff0c;排查问题耗时费力&#xff0c;严重影响业…...

ARM GICv3虚拟中断处理:GICV_IAR寄存器详解

1. GICV_IAR寄存器概述GICV_IAR&#xff08;Virtual Machine Interrupt Acknowledge Register&#xff09;是ARM GICv3架构中虚拟CPU接口的关键寄存器&#xff0c;主要用于虚拟机环境下的中断确认机制。当虚拟中断信号到达处理器时&#xff0c;通过读取该寄存器可以获取当前最高…...

从零构建本地AI应用:基于DeepSeek-R1的RAG与智能体实战指南

1. 项目概述&#xff1a;一个本地化AI应用的全栈学习与实践仓库最近在折腾本地大语言模型&#xff0c;特别是DeepSeek-R1&#xff0c;发现网上资料虽然多&#xff0c;但要么太零散&#xff0c;要么就是纯理论&#xff0c;真正能让你从零开始、一步步把模型跑起来&#xff0c;再…...

WarcraftHelper完整指南:5分钟让魔兽争霸3在现代电脑上完美运行

WarcraftHelper完整指南&#xff1a;5分钟让魔兽争霸3在现代电脑上完美运行 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为魔兽争霸3在现代Win…...