当前位置: 首页 > news >正文

基于Kafka的日志采集

目录

前言

架构图

资源列表

基础环境

关闭防护墙

关闭内核安全机制

修改主机名

添加hosts映射

一、部署elasticsearch

修改limit限制

部署elasticsearch

修改配置文件

启动

二、部署filebeat

部署filebeat

添加配置文件

启动

三、部署kibana

部署kibana

修改配置文件

启动

四、部署Kafka

安装java

安装kafka

配置环境变量

创建数据存储目录和日志存储目录

修改zk配置文件

修改Kafka配置文件

启动zk

启动Kafka

测试

五、部署logstash

部署logstash

添加配置文件

启动


前言

        当日志量变得非常大时,传统的日志收集平台可能会遇到性能瓶颈、单点故障或扩展性问题。在这种情况下,引入消息队列(如Kafka)可以显著增强日志收集系统的健壮性、可扩展性和实时性。

以下是当在日志收集平台中加入Kafka时,可以带来的优势和改进:

  1. 缓冲和异步处理
    Kafka作为一个消息队列,可以充当Filebeat(或其他日志收集器)和Logstash(或其他日志处理组件)之间的缓冲层。Filebeat可以将日志数据异步地发送到Kafka,而不需要等待Logstash的即时响应。这样,即使Logstash暂时无法处理数据,Kafka也可以暂时存储数据,直到Logstash恢复处理能力。

  2. 水平扩展
    随着日志量的增长,Kafka可以通过添加更多的节点(brokers)来实现水平扩展。这种扩展方式使得Kafka能够处理更多的并发写入和读取操作,而不会遇到单点故障或性能瓶颈。此外,Kafka的分布式架构还允许数据在多个节点之间进行复制,以提高数据的可靠性和容错性。

  3. 实时数据处理
    Kafka支持实时数据流处理,使得日志数据可以立即被消费和处理。这意味着一旦日志数据被写入Kafka,就可以立即被Logstash(或其他流处理工具)读取和处理,以满足实时分析、监控和告警的需求。

  4. 数据持久化
    Kafka将数据持久化到磁盘上,以确保即使在系统崩溃或重启的情况下,数据也不会丢失。这种持久化机制使得Kafka成为了一个可靠的数据传输和存储平台,特别适用于对日志数据进行长期存储和分析的场景。

  5. 多消费者支持
    Kafka允许多个消费者(如Logstash、其他数据分析工具或应用)从同一个主题(topic)中消费数据。这意味着您可以同时运行多个消费者来处理和分析日志数据,以满足不同的业务需求和数据使用场景。

  6. 可定制性和灵活性
    Kafka提供了丰富的API和工具,使得您可以轻松地定制和扩展日志收集系统。例如,您可以编写自定义的Kafka生产者来收集特定格式的日志数据,或者编写自定义的Kafka消费者来处理和分析日志数据。

  7. 与其他系统的集成
    Kafka是一个广泛使用的消息队列系统,它支持与其他各种系统和工具进行集成。这意味着您可以将Kafka轻松地集成到现有的日志收集、处理、存储和分析系统中,以构建一个更加健壮、可扩展和灵活的日志收集平台。

        综上所述,当日志量变得非常大时,在日志收集平台中加入Kafka可以显著提高系统的性能、可靠性和可扩展性。通过利用Kafka的缓冲、异步处理、水平扩展、实时数据处理、数据持久化、多消费者支持、可定制性和与其他系统的集成能力,您可以构建一个更加健壮、高效和灵活的日志收集系统。

        有需要本次实验软件包的评论区可以找我要,无偿提供。

架构图

资源列表

操作系统配置主机名IP
CentOS7.3.16112C4Ges01192.168.207.131
CentOS7.3.16112C4Gkibana192.168.207.165
CentOS7.3.16112C4Gfilebeat192.168.207.166
CentOS7.3.16112C4Gkafka192.168.207.167
CentOS7.3.16112C4Glogstash192.168.207.168

基础环境

关闭防护墙

systemctl stop firewalld
systemctl disable firewalld

关闭内核安全机制

sed -i "s/.*SELINUX=.*/SELINUX=disabled/g" /etc/selinux/config
reboot

修改主机名

hostnamectl set-hostname es01
hostnamectl set-hostname kibana
hostnamectl set-hostname filebeat
hostnamectl set-hostname kafka
hostnamectl set-hostname logstash

添加hosts映射

cat >> /etc/hosts << EOF
192.168.207.131 es01
192.168.207.165 kibana
192.168.207.166 filebeat
192.168.207.167 kafka
192.168.207.168 logstash
EOF

一、部署elasticsearch

修改limit限制

