当前位置: 首页 > news >正文

高效数据传输:轻松上手将Kafka实时数据接入CnosDB

本篇我们将主要介绍如何在 Ubuntu 22.04.2 LTS 环境下,实现一个Kafka+Telegraf+CnosDB 同步实时获取流数据并存储的方案。在本次操作中,CnosDB 版本是2.3.0,Kafka 版本是2.5.1,Telegraf 版本是1.27.1 

随着越来越多的应用程序架构转向微服务或无服务器结构,应用程序和服务的数量每天都在增加。用户既可以通过实时聚合,也可以通过输出为测量或指标的计算,来处理数量不断增加的时间序列数据。面对产生的海量数据,用户可以通过多种方式来捕获和观察系统中数据的变化,在云原生环境中,最流行的一种是使用事件。

Apache Kafka是一个耐用、高性能的消息系统,也被认为是分布式流处理平台。它可应用于许多用例,包括消息传递、数据集成、日志聚合和指标。而就指标而言,仅有消息主干或代理是不够的。虽然 Apache Kafka 很耐用,但它并不是为运行指标和监控查询而设计的。这恰恰正是 CnosDB 的长处。

架构方案

通过将这Kafka、Telegraf和CnosDB 三者结合起来,可以实现数据的完整流程:

  1. 数据生成:使用传感器、设备或其他数据源产生数据,并将其发送到Kafka主题。
  2. Kafka 消息队列:Kafka 接收并存储数据流,确保数据安全和可靠性。
  3. Telegraf 消费者:Telegraf 作为 Kafka 的消费者,订阅 Kafka 主题并获取数据流。
  4. CnosDB 数据存储:经过预处理的数据由 Telegraf 发送到 CnosDB 中进行时序数据的存储。

整体的应用程序架构如图所示:

图片

Kafka

Apache Kafka 是一个开源分布式流处理平台,它被设计用于处理实时数据流,具有高可靠性、高吞吐量和低延迟的特点,目前已经被大多数公司使用。它的使用方式非常多样化,包括:

  • 流处理:它通过存储实时事件以进行聚合、丰富和处理来提供事件主干。
  • 指标:Apache Kafka 成为许多分布式组件或应用程序(例如微服务)的集中聚合点。这些应用程序可以发送实时指标以供其他平台使用,包括 CnosDB。
  • 数据集成:可以捕获数据和事件更改并将其发送到 Apache Kafka,任何需要对这些更改采取行动的应用程序都可以使用它们。
  • 日志聚合:Apache Kafka 可以充当日志流平台的消息主干,将日志块转换为数据流。

几个核心概念

  1. 实例(Broker):Kafka的Broker是Kafka集群中的服务器节点,负责存储和转发消息,提供高可用性、容错性和可靠性。
  2. 主题(Topic):Apache Kafka 中的 topic ,是逻辑存储单元,就像关系数据库的表一样。主题通过分区通过代理进行分发,提供可扩展性和弹性。
  3. 生产者(Producer):生产者将消息发布到Kafka的指定主题。生产者可以选择将消息发送到特定的分区,也可以让Kafka自动决定分配策略。
  4. 消费者(Consumer):消费者从指定主题的一个或多个分区中读取消息。消费者可以以不同的方式进行组织,如单播、多播、消费者组等。
  5. 发布-订阅模式:是指生产者将消息发布到一个或多个主题,而消费者可以订阅一个或多个主题,从中接收并处理消息。

简单来说就是,当客户端将数据发送到 Apache Kafka 集群实例时,它必须将其发送到某个主题。

此外,当客户端从 Apache Kafka 集群读取数据时,它必须从主题中读取。向 Apache Kafka 发送数据的客户端成为生产者,而从 Kafka 集群读取数据的客户端则成为消费者。数据流向示意图如下:

图片

注:这里没有引入更复杂的概念,如topic分区、偏移量、消费者组等,用户可自行参考官方指导文档学习:

Kafka:【https://kafka.apache.org/documentation/#gettingStarted】

