当前位置: 首页 > news >正文

如何通过 Logstash 将数据采集到 Elasticsearch

作者:来自 Elastic Andre Luiz

将 Logstash 与 Elasticsearch 集成以实现高效的数据提取、索引和搜索的分步指南。

什么是 Logstash?

Logstash 是一种广泛使用的 Elastic Stack 工具,用于实时处理大量日志数据。它充当高效的数据管道,将来自各种来源的信息集成到单一结构化流中。其主要功能是可靠地执行数据提取、转换和加载。

Logstash 具有多种优势,尤其是其在支持多种类型的输入、过滤器和输出方面的多功能性,可与各种来源和目的地集成。它实时处理数据,捕获和转换信息。它与 Elastic Stack(尤其是 Elasticsearch 和 Kibana)的原生集成有助于数据分析和可视化。此外,它还包括高级过滤器,可实现高效的数据规范化、丰富和转换。

Logstash 如何工作?

Logstash 由输入、过滤器和输出组成,它们构成了数据处理管道。这些组件在定义数据提取流程的 .config 文件中进行配置。

  • 输入(Inputs):从各种来源捕获数据。
  • 过滤器(Filters):处理和转换捕获的数据。
  • 输出(Outputs):将转换后的数据发送到定义的目的地。

每个组件最常见的类型如下所示:

输入类型

  • 文件:读取各种格式(文本、JSON、CSV 等)的日志文件。
  • 消息队列:Kafka、RabbitMQ。
  • API:Webhook 或其他数据收集 API。
  • 数据库:用于关系数据提取的 JDBC 连接。

过滤器类型

  • Grok:用于分析和提取文本模式。
  • Mutate:修改字段(重命名、转换类型、删除数据)。
  • Date:将日期和时间字符串转换为可读的日期格式。
  • GeoIP:使用地理数据丰富日志。
  • JSON:解析或生成 JSON 数据。

输出类型

  • Elasticsearch:最常见的目的地,Elasticsearch 是一个搜索和分析引擎,允许对 Logstash 索引的数据进行强大的搜索和可视化。
  • Files:将处理后的数据存储在本地。
  • 云服务:Logstash 可以将数据发送到各种云服务,例如 AWS S3、Google Cloud Storage、Azure Blob Storage,进行存储或分析。
  • 数据库:Logstash 可以通过特定的连接器将数据发送到各种其他数据库,例如 MySQL、PostgreSQL、MongoDB 等。

Elasticsearch 的数据提取

在此示例中,我们使用 Logstash 将数据提取到 Elasticsearch 中。此示例中配置的步骤将具有以下流程:

  1. Kafka 将用作数据源。
  2. Logstash 将使用数据,应用 grok、geoip 和 mutate 等过滤器来构造数据。
  3. 转换后的数据将发送到 Elasticsearch 中的索引。
  4. Kibana 将用于可视化索引数据。

先决条件

我们将使用 Docker Compose 创建一个具有必要服务的环境:Elasticsearch、Kibana、Logstash 和 Kafka。Logstash 配置文件名为 logstash.conf,将直接挂载到 Logstash 容器中。下面我们将详细介绍配置文件的配置。

这是 docker-compose.yml:

version: '3.8'
services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:8.16.2container_name: elasticsearch-8.16.2environment:- node.name=elasticsearch- xpack.security.enabled=false- discovery.type=single-node- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"ports:- 9200:9200networks:- shared_networkkibana:image: docker.elastic.co/kibana/kibana:8.16.2container_name: kibana-8.16.2restart: alwaysenvironment:- ELASTICSEARCH_URL=http://elasticsearch:9200ports:- 5601:5601depends_on:- elasticsearchnetworks:- shared_networklogstash:image: docker.elastic.co/logstash/logstash:8.16.2container_name: logstash-8.16.2volumes:- ./logstash.conf:/usr/share/logstash/pipeline/logstash.confports:- "5044:5044"depends_on:- elasticsearchnetworks:- shared_networkzookeeper:image: confluentinc/cp-zookeeper:latestcontainer_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- 2181:2181networks:- shared_networkkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafkadepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- 9092:9092networks:- shared_networknetworks:shared_network:

