大数据课程E7——Flume的Interceptor
文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州
▲ 本章节目的
⚪ 了解Interceptor的概念和配置参数;
⚪ 掌握Interceptor的使用方法;
⚪ 掌握Interceptor的Host Interceptor;
⚪ 掌握Interceptor的Static Interceptor;
⚪ 掌握Interceptor的UUID Interceptor;
⚪ 掌握Interceptor的Search And Replace Interceptor;
⚪ 掌握Interceptor的Regex Filtering Interceptor;
⚪ 掌握Interceptor的Custom Interceptor;
一、Timestamp Interceptor
1. 概述
1. Timestamp Interceptor是在headers中来添加一个timestamp字段来标记数据被收集的时间。
2. Timestamp Interceptor结合HDFS Sink可以实现数据按天存储。
2. 配置属性
| 属性 | 解释 |
| type | timestamp |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 给Interceptor起名
a1.sources.s1.interceptors = i1
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
4. 数据按天存放
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp
a1.channels.c1.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata/date=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f hdfsin.conf -
Dflume.root.logger=INFO,console
二、Host Interceptor
1. 概述
1. Host Interceptor是在headers中添加一个字段host。
2. Host Interceptor可以用于标记数据来源于哪一台主机。
2. 配置属性
| 属性 | 解释 |
| type | 必须是host |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 给Interceptor起名
a1.sources.s1.interceptors = i1 i2
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定Host Interceptor
a1.sources.s1.interceptors.i2.type = host
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
三、Static Interceptor
1. 概述
1. Static Interceptor是在headers中添加指定字段。
2. 可以利用这个Interceptor来标记数据的类型。
2. 配置属性
| 属性 | 解释 |
| type | 必须是static |
| key | 指定在headers中的字段名 |
| value | 指定在headers中的字段值 |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 给Interceptor起名
a1.sources.s1.interceptors = i1 i2 i3
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定Host Interceptor
a1.sources.s1.interceptors.i2.type = host
# 指定Static Interceptor
a1.sources.s1.interceptors.i3.type = static
a1.sources.s1.interceptors.i3.key = kind
a1.sources.s1.interceptors.i3.value = log
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
四、UUID Interceptor
1. 概述
1. UUID Interceptor是在headers中添加一个id字段。
2. 可以用于标记数据的唯一性。
2. 配置属性
| 属性 | 解释 |
| type | 必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 给Interceptor起名
a1.sources.s1.interceptors = i1 i2 i3 i4
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定Host Interceptor
a1.sources.s1.interceptors.i2.type = host
# 指定Static Interceptor
a1.sources.s1.interceptors.i3.type = static
a1.sources.s1.interceptors.i3.key = kind
a1.sources.s1.interceptors.i3.value = log
# 指定UUID Interceptor
a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
五、Search And Replace Interceptor
1. 概述
1. Search And Replace Interceptor在使用的时候,需要指定正则表达式,会根据正则表达式的规则,将符合正则表达式的数据替换为指定形式的数据。
2. 在替换的时候,不会替换headers中的数据,而是会替换body中的数据。
2. 配置属性
| 属性 | 解释 |
| type | 必须是search_replace |
| searchPattern | 指定要匹配的正则形式 |
| replaceString | 指定要替换的字符串 |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = http
a1.sources.s1.port = 8090
# 给拦截器起名
a1.sources.s1.interceptors = i1
# 指定类型
a1.sources.s1.interceptors.i1.type = search_replace
a1.sources.s1.interceptors.i1.searchPattern = [0-9]
a1.sources.s1.interceptors.i1.replaceString = *
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f searchin.conf -
Dflume.root.logger=INFO,console
六、Regex Filtering Interceptor
1. 概述
1. Regex Filtering Interceptor在使用的时候需要指定正则表达式。
2. 属性excludeEvents的值如果不指定,默认是false。
3. 如果没有配置excludeEvents的值或者配置excludeEvents的值配置为false,则只有符合正则表达式的数据会留下来,其他不符合正则表达式的数据会被过滤掉;如果excludeEvents的值,那么符合正则表达式的数据会被过滤掉,其他的数据则会被留下来。
2. 配置属性
| 属性 | 解释 |
| type | 必须是regex_filter |
| regex | 指定正则表达式 |
| excludeEvents | true或者false |
3. 案例
1. 编写格式文件,添加如下内容:
# 定义 数据源(输入端) 缓冲区 输出源(输出端)
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 输入端
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/upload
a1.sources.r1.fileSuffix = .done
# 拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
#全部都是符合条件的数据
a1.sources.r1.interceptors.i1.regex = ^.*INFO.*$
#排除符合正则表达式的数据
# a1.sources.r1.interceptors.i1.excludeEvents = true
# 输出端
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://flume45:9000/interceptors/%Y%m%d/%H
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 序列化
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 0
# 使用一个在内存中缓冲事件的通道
a1.channels.c1.type = memory
# 连接通道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f regexin.conf -
Dflume.root.logger=INFO,console
七、Custom Interceptor
1. 概述
1. 在Flume中,也允许自定义拦截器。但是不同于其他组件,自定义Interceptor的时候,需要再额外覆盖其中的内部接口。
2. 步骤:
a. 构建Maven工程,导入对应的依赖。
b. 自定义一个类实现Interceptor接口,覆盖其中initialize,intercept和close方法。
c. 定义静态内部类,实现Interceptor.Builder内部接口。
d. 打成jar包方法Flume安装目录的lib目录下。
e. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 指定拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = cn.tedu.flume.interceptor.AuthInterceptor$Builder
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
f. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f authin.conf -
Dflume.root.logger=INFO,console
相关文章:
大数据课程E7——Flume的Interceptor
文章作者邮箱:yugongshiyesina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 了解Interceptor的概念和配置参数; ⚪ 掌握Interceptor的使用方法; ⚪ 掌握Interceptor的Host Interceptor; ⚪ 掌握Interceptor的…...
P2P网络NAT穿透原理(打洞方案)
1.关于NAT NAT技术(Network Address Translation,网络地址转换)是一种把内部网络(简称为内网)私有IP地址转换为外部网络(简称为外网)公共IP地址的技术,它使得一定范围内的多台主机只…...
Gof23设计模式之桥接外观模式
1.概述 又名门面模式,是一种通过为多个复杂的子系统提供一个一致的接口,而使这些子系统更加容易被访问的模式。该模式对外有一个统一接口,外部应用程序不用关心内部子系统的具体的细节,这样会大大降低应用程序的复杂度࿰…...
微服务性能分析工具 Pyroscope 初体验
Go 自带接口性能分析工具 pprof,较为常用的有以下 4 种分析: CPU Profiling: CPU 分析,按照一定的频率采集所监听的应用程序 CPU(含寄存器)的使用情况,可确定应用程序在主动消耗 CPU 周期时花费时间的位置…...
工作记录------单元测试(持续更新)
工作记录------单元测试 之前的工作中从来没有写过单元测试,新入职公司要求写单元测试, 个人觉得,作为程序员单元测试还是必须会写的 于此记录一下首次编写单元测试的过程。 首先引入单元测试相关的依赖 <dependency><groupId>…...
C#再windowForm窗体中绘画扇形并给其填充颜色
C#再windowForm窗体中绘画扇形并给其填充颜色 Graphics graphics this.CreateGraphics();graphics.SmoothingMode SmoothingMode.AntiAlias;int width this.Width;int height this.Height;h this.Height;w this.Width;Rectangle rct new Rectangle(0 - h / 6, 0 - h / 6…...
MBA拓展有感-见好就收,还是挑战到底?MBA拓展有感-见好就收,还是挑战到底?
今天看到新闻提到某位坚持了14年高考的同学滑档,让人心生感叹:无论在日常工作还是生活中,选择都是非常重要的。不由想起前段时间我参加研究生新生拓展时的一些感悟,和大家分享一下。 事情的起因是拓展活动中的一个分队竞技类的活…...
综合布线系统光缆分类及其特点?
综合布线系统光缆是一种用于数据传输和通信的电缆,常用于建筑物内部网络和通信系统的布线。光缆采用光纤作为传输介质,能够以光的形式传输大量数据,具有高带宽、低延迟、抗干扰等特点,适用于高速数据传输和长距离通信需求。 光缆…...
前端构建(打包)工具发展史
大多同学的前端学习路线:三件套框架慢慢延伸到其他,在这个过程中,有一个词出现的频率很高:webpack 。 作为一个很出名的前端构建工具我们在网上随便一搜,就会有各种教程:loader plugin entry吧啦吧啦。 但…...
【数据可视化】(一)数据可视化概述
目录 0.本章节概述 一、数据可视化 1、什么是数据可视化? 2、数据可视化的好处 3、数据可视化的用途 二、数据探索 1、数据相关工具的使用情景: 2、探索性查询 三、数据挑战 1、什么是数据挑战?...
GoogleLeNet Inception V2 V3
文章目录 卷积核分解第一步分解,对称分解第二步分解,非对称分解在Inception中的改造一般模型的参数节省量可能导致的问题 针对两个辅助分类起的改造特征图尺寸缩减Model Regularization via Label Smoothing——LSR问题描述,也就是LSR解决什么…...
【css】背景图片附着
属性:background-attachment 属性指定背景图像是应该滚动还是固定的(不会随页面的其余部分一起滚动)。 background-attachment: fixed:为固定; background-attachment: scroll为滚动 代码: <!DOCTYPE h…...
解决运行flutter doctor --android-licenses时报错
问题描述: 配置flutter环境时,会使用flutter doctor命令来检查运行flutter的相关依赖是否配好。能看到还差 Android license status unknown.未解决。 C:\Users\ipkiss.wu>flutter doctor Flutter assets will be downloaded from https://storage.…...
在使用Python爬虫时遇到503 Service Unavailable错误解决办法汇总
在进行Python爬虫的过程中,有时会遇到503 Service Unavailable错误,这意味着所请求的服务不可用,无法获取所需的数据。为了解决这个常见的问题,本文将提供一些解决办法,希望能提供实战价值,让爬虫任务顺利完…...
小研究 - 主动式微服务细粒度弹性缩放算法研究(一)
微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放,忽略了能够充分利用单台服务器资源的细粒度垂直缩放,从而导致资源浪费。为此,本文设计了主动式微服务细粒度弹性缩放算法…...
【LeetCode】215.数组中的第K个最大元素
题目 给定整数数组 nums 和整数 k,请返回数组中第 k 个最大的元素。 请注意,你需要找的是数组排序后的第 k 个最大的元素,而不是第 k 个不同的元素。 你必须设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1: 输入: [3,2,1,5,6,4…...
MySQL学习记录:第七章 存储过程和函数
文章目录 第七章 存储过程和函数一、存储过程1、 创建语法*2、调用语法(1)空参列表(2)创建带in参数模式的存储过程,需终端运行(3)创建带out参数模式的存储过程,需终端运行(4)创建带inout参数模式的存储过程,需终端运行3、删除存储过程4、查看存储过程的信息二、函数…...
Docker中gitlab以及gitlab-runner的安装与使用
1、本文主要讲述如何使用Docker安装gitlab以及gitlab-runner,并且会讲述gitlab-runner如何使用 2、gitlab部分不需要修改过多的配置即可使用,本文未讲述https配置,如有需求,可自行百度 3、Docker如何安装可以自行百度 一、Docker安…...
一起学SF框架系列5.12-spring-beans-数据绑定dataBinding
数据绑定有助于将用户输入动态绑定到应用程序的域模型(或用于处理用户输入的任何对象),主要用于web层,但实际可用于任何层。Spring提供了DataBinder来做到这一点,并提供了Validator进行数据验证,两者组成了…...
火热报名中 | 赛宁独家技术支持第七届“蓝帽杯”网络安全技能大赛
由公安部网络安全保卫局、教育部教育管理信息中心、中国教育协会指导,中国人民公安大学主办,奇安信科技集团股份有限公司协办,南京赛宁信息技术有限公司提供技术支持的2023第七届“蓝帽杯”全国大学生网络安全技能大赛于近日正式开启报名。 …...
Pixel Dimension Fissioner步骤详解:如何导出维度手稿为Markdown/PDF/JSON
Pixel Dimension Fissioner步骤详解:如何导出维度手稿为Markdown/PDF/JSON 1. 工具概览 Pixel Dimension Fissioner是一款基于MT5-Zero-Shot-Augment核心引擎构建的文本增强工具,它将传统AI工具转化为一个充满活力的16-bit像素冒险工坊。通过这款工具&…...
STC8A8K寄存器操作避坑指南:硬件PWM配置常见错误排查
STC8A8K硬件PWM实战避坑手册:从寄存器操作到波形调优 第一次用STC8A8K的硬件PWM模块时,我盯着示波器上那串扭曲的波形发了半小时呆——明明按照手册配置了寄存器,为什么输出的PWM信号像心电图一样抽搐?后来才发现是时钟源分频系数…...
别再死记公式了!用MATLAB复现脉冲多普勒雷达(PD)信号处理全流程
用MATLAB实战脉冲多普勒雷达:从信号建模到速度测量全解析 雷达工程师常被复杂的公式和抽象概念困扰,而真正的理解往往来自动手实践。本文将带您用MATLAB完整实现脉冲多普勒(PD)雷达的信号处理流程,通过可运行的代码示例,让每个处理…...
Nanbeige 4.1-3B镜像免配置教程:预装依赖+自动模型缓存机制
Nanbeige 4.1-3B镜像免配置教程:预装依赖自动模型缓存机制 1. 项目介绍 Nanbeige 4.1-3B像素冒险聊天终端是一款专为中文对话优化的AI交互界面,将传统聊天机器人转变为充满游戏乐趣的冒险体验。这个镜像已经预装所有必要依赖,并采用智能缓存…...
python基础学习笔记第七章——文件操作
一、文件的编码1. 编码概念编码是内容和二进制间相互转换的规则集合,由于计算机仅识别0和1,所以需通过编码将文本转二进制存储,也需编码将二进制转回可识别内容。不同编码的转换规则不同,使用错误编码读写文件会导致内容乱码。2. …...
Qwen3.5-9B教育科技:习题截图→知识点定位→举一反三题目生成
Qwen3.5-9B教育科技:习题截图→知识点定位→举一反三题目生成 1. 教育场景的创新应用 在传统教育场景中,教师经常面临一个普遍难题:如何快速识别学生习题中的知识薄弱点,并针对性地提供拓展练习。Qwen3.5-9B通过其强大的多模态理…...
Unity移动物体别再只用Update了!协程、iTween、Lerp实战对比与避坑指南
Unity移动物体方案深度对比:从协程到iTween的实战避坑指南 在Unity开发中,物体移动是最基础也最频繁的需求之一。很多开发者习惯性地在Update中直接修改Transform,但这种方式往往会导致性能浪费、代码难以维护,甚至产生意想不到的…...
CRUISE纯电动车仿真模型与Simulink DLL联合仿真:电制动优先能量回收策略实现指南...
CRUISE纯电动车仿真模型,simulink DLL联合仿真,实现电制动优先能量回收策略。 关于模型:策略是用64位软件编译的,如果模型运行不了请将软件切换成64位。 切换位置在启动界面platform,或者进入软件后点option→ layout。…...
本地大模型系列:2.通过API让本地大模型为你服务
上一篇我们介绍了ollama和lmstudio,这两个工具都是个人版的运行大模型的工具,可以用来运行本地小参数的LLM(所谓小参数一般指不超过27B的4bit量化LLM,按照1B参与大约需要0.7G显存加载进行预测,加上其他开销月1-1.5G&am…...
disposable-email-domains的监控告警系统:异常域名检测与实时通知
disposable-email-domains的监控告警系统:异常域名检测与实时通知 【免费下载链接】disposable-email-domains a list of disposable and temporary email address domains 项目地址: https://gitcode.com/GitHub_Trending/di/disposable-email-domains 在当…...