部署 Kafka

下载并安装Kafka【https://kafka.apache.org/】

1.前提:需确保有 JDK 环境和 Zookeeper 环境,如果没有可以使用下面的命令进行安装:

sudo apt install openjdk-8-jdk
sudo apt install zookeeper

2.下载 Kafka 安装包并解压

wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
tar -zxvf kafka_2.12-2.5.1.tgz

3.进入解压后的 Kafka 目录

cd  kafka_2.12-2.5.1

4.修改$KAFKA_HOME/config/server.properties的配置文件(可按需修改端口、日志路径等配置信息)

5.保存并关闭编辑器。运行下面的命令来启动Kafka:

bin/kafka-server-start.sh config/server.properties

Kafka 将在后台运行,并通过默认的 9092 端口监听连接。

Telegraf

Telegraf 是一个开源的服务器代理程序,用于收集、处理和传输系统和应用程序的指标数据。Telegraf 支持多种输入插件和输出插件,并且能够与各种不同类型的系统和服务进行集成。它可以从系统统计、日志文件、API 接口、消息队列等多个来源采集指标数据,并将其发送到各种目标,如 CnosDB 、Elasticsearch、Kafka、Prometheus 等。这使得 Telegraf 非常灵活,可适应不同的监控和数据处理场景。

  • 轻量级:Telegraf被设计为一个轻量级的代理程序,对系统资源的占用相对较小,可以高效运行在各种环境中。
  • 插件驱动:Telegraf使用插件来支持各种输入和输出功能。它提供了丰富的插件生态系统,涵盖了众多的系统和服务。用户可以根据自己的需求选择合适的插件来进行指标数据的采集和传输。
  • 数据处理和转换:Telegraf具有灵活的数据处理和转换功能,可以通过插件链(Plugin Chain)来对采集到的指标数据进行过滤、处理、转换和聚合,从而提供更加精确和高级的数据分析。

部署 Telegraf

1.安装 Telegraf

sudo apt-get update && sudo apt-get install telegraf

2.切换到 Telegraf 的默认配置文件所处目录 /etc/telegraf 下

3.在配置文件 telegraf.config 中添加目标 OUTPUT PLUGIN

[[outputs.http]]url = "http://127.0.0.1:8902/api/v1/write?db=telegraf"timeout = "5s"method = "POST"username = "root"password = ""data_format = "influx"use_batch_format = truecontent_encoding = "identity"idle_conn_timeout = 10

按需修改的参数:

url:CnosDB 地址和端口
username:连接 CnosDB 的用户名
password:连接 CnosDB 的用户名对应的密码

注:其余参数可与上述配置示例中保持一致

4.在配置文件中将下面的配置注释放开,可按需修改

[[inputs.kafka_consumer]]
brokers = ["127.0.0.1:9092"]
topics = ["oceanic"]
data_format = "json"

参数:

brokers:Kafka 的 broker list 
topics:指定写入 Kafka 目标的 topic
data_format:写入数据的格式

注:其余参数可与上述配置示例中保持一致

5.启动 Telegraf

telegraf -config /etc/telegraf/telegraf.conf

CnosDB

部署 CnosDB

详细操作请参考: CnosDB 安装

【https://docs.cnosdb.com/zh/latest/start/install.html】

整合

Kafka创建topic

1.进入 kafka 的 bin 文件夹下

2.执行命令,创建 topic

./kafka-topics.sh --create --topic oceanic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Python 模拟写入数据到Kakfa

1.编写代码:

import time
import json
import randomfrom kafka import KafkaProducerdef random_pressure():return round(random.uniform(0, 10), 1)def random_tempreture():return round(random.uniform(0, 100), 1)def random_visibility():return round(random.uniform(0, 100), 1)def get_json_data():data = {}data["pressure"] = random_pressure()data["temperature"] = random_temp_cels()data["visibility"] = random_visibility()return json.dumps(data) def main():producer = KafkaProducer(bootstrap_servers=['ip:9092'])for _ in rang(2000):json_data = get_json_data()producer.send('oceanic', bytes(f'{json_data}','UTF-8'))print(f"Sensor data is sent: {json_data}")time.sleep(5)if __name__ == "__main__":main()

