当前位置: 首页 > news >正文

Flume 简介及基本使用

1.Flume简介

Apache Flume 是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本,NG 在 OG 的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以 NG 为基础。

2. Flume架构和基本概念

下图为 Flume 的基本架构图:

2.1 基本架构

外部数据源以特定格式向 Flume 发送 `events` (事件),当 `source` 接收到 `events` 时,它将其存储到一个或多个 `channel`,`channe` 会一直保存 `events` 直到它被 `sink` 所消费。`sink` 的主要功能从 `channel` 中读取 `events`,并将其存入外部存储系统或转发到下一个 `source`,成功后再从 `channel` 中移除 `events`。

2.2 基本概念

1. Event

`Event` 是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 `Event` 由标题和正文组成:前者是键/值映射,后者是任意字节数组。

2. Source 

数据收集组件,从外部数据源收集数据,并存储到 Channel 中。

3. Channel

`Channel` 是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:

+ `Memory Channel` : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机);

+ `File Channel` : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。

4. Sink

`Sink` 的主要功能从 `Channel` 中读取 `Event`,并将其存入外部存储系统或将其转发到下一个 `Source`,成功后再从 `Channel` 中移除 `Event`。

5. Agent

是一个独立的 (JVM) 进程,包含 `Source`、 `Channel`、 `Sink` 等组件。

2.3 组件种类

Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:

- Source 类型 :内置了几十种类型,如 `Avro Source`,`Thrift Source`,`Kafka Source`,`JMS Source`;

- Sink 类型 :`HDFS Sink`,`Hive Sink`,`HBaseSinks`,`Avro Sink` 等;

- Channel 类型 :`Memory Channel`,`JDBC Channel`,`Kafka Channel`,`File Channel` 等。

对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。

3. Flume架构模式

Flume 支持多种架构模式,分别介绍如下

3.1 multi-agent flow

Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 `Avro` 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口(详细配置见下文案例三)。

3.2 Consolidation

日志收集中常常存在大量的客户端(比如分布式 web 服务),Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

3.3 Multiplexing the flow

Flume 支持从一个 Source 向多个 Channel,也就是向多个 Sink 传递事件,这个操作称之为 `Fan Out`(扇出)。默认情况下 `Fan Out` 是向所有的 Channel 复制 `Event`,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 `Source` 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

4.Flume配置格式

Flume 配置通常需要以下两个步骤:

1. 分别定义好 Agent 的 Sources,Sinks,Channels,然后将 Sources 和 Sinks 与通道进行绑定。需要注意的是一个 Source 可以配置多个 Channel,但一个 Sink 只能配置一个 Channel。基本格式如下:

<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

2. 分别定义 Source,Sink,Channel 的具体属性。基本格式如下:

<Agent>.sources.<Source>.<someProperty> = <someValue>properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

5. Flume使用案例

介绍几个 Flume 的使用案例:

+ 案例一:使用 Flume 监听文件内容变动,将新增加的内容输出到控制台。

+ 案例二:使用 Flume 监听指定目录,将目录下新增加的文件存储到 HDFS。

+ 案例三:使用 Avro 将本服务器收集到的日志数据发送到另外一台服务器。

5.1 案例一

需求: 监听文件内容变动,将新增加的内容输出到控制台。

实现: 主要使用 `Exec Source` 配合 `tail` 命令实现。

1. 配置

新建配置文件 `exec-memory-logger.properties`,其内容如下:

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  #配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c#将sourceschannels进行绑定
a1.sources.s1.channels = c1#配置sink 
a1.sinks.k1.type = logger#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1  #配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console

3. 测试

向文件中追加数据:

控制台的显示:

5.2 案例二

需求: 监听指定目录,将目录下新增加的文件存储到 HDFS。

实现:使用 `Spooling Directory Source` 和 `HDFS Sink`。

1. 配置

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  #配置sources属性
a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName 
#将sourceschannels进行绑定  
a1.sources.s1.channels =c1 #配置sink 
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream  
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1#配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console

3. 测试

拷贝任意文件到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

cp log.txt logs/

查看上传到 HDFS 上的文件内容与本地是否一致:

hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801

5.3 案例三

需求: 将本服务器收集到的数据发送到另外一台服务器。

实现:使用 `avro sources` 和 `avro Sink` 实现。

1. 配置日志收集Flume

新建配置 `netcat-memory-avro.properties`,监听文件内容变化,然后将新的文件内容通过 `avro sink` 发送到 hadoop001 这台服务器的 8888 端口:

#指定agentsources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2. 配置日志聚合Flume

使用 `avro source` 监听 hadoop001 服务器的 8888 端口,将获取到内容输出到控制台:

