当前位置: 首页 > news >正文

Flume拦截器的实现

Flume conf文件编写

vim file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /Users/zhangjin/model/project/realtime-flink/applog/log/app.*
# 设置断点续传的位置
a1.sources.r1.positionFile = /Users/zhangjin/model/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = topic_log
# 设置不以Flume event 写入数据,以Body数据进行写入
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1

Flume ETLInterceptor拦截器的编写

maven依赖

	<dependencies><!--Flume依赖 --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><!--Json格式校验--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies>

maven package打包依赖

    <build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

判断是否是JSON字符串

public class JSONUtil {/** 通过异常判断是否是json字符串* 是:返回true  不是:返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}

拦截器实现

  1. 继承Interceptor接口
  2. 实现单event处理
  3. 实现批量event处理
public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 单个event处理* 检验是否是Json格式* @param event* @return*/@Overridepublic Event intercept(Event event) {//1 获取json数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2 校验json数据if (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}/*** 多个event处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}/*** 拦截器重写Builder方法*/public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

测试

maven package打包,将生成的jar包放在了Flume的lib目录下
启动kafka

# 启动命令
./bin/kafka-server-start.sh -daemon ./config/server.properties &# 开启消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_log

启动Flume

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

相关文章:

Flume拦截器的实现

Flume conf文件编写 vim file_to_kafka.conf#定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /Users/zhangjin/model/project/realtime-flink/applog/log/app.* # 设置断点续传…...

Swift Combine 学习(四):操作符 Operator

Swift Combine 学习&#xff08;一&#xff09;&#xff1a;Combine 初印象Swift Combine 学习&#xff08;二&#xff09;&#xff1a;发布者 PublisherSwift Combine 学习&#xff08;三&#xff09;&#xff1a;Subscription和 SubscriberSwift Combine 学习&#xff08;四&…...

leetcode 173.二叉搜索树迭代器栈绝妙思路

以上算法题中一个比较好的实现思路就是利用栈来进行实现&#xff0c;以下方法三就是利用栈来进行实现的&#xff0c;思路很好&#xff0c;很简练。进行next的时候&#xff0c;先是一直拿到左边的子树&#xff0c;直到null为止&#xff0c;这一步比较好思考一点&#xff0c;下一…...

df.groupby([pd.Grouper(freq=‘1M‘, key=‘Date‘), ‘Buyer‘]).sum()

df.groupby([pd.Grouper(freq1M, keyDate), Buyer]).sum() 用于根据特定的时间频率和买家&#xff08;Buyer&#xff09;对 DataFrame 进行分组&#xff0c;然后计算每个分组的总和。下面是对这行代码的逐步解释&#xff1a; df.groupby([...])&#xff1a;这个操作会根据传入的…...

LLM - 使用 LLaMA-Factory 部署大模型 HTTP 多模态服务 (4)

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/144881432 大模型的 HTTP 服务&#xff0c;通过网络接口&#xff0c;提供 AI 模型功能的服务&#xff0c;允许通过发送 HTTP 请求&#xff0c;交互…...

icp备案网站个人备案与企业备案的区别

个人备案和企业备案是在进行ICP备案时需要考虑的两种不同情况。个人备案是指个人拥有的网站进行备案&#xff0c;而企业备案则是指企业或组织名下的网站进行备案。这两者在备案过程中有一些明显的区别。 首先&#xff0c;个人备案相对来说流程较为简单。个人备案只需要提供个人…...

如何不修改模型参数来强化大语言模型 (LLM) 能力?

前言 如果你对这篇文章感兴趣&#xff0c;可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」&#xff0c;查看完整博客分类与对应链接。 大语言模型 (Large Language Model, LLM, e.g. ChatGPT) 的参数量少则几十亿&#xff0c;多则上千亿&#xff0c;对其的训…...

AF3 AtomAttentionEncoder类的init_pair_repr方法解读

AlphaFold3 的 AtomAttentionEncoder 类中,init_pair_repr 方法方法负责为原子之间的关系计算成对表示(pair representation),这是原子转变器(atom transformer)模型的关键组成部分,直接影响对蛋白质/分子相互作用的建模。 init_pair_repr源代码: def init_pair_repr(…...

DDoS攻击防御方案大全

1. 引言 随着互联网的迅猛发展&#xff0c;DDoS&#xff08;分布式拒绝服务&#xff09;攻击成为了网络安全领域中最常见且危害严重的攻击方式之一。DDoS攻击通过向目标网络或服务发送大量流量&#xff0c;导致服务器过载&#xff0c;最终使其无法响应合法用户的请求。本文将深…...

Vue中常用指令

一、内容渲染指令 1.v-text&#xff1a;操作纯文本&#xff0c;用于更新标签包含的文本&#xff0c;但是使用不灵活&#xff0c;无法拼接字符串&#xff0c;会覆盖文本&#xff0c;可以简写为{{}}&#xff0c;{{}}支持逻辑运算。 用法示例&#xff1a; //把name对应的值渲染到…...

Servlet解析

概念 Servlet是运行在服务端的小程序&#xff08;Server Applet)&#xff0c;可以处理客户端的请求并返回响应&#xff0c;主要用于构建动态的Web应用&#xff0c;是SpringMVC的基础。 生命周期 加载和初始化 默认在客户端第一次请求加载到容器中&#xff0c;通过反射实例化…...

带虚继承的类对象模型

文章目录 1、代码2、 单个虚继承3、vbptr是什么4、虚继承的多继承 1、代码 #include<iostream> using namespace std;class Base { public:int ma; };class Derive1 :virtual public Base { public:int mb; };class Derive2 :public Base { public:int mc; };class Deri…...

深度学习中的离群值

文章目录 深度学习中有离群值吗&#xff1f;深度学习中的离群值来源&#xff1a;处理离群值的策略&#xff1a;1. 数据预处理阶段&#xff1a;2. 数据增强和鲁棒模型&#xff1a;3. 模型训练阶段&#xff1a;4. 异常检测集成模型&#xff1a; 如何处理对抗样本&#xff1f;总结…...

如何利用Logo设计免费生成器创建专业级Logo

在当今的商业世界中&#xff0c;一个好的Logo是品牌身份的象征&#xff0c;它承载着公司的形象与理念。设计一个专业级的Logo不再需要花费大量的金钱和时间&#xff0c;尤其是当我们拥有Logo设计免费生成器这样的工具时。接下来&#xff0c;让我们深入探讨如何利用这些工具来创…...

Mysql SQL 超实用的7个日期算术运算实例(10k)

文章目录 前言1. 加上或减去若干天、若干月或若干年基本语法使用场景注意事项运用实例分析说明2. 确定两个日期相差多少天基本语法使用场景注意事项运用实例分析说明3. 确定两个日期之间有多少个工作日基本语法使用场景注意事项运用实例分析说明4. 确定两个日期相隔多少个月或多…...

运算指令(PLC)

加 ADD 减 SUB 乘 MUL 除 DIV 浮点运算 整数运算...

「Mac畅玩鸿蒙与硬件49」UI互动应用篇26 - 数字填色游戏

本篇教程将带你实现一个数字填色小游戏&#xff0c;通过简单的交互逻辑&#xff0c;学习如何使用鸿蒙开发组件创建趣味性强的应用。 关键词 UI互动应用数字填色动态交互逻辑判断游戏开发 一、功能说明 数字填色小游戏包含以下功能&#xff1a; 数字选择&#xff1a;用户点击…...

机器学习经典算法——逻辑回归

目录 算法介绍 算法概念 算法的优缺点 LogisticRegression()函数理解 环境准备 算法练习 算法介绍 算法概念 逻辑回归&#xff08;Logistic Regression&#xff09;是一种广泛应用于分类问题的机器学习算法。 它基于线性回归的思想&#xff0c;但通过引入一个逻辑函数&…...

【数据仓库金典面试题】—— 包含详细解答

大家好&#xff0c;我是摇光~&#xff0c;用大白话讲解所有你难懂的知识点 该篇面试题主要针对面试涉及到数据仓库的数据岗位。 以下都是经典的关于数据仓库的问题&#xff0c;希望对大家面试有用~ 1、什么是数据仓库&#xff1f;它与传统数据库有何区别&#xff1f; 数据仓库…...

【UE5 C++课程系列笔记】19——通过GConfig读写.ini文件

步骤 1. 新建一个Actor类&#xff0c;这里命名为“INIActor” 2. 新建一个配置文件“Test.ini” 添加一个自定义配置项 3. 接下来我们在“INIActor”类中获取并修改“CustomInt”的值。这里定义一个方法“GetINIVariable” 方法实现如下&#xff0c;其中第16行代码用于构建配…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

Yolov8 目标检测蒸馏学习记录

yolov8系列模型蒸馏基本流程&#xff0c;代码下载&#xff1a;这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中&#xff0c;**知识蒸馏&#xff08;Knowledge Distillation&#xff09;**被广泛应用&#xff0c;作为提升模型…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

c# 局部函数 定义、功能与示例

C# 局部函数&#xff1a;定义、功能与示例 1. 定义与功能 局部函数&#xff08;Local Function&#xff09;是嵌套在另一个方法内部的私有方法&#xff0c;仅在包含它的方法内可见。 • 作用&#xff1a;封装仅用于当前方法的逻辑&#xff0c;避免污染类作用域&#xff0c;提升…...

DiscuzX3.5发帖json api

参考文章&#xff1a;PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下&#xff0c;适配我自己的需求 有一个站点存在多个采集站&#xff0c;我想通过主站拿标题&#xff0c;采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...