2.运行Python脚本

python3 test.py

查看 kafka topic 中的数据

1.执行下面查看指定 topic 数据的命令

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic oceanic --from-beginning

图片

查看同步到 CnosDB 中的数据

1.使用工具连接到CnosDB

cnosdb-cli

2.切换到指定库

\c public

3.查看数据

select * from kafka_consumer;

图片

补充阅读

1.使用 Telegraf 采集数据并写入 CnosDB:

https://docs.cnosdb.com/zh/latest/versatility/collect/telegraf.html

2.Python 连接器:

https://docs.cnosdb.com/zh/latest/reference/connector/python.html

3.CnosDB 快速开始:

https://docs.cnosdb.com/zh/latest/start/quick_start.html

相关文章:

高效数据传输:轻松上手将Kafka实时数据接入CnosDB

本篇我们将主要介绍如何在 Ubuntu 22.04.2 LTS 环境下,实现一个KafkaTelegrafCnosDB 同步实时获取流数据并存储的方案。在本次操作中,CnosDB 版本是2.3.0,Kafka 版本是2.5.1,Telegraf 版本是1.27.1 随着越来越多的应用程序架构转…...

【探索Linux】—— 强大的命令行工具 P.3(Linux开发工具 vim)

阅读导航 前言vim简介概念特点 vim的相关指令vim命令模式(Normal mode)相关指令插入模式(Insert mode)相关指令末行模式(last line mode)相关指令 简单vim配置(附配置链接)温馨提示 前言 前面我们讲了C语言的基础知识,也了解了一些数据结构&…...

AgentBench::AI智能体发展的潜在问题一

从历史上看,几乎每一种新技术的广泛应用都会在带来新机遇的同时引发很多新问题,AI智能体也不例外。从目前的发展看,AI智能体的发展可能带来的新问题可能包括如下方面: 第一是它可能带来涉及个人数据、隐私,以及知识产权的法律纠纷的大幅增长。要产生一个优秀的AI智能体,除…...

【2023年11月第四版教材】《第5章-信息系统工程之软件工程(第二部分)》

《第5章-信息系统工程之软件工程(第二部分)》 1.3 软件设计1.4 软件实现[补充第三版教材内容] 1.5 部署交付 1.3 软件设计 1、结构化设计SD是一种面向数据流的方法,它以SRS和SA阶段所产生的DFD和数据字 典等文档为基础…...

OpenCV(二)——图像基本处理(二)

目录 2.图像的几何变换 2.1 图像平移 2.2 图像缩放 2.3 图像旋转 2.4 仿射变换 2.5 透视变换...

Redis—缓存

目录标题 缓存雪崩发生场景解决方案针对Redis宕机的缓存雪崩解决方案 缓存击穿发生场景解决方案 缓存穿透发生场景解决方案布隆过滤器 数据库和缓存数据一致性 缓存雪崩 大量缓存数据在同一时间过期(失效)或者 Redis 故障宕机时,如果此时有大…...

第三章 图论 No.10无向图的双连通分量

文章目录 定义Tarjan求e-DCCTarjan求v-DCC395. 冗余路径1183. 电力396. 矿场搭建 定义 无向图有两种双连通分量 边双连通分量,e-DCC点双连通分量,v-DCC 桥:删除这条无向边后,图变得不连通,这条边被称为桥 边双连通分…...

Java学习手册——第二篇面向对象程序设计

Java学习手册——第二篇面向对象 1. 结构化程序设计2. 面向对象 第一章我们已经介绍了Java语言的基础知识,也知道他能干什么了, 那我们就从他的设计思想开始入手吧。 接触一个语言之前首先要知道他的大方向,设计思想是什么样的, 这…...

Redis实战:Redis的安装及简单使用

