大数据-玩转数据-Flume
一、Flume简介
-
- Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。
-
- Flume基于流式架构,容错性强,也很灵活简单。
-
- Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。
二、Flume角色
2.1、Source
用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。
2.2、Channel
用于桥接Sources和Sinks,类似于一个队列。
2.3、Sink
从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。
2.4、Event
传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。
三、Flume传输过程
source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并put到channel后commit提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS中。
四、Flume部署及使用
4.1 采集架构
4.2 Flume安装
4.2.1 下载
apache-flume-1.6.0-bin.tar.gz
链接:https://pan.baidu.com/s/1ySmEEObFtKtyT7GsEldnfA
提取码:436t
4.2.2 安装
Flume的安装非常简单,只需要解压即可
tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME
在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:KaTeX parse error: Expected 'EOF', got '#' at position 6: PWD ]#̲ scp -rp apache…PWD
4.2.3 测试
采集配置:
vi netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent去采集数据
启动命令:
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定flume自身的配置文件所在目录
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字
先要往agent采集监听的端口上发送数据,让agent有数据可采
发送命令:
安装telnet:
]# yum install telnet
]# telnet anget-hostname port (telnet localhost 44444)
测试输入输出如下图:
4.3 Flume配置
1)Flume 配置分析
Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件
vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
测试日志:
配置说明如下:
4.4 Flume 的 ETL 和分类型拦截器
本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。
1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.zgjy</groupId><artifactId>flume-interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.41</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.3</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>
4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:
package
相关文章:

大数据-玩转数据-Flume
一、Flume简介 Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。Flume基于流式架构,容错性强,也很灵活简单。Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。二、Flume…...

【Linux】进程概念IV 进程地址空间
Halo,这里是Ppeua。平时主要更新C语言,C,数据结构算法…感兴趣就关注我吧!你定不会失望。 本篇导航 0. 数据在内存中的分布1. 虚拟地址与真实物理地址2. 进程地址空间2.1 进程地址空间概念2.2 进程->页表->内存 0. 数据在内…...

Flink在汽车行业的应用【面试加分系列】
很多同学问我为什么要发这些大数据前沿汇报? 一方面是自己学习完后觉得非常好,然后总结发出来方便大家阅读;另外一方面,看这些汇报对你的面试帮助会很大,特别是面试前可以看看即将面试公司在大数据前沿的发展动向&…...

智慧工地源码:助力数字建造、智慧建造、安全建造、绿色建造
智慧工地围绕建设过程管理,建设项目与智能生产、科学管理建设项目信息生态系统集成在一起,该数据在虚拟现实环境中,将物联网收集的工程信息用于数据挖掘和分析,提供过程趋势预测和专家计划,实现工程建设的智能化管理&a…...

Spring Boot(二)
1、运行维护 1.1、打包程序 SpringBoot程序是基于Maven创建的,在Maven中提供有打包的指令,叫做package。本操作可以在Idea环境下执行。 mvn package 打包后会产生一个与工程名类似的jar文件,其名称是由模块名版本号.jar组成的。 1.2、程序…...

上海亚商投顾:沪指缩量调整跌 高位强势股继续退潮
上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。 一.市场情绪 三大指数11月10日弱势震荡,上证50盘中跌超1%,以保险为首的权重板块走势较弱。 高位强…...
药理学试卷
1【单选题】关于尼可刹米,错误的是 C A、直接兴奋延脑呼吸中枢 B、刺激颈动脉体化学感受器 C、作用时间较长 D、过量可致惊厥 2【单选题】属于第三代头孢菌素的药物是 C A、头孢克洛 B、头孢噻吩 C、头孢曲松 D、头孢匹罗 3【单选题】不属于β受体阻断药禁…...
SpringBoot3-快速入门
1.前置知识 Java17Spring、SpringMVC、MyBatisMaven、IDEA\ 2. 环境要求 环境&工具 版本(or later) SpringBoot 3.0.5 IDEA 2021.2.1 Java 17 Maven 3.5 Tomcat 10.0 Servlet 5.0 GraalVM Community 22.3 Native Build Tools 0.9…...

