如何通过 Logstash 将数据采集到 Elasticsearch
作者:来自 Elastic Andre Luiz
将 Logstash 与 Elasticsearch 集成以实现高效的数据提取、索引和搜索的分步指南。
什么是 Logstash?
Logstash 是一种广泛使用的 Elastic Stack 工具,用于实时处理大量日志数据。它充当高效的数据管道,将来自各种来源的信息集成到单一结构化流中。其主要功能是可靠地执行数据提取、转换和加载。
Logstash 具有多种优势,尤其是其在支持多种类型的输入、过滤器和输出方面的多功能性,可与各种来源和目的地集成。它实时处理数据,捕获和转换信息。它与 Elastic Stack(尤其是 Elasticsearch 和 Kibana)的原生集成有助于数据分析和可视化。此外,它还包括高级过滤器,可实现高效的数据规范化、丰富和转换。
Logstash 如何工作?
Logstash 由输入、过滤器和输出组成,它们构成了数据处理管道。这些组件在定义数据提取流程的 .config 文件中进行配置。
- 输入(Inputs):从各种来源捕获数据。
- 过滤器(Filters):处理和转换捕获的数据。
- 输出(Outputs):将转换后的数据发送到定义的目的地。
每个组件最常见的类型如下所示:
输入类型:
- 文件:读取各种格式(文本、JSON、CSV 等)的日志文件。
- 消息队列:Kafka、RabbitMQ。
- API:Webhook 或其他数据收集 API。
- 数据库:用于关系数据提取的 JDBC 连接。
过滤器类型:
- Grok:用于分析和提取文本模式。
- Mutate:修改字段(重命名、转换类型、删除数据)。
- Date:将日期和时间字符串转换为可读的日期格式。
- GeoIP:使用地理数据丰富日志。
- JSON:解析或生成 JSON 数据。
输出类型:
- Elasticsearch:最常见的目的地,Elasticsearch 是一个搜索和分析引擎,允许对 Logstash 索引的数据进行强大的搜索和可视化。
- Files:将处理后的数据存储在本地。
- 云服务:Logstash 可以将数据发送到各种云服务,例如 AWS S3、Google Cloud Storage、Azure Blob Storage,进行存储或分析。
- 数据库:Logstash 可以通过特定的连接器将数据发送到各种其他数据库,例如 MySQL、PostgreSQL、MongoDB 等。
Elasticsearch 的数据提取
在此示例中,我们使用 Logstash 将数据提取到 Elasticsearch 中。此示例中配置的步骤将具有以下流程:
- Kafka 将用作数据源。
- Logstash 将使用数据,应用 grok、geoip 和 mutate 等过滤器来构造数据。
- 转换后的数据将发送到 Elasticsearch 中的索引。
- Kibana 将用于可视化索引数据。
先决条件
我们将使用 Docker Compose 创建一个具有必要服务的环境:Elasticsearch、Kibana、Logstash 和 Kafka。Logstash 配置文件名为 logstash.conf,将直接挂载到 Logstash 容器中。下面我们将详细介绍配置文件的配置。
这是 docker-compose.yml:
version: '3.8'
services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:8.16.2container_name: elasticsearch-8.16.2environment:- node.name=elasticsearch- xpack.security.enabled=false- discovery.type=single-node- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"ports:- 9200:9200networks:- shared_networkkibana:image: docker.elastic.co/kibana/kibana:8.16.2container_name: kibana-8.16.2restart: alwaysenvironment:- ELASTICSEARCH_URL=http://elasticsearch:9200ports:- 5601:5601depends_on:- elasticsearchnetworks:- shared_networklogstash:image: docker.elastic.co/logstash/logstash:8.16.2container_name: logstash-8.16.2volumes:- ./logstash.conf:/usr/share/logstash/pipeline/logstash.confports:- "5044:5044"depends_on:- elasticsearchnetworks:- shared_networkzookeeper:image: confluentinc/cp-zookeeper:latestcontainer_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- 2181:2181networks:- shared_networkkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafkadepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- 9092:9092networks:- shared_networknetworks:shared_network:
如上所述,将定义 Logstash 管道,在此步骤中,我们将描述输入、过滤器和输出配置。
将在当前目录(docker-compose.yml 所在的位置)中创建 logstash.conf 文件。在 docker-compose.yml 中,本地文件系统上的 logstash.conf 文件将安装在容器内的路径 /usr/share/logstash/pipeline/logstash.conf 中。
Logstash 管道配置
Logstash 管道分为三个部分:输入、过滤器和输出。
- 输入:定义数据的使用位置(在本例中为 Kafka)。
- 过滤器:对原始数据进行转换和结构化。
- 输出:指定处理后的数据发送到的位置(在本例中为 Elasticsearch)。
接下来,我们将详细配置每个步骤。
输入配置
数据源是 Kafka 主题,要使用该主题的数据,需要配置 Kafka 输入插件。以下是 Logstash 中 Kafka 插件的配置,我们定义:
- bootstrap_servers:Kafka 服务器的地址。
- topics:要使用的主题的名称。
- group_id:消费者组标识符。
input {kafka {bootstrap_servers => "kafka:9092"topics => ["logs"]group_id => "logstash-consumer"}
}
这样,我们就可以接收数据了。
过滤器配置
过滤器负责转换和构造数据。让我们配置以下过滤器:
Grok 过滤器
从非结构化数据中提取结构化信息。在本例中,它提取时间戳、日志级别、客户端 IP、URI、状态和 JSON 负载。
grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp},%{WORD:log_level},%{IP:client_ip},%{URIPATH:uri},%{NUMBER:status}"}
}
示例日志:
2025-01-05 16:30:15,INFO,69.162.81.155,/api/products,200,{"user_id":123,"region":"US"}
提取字段:
- timestamp:提取日期和时间(例如:2025-01-05T16:30:15)。
- log_level:捕获日志级别(例如:INFO、ERROR)。
- client_ip:捕获客户端 IP 地址(例如:69.162.81.155)。
- uri:捕获 URI 路径(例如:/api/products)。
- status:捕获 HTTP 状态码(例如:200)。
日期过滤器
将时间戳字段转换为 Elasticsearch 可读的格式并将其存储在 @timestamp 中。
date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]target => "@timestamp"}
GeoIP 过滤器
接下来,我们将使用 geoip 过滤器根据 client_ip 字段的值检索地理信息,例如国家、地区、城市和坐标。
geoip {source => "client_ip"target => "geoip" }
Mutate 过滤器
变异过滤器允许对字段进行转换。在本例中,我们将使用它的两个属性:
- remove_field:删除时间戳和消息字段,因为它们不再需要。
- convert:将状态字段从字符串转换为整数。
输出配置
输出定义转换后的数据将发送到何处。在本例中,我们将使用 Elasticsearch。
output {elasticsearch {hosts => ["http://172.21.0.1:9200"]index => "webapp_logs"}
}
现在我们已经定义了配置文件。以下是完整文件:
input {kafka {bootstrap_servers => "kafka:9092"topics => ["logs"]group_id => "logstash-consumer"}
}filter {grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp},%{WORD:log_level},%{IP:client_ip},%{URIPATH:uri},%{NUMBER:status}"}}date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]target => "@timestamp"}geoip {source => "client_ip"target => "geoip"}mutate {remove_field => ["timestamp", "message"]convert => { "status" => "integer" }}
}output {elasticsearch {hosts => ["http://172.21.0.1:9200"]index => "webapp_logs"}
}
发送和提取数据
容器运行时,我们可以开始向主题发送消息并等待数据被索引。首先,如果尚未创建主题(topic),请创建主题。
docker exec -it kafka kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1
要发送消息,请在终端中执行以下命令:
docker exec -it kafka kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1
要发送的消息:
2025-01-05 16:30:15,INFO,69.162.81.155,/api/products,200,{"user_id":123,"region":"US"}
2025-01-05 16:31:02,ERROR,104.101.21.255,/api/orders,500,{"user_id":124,"region":"BR"}
2025-01-05 16:32:45,INFO,103.244.145.255,/api/cart,404,{"user_id":125,"region":"DE"}
要查看索引数据,请转到 Kibana:
索引成功完成后,我们可以在 Kibana 中查看和分析数据。映射和索引过程可确保字段根据 Logstash 中定义的配置进行结构化。
结论
通过提供的配置,我们使用 Logstash 创建了一个管道,用于在具有 Elasticsearch 和 Kafka 的容器化环境中索引日志。我们探索了 Logstash 使用 grok、date、geoip 和 mutate 等过滤器处理消息的灵活性,从而构建了数据以供在 Kibana 中进行分析。此外,我们还演示了如何配置与 Kafka 的集成以使用消息并使用它们来处理和索引数据。
参考
- Logstash
- https://www.elastic.co/guide/en/logstash/current/index.html
- Logstash Docker
- https://www.elastic.co/guide/en/logstash/current/docker.html
-
GeoIp Plugin
- https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
- Mutate Plugin
- https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
- Grok Plugin
- https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
- Kafka Plugin
- https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时举行!
Elasticsearch 包含许多新功能,可帮助你针对自己的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在本地机器上试用 Elastic。
原文:How to ingest data to Elasticsearch through Logstash - Elasticsearch Labs
相关文章:

如何通过 Logstash 将数据采集到 Elasticsearch
作者:来自 Elastic Andre Luiz 将 Logstash 与 Elasticsearch 集成以实现高效的数据提取、索引和搜索的分步指南。 什么是 Logstash? Logstash 是一种广泛使用的 Elastic Stack 工具,用于实时处理大量日志数据。它充当高效的数据管道&#x…...

mysql的cpu使用率100%问题排查
背景 线上mysql服务器经常性出现cpu使用率100%的告警, 因此整理一下排查该问题的常规流程。 1. 确认CPU占用来源 检查系统进程 使用 top 或 htop 命令,确认是否是 mysqld 进程导致CPU满载:top -c -p $(pgrep mysqld)2. 实时分析MySQL活动 …...

centos虚拟机迁移没有ip的问题
故事背景,我们的centos虚拟机本来是好好的,但是拷贝到其他电脑上就不能分配ip,我个人觉得这个vmware他们软件应该搞定这个啊,因为这个问题是每次都会出现的。 网络选桥接 网络启动失败 service network restart Restarting netw…...

接入 deepseek 实现AI智能问诊
1. 准备工作 注册 DeepSeek 账号 前往 DeepSeek 官网 注册账号并获取 API Key。 创建 UniApp 项目 使用 HBuilderX 创建一个新的 UniApp 项目(选择 Vue3 或 Vue2 模板)。 安装依赖 如果需要在 UniApp 中使用 HTTP 请求,推荐使用 uni.requ…...