#指定agentsources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2#配置sources属性
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888#将sourceschannels进行绑定
a2.sources.s2.channels = c2#配置sink
a2.sinks.k2.type = logger#将sinkschannels进行绑定
a2.sinks.k2.channel = c2#配置channel类型
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

3. 启动

启动日志聚集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console

在启动日志收集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console

这里建议按以上顺序启动,原因是 `avro.source` 会先与端口进行绑定,这样 `avro sink` 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,`sink` 会一直重试,直至建立好连接。

4.测试

向文件 `tmp/log.txt` 中追加内容:

可以看到已经从 8888 端口监听到内容,并成功输出到控制台:

相关文章:

Flume 简介及基本使用

1.Flume简介 Apache Flume 是一个分布式&#xff0c;高可用的数据收集系统。它可以从不同的数据源收集数据&#xff0c;经过聚合后发送到存储系统中&#xff0c;通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本&#xff0c;NG 在 OG 的基础上进行了完全的重构…...

行业追踪,2023-10-11

自动复盘 2023-10-11 凡所有相&#xff0c;皆是虚妄。若见诸相非相&#xff0c;即见如来。 k 线图是最好的老师&#xff0c;每天持续发布板块的rps排名&#xff0c;追踪板块&#xff0c;板块来开仓&#xff0c;板块去清仓&#xff0c;丢弃自以为是的想法&#xff0c;板块去留让…...

Linux:进程控制

目录 一、进程创建 写时拷贝 二、进程终止 echo $? 如何终止进程 _exit与exit 三、进程等待 进程等待的必要性 进程等待的操作 wait waitpid status 异常退出情况 status相关宏 options 四、进程程序替换 1、关于进程程序替换 2、如何进行进程程序替换 程序…...

HTTP中的GET方法与POST方法

1、GET 和 POST方法之间的区别 根据 RFC 规范&#xff0c;GET 的语义是从服务器获取指定的资源&#xff0c;这个资源可以是静态的文本、页面、图片视频等。GET 请求的参数位置一般是写在 URL 中&#xff0c;URL 规定只能支持 ASCII&#xff0c;所以 GET 请求的参数只允许 ASCI…...

2023年10月16日-10月22日,(光追+ue+osg继续按部就班进行即可。)

根据月计划&#xff0c; 本周计划如下&#xff1a; 2023年10月16日-10月22日&#xff0c;光追10.7-10.13&#xff0c;ue rpg(p47-p53),ue5底层渲染01A19-01B4,osg29,osg30,filament文档每天看 落实到天就是 2023年10月16日光追10.7&#xff0c;ue rpg(p47),ue5底层渲染01A19,o…...

【Docker】命令使用大全

【Docker】命令使用大全 目录 【Docker】命令使用大全 简述 Docker 的主要用途 基本概念 容器周期管理 run start/stop/restart kill rm pause/unpause create exec 容器操作 ps inspect top attach events logs wait export port 容器 rootfs 命令 c…...

查找算法:二分查找、插值查找、斐波那契查找

二分查找 查找的前提是数组有序 思路分析 代码实现 # 二分查找&#xff08;递归法实现&#xff09; # 找到一个相等的值就返回该值的下标 def binary_search(arr: list, find_val: int, left: int, right: int):mid (left right) // 2 # 寻找数组中间位置的下标if left &…...

python+django高校教室资源预约管理系统lqg8u

技术栈 后端&#xff1a;pythondjango 前端&#xff1a;vueCSSJavaScriptjQueryelementui 开发语言&#xff1a;Python 框架&#xff1a;django/flask Python版本&#xff1a;python3.7.7 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 开发软件&#xff1a;PyChar…...

Potato靶机

信息搜集 设备发现 扫描端口 综合扫描 开放了80端口的HTTP服务和7120端口的SSH服务 目录扫描 扫描目录 看看这个info.php&#xff0c;发现只有php的版本信息&#xff0c;没有可以利用的注入点 SSH突破 hydra 爆破 考虑到 7120 端口是 ssh 服务&#xff0c;尝试利用 hydra …...

【环境搭建】linux docker-compose安装gitlab和redis

gitlab需要redis&#xff0c;一起安装了 新建gitlab和redis挂载目录 mkdir -p /data/docker/redis/data mkdir -p /data/docker/redis/logs mkdir -p /data/docker/redis/confmkdir -p /data/docker/gitlab/data mkdir -p /data/docker/gitlab/logs mkdir -p /data/docker/gi…...

JAVAEE初阶相关内容第十三弹--文件操作 IO

