当前位置: 首页 > news >正文

使用 Confluent Cloud 的 Elasticsearch Connector 部署 Elastic Agent

作者:来自 Elastic Nima Rezainia

Confluent Cloud 用户现在可以使用更新后的 Elasticsearch Sink Connector 与 Elastic Agent 和 Elastic Integrations 来实现完全托管且高度可扩展的数据提取架构。

Elastic 和 Confluent 是关键的技术合作伙伴,我们很高兴宣布对该合作伙伴关系进行新的投资。Confluent 的 Kafka 是许多企业摄取架构的关键组件,它确保客户能够保证将关键的可观察性和安全性数据传输到他们的 Elasticsearch 集群中。我们一直致力于改进我们产品的结合方式。借助 Elastic Agent 的新 Kafka 输出和 Confluent 新改进的 Elasticsearch Sink Connectors,无缝地从边缘收集数据、通过 Kafka 将其传输到 Elasticsearch 集群从未如此简单。

在这篇博客中,我们研究了一种将 Elastic Agent 与 Confluent Cloud 的 Kafka 产品集成的简单方法,以减轻摄取业务关键数据的运营负担。

Elastic Agent 和 Confluent Cloud 的优势

Elastic Agent 和 Confluent Cloud 的更新版 Elasticsearch Sink connector 结合使用时,可为各种规模的组织提供无数优势。这种组合解决方案可以灵活地以高效且有弹性的方式处理任何类型的数据采集工作负载。

完全托管

Elastic Cloud Serverless 和 Confluent Cloud 结合使用时,可为用户提供完全托管的服务。这使得部署和采集几乎无限量的数据变得毫不费力,而无需担心节点、集群或扩展。

完全支持 Elastic 集成

300 多个 Elastic 集成中的任何一个都完全支持通过 Kafka 发送数据。在这篇博文中,我们概述了如何在两个平台之间建立连接。这可确保你能从我们在内置警报、SLO、AI 助手等方面的投资中受益。

解耦架构

Kafka 充当数据源(如 Elastic Agent 和 Logstash)和 Elasticsearch 之间的弹性缓冲区,将数据生产者与消费者解耦。这可以显著降低总体拥有成本,因为你可以根据典型的数据摄取量(而不是最大摄取量)调整 Elasticsearch 集群的大小。它还可以确保系统在数据量激增时具有弹性。

完全控制你的数据

借助我们新的 “Output per Integration - 每个集成输出” 功能,客户现在可以使用同一代理将不同的数据发送到不同的目的地。客户可以轻松地将安全日志直接发送到 Confluent Cloud/Kafka,这可以提供交付保证,同时将不太重要的应用程序日志和系统指标直接发送到 Elasticsearch。

部署参考架构

在以下部分中,我们将引导你了解使用 Confluent Cloud 的 Elasticsearch Sink Connector 将 Confluent Kafka 与 Elastic Agent 和 Elasticsearch 集成的方法之一。与任何流式传输和数据收集技术一样,根据特定用例,可以通过多种方式配置管道。这篇博文将重点介绍一个简单的架构,可以将其用作更复杂部署的起点。

该架构的一些亮点包括:

  • Elastic Agents 上的动态 Kafka 主题选择
  • Elasticsearch Sink Connectors,用于从 Confluent Kafka 到 Elasticsearch 的完全托管传输
  • 利用 Elastic 的 300 多个集成处理数据


先决条件

在开始之前,请确保你在 Confluent Cloud 中部署了 Kafka 集群,在 Elastic Cloud 中部署了 Elasticsearch 集群或项目,并安装并注册了 Elastic Agent。

为 Elastic Agent 配置 Confluent Cloud Kafka 集群

导航到 Confluent Cloud 中的 Kafka 集群,然后选择 “Cluster Settings”。找到并记下 Bootstrap Server 地址,稍后在 Fleet 中创建 Kafka 输出时我们将需要此值。

导航到左侧导航菜单中的主题并创建两个主题:

  • 名为 logs 的主题
  • 名为 metrics 的主题

接下来,导航到左侧导航菜单中的 API Keys:

  1. 单击 + 添加 API Key
  2. 选择 Service Account  API key type
  3. 为此 API Key 提供一个有意义的名称
  4. 授予 metrics 和 logs 主题的密钥写入权限
  5. 创建密钥

请记录提供的 Key 和 Secret,我们稍后在 Fleet 中配置 Kafka 输出时会用到它们。

配置 Elasticsearch 和 Elastic Agent