cat > /etc/security/limits.d/es.conf << EOF
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF
​
cat >> /etc/sysctl.conf << EOF
vm.max_map_count=655360
EOF
sysctl -p

部署elasticsearch

mkdir -p /data/elasticsearch
tar zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz -C /data/elasticsearch

修改配置文件

mkdir /data/elasticsearch/{data,logs}[root@es01 elasticsearch-7.14.0]# grep -v "^#" /data/elasticsearch/elasticsearch-7.14.0/config/elasticsearch.yml
cluster.name: my-application
node.name: es01
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["es01"]

启动

useradd es 
chown -R es:es /data/
su - es
/data/elasticsearch/elasticsearch-7.14.0/bin/elasticsearch -d

二、部署filebeat

部署filebeat

mkdir -p /data/filebeat
tar zxvf filebeat-7.14.0-linux-x86_64.tar.gz -C /data/filebeat/

添加配置文件

这里提供了两份filebeat配置文件的参考

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/messages         ###要监控的日志文件
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: 'topic_test1'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/httpd/access_log         ###要监控的日志文件fields:kafka_topic: httpd_access
- type: logenabled: truepaths:- /var/log/httpd/error_log         ###要监控的日志文件fields:kafka_topic: httpd_error
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: '%{[fields.kafka_topic]}'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000

启动

/data/filebeat/filebeat-7.14.0-linux-x86_64/filebeat -e -c filebeat.yml

三、部署kibana

部署kibana

mkdir -p /data/kibana
tar zxvf kibana-7.14.0-linux-x86_64.tar.gz -C /data/kibana/

修改配置文件

grep -v "^#" /data/kibana/kibana-7.14.0-linux-x86_64/config/kibana.yml  | grep -v "^$"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.207.131:9200"]
kibana.index: ".kibana"

启动

useradd kibana
chown -R kibana:kibana /data 
su - kibana
/data/kibana/kibana-7.14.0-linux-x86_64/bin/kibana

四、部署Kafka

安装java

# 安装java环境
yum -y install java-1.8.0-openjdk

安装kafka

tar zxvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/kafka

配置环境变量

# 配置环境变量
cat > /etc/profile.d/zookeeper.sh << 'EOF'
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$ZOOKEEPER_HOME/bin:$PATH
EOFcat > /etc/profile.d/kafka.sh << 'EOF'
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOFsource /etc/profile

创建数据存储目录和日志存储目录

mkdir -p /usr/local/kafka/zookeeper
mkdir -p /usr/local/kafka/log/zookeeper
mkdir -p /usr/local/kafka/log/kafka# 创建zk需要的myid文件
echo 0 > /usr/local/kafka/zookeeper/myid

修改zk配置文件

# 注意Kafka安装目录下的config目录里
server.properties             #是Kafka的配置文件
zookeeper.properties          #是zookeeper的配置文件
cat >> /usr/local/kafka/config/zookeeper.properties << EOF
dataLogDir=/usr/local/kafka/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.207.167:2888:3888
EOFsed -i "s/dataDir\=\/tmp\/zookeeper/dataDir\=\/usr\/local\/kafka\/zookeeper/g" /usr/local/kafka/config/zookeeper.properties

修改Kafka配置文件

# /usr/local/kafka/config/server.properties修改listeners=PLAINTEXT://192.168.207.167:9092
advertised.listeners=PLAINTEXT://192.168.207.167:9092
log.dirs=/usr/local/kafka/log/kafka
delete.topic.enable=true
zookeeper.connect=192.168.207.167:2181

启动zk

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

测试

# 创建一个topic
[root@kafka kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.207.167:9092 --replication-factor 1   --partitions 1 --topic Hello-Kafka
Created topic Hello-Kafka.# 往topic里面输入消息
[root@kafka kafka]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.207.167:9092 --topic Hello-Kafka# 从topic里面消费消息
[root@kafka ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka --from-beginning# 查看topic列表
[root@kafka kafka]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.207.167:9092 --list
Hello-Kafka# 删除topic
[root@kafka kafka]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka

五、部署logstash

部署logstash

mkdir -p /data/logstash
tar zxvf logstash-7.14.0-linux-x86_64.tar.gz -C /data/logstash/

添加配置文件

mkdir /data/logstash/logstash-7.14.0/conf.dcat > /data/logstash/logstash-7.14.0/conf.d/system.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"topic_test1" type =>"topic_test1"codec =>"json" } 
}
output { if [type] == "topic_test1" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"kafka-system-%{+YYYY.MM.dd}" } }
}EOF
cat > /data/logstash/logstash-7.14.0/conf.d/httpd.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_access" type =>"httpd_access"codec =>"json" } kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_error" type =>"httpd_error"codec =>"json" }
}
output { if [type] == "httpd_access" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-access-%{+YYYY.MM.dd}" } }if [type] == "httpd_error" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-error-%{+YYYY.MM.dd}" } }
}EOF

