当前位置: 首页 > news >正文

ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [ALIAS expr1],name2 [type2] [ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0,][kafka_client_id = '',][kafka_poll_timeout_ms = 0,][kafka_poll_max_batch_size = 0,][kafka_flush_interval_ms = 0,][kafka_thread_per_consumer = 0,][kafka_handle_error_mode = 'default',][kafka_commit_on_select = false,][kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECTsys_time,num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;SELECT *
FROM test_ck_sync1_resQuery id: a666f893-5be9-4022-9327-3a1507aa5485┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_readerbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT ValuesQuery id: 223a63ab-97fa-488d-8ea7-c2e194155d26Ok.4 rows in set. Elapsed: 1.054 sec. 
[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"9,"Plan","Test1","1970-01-01 08:00:00"10,"Plan","Test2","1970-01-01 08:00:00"11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

相关文章:

ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。 Kafka大家…...

Encoding类

Encoding System.Text.Encoding 是 C# 中用于处理字符编码和字符串与字节之间转换的类。它提供了各种静态方法和属性,**用于在不同字符编码之间进行转换,**以及将字符串转换为字节数组或反之。 在处理多语言文本、文件、网络通信以及其他字符数据的场景…...

标定系列——预备知识-OpenCV中实现Rodrigues变换的函数(二)

标定系列——预备知识-OpenCV中实现Rodrigues变换的函数(二) 说明记录 说明 简单介绍罗德里格斯变换以及OpenCV中的实现函数 记录...

2014年认证杯SPSSPRO杯数学建模C题(第一阶段)土地储备方案的风险评估全过程文档及程序

2014年认证杯SPSSPRO杯数学建模 C题 土地储备方案的风险评估 原题再现: 土地储备,是指市、县人民政府国土资源管理部门为实现调控土地市场、促进土地资源合理利用目标,依法取得土地,进行前期开发、储存以备供应土地的行为。土地…...

我的编程之路:从非计算机专业到Java开发工程师的成长之路 | 学习路线 | Java | 零基础 | 学习资源 | 自学

小伙伴们好,我是「 行走的程序喵」,感谢您阅读本文,欢迎三连~ 😻 【Java基础】专栏,Java基础知识全面详解:👉点击直达 🐱 【Mybatis框架】专栏,入门到基于XML的配置、以…...

Django Cookie和Session

Django Cookie和Session 【一】介绍 【1】起因 HTTP协议四大特性 基于请求响应模式:客户端发送请求,服务端返回响应基于TCP/IP之上:作用于应用层之上的协议无状态:HTTP协议本身不保存客户端信息短链接:1.0默认使用短…...

【算法刷题 | 二叉树 04】3.27(翻转二叉树、对称二叉树、完全二叉树的节点个数、平衡二叉树、完全二叉树的所有路径)

文章目录 6.翻转二叉树6.1问题6.2解法一:递归6.2.1递归思路(1)确定递归函数的参数和返回值(2)确定终止条件(3)确定单层递归的逻辑 6.2.2全部代码 6.3解法二:层序遍历 7.对称二叉树7.…...

【uniapp】uniapp实现免密登录

文章目录 一、概要二、整体架构流程三、技术名词解释四 、技术细节1.存取token有效期?2.使用setStorageSync而不使用setStorage?3.使用onLaunch而不使用全局路由? 一、概要 打开一个网页或小程序的时候,我们有时候会自动进入主页…...

2024-03-27 问AI: 介绍一下深度学习中的 Darknet

文心一言 在深度学习中,Darknet是一个开源的神经网络框架,由Joseph Redmon开发,主要用于实现深度学习算法。它以其高效的实现和速度受到广泛关注,并在许多计算机视觉竞赛中取得了优异的结果。 Darknet的特点包括: 轻…...

POJ3037 + HDU-6714

两道最短路好题 POJ3037 手玩一下 发现每一点的速度可以直接搞出来&#xff0c;就是pow(2,h[1][1]-h[i][j])*V 那么从这个点出发到达别的点的耗费的时间都是上面这个数的倒数&#xff0c;然后直接跑最短路就好了 #include<iostream> #include<vector> #include<…...

Ubuntu搭建环境Cmake-Libtorch-Torchvision-PCL-VTK-OpenCV

Ubuntu搭建环境Cmake-Libtorch-Torchvision-PCL-VTK-OpenCV 安装Cmake安装libtorch安装torchvision安装PCL安装VTK安装OpenCV设置环境变量 仅供本人记录查阅使用 安装Cmake Cmake下载地址 解压 进入目录会看到只有 bin doc man share三个文件夹&#xff0c;没有 bootstrap文…...

分享多种mfc100u.dll丢失的解决方法(一键修复DLL丢失的方法)

在使用电脑过程中&#xff0c;我们经常会遇到一些陌生的DLL文件&#xff0c;例如mfc100u.dll。这些DLL文件是动态链接库&#xff08;Dynamic Link Libraries&#xff09;的缩写&#xff0c;它们包含了可以被多个程序共享的代码和数据。今天&#xff0c;我们将深入探讨mfc100u.d…...

Redis是单线程还是多线程?(面试题)

1、Redis5及之前是单线程版本 2、Redis6开始引入多线程版本&#xff08;实际上是 单线程多线程 版本&#xff09; Redis6及之前版本&#xff08;单线程&#xff09; Redis5及之前的版本使用的是 单线程&#xff0c;也就是说只有一个 worker队列&#xff0c;所有的读写操作都要…...

动态菜单设计

需求&#xff1a; 登录不同用户 显示不同的菜单 思路&#xff1a;根据用户id 左关联表 查询出对应的菜单选项 查询SQL select distinct-- 菜单表 去除重复记录sys_menu.id,sys_menu.parentId, sys_menu.name from -- 权限表sys_menu-- 角色与权限表 菜单表id 角色菜…...

Haproxy负载均衡介绍即部署

haproxy的原理&#xff1a; 提供高可用、负载均衡以及基于TCP&#xff08;四层&#xff09;和HTTP&#xff08;七层&#xff09;应用的代理&#xff0c;支持虚拟主机&#xff0c;开源可靠的一款软件。 适用于哪些负载特别大的web站点&#xff0c;这些站点通常又需要回话保持和七…...

基于大语言模型的云故障根因分析|顶会EuroSys24论文

*马明华 微软主管研究员 2021年CCF国际AIOps挑战赛程序委员会主席&#xff08;第四届&#xff09; 2021年博士毕业于清华大学&#xff0c;2020年在佐治亚理工学院做访问学者。主要研究方向是智能运维&#xff08;AIOps&#xff09;、软件可靠性。近年来在ICSE、FSE、ATC、EuroS…...

Windows直接运行python程序

Windows直接运行python程序 一、新建bat脚本二、新建vbs脚本 一、新建bat脚本 新建bat批处理脚本&#xff0c;写入以下内容 echo off call conda activate pytorch python app.pyecho off&#xff1a;在此语句后所有运行的命令都不显示命令行本身&#xff0c;但是本身的指令是…...

经典应用丨光伏行业扫码追溯新标杆,海康机器人AI智能读码器!

去年&#xff0c;光伏发电行业持续高速发展&#xff0c;我国仅在前九个月累计装机521.08GW&#xff0c;同比增长达到45.3%&#xff0c;已成为第二大电源类型超过水电。根据《2023中国与全球光伏发展白皮书》预测&#xff0c;到2030年&#xff0c;中国能够实现国家规划的风电和光…...

逆流而上的选择-积极生活,逆流而上

首先请大家看一个故事 李明坐在公司的开放式办公区&#xff0c;耳边是键盘敲击声的交响乐&#xff0c;眼前是一行行跳跃的代码。他的眼神有些恍惚&#xff0c;显示器的蓝光在他眼镜上反射出时代的光芒&#xff0c;这光芒既耀眼又刺眼。他即将35岁&#xff0c;在这个年纪&#x…...

SpringMVC基础Controller

文章目录 Controller 的编写和配置1. Controller 注解类型2. RequestMapping 注解类型3. 编写请求方法4. 请求参数和路径变量 Controller 的编写和配置 Controller 注解和 RequestMapping 注解是 Spring MVC 最重要的两个注解。 使用基于注解的控制器的优点如下&#xff1a; …...

苹果M1/M2芯片跑自监督学习:统一内存与Metal后端实战指南

1. 项目概述&#xff1a;为什么苹果自研芯片正在悄悄改写AI训练的底层逻辑最近三个月&#xff0c;我陆续在三台不同配置的Mac上跑通了SimCLR、BYOL和MoCo v3这三套主流自监督学习&#xff08;SSL&#xff09;模型的完整训练流程——不是跑个demo&#xff0c;而是用ImageNet-1K子…...

iOS自动化测试环境搭建:Appium+Python真机与模拟器全链路通关指南

1. 为什么iOS自动化测试环境搭建总让人卡在第一步&#xff1f;“AppiumPython实现iOS自动化测试~环境搭建”——这个标题里藏着太多新手看不见的暗礁。我带过三届测试团队&#xff0c;每年都有至少7个人卡在“连不上真机”“Xcode报错找不到WebDriverAgent”“模拟器启动后白屏…...

SpinalHDL流水线设计:从时序抽象到工程实践

1. 项目概述&#xff1a;从Verilog的“线”到SpinalHDL的“流”在数字电路设计里&#xff0c;时序逻辑的流水线&#xff08;Pipeline&#xff09;是个老生常谈但又至关重要的概念。无论是为了提升系统主频&#xff0c;还是为了平衡组合逻辑路径的延迟&#xff0c;我们总免不了要…...

Token聚合平台 vs 传统云 vs AI原生云,AI推理应用怎么选?

在大模型能力深度融入生产环境的当下&#xff0c;后端 AI 架构的选择往往决定了应用的生死。从早期的“调用一个接口”到如今复杂的智能体&#xff08;Agent&#xff09;工作流&#xff0c;开发团队在底座选型上面临着两条截然不同的演进路径&#xff1a;一条是追求便利与极致轻…...

智慧防疫终端实战:从数字哨兵系统设计到落地运维全解析

1. 项目背景与核心痛点&#xff1a;为什么“数字哨兵”成了刚需&#xff1f;去年下半年&#xff0c;我参与了一个在无锡落地的智慧防疫项目&#xff0c;核心就是部署一批“数字哨兵”智能核验终端。去现场之前&#xff0c;我和很多人想的一样&#xff1a;不就是个扫健康码的机器…...

Spring AI Alibaba 1.x 系列【55】Interrupts 中断机制:静态中断源码分析

文章目录 1. interruptBefore 模式1.1 中断判断逻辑1.2 构建中断元数据1.3 返回中断响应1.4 初始化【中断执行】上下文1.5 合并状态&#xff08;BUG&#xff09;1.6 执行结束 2. interruptsAfter 模式2.1 设置 INTERRUPT_AFTER 标记2.2 动态计算下一个节点 3. 中断时机对比 1. …...

6.解决 99% 刷机故障|GPT 分区修复 + SEP 兼容检测 + 全分区备份,工程师实战手册

摘要 本文面向具备基础Linux命令行操作能力的维修工程师与高级发烧友,系统阐述主流品牌手机刷机与维修的底层逻辑与标准化操作流程。内容覆盖高通、联发科、苹果A系列三大芯片平台的刷机协议差异,提供完整的刷机工具链搭建脚本、分区备份恢复脚本、以及底层驱动级故障诊断代…...

为什么你的“cashmere sweater”总像塑料?Midjourney布料质感模拟的4个致命认知误区(附NASA纺织材料数据库对照表)

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;为什么你的“cashmere sweater”总像塑料&#xff1f;——Midjourney布料质感失真的本质悖论 当输入 cashmere sweater, soft knit, macro detail, studio lighting, photorealistic&#xff0c;Midjourney …...

基于i.MX8M Plus与5G的高性能AI边缘计算网关设计与实践

1. 项目概述&#xff1a;为什么我们需要一个“会思考”的边缘网关&#xff1f;在工业现场待久了&#xff0c;你一定会对几个场景深有感触&#xff1a;产线上几十台PLC和传感器&#xff0c;协议五花八门&#xff0c;Modbus、Profibus、CANopen&#xff0c;想统一采集数据得接一堆…...

WT32-S3-DK开发板全解析:从硬件设计到物联网项目实战

1. 项目概述&#xff1a;一块“小而全”的物联网开发板最近在捣鼓一个智能家居的传感器节点项目&#xff0c;需要一块性能足够、接口丰富、最好还带屏幕的开发板。市面上ESP32-S3的方案很多&#xff0c;但要么是核心板&#xff0c;需要自己配底板和屏幕&#xff0c;要么就是功能…...