在本节中,我们将配置 Elastic Agent 以将数据发送到 Confluent Cloud 的 Kafka 集群,并将配置 Elasticsearch 以便它可以从 Confluent Cloud Elasticsearch Sink Connector 接收数据。

配置 Elastic Agent 以将数据发送到 Confluent Cloud

Elastic Fleet 简化了向 Kafka 和 Confluent Cloud 发送数据的过程。使用 Elastic Agent,可以轻松将 Kafka “输出” 附加到来自代理的所有数据,也可以将其仅应用于来自特定数据源的数据。

在左侧导航中找到 Fleet,单击 “Settings” 选项卡。在 “Settings” 选项卡上,找到 “Output” 部分,然后单击 “dd Output”。

执行以下步骤来配置新的 Kafka output:

  • 为 output 提供 Name
  • 将 Type 设置为 Kafka
  • 使用我们之前记下的 Bootstrap Server 地址填充主机字段。
  • 在身份验证下,使用 API 密钥填充用户名,使用我们之前记下的密码填充密码

  • 在 Topics 下,选择 Dynamic Topic,并将 Topic from field 设置为 data_stream.type

  • 单击 “Save and apply settings”

接下来,我们将导航到 Fleet 中的 “Agent Policies” 选项卡,然后单击以编辑我们要将 Kafka 输出附加到的代理策略。打开代理策略后,单击 “Settings” 选项卡,然后将 Output for integrations 和 Output for agent monitoring 更改为我们刚刚创建的 Kafka 输出。

为每个 Elastic 集成选择一个输出:要设置用于特定数据源的 Kafka 输出,请参阅集成级输出文档。

关于主题选择的说明:data_stream.type 字段是一个保留字段,如果我们发送的数据是日志,Elastic Agent 会自动将其设置为 logs,如果我们发送的数据是指标,则设置为 metrics。使用 data_stream.type 启用 Dynamic Topic 选择将导致 Elastic Agent 自动将指标路由到 metrics 主题,并将日志路由到 logs 主题。有关主题选择的信息,请参阅 Kafka 输出的主题设置文档。

在 Elasticsearch 中配置发布端点

接下来,我们将为 Confluent Cloud Sink Connector 设置两个发布端点(数据流),以便在将文档发布到 Elasticsearch 时使用:

  • 我们将创建一个数据流 logs-kafka.reroute-default 用于处理 logs
  • 我们将创建一个数据流 metrics-kafka.reroute-default 用于处理 metrics

如果我们让这些数据流中的数据保持原样,数据虽然可用,但我们会发现数据未经解析且缺乏重要的丰富内容。因此,我们还将创建两个索引模板和两个摄取管道,以确保数据由我们的 Elastic Integrations 处理。

创建 Elasticsearch 索引模板和摄取管道

以下步骤使用 Kibana 中的 Dev Tools,但所有这些步骤都可以通过 REST API 或使用 Stack Management 中的相关用户界面完成。

首先,我们将创建用于处理 logs 的索引模板和摄取管道:

PUT _index_template/logs-kafka.reroute
{"template": {"settings": {"index.default_pipeline": "logs-kafka.reroute"}},"index_patterns": ["logs-kafka.reroute-default"],"data_stream": {}
}
PUT _ingest/pipeline/logs-kafka.reroute
{"processors": [{"reroute": {"dataset": ["{{data_stream.dataset}}"],"namespace": ["{{data_stream.namespace}}"]}}]
}

接下来,我们将创建用于处理 metrics 的索引模板和提取管道:

PUT _index_template/metrics-kafka.reroute
{"template": {"settings": {"index.default_pipeline": "metrics-kafka.reroute"}},"index_patterns": ["metrics-kafka.reroute-default"],"data_stream": {}
}
PUT _ingest/pipeline/metrics-kafka.reroute
{"processors": [{"reroute": {"dataset": ["{{data_stream.dataset}}"],"namespace": ["{{data_stream.namespace}}"]}}]
}

关于 rerouting 的说明:下面是一个实际示例,其中与 Linux 网络指标相关的文档将首先进入 metrics-kafka.reroute-default,然后此 Ingest Pipeline 将检查该文档并发现 data_stream.dataset 设置为 system.network,data_stream.namespace 设置为 default。它将使用这些值将文档从 metrics-kafka.reroute-default 重新路由到 metrics-system.network-default,然后由系统集成进行处理。

配置 Confluent Cloud Elasticsearch Sink 连接器

现在是时候配置 Confluent Cloud Elasticsearch Sink 连接器了。我们将执行以下步骤两次并创建两个单独的连接器,一个用于 logs 的连接器,一个用于 metrics 的连接器。如果所需的设置不同,我们将突出显示正确的值。

