当前位置: 首页 > news >正文

ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

文章目录

  • Kafka表集成引擎
    • 配置
      • Kerberos 支持
    • 虚拟列
  • 资料分享
  • 参考文章

Kafka表集成引擎

此引擎与Apache Kafka结合使用。

Kafka 特性:

  • 发布或者订阅数据流。
  • 容错存储机制。
  • 处理流数据。

老版Kafka集成表引擎参数格式:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版Kafka集成表引擎参数格式:

Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic1,topic2',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n',kafka_schema = '',kafka_num_consumers = 2

必要参数:

  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow

可选参数:

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

格式输入输出
[TabSeparated]
[TabSeparatedRaw]
[TabSeparatedWithNames]
[TabSeparatedWithNamesAndTypes]
[Template]
[TemplateIgnoreSpaces]
[CSV]
[CSVWithNames]
[CustomSeparated]
[Values]
[Vertical]
[JSON]
[JSONAsString]
[JSONStrings]
[JSONCompact]
[JSONCompactStrings]
[JSONEachRow]
[JSONEachRowWithProgress]
[JSONStringsEachRow]
[JSONStringsEachRowWithProgress]
[JSONCompactEachRow]
[JSONCompactEachRowWithNamesAndTypes]
[JSONCompactStringsEachRow]
[JSONCompactStringsEachRowWithNamesAndTypes]
[TSKV]
[Pretty]
[PrettyCompact]
[PrettyCompactMonoBlock]
[PrettyNoEscapes]
[PrettySpace]
[Protobuf]
[ProtobufSingle]
[Avro]
[AvroConfluent]
[Parquet]
[Arrow]
[ArrowStream]
[ORC]
[RowBinary]
[RowBinaryWithNamesAndTypes]
[Native]
[Null]
[XML]
[CapnProto]
[LineAsString]
[Regexp]
[RawBLOB]

示例:

  CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');SELECT * FROM queue LIMIT 5;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_num_consumers = 4;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1')SETTINGS kafka_format = 'JSONEachRow',kafka_num_consumers = 4;

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。

SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

  1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
  2. 创建一个结构表。
  3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

示例:

  CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');CREATE TABLE daily (day Date,level String,total UInt64) ENGINE = SummingMergeTree(day, (day, level), 8192);CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM queue GROUP BY day, level;SELECT level, sum(total) FROM daily GROUP BY level;

为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

停止接收主题数据或更改转换逻辑,请 detach 物化视图:

  DETACH TABLE consumer;ATTACH TABLE consumer;

如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

配置

GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

  <!-- Global configuration options for all tables of Kafka engine type --><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "logs" --><kafka_logs><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_logs>

ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>

Kerberos 支持

对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。

示例:

  <!-- Kerberos-aware Kafka --><kafka><security_protocol>SASL_PLAINTEXT</security_protocol><sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab><sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal></kafka>

虚拟列

  • _topic – Kafka 主题。
  • _key – 信息的键。
  • _offset – 消息的偏移量。
  • _timestamp – 消息的时间戳。
  • _timestamp_ms – 消息的时间戳(毫秒)。
  • _partition – Kafka 主题的分区。

资料分享

ClickHouse经典中文文档分享

参考文章

  • ClickHouse(01)什么是ClickHouse,ClickHouse适用于什么场景
  • ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
  • ClickHouse(03)ClickHouse怎么安装和部署
  • ClickHouse(04)如何搭建ClickHouse集群
  • ClickHouse(05)ClickHouse数据类型详解
  • ClickHouse(06)ClickHouse建表语句DDL详细解析
  • ClickHouse(07)ClickHouse数据库引擎解析
  • ClickHouse(08)ClickHouse表引擎概况
  • ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
  • ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
  • ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
  • ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
  • ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
  • ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
  • ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
  • ClickHouse(16)ClickHouse日志引擎Log详细解析
  • ClickHouse(17)ClickHouse集成JDBC表引擎详细解析
  • ClickHouse(18)ClickHouse集成ODBC表引擎详细解析
  • ClickHouse(19)ClickHouse集成Hive表引擎详细解析
  • ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析