用AVFrame + AVPacket 完成accede编码和直接用ffmpeg命令行实现acc编码的对比
在使用 FFmpeg 进行 AAC 音频编码时,可以选择两种方式:通过编程接口(如 AVFrame 和 AVPacket)实现 AAC 编码,或者直接使用 FFmpeg 命令行工具。这两种方式各有特点,适用于不同的场景。以下是对两种方法的详细分析,包括它们的区别、优缺点以及适用场景。 一、通过 AVFram…...

计算机网络笔记再战——理解几个经典的协议6——TCP与UDP
目录 先说端口号 TCP 使用序号保证顺序性和应答来保证有效性 超时重传机制 TCP窗口机制 UDP 路由协议 协议分类:IGP和EGP 几个经典的路由算法 RIP OSPF 链路状态数据库(LSDB) LSA(Link State Advertisement࿰…...

【AI】在Ubuntu中使用docker对DeepSeek的部署与使用
这篇文章前言是我基于部署好的deepseek-r1:8b模型跑出来的 关于部署DeepSeek的前言与介绍 在当今快速发展的技术环境中,有效地利用机器学习工具来解决问题变得越来越重要。今天,我将引入一个名为DeepSeek 的工具,它作为一种强大的搜索引擎&a…...

openssl使用
openssl使用 提取密钥对 数字证书pfx包含公钥和私钥,而cer证书只包含公钥。提取需输入证书保护密码 openssl pkcs12 -in xxx.pfx -nocerts -nodes -out pare.key提取私钥 openssl rsa -in pare.key -out pri.key提取公钥 openssl rsa -in pare.key -pubout -ou…...

《语义捕捉全解析:从“我爱自然语言处理”到嵌入向量的全过程》
首先讲在前面,介绍一些背景 RAG(Retrieval-Augmented Generation,检索增强生成) 是一种结合了信息检索与语言生成模型的技术,通过从外部知识库中检索相关信息,并将其作为提示输入给大型语言模型ÿ…...

HIVE如何注册UDF函数
如果注册UDF函数的时候报了上面的错误,说明hdfs上传的路径不正确, 一定要用下面的命令 hadoop fs -put /tmp/hive/111.jar /user/hive/warehouse 一定要上传到上面路径,这样在创建函数时,引用下面的地址就可以创建成功...

VsCode创建VUE项目
1. 首先安装Node.js和npm 通过网盘分享的文件:vsCode和Node(本人电脑Win11安装) 链接: https://pan.baidu.com/s/151gBWTFZh9qIDS9XWMJVUA 提取码: 1234 它们是运行和构建Vue.js应用程序所必需的。 1.1 Node安装,点击下一步即可 …...

x64、aarch64、arm与RISC-V64:详解四种处理器架构
x64、aarch64、arm与RISC-V64:详解四种处理器架构 x64架构aarch64架构ARM架构RISC-V64架构总结与展望在计算机科学领域,处理器架构是构建计算机系统的基石,它决定了计算机如何执行指令、管理内存和处理数据。x64、aarch64、arm与RISC-V64是当前主流的四种处理器架构,它们在…...

如何使用iframe来渲染ThingsBoard仪表盘
1、概述 当我们在使用ThingsBoard的时候,有时候需要再自己的前端项目中展示大屏,thingsboard的仪表盘是可以来做大屏的,虽然界面达不到非常的美观,但是对比之前的版本,现在的版本仪表盘做了很多的优化了。可以实现将thingsboard的仪表板嵌入到自己的vue界面中作为大屏显示…...

退格法记单词(类似甘特图)
退格法记单词,根据记忆次数或熟练程度退格,以示区分,该方法用于短时高频大量记单词: explosion爆炸,激增 mosquito蚊子granary粮仓,谷仓 offhand漫不经心的 transient短暂的slob懒惰而邋遢的…...

计算 MySQL 表行的成本是多少?
当计算表中的所有行时,将使用什么索引?好吧,MySQL文档文档对此提供了一个直接的答案,引用: InnoDB 通过遍历最小的可用二级索引来处理 SELECT COUNT(*) 语句除非索引或优化器提示指示优化器使用…...

Pygame介绍与游戏开发
提供pygame功能介绍的文档:Pygame Front Page — pygame v2.6.0 documentation 基础语法和实现逻辑 与CLI不同,pygame提供了图形化使用界面GUI(graphical user interface)基于图像的界面可以创建一个有图像和颜色的窗口 要让py…...

webpack配置方式
1. 基本配置文件 (webpack.config.js)(导出一个对象) 最常见的方式是通过 webpack.config.js 文件来配置 Webpack,导出一个对象。你可以在这个文件中导出一个配置对象,指定入口、输出、加载器、插件等。 // webpack.config.js m…...

10. k8s二进制集群之Kube Scheduler部署
在开始之前需要准备什么?创建kube-scheduler证书请求文件【即证书的生成⓵】根据上面证书配置文件生成kube-scheduler证书【即证书的生成⓶】创建与关联kube-scheduler配置文件(为后面生成系统服务做准备)创建kube-scheduler服务配置文件【准备系统服务⓵】创建kube-schedul…...

java实现8583报文解析技术详解
文章目录 概要整体架构流程技术名词解释技术细节小结概要 ISO 8583协议是金融交易系统中广泛使用的通信协议,用于规范报文的格式和数据交换。解析8583报文是实现金融交易系统的关键技术之一。本文将详细介绍8583报文解析的核心实现,重点关注解析算法和关键代码逻辑。 8583报…...

k8s服务发现有哪些方式?
在 Kubernetes 中,服务发现是指如何让应用程序在集群内互相找到并通信。Kubernetes 提供了多种服务发现的方式,适应不同的使用场景。以下是 Kubernetes 中常见的服务发现方式: 1. 环境变量(Environment Variables) 概…...

【SqlServer】SQL Server Management Studio (SSMS) 下载、安装、配置使用及卸载——保姆级教程
超详细的 SQL Server Management Studio (SSMS) 下载、安装、连接数据库配置及卸载教程 SQL Server Management Studio (SSMS) 是微软提供的图形化管理工具,主要用于连接、管理和开发 SQL Server 数据库。以下是详细的 SSMS 下载、安装、连接数据库以及卸载的完整教…...

[ESP32:Vscode+PlatformIO]添加第三方库 开源库 与Arduino导入第三方库的区别
前言 PlatformIO与Arduino在添加第三方库方面的原理存在显著差异 在PlatformIO中,第三方库的使用是基于项目(工程)的。具体而言,只有当你为一个特定的项目添加了某个第三方库后,该项目才能使用该库。这些第三方库的文…...

音频文件格式——AAC、OGG和FLAC
3.AAC文件格式 3.1 封装格式解析 高级音频编码 (Advanced Audio Coding) 是一种用于有损数字音频压缩的音频编码标准。它被设计为 MP3 格式的继承者,在相同比特率下通常可以获得比 MP3 更高的音质。AAC有两种封装格式: ADIF&am…...

BUU26 [极客大挑战 2019]HardSQL1
输入一些SQL关键词,发现空格,,union,and,by都被过滤了 被过滤,就用like替代 发现登录成功,可以注入 报错注入 注意 1.这里过滤了空格,就用()将内容包裹起来 比如说:…...

多光谱成像技术在华为Mate70系列的应用
华为Mate70系列搭载了光谱技术的产物——红枫原色摄像头,这是一款150万像素的多光谱摄像头。 相较于普通摄像头,它具有以下优势: 色彩还原度高:色彩还原准确度提升约 120%,能捕捉更多光谱信息,使拍摄照片色…...

借助 Cursor 快速实现小程序前端开发
借助 Cursor 快速实现小程序前端开发 在当今快节奏的互联网时代,小程序因其便捷性、高效性以及无需下载安装的特点,成为众多企业和开发者关注的焦点。然而,小程序的开发往往需要耗费大量的时间和精力,尤其是在前端开发阶段。幸运…...

【deepseek】ollama chatbox webui 本地部署deepseek 踩坑记录
部署 1、前往Ollama官网下载跨平台工具 官网直达:https://ollama.com/download 2、挑选适合自己设备的模型版本,获取运行指令 访问模型库:https://ollama.com/library/deepseek-r1 ▌配置建议: • 入门级:1.5B版本&…...

在离线的服务器上部署Python的安装库
在离线服务器上部署 Python 安装库(如 SQLAlchemy、pandas、pyodbc 等),可以使用以下方法: 方法 1:在联网机器上下载依赖,拷贝到离线服务器 适用于:服务器完全无法访问互联网。 步骤 1. 在联网…...

计算机网络笔记再战——理解几个经典的协议2
理解互联网与TCP/IP 下面,我们将会开始理解互联网这个东西,进一步的,我们会理解何为TCP/IP 我们的互联网就是一个巨大的网状结构,需要注意的是——每一个网状的节点之间都是使用一个叫做NOC,Network Operation Center…...

设计高效的测试用例:从需求到验证
在现代软件开发过程中,测试用例的设计一直是质量保证(QA)环节的核心。有效的测试用例不仅能够帮助发现潜在缺陷,提升软件质量,还能降低后期修复成本,提高开发效率。尽管如此,如何从需求出发&…...