导航到 Confluent Cloud 中的 Kafka 集群,然后从左侧导航菜单中选择 Connectors。在 Connectors 页面上,从可用的连接器目录中选择 Elasticsearch Service Sink。

Confluent Cloud 为用户提供了配置连接器的简化工作流程。这里我们将逐步介绍该过程的每个步骤:

步骤 1:主题选择

首先,我们将根据要部署的连接器选择连接器将从中消费数据的主题:

  • 部署用于 logs 的 Elasticsearch Sink 连接器时,请选择 logs 主题。
  • 部署用于 metrics 的 Elasticsearch Sink 连接器时,请选择 metrics 主题。

步骤 2:Kafka 凭据

选择 KAFKA_API_KEY 作为集群身份验证模式。提供前面提到的 API Key 和 Secret

我们收集所需的 Confluent Cloud Cluster 信息。

步骤 3:身份验证

提供我们的 Elasticsearch 集群的 Elasticsearch Endpoint 地址作为连接 URI。连接用户和连接密码是 Elasticsearch 中帐户的身份验证信息,Elasticsearch Sink Connector 将使用这些信息将数据写入 Elasticsearch。

步骤 4:配置

在此步骤中,我们将 Input Kafka record value format 设置为 JSON。接下来,展开 Advanced Configuration。

  1. 我们将 Data Stream Dataset 设置为 kafka.reroute
  2. 我们将根据要部署的连接器设置 Data Stream Type:
    • 在为 logs 部署 Elasticsearch Sink 连接器时,我们将 Data Stream Type 设置为 logs
    • 在为 metrics 部署 Elasticsearch Sink 连接器时,我们将Data Stream Type 设置为 metrics
  3. 其他设置的正确值将取决于具体环境。

步骤 5:调整大小

在此步骤中,请注意 Confluent Cloud 为我们的部署提供了建议的最少任务数。遵循此处的建议是大多数部署的良好起点。

步骤 6:检查和启动

查看 Connector configuration 和 Connector pricing 部分,如果一切正常,则是时候单击 continue 并启动连接器了!连接器可能会报告为配置,但很快就会开始使用来自 Kafka 主题的数据并将其写入 Elasticsearch 集群。

你现在可以导航到 Kibana 中的发现并找到流入 Elasticsearch 的日志!还请查看 Confluent Cloud 为你的新 Elasticsearch Sink Connector 部署提供的实时指标。

如果你只部署了第一个 logs 接收器连接器,你现在可以重复上述步骤来部署第二个 metrics 接收器连接器。

享受完全托管的数据采集架构

如果你遵循上述步骤,那么恭喜你。你已成功:

  1. 配置 Elastic Agent 以将日志和指标发送到 Kafka 中的专用主题
  2. 在 Elasticsearch 中创建发布端点(数据流),专用于处理来自 Elasticsearch Sink Connector 的数据
  3. 配置托管 Elasticsearch Sink Connector 以使用来自多个主题的数据并将该数据发布到 Elasticsearch

接下来,你应该启用其他集成,部署更多 Elastic Agent,在 Kibana 中探索你的数据,并享受 Elastic Serverless 和 Confluent Cloud 完全托管的数据采集架构带来的好处!

原文:Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector — Elastic Observability Labs

相关文章:

使用 Confluent Cloud 的 Elasticsearch Connector 部署 Elastic Agent

作者:来自 Elastic Nima Rezainia Confluent Cloud 用户现在可以使用更新后的 Elasticsearch Sink Connector 与 Elastic Agent 和 Elastic Integrations 来实现完全托管且高度可扩展的数据提取架构。 Elastic 和 Confluent 是关键的技术合作伙伴,我们很…...

嵌入式知识点总结 Linux驱动 (三)-文件系统

针对于嵌入式软件杂乱的知识点总结起来,提供给读者学习复习对下述内容的强化。 目录 1.什么是文件系统? 2.根文件系统为什么这么重要?​编辑 3.可执行映像文件通常由几部分构成,他们有什么特点? 1.什么是文件系统&a…...

【知识】可视化理解git中的cherry-pick、merge、rebase

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 这三个确实非常像,以至于对于初学者来说比较难理解。 总结对比 先给出对比: 特性git mergegit rebasegit cherry-pick功能合并…...

【deepseek】deepseek-r1本地部署-第二步:huggingface.co替换为hf-mirror.com国内镜像