启动

/data/logstash/logstash-7.14.0/bin/logstash -f /data/logstash/logstash-7.14.0/conf.d/

相关文章:

基于Kafka的日志采集

目录 前言 架构图 资源列表 基础环境 关闭防护墙 关闭内核安全机制 修改主机名 添加hosts映射 一、部署elasticsearch 修改limit限制 部署elasticsearch 修改配置文件 启动 二、部署filebeat 部署filebeat 添加配置文件 启动 三、部署kibana 部署kibana 修…...

某烟草企业数字化转型物流信息化咨询项目规划方案(117页PPT)

方案介绍&#xff1a; 烟草企业数字化转型物流信息化咨询项目规划方案将为企业带来多方面的价值&#xff0c;包括提升物流运营效率、降低物流成本、优化供应链管理、增强企业竞争力和促进可持续发展等。这些价值的实现将有助于企业在激烈的市场竞争中保持领先地位并实现可持续…...

失落的方舟 命运方舟台服封号严重 游戏封IP怎么办

步入《失落的方舟》&#xff08;Lost Ark&#xff09;&#xff0c;这款由Smilegate精心打造的宏大规模在线角色扮演游戏&#xff08;MMORPG&#xff09;&#xff0c;您将启程前往阿克拉西亚这片饱经沧桑的奇幻大陆&#xff0c;展开一场穿越时空的壮阔探索。在这里&#xff0c;一…...

2.10 mysql设置远程访问权限

2.10 mysql设置远程访问权限 目录1. 管理员运行mysql命令窗口2. 使用 root 用户重新登录 MySQL3. 修改用户权限4. 修改mysql安装目录下的my.ini 目录 说明&#xff1a; Mysql8.0 设置远程访问权限 一、Mysql8.0 设置远程访问权限 1. 管理员运行mysql命令窗口 2. 使用 root 用…...

C# 证件照替换底色与设置背景图---PaddleSegSharp

PaddleSegSharp 是一个基于百度飞桨PaddleSeg项目的人像分割模块而开发的.NET的工具类库。 PaddleSegSharp 中PaddleSeg.dll文件是基于开源项目最新发布版本PaddleSeg PaddleSeg的版本修改而成的C动态库&#xff0c;基于opencv的x64编译而成的。 PaddleSeg是基于飞桨PaddlePa…...

HCIA-HarmonyOS Device Developer 课程大纲

一&#xff1a;OpenHarmony 介绍 - &#xff08; 3 课时&#xff09; - OpenHarmony 简介&#xff1b;OpenHarmony 设计理念&#xff1b;OpenHarmony 设计理念概述&#xff1b; - OpenHarmony 试图解决的问题&#xff1b;应用生态割裂问题&#xff1b;用户数据割裂问题&#…...

洗地机哪个牌子最好用?十大名牌洗地机排行榜

作为一种新兴的智能家居产品&#xff0c;洗地机的市场规模已经突破了百亿大关。如此庞大的市场自然吸引了大量资本的涌入&#xff0c;许多品牌纷纷推出自己的洗地机产品&#xff0c;试图在这个竞争激烈的市场中占据一席之地。然而&#xff0c;面对如此多的品牌和型号&#xff0…...

Unity开发——XLua热更新之Hotfix配置(包含xlua获取与导入)

一、Git上获取xlua 最新的xlua包&#xff0c;下载地址链接&#xff1a;https://github.com/Tencent/xLua 二、Unity添加xlua 解压xlua压缩包后&#xff0c;将xlua里的Assets里的文件直接复制进Unity的Assets文件夹下。 成功导入后&#xff0c;unity工具栏会出现xlua选项。 …...

Qt 基于FFmpeg的视频转换器 - 转GIF动图

Qt 基于FFmpeg的视频转换器 - 转GIF动图 引言一、设计思路二、核心源码三、参考链接 引言 gif格式的动图可以通过连续播放一系列图像或视频片段来展示动态效果&#xff0c;使信息更加生动形象&#xff0c;可以很方便的嵌入到网页或者ppt中。上图展示了视频的前几帧转为gif动图的…...

HTML新春烟花盛宴

目录 写在前面 烟花盛宴 完整代码 修改文字...

第十四届蓝桥杯c++研究生组

