实时从TDengine数据库采集数据到Kafka Topic
实时从TDengine数据库采集数据到Kafka Topic
- 一、认识TDengine
- 二、TDengine Kafka Connector
- 三、什么是 Kafka Connect?
- 四、前置条件
- 五、安装 TDengine Connector 插件
- 六、启动 Kafka
- 七、验证 kafka Connect 是否启动成功
- 八、TDengine Source Connector 的使用
- 九、添加 Source Connector 配置文件
- 十、准备测试数据
- 十一、创建 Source Connector 实例
- 十二、查看 topic 数据
- 十三、unload 插件
- 十四、性能调优
- 十五、配置参考
- 通用配置
- TDengine Source Connector 特有的配置
- 十六、更多内容
一、认识TDengine
TDengine是一款高性能、高稳定性的开源时间序列数据库。它是由中国的PingCAP团队开发并开源的,旨在为大规模数据存储和实时分析提供解决方案。TDengine具有以下特点:
- 高性能:TDengine使用了多种优化技术,如数据压缩、索引优化和并行计算,以实现高性能的数据写入和查询。它能够处理大规模的数据,并且在毫秒级的响应时间内提供查询结果。
- 高稳定性:TDengine具有良好的容错和恢复机制,能够保证数据的持久性和可靠性。它支持数据的多副本备份和自动故障转移,以及数据一致性和完整性的检查。
- 时间序列支持:TDengine专注于时间序列数据的存储和分析,能够高效地处理时间序列数据的写入、查询和聚合操作。它支持多种数据类型和数据模型,如数字、文本、地理位置和时间等。
- 开源:TDengine是一个开源项目,遵循Apache 2.0许可证。用户可以自由地使用、修改和分发该软件,同时也可以参与到开发和改进过程中。
- 总之,TDengine是一款专注于时间序列数据存储和分析的高性能、高稳定性的开源数据库,适用于大规模数据存储和实时分析的场景。
二、TDengine Kafka Connector
- TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。
三、什么是 Kafka Connect?
- Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。
四、前置条件
运行本教程中示例的前提条件。
- Linux 操作系统
- 已安装 Java 8 和 Maven
- 已安装 Git、curl、vi
- 已安装并启动 TDengine。
五、安装 TDengine Connector 插件
编译插件
git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package -Dmaven.test.skip=true
unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 target/components/packages/ 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: $KAFKA_HOME/components/。
配置插件
将 kafka-connect-tdengine 插件加入 $KAFKA_HOME/config/connect-distributed.properties 配置文件 plugin.path 中
plugin.path=/usr/share/java,/opt/kafka/components
六、启动 Kafka
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.propertieskafka-server-start.sh -daemon $KAFKA_HOME/config/server.propertiesconnect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
七、验证 kafka Connect 是否启动成功
输入命令:
curl http://localhost:8083/connectors
如果各组件都启动成功,会得到如下输出:
[]
八、TDengine Source Connector 的使用
TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是,先分批拉取历史数据,再用定时查询的策略同步增量数据。同时会监控表的变化,可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。
TDengine Source Connector 会将 TDengine 数据表中的数据转换成 InfluxDB Line 协议格式 或 OpenTSDB JSON 协议格式然后写入 Kafka。
下面的示例程序同步数据库 test 中的数据到主题 tdengine-test-meters。
九、添加 Source Connector 配置文件
vi source-demo.json
输入以下内容:
source-demo.json{"name":"TDengineSourceConnector","config":{"connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector","tasks.max": 1,"subscription.group.id": "source-demo","connection.url": "jdbc:TAOS://127.0.0.1:6030","connection.user": "root","connection.password": "taosdata","connection.database": "test","connection.attempts": 3,"connection.backoff.ms": 5000,"topic.prefix": "tdengine","topic.delimiter": "-","poll.interval.ms": 1000,"fetch.max.rows": 100,"topic.per.stable": true,"topic.ignore.db": false,"out.format": "line","data.precision": "ms","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}
十、准备测试数据
准备生成测试数据的 SQL 文件。
prepare-source-data.sqlDROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) \d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) \d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) \d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) \d1003 USING meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) \d1003 USING meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) \d1004 USING meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) \d1004 USING meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000);
使用 TDengine CLI, 执行 SQL 文件。
taos -f prepare-source-data.sql
十一、创建 Source Connector 实例
curl -X POST -d @source-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
十二、查看 topic 数据
使用 kafka-console-consumer 命令行工具监控主题 tdengine-test-meters 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-test-meters
输出:
......
meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
......
此时会显示所有历史数据。切换到 TDengine CLI, 插入两条新的数据:
USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
再切换回 kafka-console-consumer, 此时命令行窗口已经打印出刚插入的 2 条数据。
十三、unload 插件
测试完毕之后,用 unload 命令停止已加载的 connector。
查看当前活跃的 connector:
curl http://localhost:8083/connectors
如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload:
curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
十四、性能调优
如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期,可以尝试使用如下参数提升 Kafka 的写入吞吐量。
- 打开 KAFKA_HOME/config/producer.properties 配置文件。
- 参数说明及配置建议如下:
参数 | 参数说明 | 设置建议 |
---|---|---|
producer.type | 此参数用于设置消息的发送方式,默认值为 sync 表示同步发送,async 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async |
request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。 | 1 |
max.request.size | 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。 | 104857600 |
batch.size | 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。 | 524288 |
buffer.memory | 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。 | 1073741824 |
十五、配置参考
通用配置
以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。
- name: connector 名称。
- connector.class: connector 的完整类名, 如: com.taosdata.kafka.connect.sink.TDengineSinkConnector。
- tasks.max: 最大任务数, 默认 1。
- topics: 需要同步的 topic 列表, 多个用逗号分隔, 如 topic1,topic2。
- connection.url: TDengine JDBC 连接字符串, 如 jdbc:TAOS://127.0.0.1:6030。
- connection.user: TDengine 用户名, 默认 root。
- connection.password :TDengine 用户密码, 默认 taosdata。
- connection.attempts :最大尝试连接次数。默认 3。
- connection.backoff.ms : 创建连接失败重试时间隔时间,单位为 ms。 默认 5000。
TDengine Source Connector 特有的配置
- connection.database: 源数据库名称,无缺省值。
- topic.prefix: 数据导入 kafka 时使用的 topic 名称的前缀。默认为空字符串 “”。
- timestamp.initial: 数据同步起始时间。格式为’yyyy-MM-dd HH:mm:ss’,若未指定则从指定 DB 中最早的一条记录开始。
- poll.interval.ms: 检查是否有新建或删除的表的时间间隔,单位为 ms。默认为 1000。
- fetch.max.rows : 检索数据库时最大检索条数。 默认为 100。
- query.interval.ms: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0,即获取到当前最新时间的所有数据。
- out.format : 结果集输出格式。line 表示输出格式为 InfluxDB Line 协议格式,json 表示输出格式是 json。默认为 line。
- data.precision: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
- ms : 表示毫秒,
- us : 表示微秒
- ns : 表示纳秒。
- topic.per.stable: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 <topic.prefix><topic.delimiter><connection.database><topic.delimiter>
<stable.name>;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 <topic.prefix><topic.delimiter><connection.database> - topic.ignore.db: topic 命名规则是否包含 database 名称,true 表示规则为 <topic.prefix><topic.delimiter><stable.name>,false 表示规则为 <topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>,默认 false。此配置项在 topic.per.stable 设置为 false 时不生效。
- topic.delimiter: topic 名称分割符,默认为 -。
- read.method: 从 TDengine 读取数据方式,query 或是 subscription。默认为 subscription。
- subscription.group.id: 指定 TDengine 数据订阅的组 id,当 read.method 为 subscription 时,此项为必填项。
- subscription.from: 指定 TDengine 数据订阅起始位置,latest 或是 earliest。默认为 latest。
十六、更多内容
更多内容请参阅官网:
- https://docs.taosdata.com/third-party/collection/kafka/
相关文章:

实时从TDengine数据库采集数据到Kafka Topic
实时从TDengine数据库采集数据到Kafka Topic 一、认识TDengine二、TDengine Kafka Connector三、什么是 Kafka Connect?四、前置条件五、安装 TDengine Connector 插件六、启动 Kafka七、验证 kafka Connect 是否启动成功八、TDengine Source Connector 的使用九、添…...

Linux -- 初识动静态库
目录 为什么要有库? 静态库 什么是静态库? 特点 优点 缺点 动态库 什么是动态库? 优点 缺点 编译器会选择哪个库? 为什么要有库? 库的存在是为了提高软件开发的效率、促进代码复用以及简化维护工作。通过使用…...

vite 打包前请求接口和打包后的不一致
在使用 Vite 进行项目打包时,如果发现打包前请求接口和打包后的行为不一致,这可能是由于多种原因导致的。以下是一些可能的原因和相应的解决方案: 1. 代理配置问题 开发环境:在开发环境中,Vite 通常使用 vite.config…...

fairseq 安装包python
背景: Collecting fairseq Using cached https://pypi.tuna.tsinghua.edu.cn/packages/d7/0f/b7043b451a97eb9b4cfb1b1e23e567b947d9d7bca542403228bd53b435fe/fairseq-0.12.1.tar.gz (9.6 MB) Installing build dependencies ... done Getting requirements…...

