当前位置: 首页 > news >正文

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述
欢迎关注PiflowX公众号,谢谢支持!!!

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

相关文章:

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件 组件说明 upsert方式从Kafka topic中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HO…...

keil 5 ARM CC编译错误和警告解释大全(3)序列号2000-3000

2001年:已声明虚拟参数,但从未使用过 2002年:虚拟参数重新定义为do变量 2003:无法优化:常量/表达式传递给可能修改的变量 2004:重新维度的数组作为参数传递 2005:重维度数组等价 2006&…...

CentOS 7 实战指南:文件或目录的权限操作命令详解

前言 这篇文章详细介绍了文件和目录的常用权限操作命令,并提供了全面的技术解析。通过本文,你将学习如何使用 chmod 和 chown 命令来管理文件和目录的权限,控制用户和用户组的访问权限。无论你是初学者还是有经验的系统管理员,这…...

我的第一个前端项目,vue项目从零开始创建和运行

​入门前端,从基础做起,从零开始新建项目 背景:VUE脚手架项目是一个“单页面”应用,即整个项目中只有1个网页! 在VUE脚手架项目中,主要是设计各个“视图组件”,它们都是整个网页中某个部分&…...

【OJ】C++,Java,Python,Go,Rust

for循环语法 // cpp// java// python for i in range(集合): for i, val in enumerate(集合): for v1,v2,v3,... in zip(集合1,集合2,集合3,...):Pair // cpp pair<int, string> first second // java Pair<Integer, String> first() new Pair<>(firstVal…...

Flink 任务指标监控

目录 状态监控指标 JobManager 指标 TaskManager 指标 Job 指标 资源监控指标 数据流监控指标 任务监控指标 网络监控指标 容错监控指标 数据源监控指标 数据存储监控指标 当使用 Apache Flink 进行流处理任务时&#xff0c;可以根据不同的监控需求&#xff0c;监控…...

Go语言程序设计-第7章--接口

Go语言程序设计-第7章–接口 接口类型是对其他类型行为的概括与抽象。 Go 语言的接口的独特之处在于它是隐式实现。对于一个具体的类型&#xff0c;无须声明它实现了哪些接口&#xff0c;只要提供接口所必须实现的方法即可。 7.1 接口即约定 7.2 接口类型 package iotype …...

性能优化-OpenMP基础教程(二)

本文主要介绍OpenMP并行编程技术&#xff0c;编程模型、指令和函数的介绍、以及OpenMP实战的几个例子。希望给OpenMP并行编程者提供指导。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&am…...

让电脑变得更聪明——用python实现五子棋游戏

作为经典的棋类游戏&#xff0c;五子棋深受大众喜爱&#xff0c;但如果仅实现人与人的博弈&#xff0c;那程序很简单&#xff0c;如果要实现人机对战&#xff0c;教会计算机如何战胜人类&#xff0c;那就不是十分容易的事了。本文我们先从简单入手&#xff0c;完成五子棋游戏的…...

C#-接口

接口 (interface) 定义了一个可由类和结构实现的协定。接口可以包含方法、属性、事件和索引器。接口不提供它所定义的成员的实现 — 它仅指定实现该接口的类或结构必须提供的成员。 接口可支持多重继承。在下面的示例中,接口 IComboBox 同时从 ITextBox 和 IListBox 继承。 i…...

ASP.NET可视化流程设计器源码

源码介绍: ASP.NET可视化流程设计器源码已应用于众多大型企事业单位。拥有全浏览器兼容的可视化流程设计器、表单设计器、基于角色的权限管理等系统开发必须功能&#xff0c;大大为您节省开发时间&#xff0c;是您开发OA.CRM、HR等企事业各种应用管理系统和工作流系统的最佳基…...

景联文科技GPT教育题库:AI教育大模型的强大数据引擎

GPT-4发布后&#xff0c;美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为&#xff0c;这个几乎是用“刷题”方式喂大的AI教育大模型的到来&#xff0c;意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展&#xff0c;借助GPT和AI教育…...

PHP进阶-实现网站的QQ授权登录

