探索ClickHouse——连接Kafka和Clickhouse
安装Kafka
新增用户
sudo adduser kafka
sudo adduser kafka sudo
su -l kafka
安装JDK
sudo apt-get install openjdk-8-jre
下载解压kafka
可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。
mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1
配置
配置kafka
vim ~/kafka/config/server.properties
将下面这行加入文件的末尾
# ~/kafka/config/server.properties
delete.topic.enable=true
同时修改log的路径
# ~/kafka/config/server.properties
log.dirs=/home/kafka/logs
创建zookeeper service
sudo vim /etc/systemd/system/zookeeper.service
将下面内容填入上述文件中
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target
创建kafka service
sudo vim /etc/systemd/system/kafka.service
将下面内容填入上述文件中
[Unit]
Requires=zookeeper.service
After=zookeeper.service[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target
启动kafka#
启动服务
sudo systemctl start kafka
查看状态
sudo systemctl status kafka
● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago
Main PID: 3561758 (sh)
Tasks: 42 (limit: 2143)
Memory: 292.4M
CPU: 2.768s
CGroup: /system.slice/kafka.service
├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1”
└─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/>
Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.
可以看到kafka已经处于running状态。
测试
创建Topic
~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
发送消息
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
订阅Topic
新启动一个界面,执行下面命令
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
它会收到上面发的消息
Hello, World
连接
创建表
使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。
clickhouse-client --stream_like_engine_allow_direct_select 1
CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;
CREATE TABLE uk_price_paid_from_kafka
(
uuid_stringString,
price_stringString,
timeString,
postcodeString,
aString,
bString,
cString,
addr1String,
addr2String,
streetString,
localityString,
townString,
districtString,
countyString,
dString,
eString
)
ENGINE = Kafka
SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1
Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28
Ok.
0 rows in set. Elapsed: 0.008 sec.
给kafka发送消息
~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic
进入消息输入模式,发送下面两个消息
"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A"
"{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A"

Clickhouse收到消息
在clickhouse-client交互终端中执行下面指令:
select * from uk_price_paid_from_kafka;

可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。
问题
后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。
参考资料
- https://openjdk.org/install/
- https://kafka.apache.org/quickstart
- https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries
- https://cloud.tencent.com/developer/article/1892086
- https://sineyuan.github.io/post/clickhouse-kafka/
相关文章:
探索ClickHouse——连接Kafka和Clickhouse
安装Kafka 新增用户 sudo adduser kafka sudo adduser kafka sudo su -l kafka安装JDK sudo apt-get install openjdk-8-jre下载解压kafka 可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否…...
基于监督学习的多模态MRI脑肿瘤分割,使用来自超体素的纹理特征(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
【RocketMQ】(八)Rebalance负载均衡
消费者负载均衡,是指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息,这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不…...
线性筛和埃氏筛
线性筛: #define _CRT_SECURE_NO_WARNINGS #include<iostream> #include<cstdio> #include<cstdlib> #include<string> #include<cstring> #include<cmath> #include<ctime> #include<algorithm> #include<ut…...
【Java 进阶篇】JDBC ResultSet 类详解
在Java应用程序中,与数据库交互通常涉及执行SQL查询以检索数据。一旦执行查询,您将获得一个ResultSet对象,该对象包含查询结果的数据。本文将深入介绍ResultSet类,它是Java JDBC编程中的一个核心类,用于处理查询结果。…...
Centos7常用服务脚本(.service)
Centos7常用服务脚本(.service) 注意:[Service]中配置路径必须使用绝对路径。 启停: systemctl { start | stop | restart | reload } xxx.service 自启动: systemctl { enable | disable } xxx.service nginx.se…...
MySQL 视图View的SQL语法和更新(视图篇 二)
视图语法基本操作 创建 -- [ ]表示可选 create [or replace] view 视图名称[(列名列表)] as select语句 [ with [cascaded | local ] check option ]; 添加(虽然视图是虚拟表,但是向视图操作的数据实际上会影响到实际关联的表数据) -- 视图添…...
38 翻转二叉树
翻转二叉树 理解题意,翻转即每个结点的左右子树翻转/对调题解1 递归——自下而上题解2 迭代——自上而下 给你一棵二叉树的根节点 root ,翻转这棵二叉树,并返回其根节点。 提示: 树中节点数目范围在 [0, 100] 内-100 < Node.…...
数据结构-快速排序-C语言实现
引言:快速排序作为一种非常经典且高效的排序算法,无论是工作还是面试中广泛用到,作为一种分治思想,需要熟悉递归思想。下面来讲讲快速排序的实现和改进。 老规矩,先用图解来理解一下:(这里使用快…...
玩客云Armbian_23.08.0-trunk_Onecloud_bookworm_edge_6.4.14.burn配置
固定IP # interface file auto-generated by buildrootauto lo iface lo inet loopback// 上面是默认的内容,下面是新增的内容,上下之间需要一个空行隔开 // 接口顶格写,属性的前面有一个tab的缩进 # The primary network interfaceauto eth0 iface eth0 inet staticaddress 1…...
Nginx查找耗时的接口
Nginx查找耗时的接口 # grep 是筛选的域名 awk中的$5是判断的状态码 sort中的15是指的upstream_response_time 当然也可以统计request_time的时间cat access.log | grep zhhll.icu | awk $5 200{print $0} | sort -k 15 -n -r | head -10 https://zhhll.icu/2021/linux/实…...
C++ Primer 一 变量和基本类型
本章讲解C内置的数据类型(如:字符、整型、浮点数等)和自定义数据类型的机制。下一章讲解C标准库里面定义的更加复杂的数据类型,比如可变长字符串和向量等。 1.基本内置类型 C内置的基本类型包括:算术类型和空类型。算…...
实体行业数字化转型怎么做?线上线下相结合的新零售体系怎么做?
如今,实体行业想要取得收入增长,只做线下业务或者只做线上业务,在当前的市场环境中是难以长久生存的,因此一定要线上线下相结合,将流量运作与线下转化进行充分结合,才能更好地发挥实体优势,带来…...
JAVA面经整理(5)
创建线程池不是说现用先创建,而是要是可以复用线程池中的线程,就很好地避免了大量用户态和内核态的交互,不需要频繁的创建和销毁线程 一)什么是池化技术?什么是线程池? 1)池化技术是提前准备好一些资源,在…...
【牛客网-面试必刷TOP101】二分查找题目
目录 二维数组中的查找_牛客题霸_牛客网 (nowcoder.com) 寻找峰值_牛客题霸_牛客网 (nowcoder.com) 数组中的逆序对_牛客题霸_牛客网 (nowcoder.com) 旋转数组的最小数字_牛客题霸_牛客网 (nowcoder.com) 二维数组中的查找_牛客题霸_牛客网 (nowcoder.com) 题意:…...
【QT】自定义组件ui类添加到主ui界面方法
1.添加自定义组件到项目中 add new选择如下 写好类方法,确定即可 2.将新创建的ui类加入到主ui界面 选中新创建ui类的父类空块,右键选择提升为 选择并添加新创建的类...
FFmpeg 多图片合成视频带字幕和音乐+特效(淡入淡出,圆圈黑色淡出)
FFmpeg 多图片合成视频带字幕和音乐+特效(淡入淡出,圆圈黑色淡出) 效果图1. 报错及解决2. xfade、xfade_opeccl 特效切换3. ffmpeg命令行详解4. 源码4.1 auto.bash4.2 geneFade.py4.3 python moviepy合并视频及音频按照(视频长度截取对应的音频在合并)4.4 命令行记录参考这…...
上网Tips: Linux截取动态效果图工具_byzanz
链接1 链接2 安装: sudo apt-get install byzanz 查看指令 说明 byzanz-record --help日常操作 xwininfo点击 待录制窗口 左上角 byzanz-record -x 72 -y 64 -w 1848 -h 893 -d 10 --delay5 -c /home/xixi/myGIF/test.gif小工具 获取鼠标坐标 xdotool getm…...
下载盗版网站视频并将.ts视频文件合并
. 1.分析视频请求123 2.数据获取和拼接 1.分析视频请求 1 通过抓包观察我们发现视频是由.ts文件拼接成的每一个.ts文件代表一小段2 通过观察0.ts和1.ts的url我们发现他们只有最后一段不同我们网上找到url获取的包3 我们发现index.m3u8中储存着所有的.ts文件名在拼接上前面固定…...
ElasticSearch - 基于 拼音分词器 和 IK分词器 模拟实现“百度”搜索框自动补全功能
目录 一、自动补全 1.1、效果说明 1.2、安装拼音分词器 1.3、自定义分词器 1.3.1、为什么要自定义分词器 1.3.2、分词器的构成 1.3.3、自定义分词器 1.3.4、面临的问题和解决办法 问题 解决方案 1.4、completion suggester 查询 1.4.1、基本概念和语法 1.4.2、示例…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...
Git常用命令完全指南:从入门到精通
Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...
MyBatis中关于缓存的理解
MyBatis缓存 MyBatis系统当中默认定义两级缓存:一级缓存、二级缓存 默认情况下,只有一级缓存开启(sqlSession级别的缓存)二级缓存需要手动开启配置,需要局域namespace级别的缓存 一级缓存(本地缓存&#…...
【UE5 C++】通过文件对话框获取选择文件的路径
目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 ,这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器,右键点击 .uproject 文件,选择 "Generate Visual Studio project files",重…...
深入解析光敏传感技术:嵌入式仿真平台如何重塑电子工程教学
一、光敏传感技术的物理本质与系统级实现挑战 光敏电阻作为经典的光电传感器件,其工作原理根植于半导体材料的光电导效应。当入射光子能量超过材料带隙宽度时,价带电子受激发跃迁至导带,形成电子-空穴对,导致材料电导率显著提升。…...
