ELK安装、部署、调试(六) logstash的安装和配置
1.介绍
Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。
管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。 输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。如:json、multiline等
inputs(输入阶段):
会生成事件。包括:file、kafka、beats等
filters(过滤器阶段):
可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate等
outputs(输出阶段):
将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file等
Codecs(解码器):
基本上是流过滤器,作为输入和输出的一部分进行操作,可以轻松地将消息的传输与序列化过程分开。
logstash为开源的数据处理pipeline管道,能接受多个源的数据,然后进行转换,再发送到指定目的地,logstash通过插件的机
制实现各种功能插件的下载地址为https://github.com/logstash-plugins
logstash 主要用于接收数据,解析过滤并转化,输出数据三个部分
对应的插件为input插件,filter插件,output插件
ingpu插件介绍,主要接收数据,支持多少数据源。
支持file : 读取一个文件,类似于tail命令,一行一行的实施读取。
支持syslog:监听系统的514端口的syslog messages,使用RFC3164格式进行解析
redis: 可以从Redis服务器读取数据,此时redis类似于一个消息缓存组件
kafka: 从kafka集群读取数据,kafka和logstash架构一般用于数据量较大的场景,kafka用于数据的换从和存储
filebeat: filebeat为一个文本的日志收集器,占系统资源小
filter插件介绍,主要进行数据的过滤,解析,格式化
grok:是logstash最重要的插件,可以解析任意数据,支持正则表达式,并提供很多内置的规则及模板
mutate 提供丰富的技术类型数据处理能力。包括类型转换,字符串处理,字段处理等
date,转化日志记录中的时间字符串格式
Geoip:更具ip地址提供对应的地域信息。包括国,省,经纬度等,对于可视化地图和区域统计非常有用。
output插件,输入日志到目的地
elasticsearch :发动到es中
file:发动数据到文件中
redis :发送日志到redis
kafka:发送日志到kafka
2.下载
https://www.elastic.co/cn/downloads/logstash
百度云elk上有
使用向导的连接
https://www.elastic.co/guide/en/logstash/7.9/installing-logstash.html#package-repositories
3.安装
logstashtar zxvf logstash-7.9.3.tar.gz -C /usr/local/cd /usr/local/ mv logstash-7.9.3 logstashlogstash/bin/logstash为启动文件
4.启动
cd /usr/local/logstash/bin
./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
后台运行
nohup logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}' &
input 使用input插件
stdin{} 采用标准的终端输入,如键盘输入
output 使用output插件
stdout 为标准输出
codec 插件,定义输出格式
rubydebug 一般输出为jison格式的测试
5.测试
5.1 测试1:采用标准的输入和输出
root@localhost bin]# ./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:22:35,442][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}
[2023-08-29T10:22:36,612][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because
modules or command line options are specified
[2023-08-29T10:22:40,097][INFO ][org.reflections.Reflections] Reflections took 77 ms to scan 1 urls, producing
22 keys and 45 values
[2023-08-29T10:22:42,937][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main",
"pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500,
"pipeline.sources"=>["config string"], :thread=>"#<Thread:0xdd19934 run>"}
[2023-08-29T10:22:44,380][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time
{"seconds"=>1.41}
[2023-08-29T10:22:44,466][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2023-08-29T10:22:44,590][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>
[:main], :non_running_pipelines=>[]}
[2023-08-29T10:22:45,030][INFO ][logstash.agent ] Successfully started Logstash API endpoint
{:port=>9600}
hello #我用键盘输入的,下面的信息是logstash的输出。
{"message" => "hello","@version" => "1","host" => "localhost.localdomain","@timestamp" => 2023-08-29T02:22:53.965Z
}
5.2 测试2:使用配置文件 +标准输入输出
参见logstash-1.conf
内容如下
[root@localhost logstash]# cat logstash-1.conf
input { stdin { }
}output {stdout { codec => rubydebug }
}
[root@localhost logstash]#
执行
./logstash -f ../logstash-1.conf
[root@localhost bin]# ./logstash -f ../logstash-1.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:30:50,169][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}
[2023-08-29T10:30:51,542][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because
modules or command line options are specified
[2023-08-29T10:30:55,077][INFO ][org.reflections.Reflections] Reflections took 82 ms to scan 1 urls, producing
22 keys and 45 values
[2023-08-29T10:30:58,205][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main",
"pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500,
"pipeline.sources"=>["/usr/local/logstash/logstash-1.conf"], :thread=>"#<Thread:0x7b58200e run>"}
[2023-08-29T10:30:59,713][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time
{"seconds"=>1.48}
[2023-08-29T10:30:59,829][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2023-08-29T10:31:00,060][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>
[:main], :non_running_pipelines=>[]}
[2023-08-29T10:31:00,664][INFO ][logstash.agent ] Successfully started Logstash API endpoint
{:port=>9600}
5.3 测试三:配置文件+file输入 +标准的屏幕输出
vi logstash-2.conf
input {file {path => "/var/log/messages"}
}
output {stdout {codec=>rubydebug}
}
[root@localhost logstash]# vi logstash-2.conf
[root@localhost logstash]# ./bin/logstash -f logstash-2.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:36:25,171][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}[2023-08-29T10:36:37,490][INFO ][logstash.agent ] Successfully started Logstash API endpoint
{:port=>9600}
{"@timestamp" => 2023-08-29T02:37:14.162Z,"path" => "/var/log/messages","host" => "localhost.localdomain","message" => "Aug 29 10:37:13 localhost systemd-logind: Removed session 992.","@version" => "1"
}
{"@timestamp" => 2023-08-29T02:37:14.231Z,"path" => "/var/log/messages","host" => "localhost.localdomain","message" => "Aug 29 10:37:13 localhost systemd-logind: Removed session 993.","@version" => "1"
}
5.4 测试4:配置文件+文件输入+kafka输出
[root@localhost logstash]# cat logstash-3.conf
input {file {path => "/var/log/messages"}
}
output {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topic_id => "osmessages"}
}
[root@localhost logstash]# ./bin/logstash -f logstash-3.conf
到10.10.10.71上用kafka消费
./kafka-console-consumer.sh --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
osmessages
有信息输出,
2023-08-29T02:49:42.383Z localhost.localdomain Aug 29 10:49:41 localhost systemd-logind: New session 996 of user
root.
2023-08-29T02:49:42.300Z localhost.localdomain Aug 29 10:49:41 localhost systemd-logind: New session 997 of user
root.
2023-08-29T02:49:42.380Z localhost.localdomain Aug 29 10:49:41 localhost systemd: Started Session 997 of user
root.
2023-08-29T02:49:42.384Z localhost.localdomain Aug 29 10:49:41 localhost systemd: Started Session 996 of user
root.
2023-08-29T02:49:43.465Z localhost.localdomain Aug 29 10:49:42 localhost dbus[745]: [system] Activating service
name='org.freedesktop.problems' (using servicehelper)
2023-08-29T02:49:43.467Z localhost.localdomain Aug 29 10:49:42 localhost dbus[745]: [system] Successfully
activated service 'org.freedesktop.problems'
2023-08-29T02:50:01.531Z localhost.localdomain Aug 29 10:50:01 localhost systemd: Started Session 998 of user
root.
5.5 测试5:配置文件+filebeat端口输入+标准输出
服务器产生日志(filebeat)---》logstash服务器
[root@localhost logstash]# cat logstash-4.conf
input {beats {port => 5044}
}
output {stdout {codec => rubydebug}
}
[root@localhost logstash]# ./bin/logstash -f logstash-4.conf
启动后会在本机启动一个5044端口,此段欧在配置文件中可
以任意配置。不要和系统已启动的端口冲突即可
[root@localhost logstash]# netstat -anp | grep 5044
tcp6 0 0 :::5044 :::* LISTEN 10545/java
[root@localhost logstash]#
配合测试我们在filebeat服务器上修改配置文件
# ------------------------------ Logstash Output -------------------------------
output.logstash:hosts: ["10.10.10.74:5044"]
##filebeat使用logstash作为输出,
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/ce
修改filebeat配置后,重启filebeat
nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat-to-logstash.yml &
经过测试在logstash下看到了filebeat的日志内容。
5.6测试6 配置文件+filebeat端口输入+输出到kafka
场景
服务器产生日志(filebeat)---> logstash服务器---->kafka服务器
[root@localhost logstash]# cat logstash-5.conf
input {beats {port => 5044}
}
output {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topic_id => "osmessages"}
}
filebeat服务器:
filebeat.yml配置同5
# ------------------------------ Logstash Output -------------------------------
output.logstash:hosts: ["10.10.10.74:5045"]
logstash服务器:
配置
[root@localhost logstash]# cat logstash-5.conf
input {beats {port => 5044}
}
output {kafka {
# codec ==> json #打开注释将会使用json格式传输,使用json感觉更乱bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topic_id => "osmessages"}
}
./bin/logstash -f logstash-5.conf
kafka服务器:
./kafka-console-consumer.sh --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
osmessages
消费了一下日志,可以看到filebeat服务器同步过来的信息。
5.7 测试7 配置文件+kafka读取当输入+输出到es
服务器产生日志(filebeat)---> kafka服务器__抽取数据___> logstash服务器---->ESlogstash的配置
[root@localhost logstash]# cat f-kafka-logs-es.conf
input {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topics => ["osmessages"]}
}
output {elasticsearch {hosts => ["10.10.10.65:9200,10.10.10.66:9200,10.10.10.67:9200"]index => "osmessageslog-%{+YYYY-MM-dd}"}
}
./bin/logstash -f f-kafka-logs-es.conf
nohup /usr/local/logstash/bin logstash -f /usr/local/logstash/f-kafka-logs-es.conf
filebeat.yml配置
# ------------------------------ KAFKA Output -------------------------------
output.kafka:eanbled: truehosts: ["10.10.10.71:9092","10.10.10.72:9092","10.10.10.73:9092"]version: "2.0.1"topic: '%{[fields][log_topic]}'partition.round_robin:reachable_only: trueworker: 2required_acks: 1compression: gzipmax_message_bytes: 10000000
nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml &
完成到kafka的输出
完成
相关文章:

ELK安装、部署、调试(六) logstash的安装和配置
1.介绍 Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。 管道(Logs…...
【Spring Security】UserDetails 接口介绍
文章目录 UserDetails 的作用UserDetails 接口中各个方法详解 UserDetails 的作用 UserDetails 在 Spring Security 框架中主要担任获取用户信息的接口,通过该接口就能拿到用户的信息和验证用户的信息,这些信息在下面的方法中会有讲述。 UserDetails 接…...

C# Linq源码分析之Take(四)
概要 本文主要对Take的优化方法进行源码分析,分析Take在配合Select,Where等常用的Linq扩展方法使用时候,如何实现优化处理。 本文涉及到Select, Where和Take和三个方法的源码分析,其中Select, Where, Take更详尽的源码分析&…...
Python 和 C++ 使用细节差别
1. 循环中的可迭代对象长度 1. 循环中的可迭代对象长度 C 中,for循环中写明a.size(),每次循环这个值是重新计算的; # include “iostream” # include <vector> using namespace std;int main() {vector<int> a(10);int cnt 0…...

在Ubuntu Linux系统上安装RabbitMQ服务并解决公网远程访问问题
文章目录 前言1.安装erlang 语言2.安装rabbitMQ3. 内网穿透3.1 安装cpolar内网穿透(支持一键自动安装脚本)3.2 创建HTTP隧道 4. 公网远程连接5.固定公网TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址 前言 RabbitMQ是一个在 AMQP(高级消息队列协议)基…...

葫芦娃自动预约-公众号代挂
效果 #小程序://航旅黔购/1nkYlNRVzm0Gg9x #小程序://贵旅优品/7zz6mtnSVgDfyqa #小程序://新联惠购/ibFdsuhWqIbczEd #小程序://贵盐黔品/u2TgExCUdkavrFe #小程序://空港乐购/ANkOOdqEeo71kah #小程序://遵航出山/ZkR7DQy1raoPxKD #小程序://乐旅商城/Ip5cgpJ7TLmRrWF #小程序…...

ESP32应用教程(0)— PMW3901MB光流传感器
文章目录 前言 1 传感器介绍 1.1 关键特征 1.2 关键参数 2 硬件概述 2.1 信号引脚 2.2 参考电路图 3 寄存器 3.1 寄存器列表 3.2 性能优化寄存器 4 代码说明 4.1 结构体说明 4.2 编译说明 5 波形分析 前言 本文介绍了在 ESP32 DEVKIT V1 开发板上开发 PMW3901MB…...

docker部署nginx,部署springboot项目,并实现访问
一、先部署springboot项目 1、安装docker: yum install docker -y 2、启动docker: service docker start 重启: service docker restart 3、查看版本: docker -v 4、使设置docker.service生效(路径:…...

十五、模板方法模式
一、什么是模板方法模式 模板方法(Template Method)模式的定义如下:定义一个操作中的算法骨架,而将算法的一些步骤延迟到子类中,使得子类可以不改变该算法结构的情况下重定义该算法的某些特定步骤。 模板方法模式包含以…...

jvm 什么是常量池,常量池定义 class常量池
首先需要理解下Java的class文件,以及class文件结构: 1.Class文件是一组以8个字节为基础单位的二进制流,各个数据项目严格按照顺序紧凑地排列在文 件之中,中间没有任何分隔符,这使得整个Class文件中存储的内容几乎全部…...

CA证书颁发机构服务器
目录 一、CA证书颁发机构是什么? 二、数字证书可以干什么? 三、PKI:即公钥加密体系(public key cryptography) 四、CA在网络中的工作流程及原理(以网站为例) 五、HTTPS 的工作原理 六、CA私有证…...

5. 线性层及其他层
5.1 神经网络结构 5.2 线性拉平 import torch import torchvision from torch import nn from torch.nn import ReLU from torch.nn import Sigmoid from torch.utils.data import DataLoader from torch.utils.tensorboard import SummaryWriterdataset torchvision.datase…...

PhpStorm安装篇
PhpStorm安装篇: 下载地址 : 进入官网下PhpStorm: PHP IDE and Code Editor from JetBrains 下载完之后,安装包 安装目录建议不要放C盘,放其他盘,其他直接一直点 next ,到结束 安装完,打开编辑器 注册账号并登录后…...
麒麟Linux常见问题
文章目录 麒麟Linux常见问题 1. yum下载不了文件1)报错信息2)原因3)解决(1)更换下载源(2)缓存处理 4)验证 2. 麒麟Linux常见问题 1. yum下载不了文件 1)报错信息 faile…...