一、背景 由于国际镜像国内无法直接访问,会导致搜索模型时加载失败,如下: 因此需将国际地址替换为国内镜像地址。 二、操作 1、使用vscode打开下载路径 2、全局地址替换 关键字 huggingface.co 替换为 hf-mirror.com 注意:务…...

新站如何快速获得搜索引擎收录?

本文来自:百万收录网 原文链接:https://www.baiwanshoulu.com/8.html 新站想要快速获得搜索引擎收录,需要采取一系列有针对性的策略。以下是一些具体的建议: 一、网站内容优化 高质量原创内容: 确保网站内容原创、…...

如何使用tushare pro获取股票数据——附爬虫代码以及tushare积分获取方式

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据 总结 一、Tushare 介绍 Tushare 是一个提供中国股市数据的API接口服务,它允许用户…...

解决vsocde ssh远程连接同一ip,不同端口情况下,无法区分的问题

一般服务器会通过镜像分身或者容器的方式,一个ip分出多个端口给多人使用,但如果碰到需要连接同一user,同一个ip,不同端口的情况,vscode就无法识别,如下图所示,vscode无法区分该ip下不同端口的连接&#xff…...

Elasticsearch 自定义分成器 拼音搜索 搜索自动补全 Java对接

介绍 通常用于将文档中的文本数据拆分成易于索引的词项(tokens)。有时,默认的分词器无法满足特定应用需求,这时就可以创建 自定义分词器 来实现定制化的文本分析。 自定义分词器组成 Char Filters(字符过滤器&#x…...

基于物联网设计的疫苗冷链物流监测系统

一、前言 1.1 项目开发背景 随着全球经济的发展和物流行业的不断创新,疫苗和生物制品的运输要求变得越来越高。尤其是疫苗的冷链物流,温度、湿度等环境因素的控制直接关系到疫苗的质量和效力,因此高效、可靠的冷链监控系统显得尤为重要。冷…...

RocketMQ消息是如何存储的?

大家好,我是锋哥。今天分享关于【RocketMQ消息是如何存储的?】面试题。希望对大家有帮助; RocketMQ消息是如何存储的? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 RocketMQ 使用了一个高性能、分布式的消息存储架构…...

Ubuntu 16.04安装Lua

个人博客地址:Ubuntu 16.04安装Lua | 一张假钞的真实世界 在Linux系统上使用以下命令编译安装Lua: curl -R -O http://www.lua.org/ftp/lua-5.3.3.tar.gz tar zxf lua-5.3.3.tar.gz cd lua-5.3.3 make linux test 安装make 编译过程如果提示以下信息…...

【JavaSE】String类常用字符串方法总结

目录 1. length() 求字符串长度 2. isEmpty() 判断字符串是否为空 3. String对象的比较 3.1 equals() 判断字符串是否相同 3.2 compareTo() 比较字符串大小 3.3 compareToIgnoreCase 忽略大小写比较 4. 字符串查找 4.1 charAt() 返回指定索引处的字符 4.2 indexOf() 4…...

python3+TensorFlow 2.x(二) 回归模型

目录 回归算法 1、线性回归 (Linear Regression) 一元线性回归举例 2、非线性回归 3、回归分类 回归算法 回归算法用于预测连续的数值输出。回归分析的目标是建立一个模型,以便根据输入特征预测目标变量,在使用 TensorFlow 2.x 实现线性回归模型时&…...

机器人抓取与操作概述(深蓝)——1

工业机器人:① “臂”的形态 ② “手”的形态 ③ 视觉,力和触觉 1 机器人的不同形态 “臂”的形态 “手”的形态 2 常见的操作任务 操作:插入、推和滑 抓取:两指(平行夹爪)抓取、灵巧手抓取 落地-产…...

简单聊聊“DeepSeek”

目录 DeepSeek一夜火爆并受到广泛关注的优势 技术实力与创新 低成本与高效率 开源与免费 市场策略与应用领域 团队与资金优势 行业认可与媒体关注 DeepSeek在推理效率上的特别之处 多头潜在注意力(MLA) 多词元预测(MTP)…...

使用 Docker + Nginx + Certbot 实现自动化管理 SSL 证书

使用 Docker Nginx Certbot 实现自动化管理 SSL 证书 在互联网安全环境日益重要的今天,为站点或应用部署 HTTPS 已经成为一种常态。然而,手动申请并续期证书既繁琐又容易出错。本文将以 Nginx Certbot 为示例,基于 Docker 容器来搭建一个…...

粒子群算法 笔记 数学建模

引入: 如何找到全局最大值:如果只是贪心的话,容易被局部最大解锁定 方法有:盲目搜索,启发式搜索 盲目搜索:枚举法和蒙特卡洛模拟,但是样例太多花费巨量时间 所以启发式算法就来了,通过经验和规…...

【C语言】结构体与共用体深入解析

在C语言中,结构体(struct)和共用体(union)都是用来存储不同类型数据的复合数据类型,它们在程序设计中具有重要的作用。 推荐阅读:操作符详细解说,让你的编程技能更上一层楼 1. 结构体…...

es6.7.1分词器ik插件安装-和head插件连接es特殊配置

es6.7.1分词器ik插件安装-和head插件连接es特殊配置 如果对运维课程感兴趣,可以在b站上、A站或csdn上搜索我的账号: 运维实战课程,可以关注我,学习更多免费的运维实战技术视频 1.查看es6.7.1和es-head安装位置和es插件路径 [ro…...

java求职学习day18

常用的设计原则和设计模式 1 常用的设计原则(记住) 1.1 软件开发的流程 需求分析文档、概要设计文档、详细设计文档、编码和测试、安装和调试、维护和升级 1.2 常用的设计原则 (1)开闭原则(Open Close Principle…...

单链表专题(上)

链表的定义与创建 线性表: 1. 物理结构上不一定是线性的 2. 逻辑结构上一定是线性的 链表是一种物理存储结构上非连续,非顺序的存储结构 链表也是线性表的一种,但是在物理结构上不是连续的 链表是由一个一个的节点组成,需要数…...

【stm32学习】STM32F103相关特性

| 名称 | 缩写 | 频率 | 外部连接 | 功能 | 用途 | 特性 | |--------------------|------|----------------|---------------|------------|--------------|----------------| | 外部高速晶体振荡器 | HSE | 4~16MHz …...

PostGIS笔记:PostgreSQL中表、键和索引的基础操作

创建、查看与删除表 在数据库中创建一个表,使用如下代码: create table streets (id serial not null primary key, name varchar(50));这里的表名是streets,id是主键所以非空,采用serial数据类型,这个数据类型会自动…...

蓝桥杯python语言基础(3)——循环结构

一、for语句 理解range函数 range(start, stop, step) start: 序列开始的数字(默认为0)。stop: 序列结束的数字(不包含stop)。step: 步长(默认为1)。 练习 输出在 l 和 r 之间的所有偶数: pri…...

微服务网关鉴权之sa-token

目录 前言 项目描述 使用技术 项目结构 要点 实现 前期准备 依赖准备 统一依赖版本 模块依赖 配置文件准备 登录准备 网关配置token解析拦截器 网关集成sa-token 配置sa-token接口鉴权 配置satoken权限、角色获取 通用模块配置用户拦截器 api模块配置feign…...

23【进制的理解】

很多人可能听过计算机的最底层是2进制执行,但是原理并不知道,我们今天先不讨论那么复杂的问题,先讨论什么是进制 1910,10并不是1个字符,而是2个字符,也就是说在10进制里面没有“10”这个字符,1…...

jemalloc 5.3.0的tsd模块的源码分析

一、背景 在主流的内存库里,jemalloc作为android 5.0-android 10.0的默认分配器肯定占用了非常重要的一席之地。jemalloc的低版本和高版本之间的差异特别大,低版本的诸多网上整理的总结,无论是在概念上和还是在结构体命名上在新版本中很多都…...

【Convex Optimization Stanford】Lec3 Function

【Convex Optimization Stanford】Lec3 Function 前言凸函数的定义对凸函数在一条线上的限制增值扩充? 一阶条件二阶条件一些一阶/二阶条件的例子象集和sublevel set关于函数凸性的扩展(Jesen Inequality)保持函数凸性的操作非负加权和 & 仿射函数的…...

深入 Rollup:从入门到精通(三)Rollup CLI命令行实战

准备阶段:初始化项目 初始化项目,这里使用的是pnpm,也可以使用yarn或者npm # npm npm init -y # yarn yarn init -y # pnpm pnpm init安装rollup # npm npm install rollup -D # yarn yarn add rollup -D # pnpm pnpm install rollup -D在…...

wangEditor富文本编辑器,Laravel上传图片配置和使用

文章目录 前言步骤1. 构造好前端模版2. 搭建后端存储3. 调试 前言 由于最近写项目需要使用富文本编辑器,使用的是VUE3.0版本所以很多不兼容,实际测试以后推荐使用wangEditor 步骤 构造好前端模版搭建后端存储调试 1. 构造好前端模版 安装模版 模版安…...