如上所述,将定义 Logstash 管道,在此步骤中,我们将描述输入、过滤器和输出配置。

将在当前目录(docker-compose.yml 所在的位置)中创建 logstash.conf 文件。在 docker-compose.yml 中,本地文件系统上的 logstash.conf 文件将安装在容器内的路径 /usr/share/logstash/pipeline/logstash.conf 中。

Logstash 管道配置

Logstash 管道分为三个部分:输入、过滤器和输出。

  • 输入:定义数据的使用位置(在本例中为 Kafka)。
  • 过滤器:对原始数据进行转换和结构化。
  • 输出:指定处理后的数据发送到的位置(在本例中为 Elasticsearch)。

接下来,我们将详细配置每个步骤。

输入配置

数据源是 Kafka 主题,要使用该主题的数据,需要配置 Kafka 输入插件。以下是 Logstash 中 Kafka 插件的配置,我们定义:

  • bootstrap_servers:Kafka 服务器的地址。
  • topics:要使用的主题的名称。
  • group_id:消费者组标识符。
input {kafka {bootstrap_servers => "kafka:9092"topics => ["logs"]group_id => "logstash-consumer"}
}

这样,我们就可以接收数据了。

过滤器配置

过滤器负责转换和构造数据。让我们配置以下过滤器:

Grok 过滤器

从非结构化数据中提取结构化信息。在本例中,它提取时间戳、日志级别、客户端 IP、URI、状态和 JSON 负载。

grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp},%{WORD:log_level},%{IP:client_ip},%{URIPATH:uri},%{NUMBER:status}"}
}

示例日志:

2025-01-05 16:30:15,INFO,69.162.81.155,/api/products,200,{"user_id":123,"region":"US"}

提取字段

  • timestamp:提取日期和时间(例如:2025-01-05T16:30:15)。
  • log_level:捕获日志级别(例如:INFO、ERROR)。
  • client_ip:捕获客户端 IP 地址(例如:69.162.81.155)。
  • uri:捕获 URI 路径(例如:/api/products)。
  • status:捕获 HTTP 状态码(例如:200)。

日期过滤器

将时间戳字段转换为 Elasticsearch 可读的格式并将其存储在 @timestamp 中。

date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]target => "@timestamp"}

GeoIP 过滤器

接下来,我们将使用 geoip 过滤器根据 client_ip 字段的值检索地理信息,例如国家、地区、城市和坐标。

  geoip {source => "client_ip"target => "geoip" }

Mutate 过滤器

变异过滤器允许对字段进行转换。在本例中,我们将使用它的两个属性:

  • remove_field:删除时间戳和消息字段,因为它们不再需要。
  • convert:将状态字段从字符串转换为整数。

输出配置

输出定义转换后的数据将发送到何处。在本例中,我们将使用 Elasticsearch。

output {elasticsearch {hosts => ["http://172.21.0.1:9200"]index => "webapp_logs"}
}

现在我们已经定义了配置文件。以下是完整文件:

input {kafka {bootstrap_servers => "kafka:9092"topics => ["logs"]group_id => "logstash-consumer"}
}filter {grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp},%{WORD:log_level},%{IP:client_ip},%{URIPATH:uri},%{NUMBER:status}"}}date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]target => "@timestamp"}geoip {source => "client_ip"target => "geoip"}mutate {remove_field => ["timestamp", "message"]convert => { "status" => "integer" }}
}output {elasticsearch {hosts => ["http://172.21.0.1:9200"]index => "webapp_logs"}
}

发送和提取数据

容器运行时,我们可以开始向主题发送消息并等待数据被索引。首先,如果尚未创建主题(topic),请创建主题。

docker exec -it kafka kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1

要发送消息,请在终端中执行以下命令:

docker exec -it kafka kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1

要发送的消息:

2025-01-05 16:30:15,INFO,69.162.81.155,/api/products,200,{"user_id":123,"region":"US"}
2025-01-05 16:31:02,ERROR,104.101.21.255,/api/orders,500,{"user_id":124,"region":"BR"}
2025-01-05 16:32:45,INFO,103.244.145.255,/api/cart,404,{"user_id":125,"region":"DE"}

要查看索引数据,请转到 Kibana:

索引成功完成后,我们可以在 Kibana 中查看和分析数据。映射和索引过程可确保字段根据 Logstash 中定义的配置进行结构化。

结论

通过提供的配置,我们使用 Logstash 创建了一个管道,用于在具有 Elasticsearch 和 Kafka 的容器化环境中索引日志。我们探索了 Logstash 使用 grok、date、geoip 和 mutate 等过滤器处理消息的灵活性,从而构建了数据以供在 Kibana 中进行分析。此外,我们还演示了如何配置与 Kafka 的集成以使用消息并使用它们来处理和索引数据。

参考

  • Logstash
    • https://www.elastic.co/guide/en/logstash/current/index.html
  • Logstash Docker
    • https://www.elastic.co/guide/en/logstash/current/docker.html
  • GeoIp Plugin

    • https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
  • Mutate Plugin
    • https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
  • Grok Plugin
    • https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
  • Kafka Plugin
    • https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时举行!

Elasticsearch 包含许多新功能,可帮助你针对自己的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Logstash - Elasticsearch Labs

相关文章:

如何通过 Logstash 将数据采集到 Elasticsearch

作者:来自 Elastic Andre Luiz 将 Logstash 与 Elasticsearch 集成以实现高效的数据提取、索引和搜索的分步指南。 什么是 Logstash? Logstash 是一种广泛使用的 Elastic Stack 工具,用于实时处理大量日志数据。它充当高效的数据管道&#x…...

mysql的cpu使用率100%问题排查

背景 线上mysql服务器经常性出现cpu使用率100%的告警, 因此整理一下排查该问题的常规流程。 1. 确认CPU占用来源 检查系统进程 使用 top 或 htop 命令,确认是否是 mysqld 进程导致CPU满载:top -c -p $(pgrep mysqld)2. 实时分析MySQL活动 …...

centos虚拟机迁移没有ip的问题

故事背景,我们的centos虚拟机本来是好好的,但是拷贝到其他电脑上就不能分配ip,我个人觉得这个vmware他们软件应该搞定这个啊,因为这个问题是每次都会出现的。 网络选桥接 网络启动失败 service network restart Restarting netw…...

接入 deepseek 实现AI智能问诊

1. 准备工作 注册 DeepSeek 账号 前往 DeepSeek 官网 注册账号并获取 API Key。 创建 UniApp 项目 使用 HBuilderX 创建一个新的 UniApp 项目(选择 Vue3 或 Vue2 模板)。 安装依赖 如果需要在 UniApp 中使用 HTTP 请求,推荐使用 uni.requ…...

用AVFrame + AVPacket 完成accede编码和直接用ffmpeg命令行实现acc编码的对比

在使用 FFmpeg 进行 AAC 音频编码时,可以选择两种方式:通过编程接口(如 AVFrame 和 AVPacket)实现 AAC 编码,或者直接使用 FFmpeg 命令行工具。这两种方式各有特点,适用于不同的场景。以下是对两种方法的详细分析,包括它们的区别、优缺点以及适用场景。 一、通过 AVFram…...

计算机网络笔记再战——理解几个经典的协议6——TCP与UDP

