当前位置: 首页 > news >正文

04数据平台Flume

Flume 功能

在这里插入图片描述
Flume主要作用,就是实时读取服务器本地磁盘数据,将数据写入到 HDFS。

Flume是 Cloudera提供的高可用,高可靠性,分布式的海量日志采集、聚合和传输的系统工具。

Flume 架构

Flume组成架构如下图所示:
在这里插入图片描述

Agent

每个 Agent 代表着一个 JVM 进程,它以事件的方式将数据从源头送至目的地。
Agent 由 3 个部分组成,Source、Channel、Sink。

  • Source:Source是负责接收数据到Flume Agent 的组件,Source 组件可以处理各种类型、各种格式的日志数据。
  • Sink:Sink不断轮询Channel中的事件并且批量移除他们,并将这些数据批量写入到存储或者索引系统,或被发送到另一个Flume Agent。
  • Channel:Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的, 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。
    Flume自带两种 Channel:Memory Channel 和File Channel。
    Memory Channel 是一个内存队列,Source将事件写入其尾部,Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中,因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。
    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  • Event:Flume 传输数据的基本单元,数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成,Header用来存放event 的属性,为 K-V结构,Body 用于存放数据,为字节数组结构。

安装 flume

话不多说,直接开始安装 flume。

  1. 前往 http://archive.apache.org/dist/flume/ , 选择1.9.0
  2. 将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下
    在这里插入图片描述
  3. 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[logan@hadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  1. 创建软链接
cd /opt/module
ln -snf flume-1.9.0/ flume
  1. 删除jar 包,否则会报错
cd /opt/module/flume
rm lib/guava-11.0.2.jar

注意删除guava,一定要配置 Hadoop 环境变量,否则会报错:

Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Listsat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more
  1. 修改log4j.properties配置文件
[logan@hadoop101 flume]$ vim conf/log4j.properties 
# 注意是修改内容
flume.log.dir=/opt/module/flume/logs
  1. 验证 flume
[logan@hadoop101 flume]$ /opt/module/flume/bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume日志采集

配置解析

需要采集的日志文件分布在hadoop101,hadoop102两台日志服务器,故需要在hadoop101,hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到 Kafka。
此处选择TailDirSource和 KafkaChannel。

  • TailDirSource优势:支持断点续传、多目录。
  • KafkaChannel优势:省去了 Sink,提高了效率。
  • 日志采集 Flume关键配置 :
    在这里插入图片描述

Flume日志采集配置

  1. 创建Flume 配置文件
[logan@hadoop101 flume]$ pwd
/opt/module/flume
[logan@hadoop101 flume]$ mkdir job
[logan@hadoop101 flume]$ vim job/file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装 
a1.sources.r1.channels = c1
  1. 编写拦截器
    • 创建Maven工程flume-interceptor
    • 创建包:com.logan.gmall.flume.interceptor
    • 在pom.xml文件中添加如下配置
    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
    </dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.4</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
    </build>
    
    • 在com.logan.gmall.flume.utils包下创建JSONUtil类
    package com.logan.gmall.flume.utils;import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;public class JSONUtil {/*** 通过异常判定是否是JSON字符串*/public static boolean isJSONValidate(String input){try {JSON.parseObject(input);return true;} catch (JSONException e) {return false;}}
    }
    
    • 在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类
    package com.logan.gmall.flume.interceptor;import com.logan.gmall.flume.utils.JSONUtil;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
    }
    
    • 打包
      在这里插入图片描述
    • 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下

采集测试

  1. 启动Zookeeper、Kafka集群
  2. 启动 hadoop101上的日志采集Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
  1. 启动一个Kafka的Console-Consumer
