当前位置: 首页 > news >正文

Logstash输入Kafka输出Es配置

Logstash介绍

Logstash是一个开源的数据收集引擎,具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据,并将其发送到你选择的目的地。Logstash的早期目标主要是用于收集日志,但现在的功能已经远远超出这个范围。任何事件类型都可以通过Logstash进行分析,通过输入、过滤器和输出插件进行转换。

Logstash的工作原理是使用管道方式进行日志的搜集处理和输出。这个管道包括三个阶段:输入、处理和输出。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。

Logstash的输入支持各种选择,可以同时从众多常用来源捕捉事件,如日志、指标、Web应用、数据存储以及各种AWS服务等。在数据从源传输到存储库的过程中,Logstash的过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash的输出也可以根据需要选择不同的存储方式,除了Elasticsearch作为首选输出方向外,还有其他的输出选择。

Logstash是一个强大的开源工具,可以用于实时处理和转换来自各种数据源的数据,为数据分析和商业决策提供支持。

Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。它是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,Kafka是一个可行的解决方案。

Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Es介绍

ES指的是Elasticsearch,它是一个基于RESTful web接口并且构建在Apache Lucene之上的开源分布式搜索引擎。它还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索。它能够横向扩展至数以百计的服务器存储以及处理PB级的数据,可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的核心发动机。

Logstash输入输出配置

Logstash的输入输出配置主要是针对其输入和输出插件进行设置。以下是一些常见的输入和输出插件的配置示例:

输入配置:

  1. file:从文件读取日志信息,例如:
input {file {path => "/var/log/error.log"type => "error"start_position => "beginning"}
}
  1. stdin:从标准输入读取日志信息,例如:
input {stdin {}
}
  1. syslog:从系统日志读取日志信息,例如:
input {syslog {type => "syslog"}
}

输出配置:

  1. stdout:将日志信息输出到标准输出,例如:
output {stdout {}
}
  1. elasticsearch:将日志信息输出到Elasticsearch集群,例如:
output {elasticsearch {hosts => "localhost:9200"index => "myindex"}
}

以上是一些常见的输入输出插件配置示例,Logstash还支持其他多种输入输出插件,可以根据具体需求进行选择和配置。

Logstash输入Kafka输出Es配置

Logstash的输入配置可以通过Kafka插件从Kafka中读取数据,输出配置可以通过Elasticsearch插件将数据写入Elasticsearch集群。以下是一个示例配置:

input {kafka {bootstrap_servers => "your_kafka_server:9092"client_id => "your_client_id"group_id => "your_group_id"auto_offset_reset => "latest"consumer_threads => 1decorate_events => truetopics => ["your_topic"]}
}output {if [@metadata][kafka][topic] == "your_topic" {elasticsearch {hosts => "your_elasticsearch_server:9200"index => "your_index"timeout => 300}}
}

在这个配置中,Logstash通过Kafka插件从指定的Kafka服务器和主题中读取数据,然后通过Elasticsearch插件将数据写入指定的Elasticsearch索引。你可以根据实际情况修改配置中的参数,例如Kafka服务器的地址、客户端ID、组ID、主题等。

  • 上面的配置参数的含义如下所示:
  1. bootstrap_servers: 这是Kafka服务器的地址和端口。你需要提供Kafka集群中至少一个服务器的地址。
  2. client_id: 这是客户端的唯一标识符,用于标识连接到Kafka集群的客户端。
  3. group_id: 这是消费者组的ID。如果你有多个Logstash实例读取同一个Kafka主题,并且你想将它们作为一个消费者组来处理,那么你需要使用这个参数。
  4. auto_offset_reset: 这个参数决定了当Logstash无法找到其之前读取的偏移量时应该怎么做。设置为"latest"意味着从最新的记录开始读取。
  5. consumer_threads: 这是用于消费Kafka消息的线程数。增加线程数可以加快数据读取速度,但也会增加CPU和内存的使用。
  6. decorate_events: 如果设置为true,Logstash会为每个事件添加额外的元数据,例如Kafka主题和分区信息。
  7. topics: 这是Logstash要读取的Kafka主题列表。
  8. if [@metadata][kafka][topic] == “your_topic”: 这是一个条件语句,用于确定是否将事件发送到Elasticsearch。只有当事件的主题与指定的"your_topic"匹配时,事件才会被发送到Elasticsearch。
  9. hosts: 这是Elasticsearch集群的地址和端口。
  10. index: 这是Logstash将数据写入Elasticsearch的索引名称。
  11. timeout: 这是Logstash与Elasticsearch集群通信的超时时间(以秒为单位)。