使用Mockaroo生成测试数据
使用Mockaroo生成测试数据 最近在学习【Spring Boot & React】Spring Boot和React教程视频的P51.Generating 1000 students一课中,看到了https://www.mockaroo.com/网站可以用来模拟生成测试数据,觉得还不错,特此记录一下。感觉每次看老…...

使用频率最高的 opencv 基础绘图操作 - python 实现
以下是 opencv-python 基本操作绘制示例,绘制: 1)圆,2)矩形,3)线段,4)文本。 安装 opencv-python pip install opencv-python 在图上绘制圆的操作,示例如…...

Python 在Excel中添加数据条
在Excel中添加数据条是一种数据可视化技巧,它通过条形图的形式在单元格内直观展示数值的大小,尤其适合比较同一列或行中各个单元格的数值。这种表示方式可以让大量的数字信息一目了然。本文将介绍如何使用Python在Excel中的指定单元格区域添加数据条。 …...

Unity中搜索不到XR Interaction Toolkit包解决方法
问题: 针对Unity版本2020.3在中PackageManager可能搜素不到XR Interaction Toolkit包 在Package Manager中未显示XR Interaction Toolkit包 解决方法: Package manager左上角,点加号,选择 Add package from git URL..,…...

【前端】JQ验证每个单选按钮是否已经选择
验证每个单选题是否都已经选择,其中每个input中不带name值,直接遍历input[type"radio"]验证 <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewpor…...

【无人机设计与控制】滑模控制、反步控制、传统PID四旋翼无人机轨迹跟踪控制仿真
摘要 本文基于滑模控制、反步控制和传统PID控制,设计了针对四旋翼无人机的轨迹跟踪控制系统。通过对比这三种控制策略在四旋翼无人机轨迹跟踪中的表现,分析了各自的优缺点和适用场景。仿真结果表明,滑模控制具有更强的鲁棒性,反步…...

MongoDB 介绍
一、MongoDB 介绍 MongoDB 是一个开源的、面向文档的数据库管理系统。它采用了灵活的数据模型,以类似 JSON 的文档形式存储数据,具有高可扩展性、高性能和丰富的功能。 主要特点包括: 灵活的数据模型:文档型数据库允许存储不同…...

计算机网络:物理层 —— 物理层概述
文章目录 物理层功能物理层接口特性常见特性 相关概念 物理层(Physical Layer)是OSI(Open Systems Interconnection)模型的第一层,负责提供原始比特流传输的服务。它定义了硬件接口的电气、机械、功能和过程特性&#…...