本片将介绍 Redis 的安装及简单使用 文章目录 1、Redis安装1.1、Windows下Redis的安装1.2、Linux下Redis的安装1.3、Mac下Redis的安装(使用Homebrew) 2、Redis使用2.1、启动服务端客户端2.2、Redis简单命令 3、Redis命令大全 1、Redis安装 1.1、Windows…...

Linux学习之初识Linux

目录 一.Linux的发展历史及概念 1.什么是Linux UNIX发展的历史: Linux发展历史: 2. 开源 商业化发行版本 二. 如何搭建Linux环境 Linux 环境的搭建方式主要有三种: 1. 直接安装在物理机上 2. 使用虚拟机软件 3. 使用云服务器 三. …...

神经网络基础-神经网络补充概念-29-为什么使用深层表示

概念 深层表示(Deep Representation)是指在深度神经网络的多个隐藏层中逐层提取和学习数据的特征表示。 使用深层表示的原因 高维特征提取:深层神经网络可以从原始数据中自动学习高维抽象特征。每个隐藏层都对数据进行一些变换&#xff0c…...

2023最新水果编曲软件FL Studio 21.1.0.3267音频工作站电脑参考配置单及系统配置要求

音乐在人们心中的地位日益增高,近几年音乐选秀的节目更是层出不穷,喜爱音乐,创作音乐的朋友们也是越来越多,音乐的类型有很多,好比古典,流行,摇滚等等。对新手友好程度基本上在首位,…...

边缘计算:下一代计算模式的突破

章节一:引言 随着物联网、人工智能和大数据等技术的不断发展,计算需求变得越来越复杂,传统的云计算模式已经难以满足快速增长的数据处理需求。在这样的背景下,边缘计算作为一种全新的计算模式崭露头角,为我们带来了更加…...

连接不上手机,adb devices为空:

首先说明一下,我是已经安装了android studio,也配置了环境变量,但是还是连接不上手机 解决方案: 1.打开开发者模式 https://product.pconline.com.cn/itbk/sjtx/sjwt/1424/14246015.html 2.开启usb调试 https://baiyunju.cc/10770 最后成功…...

vuex学习总结

一、vuex工作原理 工作流程:需求:改变组件count的sun变量的值,先调用dispatch函数传入jia函数和要改变的值给actions(这个actions里面必须有jia这个函数);actions收到后调用commit函数将jia方法和值传给mut…...

11. Docker Swarm(二)

1、前言 上一篇中我们利用Docker Swarm搭建了基础的集群环境。那么今天我们就来验证以下该集群的可用性。上一篇的示例中,我创建了3个实例副本,并且通过访问http://192.168.74.132:8080得到我们的页面。 2、验证高可用 1)我们可以通过以下命…...

注册中心Eureka和Nacos,以及负载均衡Ribbon

1.初识微服务 1.1.什么是微服务 微服务,就是把服务拆分成为若干个服务,降低服务之间的耦合度,提供服务的独立性和灵活性。做到高内聚,低耦合。 1.2.单体架构和微服务架构的区别: 单体架构:简单方便&#…...

php+tcpdf生成pdf:中文乱码

亲测成功,感谢分享! 查看原文 TCPDF是一个生成PDF的不错的库,可惜,官方对包括中文在内的东亚字体支持不怎么样的。 场景:某项目需要根据数据库信息生成pdf格式的发票,考虑采用稳定的tcpdf,虽然…...

【AI实战】BERT 文本分类模型自动化部署之 dockerfile

【AI实战】BERT 文本分类模型自动化部署之 dockerfile BERTBERT 文本分类模型基于中文预训练bert的文本分类模型针对多分类模型的loss函数样本不均衡时多标签分类时 dockerfile编写 dockerfilebuild镜像运行docker测试服务 参考 本文主要介绍: 基于BERT的文本分类模…...

深入理解 Flutter 图片加载原理 | 京东云技术团队