写在前 终于完成了&#xff01;&#xff01;&#xff01;&#xff01;内容不多就是本人太拖拉&#xff01; 这里主要介绍文件input&#xff0c;output操作。File类&#xff0c;流对象&#xff08;分为字节流、字符流&#xff09; 需要掌握每个流对象的使用方式&#xff1a;打…...

POI报表的高级应用

POI报表的高级应用 掌握基于模板打印的POI报表导出理解自定义工具类的执行流程 熟练使用SXSSFWorkbook完成百万数据报表打印理解基于事件驱动的POI报表导入 模板打印 概述 自定义生成Excel报表文件还是有很多不尽如意的地方&#xff0c;特别是针对复杂报表头&#xff0c;单…...

【计算机毕设选题推荐】超市管理系统SpringBoot+SSM+Vue

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 项目名 基于SpringBoot的超市管理系统 技术栈 SpringBootVueMySQLMaven 文章目录 一、超市管理系统…...

【算法1-4】递推与递归-P1002 [NOIP2002 普及组] 过河卒

## 题目描述 棋盘上 A 点有一个过河卒&#xff0c;需要走到目标 B 点。卒行走的规则&#xff1a;可以向下、或者向右。同时在棋盘上 C 点有一个对方的马&#xff0c;该马所在的点和所有跳跃一步可达的点称为对方马的控制点。因此称之为“马拦过河卒”。 棋盘用坐标表示&#…...

浅谈压力测试的作用是什么

随着现代应用程序变得越来越复杂&#xff0c;用户的期望也在不断提高&#xff0c;对性能和可靠性的要求变得更加苛刻。在应用程序开发和维护的过程中&#xff0c;压力测试是一项至关重要的活动&#xff0c;它可以帮助发现潜在的问题、评估系统的性能极限&#xff0c;以及确保在…...

互联网Java工程师面试题·Java 总结篇·第一弹

目录 1、面向对象的特征有哪些方面&#xff1f; 2、访问修饰符 public,private,protected,以及不写&#xff08;默认&#xff09;时的区别&#xff1f; 3、String 是最基本的数据类型吗&#xff1f; 4、float f3.4;是否正确&#xff1f; 5、short s1 1; s1 s1 1;有错吗…...

Anylogic 读取和写入Excel文件

1、选择面板-连接-Excel文件&#xff0c;拖入到视图中 然后在excel文件的属性中进行绑定外部excel文件。 绑定完之后&#xff0c;在你需要读取的地方进行写代码&#xff0c; //定义开始读取的行数 //这里设为2&#xff0c;是因为第一行是数据名称 int row12; //读取excel文件信…...

茶百道全链路可观测实战

作者&#xff1a;山猎 茶百道是四川成都的本土茶饮连锁品牌&#xff0c;创立于 2008 年 。经过 15 年的发展&#xff0c;茶百道已成为餐饮标杆品牌&#xff0c;全国门店超 7000 家&#xff0c;遍布全国 31 个省市&#xff0c;实现中国大陆所有省份及各线级城市的全覆盖。2021 …...

Java-JDBC

JDBC JDBC英文名为&#xff1a;Java Data Base Connectivity(Java数据库连接)&#xff0c;官方解释它是Java编程语言和广泛的数据库之间独立于数据库的连接标准的Java API 根本上说JDBC是一种规范&#xff0c;它提供的接口&#xff0c;一套完整的&#xff0c;允许便捷式访问底…...

【ROS】Nav2源码之nav2_planner详解

【ROS】郭老二博文之:ROS目录 1、简述 nav2_planner是路径规划器,把起始位置、姿势的信息输入nav2_planner模块,将会生成可行路径。 nav2_planner路径规划器和nav2_controller控制器相似,也使用插件的形式加载不同的路径规划器。 常用的路径规划器插件有: 1)NavFn Plan…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

离线语音识别方案分析

随着人工智能技术的不断发展&#xff0c;语音识别技术也得到了广泛的应用&#xff0c;从智能家居到车载系统&#xff0c;语音识别正在改变我们与设备的交互方式。尤其是离线语音识别&#xff0c;由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力&#xff0c;广…...

倒装芯片凸点成型工艺

UBM&#xff08;Under Bump Metallization&#xff09;与Bump&#xff08;焊球&#xff09;形成工艺流程。我们可以将整张流程图分为三大阶段来理解&#xff1a; &#x1f527; 一、UBM&#xff08;Under Bump Metallization&#xff09;工艺流程&#xff08;黄色区域&#xff…...

如何把工业通信协议转换成http websocket

1.现状 工业通信协议多数工作在边缘设备上&#xff0c;比如&#xff1a;PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发&#xff0c;当设备上用的是modbus从站时&#xff0c;采集设备数据需要开发modbus主站&#xff1b;当设备上用的是西门子PN协议时&#xf…...