HTTP的工作原理
HTTP(Hypertext Transfer Protocol)是一种用于在计算机网络上传输超文本数据的应用层协议。它是构成万维网的基础之一,被广泛用于万维网上的数据通信。(超文本(Hypertext)是用超链接的方法,将各种不同空间的文字信息组…...

缓存数据减轻服务器压力
问题:不是所有的数据都需要请求后端的 不是所有的数据都需要请求后端的,有些数据是重复的、可以复用的解决方案:缓存 实现思路:每一个分类为一个key,一个可以下面可以有很多菜品 前端是按照分类查询的,所以我们需要通过分类来缓存缓存代码 /*** 根据分类id查询菜品** @pa…...

【自动驾驶】控制算法(十二)横纵向综合控制 | 从理论到实战全面解析
写在前面: 🌟 欢迎光临 清流君 的博客小天地,这里是我分享技术与心得的温馨角落。📝 个人主页:清流君_CSDN博客,期待与您一同探索 移动机器人 领域的无限可能。 🔍 本文系 清流君 原创之作&…...

Python基础之List列表用法
1、创建列表 names ["张三","李四","王五","Mary"] 2、列表分片 names[1]:获取数组的第2个元素。 names[1:3]:获取数组的第2、第3个元素。包含左侧,不包含右侧。 names[:3]等同于names[0:3]&…...

视觉检测开源库-功能包框架搭建
chapt9/chapt9_ws/src,接着在目录下新建 yolov5_ros2 功能包,并添加相关依赖,完整命令如下: ros2 pkg create yolov5_ros2 --build-type ament_python --dependencies rclpy yolov5 cv_bridge sensor_msgs vision_msgs cv2 --lic…...

pytest的基础入门
pytest判断用例的成功或者失败 pytest识别用例失败时会报AssertionError或者xxxError错误,当捕获异常时pytest无法识别到失败的用例 pytest的fixture夹具 pytest的参数化 #coding:utf-8 import pytestfrom PythonProject.pytest_test.funcs.guess_point import ge…...

(31)非零均值信号的时域分析:均值、方差、与功率
文章目录 前言一、使用MATLAB生成余弦波并画图二、计算信号的均值、方差、与功率三、结果分析 前言 本文对叠加了直流分量的一段整周期余弦信号进行时域分析,使用MATLAB进行信号生成,并计算其均值、方差、与功率。最后给出对计算结果的分析,…...

架设传奇SF时提示此服务器满员,GEE引擎点开始游戏弹出服务器满员的解决方法
昨天一个朋友在架设GEE的传奇服务端时遇到一个奇怪的问题,就是在服务器外网架设时,建好角色点开始游戏提示此服务器满员,这个问题一般比较少见,而且出现的话一般都是GEE引擎的版本。 他折腾了半天,一直没进游戏&#x…...

QT day06
在QT使用数据库实现学生管理系统 头文件: #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QSqlDatabase> #include <QSqlQuery> #include <QSqlRecord> QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAME…...

微信小程序-npm支持-如何使用npm包
文章目录 1、在内建终端中打开2、npm init -y3、Vant Weapp4、通过 npm 安装5、构建 npm 1、在内建终端中打开 Windows PowerShell 版权所有 (C) Microsoft Corporation。保留所有权利。尝试新的跨平台 PowerShell https://aka.ms/pscore6PS C:\Users\dgq\WeChatProjects\minip…...

Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.xkafka 3.8整合,文末有完整项目链接 前言一、如何看官方文档(有深入了解需求的人)二、kafka的安装tar包安装docker安装 三、代码中集成创建一个测试topic:testproducer代码producer配置(配置的格式,上篇文章…...

JavaScript中的数组
1.数组的概念 数组可以把一组相关的数据一起存放,并提供方便的访问/获取方式数组是指一组数据的集合,其中每个数据称之为元素(element),在数组中可以存放任意类型的元素,数组是一种将一组数据存储在单个变量名下的优雅方式。 2.…...

UE5运行时动态加载场景角色动画任意搭配-场景角色相机动画音乐加载方法(三)
1、将场景打包为Pak并加载 1、参考这篇文章将场景打包为pak,UE4打包并加载Pak-Windows/iOS/Android不同平台Editor/Runtime不同运行模式兼容 2、在Mount Pak后直接打开Map即可 void UMapManager::OpenMap(FString Path) {UWorld* World = UGlobalManager::GetInstance()->…...

c# 中 中文、英文、数字、空格、标点符号占的字符大小
在C#中,中文、英文、数字、空格和标点符号在不同编码下所占的字节大小是不一样的。常见的编码有UTF-8、UTF-16、GB2312等。以下是在不同编码下各种字符类型所占的字节大小: UTF-8: 中文字符:3个字节 英文字符:1个字…...

前端_003_js扫盲
文章目录 var,let,const严格模式数据类型运算符事件常用对象函数绑定call() ,apply(),bind() 闭包浏览器中事件循环回调和异步Promiseasync和await DOMBOMAjax var,let,const let是var的升级版本,对于块作用域,var无法进行限制,let不会存在该…...

ValueError: You cannot perform fine-tuning on purely quantized models.
在使用peft 微调8bit 或者4bit 模型的时候,可能会报错: You cannot perform fine-tuning on purely quantized models. Please attach trainable adapters on top of the quantized model to correctly perform fine-tuning. Please see: https://huggi…...

DELL R720服务器阵列数据恢复,磁盘状态为Foreign
服务器无法正常进入系统,物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了,需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…...

VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复
打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…...