前言 随着Flutter稳定版本逐步迭代更新,京东APP内部的Flutter业务也日益增多,Flutter开发为我们提供了高效的开发环境、优秀的跨平台适配、丰富的功能组件及动画、接近原生的交互体验,但随之也带来了一些OOM问题,通过线上监控信息…...

Spring Boot 支持多种环境,包括开发环境、测试环境、预发布环境和生产环境。

Spring Boot 支持多种环境,包括开发环境、测试环境、预发布环境和生产环境。不同的环境具有不同的配置,可以在不同的环境中对应用程序进行测试、验证和部署。以下是每种环境的用途和相应的代码案例。 开发环境 开发环境是开发人员在本地进行开发的环境&…...

Ctfshow web入门 命令执行RCE篇 web29-web77 与 web118-web124 详细题解 持续更新中(预计8.18完成)~

Ctfshow 命令执行 web29 pregmatch是正则匹配函数,匹配是否包含flag,if(!preg_match("/flag/i", $c)),/i忽略大小写 可以利用system来间接执行系统命令 flag采用f*绕过,或者mv fl?g.php 1.txt修改文件名&#xff0c…...

合宙Air724UG LuatOS-Air script lib API--wifiRil

wifiRil Table of Contents wifiRil wifiRil.regRsp(head, fnc, typ, formt) wifiRil.regUrc(prefix, handler) wifiRil.deRegUrc(prefix) wifiRil.request(cmd, arg, onrsp, delay, param) wifiRil 模块功能:esp8266 wifi模块AT命令交互管理 wifiRil.regRsp(head,…...

python读取word/pdf文档,指定文字内容和图片

读编号转文件夹目录然后放图片进去那个 一 先将word转为PDF pdf 读起来比较方便, 按页码读取文件: import pdfplumber from PIL import Image import cv2 import numpy as np import re import os import logging import iodef create_folder(folder_name):if not…...

零售行业供应链管理核心KPI指标(二) – 线上订单履行周期

一般品牌零售商有一个大的渠道就是全国连锁的商超、大卖场,非常重要的渠道,要去铺货。同类型的产品都在竞争这个大渠道,但商超、大卖场在这类产品的容量是有限的,所以各个品牌就要去争夺整个容量,看谁在有限的容量里占…...

VGG分类实战:猫狗分类

关于数据集 数据集选择的是Kaggle上的Cat and Dog,猫狗图片数量上达到了上万张。你可以通过这里进入Kaggle下载数据集Cat and Dog | Kaggle。 在我的Github仓库当中也放了猫狗图片各666张。 VGG网络 VGG的主要特点是使用了一系列具有相同尺寸 3x3 大小的卷积核进…...

C++11并发与多线程笔记(3)线程传参详解,detach()大坑,成员函数做线程函数

C11并发与多线程笔记(3)线程传参详解,detach 大坑,成员函数做线程函数 1、传递临时对象作为线程参数1.1 要避免的陷阱11.2 要避免的陷阱21.3 总结 2、临时对象作为线程参数2.1 线程id概念2.2 临时对象构造时机抓捕 3、传递类对象…...

说几个常见的语法糖

目录 面试回答 知识扩展 如何解语法糖? 糖块一、swith 支持 String 与枚举 糖块二、泛型 糖块三、自动装箱与拆箱 糖块四、枚举 糖块五、条件编译 糖块六、断言 糖块七、数值字面量 糖块八、for-each 糖块九、try-with-resource 可能遇到的坑 泛型 自…...

Python文件操作与输入输出:从基础到高级应用

文章目录 🍀引言🍀文件操作基础🍀上下文管理器与文件自动关闭🍀文件的迭代与逐行读取🍀文件的其他常见操作🍀输入输出基础🍀 文件输入输出🍀格式化输出🍀高级文件操作&am…...

leetcode算法题--找出最安全路径

原题链接:https://leetcode.cn/problems/find-the-safest-path-in-a-grid/description/ func maximumSafenessFactor(grid [][]int) int {n : len(grid)type pair struct {x inty int}p : make([]pair, 0)dis : make([][]int, n)for i : range dis {dis[i] make([…...