目录 先说端口号 TCP 使用序号保证顺序性和应答来保证有效性 超时重传机制 TCP窗口机制 UDP 路由协议 协议分类:IGP和EGP 几个经典的路由算法 RIP OSPF 链路状态数据库(LSDB) LSA(Link State Advertisement&#xff0…...

【AI】在Ubuntu中使用docker对DeepSeek的部署与使用

这篇文章前言是我基于部署好的deepseek-r1:8b模型跑出来的 关于部署DeepSeek的前言与介绍 在当今快速发展的技术环境中,有效地利用机器学习工具来解决问题变得越来越重要。今天,我将引入一个名为DeepSeek 的工具,它作为一种强大的搜索引擎&a…...

openssl使用

openssl使用 提取密钥对 数字证书pfx包含公钥和私钥,而cer证书只包含公钥。提取需输入证书保护密码 openssl pkcs12 -in xxx.pfx -nocerts -nodes -out pare.key提取私钥 openssl rsa -in pare.key -out pri.key提取公钥 openssl rsa -in pare.key -pubout -ou…...

《语义捕捉全解析:从“我爱自然语言处理”到嵌入向量的全过程》

首先讲在前面,介绍一些背景 RAG(Retrieval-Augmented Generation,检索增强生成) 是一种结合了信息检索与语言生成模型的技术,通过从外部知识库中检索相关信息,并将其作为提示输入给大型语言模型&#xff…...

HIVE如何注册UDF函数

如果注册UDF函数的时候报了上面的错误,说明hdfs上传的路径不正确, 一定要用下面的命令 hadoop fs -put /tmp/hive/111.jar /user/hive/warehouse 一定要上传到上面路径,这样在创建函数时,引用下面的地址就可以创建成功...

VsCode创建VUE项目

1. 首先安装Node.js和npm 通过网盘分享的文件:vsCode和Node(本人电脑Win11安装) 链接: https://pan.baidu.com/s/151gBWTFZh9qIDS9XWMJVUA 提取码: 1234 它们是运行和构建Vue.js应用程序所必需的。 1.1 Node安装,点击下一步即可 …...

x64、aarch64、arm与RISC-V64:详解四种处理器架构

x64、aarch64、arm与RISC-V64:详解四种处理器架构 x64架构aarch64架构ARM架构RISC-V64架构总结与展望在计算机科学领域,处理器架构是构建计算机系统的基石,它决定了计算机如何执行指令、管理内存和处理数据。x64、aarch64、arm与RISC-V64是当前主流的四种处理器架构,它们在…...

如何使用iframe来渲染ThingsBoard仪表盘

1、概述 当我们在使用ThingsBoard的时候,有时候需要再自己的前端项目中展示大屏,thingsboard的仪表盘是可以来做大屏的,虽然界面达不到非常的美观,但是对比之前的版本,现在的版本仪表盘做了很多的优化了。可以实现将thingsboard的仪表板嵌入到自己的vue界面中作为大屏显示…...

退格法记单词(类似甘特图)

退格法记单词,根据记忆次数或熟练程度退格,以示区分,该方法用于短时高频大量记单词: explosion爆炸,激增 mosquito蚊子granary粮仓,谷仓 offhand漫不经心的 transient短暂的slob懒惰而邋遢的…...

计算 MySQL 表行的成本是多少?

当计算表中的所有行时,将使用什么索引?好吧,MySQL文档文档对此提供了一个直接的答案,引用: InnoDB 通过遍历最小的可用二级索引来处理 SELECT COUNT(*) 语句除非索引或优化器提示指示优化器使用…...

Pygame介绍与游戏开发

提供pygame功能介绍的文档:Pygame Front Page — pygame v2.6.0 documentation 基础语法和实现逻辑 与CLI不同,pygame提供了图形化使用界面GUI(graphical user interface)基于图像的界面可以创建一个有图像和颜色的窗口 要让py…...

webpack配置方式

1. 基本配置文件 (webpack.config.js)(导出一个对象) 最常见的方式是通过 webpack.config.js 文件来配置 Webpack,导出一个对象。你可以在这个文件中导出一个配置对象,指定入口、输出、加载器、插件等。 // webpack.config.js m…...

10. k8s二进制集群之Kube Scheduler部署

在开始之前需要准备什么?创建kube-scheduler证书请求文件【即证书的生成⓵】根据上面证书配置文件生成kube-scheduler证书【即证书的生成⓶】创建与关联kube-scheduler配置文件(为后面生成系统服务做准备)创建kube-scheduler服务配置文件【准备系统服务⓵】创建kube-schedul…...

java实现8583报文解析技术详解

文章目录 概要整体架构流程技术名词解释技术细节小结概要 ISO 8583协议是金融交易系统中广泛使用的通信协议,用于规范报文的格式和数据交换。解析8583报文是实现金融交易系统的关键技术之一。本文将详细介绍8583报文解析的核心实现,重点关注解析算法和关键代码逻辑。 8583报…...

k8s服务发现有哪些方式?

在 Kubernetes 中,服务发现是指如何让应用程序在集群内互相找到并通信。Kubernetes 提供了多种服务发现的方式,适应不同的使用场景。以下是 Kubernetes 中常见的服务发现方式: 1. 环境变量(Environment Variables) 概…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

反射获取方法和属性

Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...