[logan@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log
  1. 生成模拟数据
[logan@hadoop101 module]$ mkdir -p /opt/module/applog/log/
[logan@hadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log
{"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":5},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":5},{"display_type":"query","item":"19","item_type":"sku_id","order":3,"pos_id":4},{"display_type":"query","item":"3","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"query","item":"5","item_type":"sku_id","order":5,"pos_id":2},{"display_type":"promotion","item":"19","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"query","item":"14","item_type":"sku_id","order":7,"pos_id":2},{"display_type":"query","item":"9","item_type":"sku_id","order":8,"pos_id":2},{"display_type":"promotion","item":"35","item_type":"sku_id","order":9,"pos_id":1}],"page":{"during_time":9853,"page_id":"home"},"ts":1672512476000}
{"actions":[{"action_id":"favor_add","item":"9","item_type":"sku_id","ts":1672512480386},{"action_id":"get_coupon","item":"2","item_type":"coupon_id","ts":1672512483772}],"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":4},{"display_type":"promotion","item":"14","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"query","item":"21","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"11","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"promotion","item":"28","item_type":"sku_id","order":5,"pos_id":1}],"page":{"during_time":10158,"item":"9","item_type":"sku_id","last_page_id":"home","page_id":"good_detail","source_type":"promotion"},"ts":1672512477000}
  1. 观察kafka是否有消费到数据

相关文章:

04数据平台Flume

Flume 功能 Flume主要作用&#xff0c;就是实时读取服务器本地磁盘数据&#xff0c;将数据写入到 HDFS。 Flume是 Cloudera提供的高可用&#xff0c;高可靠性&#xff0c;分布式的海量日志采集、聚合和传输的系统工具。 Flume 架构 Flume组成架构如下图所示&#xff1a; A…...

Redis--13--缓存一致性问题

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 缓存一致性问题1、先更新缓存&#xff0c;再更新DB方案二&#xff1a;先更新DB&#xff0c;再更新缓存方案三&#xff1a;先删缓存&#xff0c;再写数据库推荐1&…...

12.7作业

1. #include "mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent) {//***********窗口相关设置***********//设置窗体大小this->resize(540,410);this->setFixedSize(540,410);//取消菜单栏this->setWindowFlag(Qt::FramelessWindowHint);/…...

ssl什么是公钥和私钥?

公钥&#xff08;Public Key&#xff09;与私钥&#xff08;Private Key&#xff09;是通过加密算法得到的一个密钥对&#xff08;即一个公钥和一个私钥&#xff0c;也就是非对称加密方式&#xff09;。公钥可对会话进行加密、验证数字签名&#xff0c;只有使用对应的私钥才能解…...

github首次将文件合到远端分支,发现名字不是master,而是main

其中&#xff0c;暂存区和本地仓库的信息都存储在.git目录中 在自己的github上实践 1、刚开始&#xff0c;git clone gitgithub.com:lingze8678/my_github.git到本地 2、在克隆后的代码中加入一个pdf文件 3、在git bash中操作&#xff08;当项目中有文件更改和删除&#xff…...

RTX 40 系彻底摆烂,NVIDIA 让三年老卡焕发第二春

AMD 前段时间发布的 RX 6750GRE 12/10G 两块新卡属实给了市场一波小小震撼。 有同学要说了&#xff0c;这不就是两年前的 RX 6700 系换皮嘛&#xff0c;典型的旧饭重恰它凭啥能火&#xff1f; 无他&#xff0c;性能合格&#xff0c;价格实惠&#xff0c;主打一个高性价比。 别…...

ELK技术栈介绍及简单使用实例

1. ELK技术栈介绍 引言 在当今数据驱动的世界里&#xff0c;有效地管理和分析大量日志数据变得至关重要。这里我们将深入探讨ELK技术栈&#xff0c;这是一种流行的日志管理解决方案&#xff0c;它结合了三个开源项目&#xff1a;Elasticsearch、Logstash和Kibana。ELK技术栈因…...

基于Java健身房课程管理系统

基于Java健身房课程管理系统 功能需求 1、课程信息管理&#xff1a;系统需要能够记录和管理所有课程的详细信息&#xff0c;包括课程名称、教练信息、课程时间、课程地点、课程容量等。管理员和教练可以添加、编辑和删除课程信息。 2、会员信息管理&#xff1a;系统需要能够…...

DAPP开发【02】Remix使用

系列文章目录 系列文章在DAPP开发专栏 文章目录 系列文章目录使用部署测试网上本地项目连接remix本地项目连接remix 使用 创建一个新的工作空间 部署测试网上 利用metaMask连接测试网络 添加成功&#xff0c;添加时需要签名 即可进行编译 即可部署 本地项目连接remix 方…...

大华DSS S2-045 OGNL表达式注入漏洞复现

0x01 产品简介 大华DSS安防监控系统平台是一款集视频、报警、存储、管理于一体的综合安防解决方案。该平台支持多种接入方式,包括网络视频、模拟视频、数字视频、IP电话、对讲机等。此外,该平台还支持多种报警方式,包括移动侦测、区域入侵、越线报警、人员聚集等。 0x02 漏…...

大数据之HBase(二)

Master详细架构 位置&#xff1a;namenode实现类&#xff1a;HMaster组成 负载均衡器&#xff1a;通过meta了解region的分配&#xff0c;通过zk了解rs的启动情况&#xff0c;5分钟调控一次分配平衡元数据表管理器&#xff1a;管理自己的预写日志&#xff0c;如果宕机&#xff…...

前后端数据传输格式(下)

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 上篇主要复习了HTTP以及…...

mysql pxc高可用离线部署(三)

pxc学习流程 mysql pxc高可用 单主机 多主机部署&#xff08;一&#xff09; mysql pxc 高可用多主机离线部署&#xff08;二&#xff09; mysql pxc高可用离线部署&#xff08;三&#xff09; mysql pxc高可用 跨主机部署pxc 本文使用docker进行安装&#xff0c;主机间通过…...

XXL-JOB 日志表和日志文件自动清理

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…...

常用sql记录

备份一张表 PostgreSQL CREATE TABLE new_table AS SELECT * FROM old_table;-- 下面这个比上面好&#xff0c;这个复制表结构时&#xff0c;会把默认值、约束、注释都复制 CREATE TABLE new_table (LIKE old_table INCLUDING ALL) WITHOUT OIDS; INSERT INTO new_table SELE…...

设备温度和振动综合监测:温振一体式传感器的优点和应用

随着工业设备的复杂性和自动化程度的提高&#xff0c;对设备状态监测的需求也日益增加。温振一体式传感器作为一种集振动和温度监测于一体的传感器&#xff0c;具备多项优势&#xff0c;因此在工业设备状态监测领域得到广泛应用。 温振一体式传感器基于振动传感器和温度传感器的…...

彻底解决ModuleNotFoundError: No module named ‘exceptions‘【Bug完美解决】

文章目录 项目场景:问题描述原因分析:解决方案:此Bug解决方案总结心得项目场景: 根据本文可找到bug原因并彻底解决**ModuleNotFoundError: No module named ‘exceptions‘**Bug 报错: E:\Anconda\python.exe c:\Users\24190\PycharmProjects\pythonProject4py尝试 gong…...

yarn和npm的区别

2023-12-8 yarn和npm的区别 是常用的包管理工具&#xff0c;用于node.js项目中安装、管理、和更新依赖项 有以下几个区别&#xff1a; 性能和速度&#xff1a;在包的安装和下载方面&#xff0c;yarn比npm更快速&#xff0c;yarn通过并行下载和缓存等优化策略&#xff0c;可以…...

设计图中时序图

设计图中的时序图通常用于展示两个或多个对象之间的交互和消息传递的顺序。它是一种用于描述软件或系统中的并发性和时序行为的工具。 以下是一个简单的时序图的示例&#xff1a; 首先&#xff0c;在时序图中创建两个对象&#xff0c;例如"对象A"和"对象B&quo…...

反射实现tomcat

获取类信息的方法 1.通过类对象 x.getClass() 2.通过class.forname方法 Class.forname(className);这里className是存储类名的字符串 3.通过类名.class 类名.class 通过类名创建对象 类名.newInstance&#xff08;&#xff09;&#xff1b; 反射可以看到类的一切信息&#xff1…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...