相关文章:

ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

文章目录 Kafka表集成引擎配置Kerberos 支持 虚拟列 资料分享参考文章 Kafka表集成引擎 此引擎与Apache Kafka结合使用。 Kafka 特性&#xff1a; 发布或者订阅数据流。容错存储机制。处理流数据。 老版Kafka集成表引擎参数格式&#xff1a; Kafka(kafka_broker_list, kaf…...

JSP-概念

一、引子 很多读者可能听过JSP&#xff0c;并且知道这是一门过时的技术了。在Spring&#xff0c;SpringBoot已经成为主流的今天&#xff0c;笔者为什么还要介绍JSP的相关内容呢&#xff1f;笔者常常提到一个概念&#xff1a;理解一门技术&#xff0c;要理解这个技术为什么产生…...

sqlite插入语句id自增列问题

sqlite给主键id设置AUTOINCREMENT自增在插入数据的时候报错table has x columns but x-1 values were supplied 为什么自增列要显示不提供,sqlite需要提供自增列table ResTools has 7 columns but 6 values were supplied SQL Statement:insert into ResTools values(管理系统w…...

C#,字符串匹配(模式搜索)AC(Aho Corasick)算法的源代码

Aho-Corasick算法简称AC算法&#xff0c;也称为AC自动机(Aho-Corasick)算法&#xff0c;1975年产生于贝尔实验室&#xff08;The Bell Labs&#xff09;&#xff0c;是一种用于解决多模式字符串匹配的经典算法之一。 the Bell Lab 本文的运行效果&#xff1a; AC算法以模式树…...

【网络取证篇】Windows终端无法使用ping命令解决方法

【网络取证篇】Windows终端无法使用ping命令解决方法 以Ping命令为例&#xff0c;最近遇到ping命令无法使用的情况&#xff0c;很多情况都是操作系统"环境变量"被改变或没有正确配置导致—【蘇小沐】 目录 1、实验环境&#xff08;一&#xff09;无法ping命令 &a…...

electron+vue网页直接播放RTSP视频流?

目前大部分摄像头都支持RTSP协议&#xff0c;但是在浏览器限制&#xff0c;最新版的浏览器都不能直接播放RTSP协议&#xff0c;Electron 桌面应用是基于 Chromium 内核的&#xff0c;所以也不能直接播放RTSP&#xff0c;但是我们又有这个需求怎么办呢&#xff1f; 市场上的方案…...

【Delphi 基础知识 19】Assigned的用法

在Delphi中&#xff0c;Assigned 是一个用于检查指针是否已分配内存的函数。它通常用于检查对象或指针是否已经被分配内存&#xff0c;以避免在未分配内存的情况下引用或操作它。 以下是 Assigned 的一些用法示例&#xff1a; 检查对象是否已分配内存&#xff1a; varMyObject…...

多线程在编程中的重要性有什么?并以LabVIEW为例进行说明

多线程在编程中的重要性体现在以下几个方面&#xff1a; 并行处理&#xff1a; 多线程允许程序同时执行多个任务&#xff0c;这在现代多核心处理器上尤其重要。通过并行处理&#xff0c;可以显著提高程序的执行效率和响应速度。 资源利用最大化&#xff1a; 通过多线程&#x…...

K8S---kubectl top

一、简介 该命令类似于linux–top命令,用于显示node和pod的CPU和内存使用情况 二、命令行 1、help命令 k top --help Display resource (CPU/memory) usage. The top command allows you to see the resource consumption for nodes or pods. This command requires Metri…...

Linux部署前后端项目

部署SpringBoot项目 创建SpringBoot项目 先确保有一个可以运行的springboot项目&#xff0c;这里就记录创建项目的流程了&#xff0c;可以自行百度。 命令行启动 2.1、在linux中&#xff0c;我是在data目录下新创建的一个project目录&#xff08;此目录创建位置不限制&…...

一文搞懂系列——Linux C线程池技术

背景 最近在走读诊断项目代码时&#xff0c;发现其用到了线程池技术&#xff0c;感觉耳目一新。以前基本只是听过线程池&#xff0c;但是并没有实际应用。对它有一丝的好奇&#xff0c;于是趁这个机会深入了解一下线程池的实现原理。 线程池的优点 线程池出现的背景&#xf…...

stable diffusion代码学习笔记

前言&#xff1a;本文没有太多公式推理&#xff0c;只有一些简单的公式&#xff0c;以及公式和代码的对应关系。本文仅做个人学习笔记&#xff0c;如有理解错误的地方&#xff0c;请指出。 本文包含stable diffusion入门文献和不同版本的代码。 文献资源 本文学习的代码&…...

腾讯云服务器怎么买?两种购买方式更省钱

腾讯云服务器购买流程很简单&#xff0c;有两种购买方式&#xff0c;直接在官方活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动…...

基于SpringBoot自定义控制是否需要开启定时功能

在基于SpringBoot的开发过程中&#xff0c;有时候会在应用中使用定时任务&#xff0c;然后服务器上启动定时任务&#xff0c;本地就不需要开启定时任务&#xff0c;使用一个参数进行控制&#xff0c;通过查资料得知非常简单。 参数配置 在application-dev.yml中加入如下配置 …...

“确定要在不复制其属性的情况下复制此文件?”解决方案(将U盘格式由FAT格式转换为NTFS格式)

文章目录 1.问题描述2.问题分析3.问题解决3.1 方法一3.2 方法二3.3 方法三 1.问题描述 从电脑上复制文件到U盘里会出现“确定要在不复制其属性的情况下复制此文件&#xff1f;”提示。 2.问题分析 如果这个文件在NTFS分区上&#xff0c;且存在特殊的安全属性。那么把它从NT…...

视频监控系统EasyCVR如何通过调用API接口查询和下载设备录像?

智慧安防平台EasyCVR是基于各种IP流媒体协议传输的视频汇聚和融合管理平台。视频流媒体服务器EasyCVR采用了开放式的网络结构&#xff0c;支持高清视频的接入和传输、分发&#xff0c;平台提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联…...

15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条

15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条 progressBar2.setIndeterminate(true);//设置无限模式,运行查看动态效果 //创建并设置无限模式元素 ShapeElement element new ShapeElement(); element.setBounds(0,0,50,50); element.setRgbColor(new RgbColor(255,0,0)); …...

【FastAPI】路径参数

路径参数 from fastapi import FastAPIapp FastAPI()app.get("/items/{item_id}") async def read_item(item_id):return {"item_id": item_id}其中{item_id}就为路径参数 运行以上程序当访问 &#xff1a;http://127.0.0.1:8000/items/fastapi时候 将会…...

【docker笔记】DockerFile

DockerFile Docker镜像结构的分层 镜像不是一个单一的文件&#xff0c;而是有多层构成。 容器其实是在镜像的最上面加了一层读写层&#xff0c;在运行容器里做的任何文件改动&#xff0c;都会写到这个读写层。 如果删除了容器&#xff0c;也就是删除了其最上面的读写层&…...

React项目搭建流程

第一步 利用脚手架创建ts类型的react项目&#xff1a; 执行如下的命令&#xff1a;create-react-app myDemo --template typescript &#xff1b; 第二步 清理项目目录结构&#xff1a; src/ index.tsx, app.txs, react-app-env.d.ts public/index.ht…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态&#xff08;编译时多态&#xff09; 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1&#xff09;.协变 2&#xff09;.析构函数的重写 5.override 和 final关键字 1&#…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

LabVIEW双光子成像系统技术

双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制&#xff0c;展现出显著的技术优势&#xff1a; 深层组织穿透能力&#xff1a;适用于活体组织深度成像 高分辨率观测性能&#xff1a;满足微观结构的精细研究需求 低光毒性特点&#xff1a;减少对样本的损伤…...