一百六十八、Kettle——用海豚调度器定时调度从Kafka到HDFS的任务脚本(持续更新追踪、持续完善)
一、目的 在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件? 为了测试实际项目中的海豚定时调度从…...

Linux centos7 bash编程(小练习)
一、打印九九乘法口诀 这一个for循环嵌套的小练习,难度不大。提供一种写法,供参考: #!/bin/bash # 文件名:99table.sh # 打印输出九九乘法口诀表 for i in {1..9} do for ((j1;j<$i;j)) do …...
【SpringBoot】Web server failed to start. Port 8080 was already in use.
问题描述 SpringBoot启动Web服务器失败。 *************************** APPLICATION FAILED TO START ***************************Description:Web server failed to start. Port 8080 was already in use.Action:Identify and stop the process thats listening on port 80…...
day-36 代码随想录算法训练营(19)part05
435.无重叠区间 思路:首先对数组排序,只需要关注重叠区间就行,有重叠时计数1,然后更新当前右边界为重叠区间中的最小右边界。 763.划分字母区间 思路:记录每一个字母的最远位置,然后从头开始遍历…...
Vue3 实现JS动态改变CSS样式
以颜色为例子 定义颜色变量 import { reactive } from vue; // 可变的主题颜色 let chooseColor reactive({--color: #be2a27 }) CSS中使用 var() 函数引用颜色变量(这里是用elementPlus为例, 也可以改) :deep(.is-active) {color: var(--color);border-bottom: 2px solid…...

最新社区团购电商小程序源码 无bug完美运营版+详细搭建部署教程
分享一个开源社区团购电商小程序源码,无bug完美运营版,含完整前后端详细搭建部署教程。 系统运营模式:整合线下社区资源,由各快递代收点、社区便利店、社区物业、业主等发起的社区微信群,推送商品信息,消费…...

SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...

使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...

宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...

C++ 设计模式 《小明的奶茶加料风波》
👨🎓 模式名称:装饰器模式(Decorator Pattern) 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...