A 混乘数字 关键思路是求每个十进制数的数字以及怎么在一个数组中让判断所有的数字次数相等。 求每个十进制的数字 while(n!0){int x n%10;//x获取了n的每一个位数字n/10;}扩展&#xff1a;求二进制的每位数字 &#xff08;注意&#xff1a;进制转换、1的个数、位运算&#…...

KDD 2024|基于隐空间因果推断的微服务系统根因定位

简介&#xff1a;本文介绍了由清华大学、南开大学、eBay、微软、中国科学院计算机网络信息中心等单位共同合作的论文《基于隐空间因果推断的受限可观测性场景的微服务系统根因定位》。该论文已被KDD 2024会议录用。 论文标题&#xff1a;Microservice Root Cause Analysis Wit…...

白鹭群优化算法,原理详解,MATLAB代码免费获取

白鹭群优化算法&#xff08;Egret Swarm Optimization Algorithm&#xff0c;ESOA&#xff09;是一种受自然启发的群智能优化算法。该算法从白鹭和白鹭的捕食行为出发&#xff0c;由三个主要部分组成:坐等策略、主动策略和判别条件。将ESOA算法与粒子群算法(PSO)、遗传算法(GA)…...

【源码】2024完美运营版商城/拼团/团购/秒杀/积分/砍价/实物商品/虚拟商品等全功能商城

后台可以自由拖曳修改前端UI页面 还支持虚拟商品自动发货等功能 前端UNIAPP 后端PHP 一键部署版本 获取方式&#xff1a; 微&#xff1a;uucodes...

Java-数组内存解析

文章目录 1.内存的主要结构&#xff1a;栈、堆2.一维数组的内存解析3.二维数组的内存解析 1.内存的主要结构&#xff1a;栈、堆 2.一维数组的内存解析 举例1&#xff1a;基本使用 举例2&#xff1a;两个变量指向一个数组 3.二维数组的内存解析 举例1&#xff1a; 举例2&am…...

Spring Cache --学习笔记

一、概述 Spring Cache 是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单地加一个注解&#xff0c;就能实现缓存功能。 Spring Cache 提供了一层抽象&#xff0c;底层可以切换不同的缓存实现&#xff0c;例如&#xff1a; EHCache Caffeine Redis(常…...

NTP服务的DDoS攻击:原理和防御

NTP协议作为一种关键的互联网基础设施组件&#xff0c;旨在确保全球网络设备间的时钟同步&#xff0c;对于维护数据一致性和安全性至关重要。然而&#xff0c;其设计上的某些特性也为恶意行为者提供了发动大规模分布式拒绝服务(DDoS)攻击的机会。以下是NTP服务DDoS攻击及其防御…...

【面试干货】事务的并发问题(脏读、不可重复读、幻读)与解决策略

【面试干货】事务的并发问题&#xff08;脏读、不可重复读、幻读&#xff09;与解决策略 一、脏读&#xff08;Dirty Read&#xff09;二、不可重复读&#xff08;Non-repeatable Read&#xff09;三、幻读&#xff08;Phantom Read&#xff09;四、总结 &#x1f496;The Begi…...

函数式接口:现代编程的利器

1. 引言 在软件开发的演进过程中&#xff0c;函数式编程&#xff08;Functional Programming, FP&#xff09;逐渐显露头角&#xff0c;成为解决复杂问题的有效工具之一。函数式接口作为函数式编程的核心概念之一&#xff0c;其重要性不言而喻。本文将深入探讨函数式接口的概念…...

2022职称继续教育--深入实施新时代人才强国战略 加快建设世界重要人才中心和创新高地

单选题&#xff08;共7题&#xff0c;每题5分&#xff09; 1、&#xff08;&#xff09;实行职位职级制工资为主。 D、中长线科研重要岗位人员 2、建设世界重要人才中心和创新高地有&#xff08;&#xff09;个阶段目标。 B、三 3、综合国力竞争说到底是&#xff08;&#xf…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下&#xff0c;越来越多的求职者将目光投向了日本及中日双语岗位。但是&#xff0c;一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧&#xff1f;面对生疏的日语交流环境&#xff0c;即便提前恶补了…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

高危文件识别的常用算法:原理、应用与企业场景

高危文件识别的常用算法&#xff1a;原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件&#xff0c;如包含恶意代码、敏感数据或欺诈内容的文档&#xff0c;在企业协同办公环境中&#xff08;如Teams、Google Workspace&#xff09;尤为重要。结合大模型技术&…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

PostgreSQL——环境搭建

一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在&#xff0…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程

STM32F1 本教程使用零知标准板&#xff08;STM32F103RBT6&#xff09;通过I2C驱动ICM20948九轴传感器&#xff0c;实现姿态解算&#xff0c;并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化&#xff0c;适合嵌入式及物联网开发者。在基础驱动上新增…...