具名挂载和匿名挂载
匿名卷挂载 : -v 的时候只指定容器内的路径 如下面这个:/etc/nginx 1.docker run -d -P --name nginx -v /etc/nginx nginx 2.查看所有卷 docker volume ls 这里发现,这就是匿名挂载,只指定容器内的路径,没有指定…...

ARM串口
...
C++ Qt 学习(文章链接汇总)
C Qt 学习(一):Qt 入门 C Qt 学习(二):常用控件使用与界面布局 C Qt 学习(三):无边框窗口设计 C Qt 学习(四):自定义控件与 qss 应用 …...
2311d9月会议
DLF2023年9月月度会议摘要 Robert Robert,在DConf上做了一些初步的JSON5工作.他还更新了Bugzilla到GitHub的迁移脚本.他使用了"隐藏"API,现在脚本要快得多. 除此外,他在DScanner上做了一些小事,并等待JanJurzitza(Webfreak)合并它们.他指出,沃尔特曾要求他写一篇演…...

《算法通关村——二分查找在旋转数字中的应用》
《算法通关村——二分查找在旋转数字中的应用》 这里我们直接通过一个题目,来了解二分查找的应用。 153. 寻找旋转排序数组中的最小值 已知一个长度为 n 的数组,预先按照升序排列,经由 1 到 n 次 旋转 后,得到输入数组。例如&a…...
C/S架构学习之基于TCP的本地通信(服务器)
基于TCP的本地通信(服务器):创建流程:一、创建字节流式套接字(socket函数): int sock_fd socket(AF_LOCAL,SOCK_STREAM,0);二、创建服务器和客户机的本地网络信息结构体并填充服务器本地网络信…...

乡镇村污水处理智慧水务智能监管平台,助力污水监管智慧化、高效化
一、背景与需求 随着城市化进程的加速,排放的污水量也日益增加,导致水污染严重。深入打好污染防治攻坚战的重要抓手,对于改善城镇人居环境,推进城市治理体系和治理能力现代化,加快生态文明建设,推动高质量…...

OSPF综合
实验拓扑 实验需求: 1 R4为ISP,其上只能配置IP地址; R4与其他所有直连设备间均使用公有IP 2 R3-R5/6/7为MGRE环境,R3为中心站点 ; 3 整个OSPF环境IP基于172.16.0.0/16划分; 4 所有设备均可访问R4的环回; 5 减少LSA的更新量,加快收…...

vue分片上传视频并转换为m3u8文件并播放
开发环境: 基于若依开源框架的前后端分离版本的实践,后端java的springboot,前端若依的vue2,做一个分片上传视频并分段播放的功能,因为是小项目,并没有专门准备文件服务器和CDN服务,后端也是套用…...

【MySQL】对表结构进行增删查改的操作
表的操作 前言正式开始建表查看表show tables;desc xxx;show create table xxx; 修改表修改表名 rename to对表结构进行修改新增一个列 add 对指定列的属性做修改 modify修改列名 change 删除某列 drop 删除表 drop 前言 前一篇讲了库相关的操作,如果你不太懂&…...

Hadoop原理,HDFS架构,MapReduce原理
Hadoop原理,HDFS架构,MapReduce原理 2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得去找开发,测开 测开的话,你就得学数据库,sql,…...

【Spring Boot】035-Spring Boot 整合 MyBatis Plus
【Spring Boot】035-Spring Boot 整合 MyBatis Plus 【Spring Boot】010-Spring Boot整合Mybatis https://blog.csdn.net/qq_29689343/article/details/108621835 文章目录 【Spring Boot】035-Spring Boot 整合 MyBatis Plus一、MyBatis Plus 概述1、简介2、特性3、结构图4、相…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...

C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...