授权登录是站点开发常见的应用场景&#xff0c;通过社交媒体一键授权可以跳过注册站点账户的繁琐操作。本文将讲解如何用PHP实现QQ授权登录。首先&#xff0c;我们需要申请QQ互联开发者账号获得APPID和密钥&#xff1b;接着&#xff0c;我们下载QQ官方SDK&#xff1a;PHP SDK v…...

字节跳动基础架构SRE-Copilot获得2023 CCF国际AIOps挑战赛冠军

近日&#xff0c;2023 CCF国际AIOps挑战赛决赛暨“大模型时代的AIOps”研讨会在北京成功举办&#xff0c;活动吸引了来自互联网、运营商、科研院所、高校、软硬件厂商等领域多名专家学者参与&#xff0c;为智能运维的前沿学术研究、落地生产实践打开了新思路。决赛中&#xff0…...

python moviepy 图文批量合成带字幕口播视频

最近在研究将图片和文本批量合成为带字幕口播视频 主要是基于python的moviepy库 from generator import audio, pics, subs, videodef main():texts_input examplepics_input example# 图片分辨率预处理pics.adjust(pics_input)# 文字转语音audio.text_to_audio(texts_inpu…...

【代码片段】Linux C++打印当前函数调用堆栈

在开发大型项目时&#xff0c;尤其是多线程情况下&#xff0c;一般无法使用断点调试&#xff0c;这时候将当前函数的调用堆栈打印出来是非常有必要和有效的问题排查手段。 这里记录一段Linux环境下&#xff0c;打印函数堆栈的代码。 void get_native_callstack(std::string &a…...

Linux程序、进程以及计划任务(第一部分)

目录 一、程序和进程 1、什么是程序&#xff1f; 2、什么是进程&#xff1f; 3、线程是什么&#xff1f; 4、如何查看是多线程还是单线程 5、进程结束的两种情况&#xff1a; 6、进程的状态 二、查看进程信息的相关命令 1、ps&#xff1a;查看静态的进程统计信息 2、…...

Oracle database 12cRAC异地恢复至单机

环境 rac 环境 byoradbrac Oracle12.1.0.2 系统版本&#xff1a;Red Hat Enterprise Linux Server release 6.5 软件版本&#xff1a;Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit byoradb1&#xff1a;172.17.38.44 byoradb2&#xff1a;172.17.38.4…...

【docker】linux部署docker

简介 首先我需要声明的是&#xff0c;我的系统是centos7&#xff0c;下载工具使用的是yum&#xff1b;在linux上部署docker&#xff0c;之前一直看的是这篇文章Linux之Docker部署&#xff0c;基本上功能方面也都可以使用&#xff0c;部署起来也是比较的简单。首先我先讲述这篇…...

【K8S 云原生】Pod资源限制、Pod容器健康检查(探针)

目录 一、docker的重启方式和K8S重启方式 1、Pod的重启方式&#xff1a; 2、docker的重启策略&#xff1a; 二、yaml文件快速生成&#xff1a; 三、pod的状态&#xff1a; 四、Pod的资源限制 1、限制的方式和种类 2、CPU的限制的格式&#xff1a; 五、K8S拉取镜像的策…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

Spring Security 认证流程——补充

一、认证流程概述 Spring Security 的认证流程基于 过滤器链&#xff08;Filter Chain&#xff09;&#xff0c;核心组件包括 UsernamePasswordAuthenticationFilter、AuthenticationManager、UserDetailsService 等。整个流程可分为以下步骤&#xff1a; 用户提交登录请求拦…...

嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)

目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 ​编辑​编辑 UDP的特征 socke函数 bind函数 recvfrom函数&#xff08;接收函数&#xff09; sendto函数&#xff08;发送函数&#xff09; 五、网络编程之 UDP 用…...

QT开发技术【ffmpeg + QAudioOutput】音乐播放器

一、 介绍 使用ffmpeg 4.2.2 在数字化浪潮席卷全球的当下&#xff0c;音视频内容犹如璀璨繁星&#xff0c;点亮了人们的生活与工作。从短视频平台上令人捧腹的搞笑视频&#xff0c;到在线课堂中知识渊博的专家授课&#xff0c;再到影视平台上扣人心弦的高清大片&#xff0c;音…...