ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
文章目录
- Kafka表集成引擎
- 配置
- Kerberos 支持
- 虚拟列
- 资料分享
- 参考文章
Kafka表集成引擎
此引擎与Apache Kafka结合使用。
Kafka 特性:
- 发布或者订阅数据流。
- 容错存储机制。
- 处理流数据。
老版Kafka集成表引擎参数格式:
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
新版Kafka集成表引擎参数格式:
Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic1,topic2',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n',kafka_schema = '',kafka_num_consumers = 2
必要参数:
kafka_broker_list– 以逗号分隔的 brokers 列表 (localhost:9092)。kafka_topic_list– topic 列表 (my_topic)。kafka_group_name– Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。kafka_format– 消息体格式。使用与 SQL 部分的FORMAT函数相同表示方法,例如JSONEachRow。
可选参数:
kafka_row_delimiter- 每个消息体(记录)之间的分隔符。kafka_schema– 如果解析格式需要一个 schema 时,此参数必填。kafka_num_consumers– 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
| 格式 | 输入 | 输出 |
|---|---|---|
| [TabSeparated] | ✔ | ✔ |
| [TabSeparatedRaw] | ✔ | ✔ |
| [TabSeparatedWithNames] | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes] | ✔ | ✔ |
| [Template] | ✔ | ✔ |
| [TemplateIgnoreSpaces] | ✔ | ✗ |
| [CSV] | ✔ | ✔ |
| [CSVWithNames] | ✔ | ✔ |
| [CustomSeparated] | ✔ | ✔ |
| [Values] | ✔ | ✔ |
| [Vertical] | ✗ | ✔ |
| [JSON] | ✗ | ✔ |
| [JSONAsString] | ✔ | ✗ |
| [JSONStrings] | ✗ | ✔ |
| [JSONCompact] | ✗ | ✔ |
| [JSONCompactStrings] | ✗ | ✔ |
| [JSONEachRow] | ✔ | ✔ |
| [JSONEachRowWithProgress] | ✗ | ✔ |
| [JSONStringsEachRow] | ✔ | ✔ |
| [JSONStringsEachRowWithProgress] | ✗ | ✔ |
| [JSONCompactEachRow] | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes] | ✔ | ✔ |
| [JSONCompactStringsEachRow] | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes] | ✔ | ✔ |
| [TSKV] | ✔ | ✔ |
| [Pretty] | ✗ | ✔ |
| [PrettyCompact] | ✗ | ✔ |
| [PrettyCompactMonoBlock] | ✗ | ✔ |
| [PrettyNoEscapes] | ✗ | ✔ |
| [PrettySpace] | ✗ | ✔ |
| [Protobuf] | ✔ | ✔ |
| [ProtobufSingle] | ✔ | ✔ |
| [Avro] | ✔ | ✔ |
| [AvroConfluent] | ✔ | ✗ |
| [Parquet] | ✔ | ✔ |
| [Arrow] | ✔ | ✔ |
| [ArrowStream] | ✔ | ✔ |
| [ORC] | ✔ | ✔ |
| [RowBinary] | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes] | ✔ | ✔ |
| [Native] | ✔ | ✔ |
| [Null] | ✗ | ✔ |
| [XML] | ✗ | ✔ |
| [CapnProto] | ✔ | ✗ |
| [LineAsString] | ✔ | ✗ |
| [Regexp] | ✔ | ✗ |
| [RawBLOB] | ✔ | ✔ |
示例:
CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');SELECT * FROM queue LIMIT 5;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_num_consumers = 4;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1')SETTINGS kafka_format = 'JSONEachRow',kafka_num_consumers = 4;
消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。
SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:
- 使用引擎创建一个 Kafka 消费者并作为一条数据流。
- 创建一个结构表。
- 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。
当 MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。
示例:
CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');CREATE TABLE daily (day Date,level String,total UInt64) ENGINE = SummingMergeTree(day, (day, level), 8192);CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM queue GROUP BY day, level;SELECT level, sum(total) FROM daily GROUP BY level;
为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。
停止接收主题数据或更改转换逻辑,请 detach 物化视图:
DETACH TABLE consumer;ATTACH TABLE consumer;
如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
配置
与 GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。
<!-- Global configuration options for all tables of Kafka engine type --><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "logs" --><kafka_logs><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_logs>
在ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>。
Kerberos 支持
对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。
示例:
<!-- Kerberos-aware Kafka --><kafka><security_protocol>SASL_PLAINTEXT</security_protocol><sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab><sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal></kafka>
虚拟列
_topic– Kafka 主题。_key– 信息的键。_offset– 消息的偏移量。_timestamp– 消息的时间戳。_timestamp_ms– 消息的时间戳(毫秒)。_partition– Kafka 主题的分区。
资料分享
ClickHouse经典中文文档分享
参考文章
- ClickHouse(01)什么是ClickHouse,ClickHouse适用于什么场景
- ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
- ClickHouse(03)ClickHouse怎么安装和部署
- ClickHouse(04)如何搭建ClickHouse集群
- ClickHouse(05)ClickHouse数据类型详解
- ClickHouse(06)ClickHouse建表语句DDL详细解析
- ClickHouse(07)ClickHouse数据库引擎解析
- ClickHouse(08)ClickHouse表引擎概况
- ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
- ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
- ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
- ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
- ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
- ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
- ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
- ClickHouse(16)ClickHouse日志引擎Log详细解析
- ClickHouse(17)ClickHouse集成JDBC表引擎详细解析
- ClickHouse(18)ClickHouse集成ODBC表引擎详细解析
- ClickHouse(19)ClickHouse集成Hive表引擎详细解析
- ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析
相关文章:
ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
文章目录 Kafka表集成引擎配置Kerberos 支持 虚拟列 资料分享参考文章 Kafka表集成引擎 此引擎与Apache Kafka结合使用。 Kafka 特性: 发布或者订阅数据流。容错存储机制。处理流数据。 老版Kafka集成表引擎参数格式: Kafka(kafka_broker_list, kaf…...
JSP-概念
一、引子 很多读者可能听过JSP,并且知道这是一门过时的技术了。在Spring,SpringBoot已经成为主流的今天,笔者为什么还要介绍JSP的相关内容呢?笔者常常提到一个概念:理解一门技术,要理解这个技术为什么产生…...
sqlite插入语句id自增列问题
sqlite给主键id设置AUTOINCREMENT自增在插入数据的时候报错table has x columns but x-1 values were supplied 为什么自增列要显示不提供,sqlite需要提供自增列table ResTools has 7 columns but 6 values were supplied SQL Statement:insert into ResTools values(管理系统w…...
C#,字符串匹配(模式搜索)AC(Aho Corasick)算法的源代码
Aho-Corasick算法简称AC算法,也称为AC自动机(Aho-Corasick)算法,1975年产生于贝尔实验室(The Bell Labs),是一种用于解决多模式字符串匹配的经典算法之一。 the Bell Lab 本文的运行效果: AC算法以模式树…...
【网络取证篇】Windows终端无法使用ping命令解决方法
【网络取证篇】Windows终端无法使用ping命令解决方法 以Ping命令为例,最近遇到ping命令无法使用的情况,很多情况都是操作系统"环境变量"被改变或没有正确配置导致—【蘇小沐】 目录 1、实验环境(一)无法ping命令 &a…...
electron+vue网页直接播放RTSP视频流?
目前大部分摄像头都支持RTSP协议,但是在浏览器限制,最新版的浏览器都不能直接播放RTSP协议,Electron 桌面应用是基于 Chromium 内核的,所以也不能直接播放RTSP,但是我们又有这个需求怎么办呢? 市场上的方案…...
【Delphi 基础知识 19】Assigned的用法
在Delphi中,Assigned 是一个用于检查指针是否已分配内存的函数。它通常用于检查对象或指针是否已经被分配内存,以避免在未分配内存的情况下引用或操作它。 以下是 Assigned 的一些用法示例: 检查对象是否已分配内存: varMyObject…...
多线程在编程中的重要性有什么?并以LabVIEW为例进行说明
多线程在编程中的重要性体现在以下几个方面: 并行处理: 多线程允许程序同时执行多个任务,这在现代多核心处理器上尤其重要。通过并行处理,可以显著提高程序的执行效率和响应速度。 资源利用最大化: 通过多线程&#x…...
K8S---kubectl top
一、简介 该命令类似于linux–top命令,用于显示node和pod的CPU和内存使用情况 二、命令行 1、help命令 k top --help Display resource (CPU/memory) usage. The top command allows you to see the resource consumption for nodes or pods. This command requires Metri…...
Linux部署前后端项目
部署SpringBoot项目 创建SpringBoot项目 先确保有一个可以运行的springboot项目,这里就记录创建项目的流程了,可以自行百度。 命令行启动 2.1、在linux中,我是在data目录下新创建的一个project目录(此目录创建位置不限制&…...
一文搞懂系列——Linux C线程池技术
背景 最近在走读诊断项目代码时,发现其用到了线程池技术,感觉耳目一新。以前基本只是听过线程池,但是并没有实际应用。对它有一丝的好奇,于是趁这个机会深入了解一下线程池的实现原理。 线程池的优点 线程池出现的背景…...
stable diffusion代码学习笔记
前言:本文没有太多公式推理,只有一些简单的公式,以及公式和代码的对应关系。本文仅做个人学习笔记,如有理解错误的地方,请指出。 本文包含stable diffusion入门文献和不同版本的代码。 文献资源 本文学习的代码&…...
腾讯云服务器怎么买?两种购买方式更省钱
腾讯云服务器购买流程很简单,有两种购买方式,直接在官方活动上购买比较划算,在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵,但是自定义购买云服务器CPU内存带宽配置选择范围广,活动上购买只能选择固定的活动…...
基于SpringBoot自定义控制是否需要开启定时功能
在基于SpringBoot的开发过程中,有时候会在应用中使用定时任务,然后服务器上启动定时任务,本地就不需要开启定时任务,使用一个参数进行控制,通过查资料得知非常简单。 参数配置 在application-dev.yml中加入如下配置 …...
“确定要在不复制其属性的情况下复制此文件?”解决方案(将U盘格式由FAT格式转换为NTFS格式)
文章目录 1.问题描述2.问题分析3.问题解决3.1 方法一3.2 方法二3.3 方法三 1.问题描述 从电脑上复制文件到U盘里会出现“确定要在不复制其属性的情况下复制此文件?”提示。 2.问题分析 如果这个文件在NTFS分区上,且存在特殊的安全属性。那么把它从NT…...
视频监控系统EasyCVR如何通过调用API接口查询和下载设备录像?
智慧安防平台EasyCVR是基于各种IP流媒体协议传输的视频汇聚和融合管理平台。视频流媒体服务器EasyCVR采用了开放式的网络结构,支持高清视频的接入和传输、分发,平台提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联…...
15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条
15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条 progressBar2.setIndeterminate(true);//设置无限模式,运行查看动态效果 //创建并设置无限模式元素 ShapeElement element new ShapeElement(); element.setBounds(0,0,50,50); element.setRgbColor(new RgbColor(255,0,0)); …...
【FastAPI】路径参数
路径参数 from fastapi import FastAPIapp FastAPI()app.get("/items/{item_id}") async def read_item(item_id):return {"item_id": item_id}其中{item_id}就为路径参数 运行以上程序当访问 :http://127.0.0.1:8000/items/fastapi时候 将会…...
【docker笔记】DockerFile
DockerFile Docker镜像结构的分层 镜像不是一个单一的文件,而是有多层构成。 容器其实是在镜像的最上面加了一层读写层,在运行容器里做的任何文件改动,都会写到这个读写层。 如果删除了容器,也就是删除了其最上面的读写层&…...
React项目搭建流程
第一步 利用脚手架创建ts类型的react项目: 执行如下的命令:create-react-app myDemo --template typescript ; 第二步 清理项目目录结构: src/ index.tsx, app.txs, react-app-env.d.ts public/index.ht…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
comfyui 工作流中 图生视频 如何增加视频的长度到5秒
comfyUI 工作流怎么可以生成更长的视频。除了硬件显存要求之外还有别的方法吗? 在ComfyUI中实现图生视频并延长到5秒,需要结合多个扩展和技巧。以下是完整解决方案: 核心工作流配置(24fps下5秒120帧) #mermaid-svg-yP…...
【WebSocket】SpringBoot项目中使用WebSocket
1. 导入坐标 如果springboot父工程没有加入websocket的起步依赖,添加它的坐标的时候需要带上版本号。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dep…...
