Idea中flume的Interceptor的编写教程
1.新建-项目-新建项目
注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找
2. 在maven项目中导入依赖
Pom.xml文件中写入
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
3.创建包(scr-main-java右键-新建-软件包)
4.创建Java类(右键包名-新建-java类)
5. 继承(implements)flume 的拦截器接口
//键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;
//此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类
6.实现方法
public class MyInterceptor implements Interceptor {@Override//初始化方法public void initialize() {}//单个事件拦截//需求:在event的头部信息中添加标记//提供给channel selector 选择发送给不同的channel@Overridepublic Event intercept(Event event)//Map也需要alt + enter 导入Map<String, String> headers = event.getHeaders();//输入even.getHeaders().var回车即可自行填充等号前面的变量信息String log = new String(event.getBody());//envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型// 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名//判断log开头的第一个字符,字母则发到channel1,数字则发到channel2char c = log.charAt(0);//log.charAt(0).var回车即可自行填充等号前面的变量信息if(c >= '0' && c <= '9'){headers.put("type","number");}else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){// 注意字符串类型要使用>=需要用单引号而不能用双引号headers.put("type","letter");}//因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法 event.setHeaders(headers);//返回eventreturn event;}//批量事件拦截(处理多个event,系统调用这个方法)@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list){intercept(event);}return list;}//重写静态内部类Builder@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{//创建一个拦截器对象@Overridepublic Interceptor build() {return new MyInterceptor();}//配置方法@Overridepublic void configure(Context context) {}}}
7.打包(idea右侧菜单栏maven-生命周期-package)
打包完成在idea左侧菜单栏 target 中可以看到我们的包
8.将建好的包复制到flume家目录下的lib中即可使用
cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib
9.测试
9.1 编辑 flume 配置文件
vim flume1.conf
# agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = node1 a1.sources.r1.port = 44444 # channel selector: multiplexing 多路复用 ;默认为replicating 复制 a1.sources.r1.selector.type = multiplexing # 填写相应inerceptor的header上的key a1.sources.r1.selector.header = type # 分配不同value发送到的channel,number到c2,letter到 c1 a1.sources.r1.selector.mapping.number = c2 a1.sources.r1.selector.mapping.letter = c1 #如果匹配不上默认选择的channel a1.sources.r1.selector.default = c2 #interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.hostname = node1 a1.sinks.k2.port = 4546 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 # 接收c1中的数据 a1.sinks.k1.channel = c1 # 接收c2中的数据 a1.sinks.k2.channel = c2 |
vim flume2.conf
a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = avro a2.sources.r2.bind = node1 # flume1 中sink的输出端口 a2.sources.r2.port = 4545 # Describe the sink a2.sinks.k2.type = logger # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 |
vim flume3.conf
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = avro a3.sources.r3.bind = node1 # flume1 中sink的输出端口 a3.sources.r3.port = 4546 # Describe the sink a3.sinks.k3.type = logger # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 |
9.2测试
打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程
第四个窗口启用necat,输入内容进行测试
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3
nc nc node1 44444 (flume1.conf中 source 填的主机名或IP地址 和端口号)
第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接
在窗口4中输入数字、字母、符号
分别在窗口二看到输出字母,窗口三输出数字和符号
恭喜,Interceptor起作用!
相关文章:

Idea中flume的Interceptor的编写教程
1.新建-项目-新建项目 注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找 2. 在maven项目中导入依赖 Pom.xml文件中写入 <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifa…...
java单元测试:JUnit测试运行器
JUnit测试运行器(Test Runner)决定了JUnit如何执行测试。JUnit有多个测试运行器,每个运行器都有特定的功能和用途。 1. 默认运行器 当没有显式指定运行器时,JUnit会使用默认运行器,这在JUnit 4和JUnit 5之间有所不同…...

网络模型—BIO、NIO、IO多路复用、信号驱动IO、异步IO
一、用户空间和内核空间 以Linux系统为例,ubuntu和CentOS是Linux的两种比较常见的发行版,任何Linux发行版,其系统内核都是Linux。我们在发行版上操作应用,如Redis、Mysql等其实是无法直接执行访问计算机硬件(如cpu,内存…...

智能语义识别电影机器人的rasa实现
文章目录 0.前言1.项目整体框架2.rasa训练数据结构4.rasa启动命令及用到的API 0.前言 最近做了一个智能电影机器人的项目,我主要负责用户语义意图识别,用的框架是rasa,对应的版本为 3.6.15,对应的安装命令为: pip3 install rasa…...
C# 实现腾讯云 IM 常用 REST API 之会话管理
目录 关于腾讯 IM REST API 开发前准备 范例运行环境 常用会话管理API 查询账号会话总未读数 查询单聊会话消息记录 下载最近会话记录 小结 关于腾讯 IM REST API REST API 是腾讯即时通信 IM 提供给服务端的一组 HTTP 后台管理接口,如消息管理、群组管理…...

MySQL之Schema与数据类型优化(三)
Schema与数据类型优化 BLOB和TEXT类型 BLOB和TEXT都是为存储很大的数据而设计的字符串数据类型,分别采用二进制和字符方式存储。 实际上它们分别属于两组不同的数据类型家族:字符类型是TINYTEXT,SMALLTEXT,TEXT,MEDIUMTEXT,LONG…...
大语言模型发展历史
大语言模型的发展历史可以追溯到自然语言处理(NLP)和机器学习早期的探索,但真正快速发展起来是在深度学习技术兴起之后。以下是大语言模型发展的一个简要历史概述: 早期阶段(20世纪50-90年代): …...

Nginx - 安全基线配置与操作指南
文章目录 概述中间件安全基线配置手册1. 概述1.1 目的1.2 适用范围 2. Nginx基线配置2.1 版本说明2.2 安装目录2.3 用户创建2.4 二进制文件权限2.5 关闭服务器标记2.6 设置 timeout2.7 设置 NGINX 缓冲区2.8 日志配置2.9 日志切割2.10 限制访问 IP2.11 限制仅允许域名访问2.12 …...
简述js的事件循环以及宏任务和微任务
前言 在JavaScript中,任务被分为同步任务和异步任务。 同步任务:这些任务在主线程上顺序执行,不会进入任务队列,而是直接在主线程上排队等待执行。每个同步任务都会阻塞后续任务的执行,直到它自身完成。常见的同步任…...
[力扣题解] 797. 所有可能的路径
题目:797. 所有可能的路径 思路 深度搜索 代码 // 图论哦!class Solution { private:vector<vector<int>> result;vector<int> path;// x : 当前节点void function(vector<vector<int>>& graph, int x){int i;// cout <&l…...

【QT八股文】系列之篇章3 | QT的多线程以及QThread与QObject
【QT八股文】系列之篇章3 | QT的多线程 前言4. 多线程为什么需要使用线程池线程池的基础知识python中创建线程池的方法使用threading库队列Queue来实现线程池使用threadpool模块,这是个python的第三方模块,支持python2和python3 QThread的定义QT多线程知…...
基于python flask的web服务
基本例子 from flask import Flask app Flask(__name__) app.route(/)#检查访问的网址,根路径走这里 def hello_world():return hello world#返回hello worldif __name__ __main__:# 绑定到指定的IP地址和端口app.run(host0.0.0.0, port1000, debugTrue)##绑定端…...
HTTP 响应分割漏洞
HTTP 响应分割漏洞 1.漏洞概述2.漏洞案例 1.漏洞概述 HTTP 响应拆分发生在以下情况: 数据通过不受信任的来源(最常见的是 HTTP 请求)进入 Web 应用程序。该数据包含在发送给 Web 用户的 HTTP 响应标头中,且未经过恶意字符验证。…...

Algoriddim djay Pro Ai for Mac:AI引领,混音新篇章
当AI遇上音乐,会碰撞出怎样的火花?Algoriddim djay Pro Ai for Mac给出了答案。这款专业的DJ混音软件,以AI为引擎,引领我们进入混音的新篇章。 djay Pro Ai for Mac的智能混音功能,让每一位DJ都能感受到前所未有的创作…...

常见算法(3)
1.Arrays 它是一个工具类,主要掌握的其中一个方法是srot(数组,排序规则)。 o1-o2是升序排列,o2-o1是降序排列。 package test02; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparat…...

集中抄表电表是什么?
1.集中抄表电表:简述 集中抄表电表,又称为远程抄表系统,是一种现代化电力计量技术,为提升电力行业的经营效率和客户服务质量。它通过自动化的形式,取代了传统人工抄水表,完成了数据信息实时、精确、高效率…...

第八届能源、环境与材料科学国际学术会议(EEMS 2024)
文章目录 一、重要信息二、大会简介三、委员会四、征稿主题五、论文出版六、会议议程七、出版信息八、征稿编辑 一、重要信息 会议官网:http://ic-eems.com主办方:常州大学大会时间:2024年06月7-9日大会地点:新加坡 Holiday Inn …...

09.自注意力机制
文章目录 输入输出运行如何运行解决关联性attention score额外的Q K V Multi-head self-attentionPositional EncodingTruncated Self-attention影像处理vs CNNvs RNN图上的应用 输入 输出 运行 链接(Attention Is All You Need) 如何运行 解决关联性 a…...
时政|杂粮产业
政策支持 《新一轮千亿斤粮食产能提升行动方案(2024—2030年)》明确,按照“巩固提升口粮、主攻玉米大豆、兼顾薯类杂粮”的思路,因地制宜发展马铃薯、杂粮杂豆等品种,根据市场需求优产稳供。 产地发展 河北省石家庄…...
docker 安装 私有云盘 nextcloud
拉取镜像 # 拉取镜像 sudo docker pull nextcloud运行nextcloud 容器 # 内存足够可以不进行内存 --memory512m --memory-swap6g # 桥接网络 --network suixinnet --network-alias nextcloud \ sudo docker run -itd --name nextcloud --restartalways \ -p 9999:80 \ -v /m…...

CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...

【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)
前言: 最近在做行为检测相关的模型,用的是时空图卷积网络(STGCN),但原有kinetic-400数据集数据质量较低,需要进行细粒度的标注,同时粗略搜了下已有开源工具基本都集中于图像分割这块,…...