这些参数可以根据你的具体需求进行调整,以满足你的数据收集和处理需求。

java发送消息到Kafka示例

Apache Kafka是一种分布式流处理平台,你可以使用它来处理各种数据。以下是使用Java向Kafka发送消息的示例代码:

首先,你需要添加Apache Kafka的依赖到你的项目中。如果你正在使用Maven,那么你可以在pom.xml文件中添加如下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency>
</dependencies>

以下是使用Java发送消息的示例代码:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class ProducerDemo {public static void main(String[] args) {// 1. 配置生产者客户端参数Properties props = new Properties();// Kafka集群地址props.put("bootstrap.servers", "your_kafka_server:9092");// 消息ack模式: all表示消息被leader和follower都写入后才返回ack, -1表示只被leader写入就返回ackprops.put("acks", "all");// 重试次数props.put("retries", 0);// 批量发送大小props.put("batch.size", 16384);// 发送延时,用于控制producer发送请求的延迟时间,可以提高吞吐量props.put("linger.ms", 1);// 缓冲区大小props.put("buffer.memory", 33554432);// key序列化类props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化类props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建生产者对象,传入配置参数Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {// 3. 创建消息对象,指定topic、消息key和消息体valueProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i);// 4. 发送消息到Kafka集群,并获取返回结果RecordMetadata metadata = producer.send(record).get();// 打印结果,发送是否成功,以及发送到的分区和offset等信息System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());}// 5. 关闭生产者对象,释放资源producer.close();}
}

在这个示例中,我们创建了一个名为ProducerDemo的类,这个类使用Kafka的生产者API发送消息到名为"my-topic"的主题。请注意你需要替换"bootstrap.servers"属性的值为你的Kafka集群的实际地址。如果你的集群在本地运行,并且使用的是默认的端口,那么你可以使用"localhost:9092"。

Logstash常用输入插件

Logstash的常用输入插件包括以下几种:

  1. file:该插件可以从文件中读取事件。它使用了FileWatch库来监听文件变化,并跟踪被监听的日志文件的当前读取位置,从而确保不会漏过任何数据。
  2. stdin:该插件是标准的输入插件,能够从命令行中读取事件。
  3. TCP:从TCP连接中读取数据。
  4. UDP:从UDP套接字中读取数据。
  5. Redis:从Redis中读取数据。
  6. JDBC:从关系型数据库中读取数据。
  7. HTTP:从HTTP服务器中读取数据。

Logstash常用输出插件

Logstash常用的输出插件包括以下几种:

  1. Elasticsearch:将日志数据输出到Elasticsearch,用于后续的搜索和分析。
  2. Kafka:将日志数据发送到Kafka集群,供其他消费者使用。
  3. File:将日志数据输出到文件中,便于后续查看和审计。
  4. Gelf:将日志数据输出到Gelf兼容的服务器,用于远程监控和报警。
  5. Fluentd:将日志数据输出到Fluentd,用于统一日志收集和转发。

拓展

Logstash使用指南

Kafka使用指南

Elasticsearch使用指南

在这里插入图片描述

相关文章:

Logstash输入Kafka输出Es配置

Logstash介绍 Logstash是一个开源的数据收集引擎&#xff0c;具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据&#xff0c;并将其发送到你选择的目的地。Logstash的早期目标主要是用于收集日志&#xff0c;但现在的功能已经远远超出这个范围。任何事件类型都可…...

Bash脚本处理ogg、flac格式到mp3格式的批量转换

现在下载的许多音乐文件是flac和ogg格式的&#xff0c;QQ音乐上下载的就是这样的&#xff0c;这些文件尺寸比较大&#xff0c;在某些场合使用不便&#xff0c;比如在车机上播放还是mp3格式合适&#xff0c;音质这些在车机上播放足够了&#xff0c;要求不高。比如本人就喜欢下载…...

Android 依据Build相关信息判断机型

Android 依据Build相关信息判断机型 本文主要通过Build的相关信息获取机型,目前机型判断的较少,后续继续维护更新 public static String parseBuild() {StringBuilder sb new StringBuilder();String deriveFingerprint Build.FINGERPRINT;String manufacturer Build.MANU…...

2024年甘肃省职业院校技能大赛信息安全管理与评估赛项一阶段样题一

2024年甘肃省职业院校技能大赛高职学生组电子与信息大类信息安全管理与评估赛项样题一 竞赛需要完成三个阶段的任务&#xff0c;分别完成三个模块&#xff0c;总分共计 1000分。三个模块内容和分值分别是&#xff1a; 1.第一阶段&#xff1a;模块一 网络平台搭建与设备安全防…...

ARM:作业3

按键中断代码编写 代码: key_it.h #ifndef __KEY_IT_H__ #define __KEY_IT_H__#include "stm32mp1xx_gpio.h" #include "stm32mp1xx_exti.h" #include "stm32mp1xx_rcc.h" #include "stm32mp1xx_gic.h"void key1_it_config(); voi…...

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含python、JS工程源码)+数据集+模型(二)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 相关其它博客工程源代码下载其它资料下载 前言 本项目基于Keras框架&#xff0c;引入CNN进行模型训练&#xff0c;采用Dropout梯度…...

*上位机的定义

上位机是指在分布式控制系统中&#xff0c;负责监控和控制下位机&#xff08;也称为远程终端设备&#xff09;的计算机或者计算机网络。它通常是一个高性能的计算设备&#xff0c;运行着特定的监控软件&#xff0c;用于实时监测、控制和管理下位机设备。 上位机负责与各个下位…...

架构LAMP

目录 1.什么是LAMP 2.LAMP组成及作用 3.搭建Apache httpd服务 4.编译安装mysqld 服务 5.编译安装PHP 解析环境 6.安装论坛 1.什么是LAMP LAMP架构是目前成熟的企业网站应用模式之一&#xff0c;指的是协同工作的一整套系统和相关软件&#xff0c;能够提供动态Web站点服务…...

vue实现浏览器不同分辨率下的不同样式,css的媒体查询与js判断当前浏览器宽度

前言&#xff1a; 实现实现浏览器不同分辨率下的不同样式的方法很多&#xff0c;这里整理两种&#xff0c;1个是css的媒体查询来实现&#xff0c;另一个是js判断当前浏览器的宽度&#xff0c;然后动态给他添加不同的class名&#xff0c;或者动态用style修改样式&#xff0c;添加…...

CentOS7 安装包 MariaDB 10.4.x

CentOS7 安装包 MariaDB 10.4.x 统一 MariaDB安装包 https://www.alipan.com/s/fvLg3gN7LPX 提取码: nh81 打开「阿里云盘」...

js中箭头函数简单介绍

1.箭头函数是 ES6 中新增的一种函数定义方式&#xff0c; 简单举例为 var nameA function(a){return a} 可以用箭头函数简化为 var nameA a >a; 返回的是你输入的值 比如 nameA(5) 返回的就是5 nameA(2) 返回的就是2 以上两个表达的含义是一样的。nameA为名字 2.…...

分布式ID服务实践

背景 分布式场景下需要一个全局 ID 来标识唯一性&#xff0c;比如在单数据库时通过表唯一主键即可实现唯一 ID&#xff0c;分库分表时就需要全局唯一 ID。 业务对唯一 ID 的要求如下&#xff1a; 全局唯一性 不能出现重复的 ID 号&#xff0c;既然是唯一标识&#xff0c;这…...

YOLOv8改进 | 2023主干篇 | 利用RT-DETR特征提取网络PPHGNetV2改进YOLOv8(超级轻量化精度更高)

一、本文介绍 本文给大家带来利用RT-DETR模型主干HGNet去替换YOLOv8的主干&#xff0c;RT-DETR是今年由百度推出的第一款实时的ViT模型&#xff0c;其在实时检测的领域上号称是打败了YOLO系列&#xff0c;其利用两个主干一个是HGNet一个是ResNet&#xff0c;其中HGNet就是我们…...

C现代方法(第26章)笔记——<stdarg.h>、<stdlib.h>和<time.h>标准库

文章目录 第26章 <stdarg.h>、<stdlib.h>和<time.h>标准库26.1 <stdarg.h>: 可变参数26.1.1 调用带有可变参数列表的函数26.1.2 v...printf函数26.1.3 v...scanf函数(C99) 26.2 <stdlib.h>: 通用的实用工具26.2.1 数值转换函数26.2.1.1 测试数值…...

CCKS2023-面向金融领域的主体事件检测-亚军方案分享

赛题分析 大赛地址 https://tianchi.aliyun.com/competition/entrance/532098/introduction?spma2c22.12281925.0.0.52b97137bpVnmh 任务描述 主体事件检测是语言文本分析和金融领域智能应用的重要任务之一&#xff0c;如在金融风控领域往往会对公司主体进行风险事件的检测…...

Linux下通过find找文件---通过修改时间查找(-mtime)

通过man手册查找和-mtime选项相关的内容 man find | grep -A 3 mtime # 这里简单介绍了 -mtime &#xff0c;还有一个简单的示例-mtime n Files data was last modified n*24 hours ago. See the comments for -atime to understand how rounding affects the interpretati…...

图文教程:stable-diffusion的基本使用教程 txt2img(多图)

之前我介绍了SD的安装过程&#xff0c;那么这篇将介绍怎么使用SD 使用模型 SD安装好之后&#xff0c;我们只有一个默认的模型。这个模型很难满足我们的绘图需求&#xff0c;那么有2种方法。 1是自己训练一个模型&#xff08;有门槛&#xff09;2是去网站上找一个别人练好的模…...

VisualSVN Server的安装全过程

目录 背景: 安装过程&#xff1a; 步骤1&#xff1a; 步骤2&#xff1a; 步骤3&#xff1a; 步骤4&#xff1a; 步骤5&#xff1a; 安装出现的bug&#xff1a; 问题: 解决办法: 总结: 背景: VisualSVN Server 是一款免费的 SVN (Subversion) 服务器软件&#xff0c…...

Python 进阶(十六):二进制和ASCII码的转换(binascii 模块)

大家好&#xff0c;我是水滴~~ 本文详细介绍了Python中的binascii模块及其使用方法。通过binascii模块&#xff0c;我们可以方便地进行二进制和ASCII字符串之间的转换操作。文章中包含大量的示例代码&#xff0c;希望能够帮助新手同学快速入门。 《Python入门核心技术》专栏总…...

CSS Grid布局入门:从零开始创建一个网格系统

CSS Grid布局入门&#xff1a;从零开始创建一个网格系统 引言 在响应式设计日益重要的今天&#xff0c;CSS Grid布局系统是前端开发中的一次革新。它使得创建复杂、灵活的布局变得简单而直观。本教程将通过分步骤的方式&#xff0c;让你从零开始掌握CSS Grid&#xff0c;并在…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

省略号和可变参数模板

本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

WPF八大法则:告别模态窗口卡顿

⚙️ 核心问题&#xff1a;阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程&#xff0c;导致后续逻辑无法执行&#xff1a; var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题&#xff1a…...

Ubuntu系统复制(U盘-电脑硬盘)

所需环境 电脑自带硬盘&#xff1a;1块 (1T) U盘1&#xff1a;Ubuntu系统引导盘&#xff08;用于“U盘2”复制到“电脑自带硬盘”&#xff09; U盘2&#xff1a;Ubuntu系统盘&#xff08;1T&#xff0c;用于被复制&#xff09; &#xff01;&#xff01;&#xff01;建议“电脑…...