Flume拦截器的实现
Flume conf文件编写
vim file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /Users/zhangjin/model/project/realtime-flink/applog/log/app.*
# 设置断点续传的位置
a1.sources.r1.positionFile = /Users/zhangjin/model/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = topic_log
# 设置不以Flume event 写入数据,以Body数据进行写入
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1
Flume ETLInterceptor拦截器的编写
maven依赖
<dependencies><!--Flume依赖 --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><!--Json格式校验--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies>
maven package打包依赖
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
判断是否是JSON字符串
public class JSONUtil {/** 通过异常判断是否是json字符串* 是:返回true 不是:返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}
拦截器实现
- 继承Interceptor接口
- 实现单event处理
- 实现批量event处理
public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 单个event处理* 检验是否是Json格式* @param event* @return*/@Overridepublic Event intercept(Event event) {//1 获取json数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2 校验json数据if (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}/*** 多个event处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}/*** 拦截器重写Builder方法*/public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
测试
maven package打包,将生成的jar包放在了Flume的lib目录下
启动kafka
# 启动命令
./bin/kafka-server-start.sh -daemon ./config/server.properties &# 开启消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_log
启动Flume
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
相关文章:
Flume拦截器的实现
Flume conf文件编写 vim file_to_kafka.conf#定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /Users/zhangjin/model/project/realtime-flink/applog/log/app.* # 设置断点续传…...
Swift Combine 学习(四):操作符 Operator
Swift Combine 学习(一):Combine 初印象Swift Combine 学习(二):发布者 PublisherSwift Combine 学习(三):Subscription和 SubscriberSwift Combine 学习(四&…...
leetcode 173.二叉搜索树迭代器栈绝妙思路
以上算法题中一个比较好的实现思路就是利用栈来进行实现,以下方法三就是利用栈来进行实现的,思路很好,很简练。进行next的时候,先是一直拿到左边的子树,直到null为止,这一步比较好思考一点,下一…...
df.groupby([pd.Grouper(freq=‘1M‘, key=‘Date‘), ‘Buyer‘]).sum()
df.groupby([pd.Grouper(freq1M, keyDate), Buyer]).sum() 用于根据特定的时间频率和买家(Buyer)对 DataFrame 进行分组,然后计算每个分组的总和。下面是对这行代码的逐步解释: df.groupby([...]):这个操作会根据传入的…...
LLM - 使用 LLaMA-Factory 部署大模型 HTTP 多模态服务 (4)
欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/144881432 大模型的 HTTP 服务,通过网络接口,提供 AI 模型功能的服务,允许通过发送 HTTP 请求,交互…...
icp备案网站个人备案与企业备案的区别
个人备案和企业备案是在进行ICP备案时需要考虑的两种不同情况。个人备案是指个人拥有的网站进行备案,而企业备案则是指企业或组织名下的网站进行备案。这两者在备案过程中有一些明显的区别。 首先,个人备案相对来说流程较为简单。个人备案只需要提供个人…...
如何不修改模型参数来强化大语言模型 (LLM) 能力?
前言 如果你对这篇文章感兴趣,可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」,查看完整博客分类与对应链接。 大语言模型 (Large Language Model, LLM, e.g. ChatGPT) 的参数量少则几十亿,多则上千亿,对其的训…...
AF3 AtomAttentionEncoder类的init_pair_repr方法解读
AlphaFold3 的 AtomAttentionEncoder 类中,init_pair_repr 方法方法负责为原子之间的关系计算成对表示(pair representation),这是原子转变器(atom transformer)模型的关键组成部分,直接影响对蛋白质/分子相互作用的建模。 init_pair_repr源代码: def init_pair_repr(…...
DDoS攻击防御方案大全
1. 引言 随着互联网的迅猛发展,DDoS(分布式拒绝服务)攻击成为了网络安全领域中最常见且危害严重的攻击方式之一。DDoS攻击通过向目标网络或服务发送大量流量,导致服务器过载,最终使其无法响应合法用户的请求。本文将深…...
Vue中常用指令
一、内容渲染指令 1.v-text:操作纯文本,用于更新标签包含的文本,但是使用不灵活,无法拼接字符串,会覆盖文本,可以简写为{{}},{{}}支持逻辑运算。 用法示例: //把name对应的值渲染到…...
Servlet解析
概念 Servlet是运行在服务端的小程序(Server Applet),可以处理客户端的请求并返回响应,主要用于构建动态的Web应用,是SpringMVC的基础。 生命周期 加载和初始化 默认在客户端第一次请求加载到容器中,通过反射实例化…...
带虚继承的类对象模型
文章目录 1、代码2、 单个虚继承3、vbptr是什么4、虚继承的多继承 1、代码 #include<iostream> using namespace std;class Base { public:int ma; };class Derive1 :virtual public Base { public:int mb; };class Derive2 :public Base { public:int mc; };class Deri…...
深度学习中的离群值
文章目录 深度学习中有离群值吗?深度学习中的离群值来源:处理离群值的策略:1. 数据预处理阶段:2. 数据增强和鲁棒模型:3. 模型训练阶段:4. 异常检测集成模型: 如何处理对抗样本?总结…...
如何利用Logo设计免费生成器创建专业级Logo
在当今的商业世界中,一个好的Logo是品牌身份的象征,它承载着公司的形象与理念。设计一个专业级的Logo不再需要花费大量的金钱和时间,尤其是当我们拥有Logo设计免费生成器这样的工具时。接下来,让我们深入探讨如何利用这些工具来创…...
Mysql SQL 超实用的7个日期算术运算实例(10k)
文章目录 前言1. 加上或减去若干天、若干月或若干年基本语法使用场景注意事项运用实例分析说明2. 确定两个日期相差多少天基本语法使用场景注意事项运用实例分析说明3. 确定两个日期之间有多少个工作日基本语法使用场景注意事项运用实例分析说明4. 确定两个日期相隔多少个月或多…...
运算指令(PLC)
加 ADD 减 SUB 乘 MUL 除 DIV 浮点运算 整数运算...
「Mac畅玩鸿蒙与硬件49」UI互动应用篇26 - 数字填色游戏
本篇教程将带你实现一个数字填色小游戏,通过简单的交互逻辑,学习如何使用鸿蒙开发组件创建趣味性强的应用。 关键词 UI互动应用数字填色动态交互逻辑判断游戏开发 一、功能说明 数字填色小游戏包含以下功能: 数字选择:用户点击…...
机器学习经典算法——逻辑回归
目录 算法介绍 算法概念 算法的优缺点 LogisticRegression()函数理解 环境准备 算法练习 算法介绍 算法概念 逻辑回归(Logistic Regression)是一种广泛应用于分类问题的机器学习算法。 它基于线性回归的思想,但通过引入一个逻辑函数&…...
【数据仓库金典面试题】—— 包含详细解答
大家好,我是摇光~,用大白话讲解所有你难懂的知识点 该篇面试题主要针对面试涉及到数据仓库的数据岗位。 以下都是经典的关于数据仓库的问题,希望对大家面试有用~ 1、什么是数据仓库?它与传统数据库有何区别? 数据仓库…...
【UE5 C++课程系列笔记】19——通过GConfig读写.ini文件
步骤 1. 新建一个Actor类,这里命名为“INIActor” 2. 新建一个配置文件“Test.ini” 添加一个自定义配置项 3. 接下来我们在“INIActor”类中获取并修改“CustomInt”的值。这里定义一个方法“GetINIVariable” 方法实现如下,其中第16行代码用于构建配…...
AX-MES生产制造管理系统-总览
前言说起 MES 就不得不说 ERP,但是 ERP 大家基本上都知道,MES 就不一定了,常见的 ERP 系统包括 SAP、金蝶、用友等,ERP的流程相对来说也比较统一;MES就不同了,基本上熟悉业务流程的软件公司都可以开发并实施…...
ZjDroid命令大全:从DEX内存dump到Lua脚本注入的完整教程
ZjDroid命令大全:从DEX内存dump到Lua脚本注入的完整教程 【免费下载链接】ZjDroid Android app dynamic reverse tool based on Xposed framework. 项目地址: https://gitcode.com/gh_mirrors/zj/ZjDroid ZjDroid是一款基于Xposed框架的Android应用动态逆向分…...
DeepSeek系统设计辅助:如何在48小时内完成可审计、可回滚、可压测的AI服务架构图?
更多请点击: https://intelliparadigm.com 第一章:DeepSeek系统设计辅助 DeepSeek系统设计辅助模块面向架构师与后端工程师,提供模型能力调用、接口契约生成、异步任务编排等核心支撑能力。该模块不替代人工设计决策,而是通过结构…...
除了排错,你可能不知道OPC Expert v8.1还能做这些:数据归档、计算与冗余实战
解锁OPC Expert v8.1的隐藏潜力:数据归档、实时计算与冗余架构实战指南在工业自动化领域,OPC Expert常被视为故障排查的"急救箱",但它的能力远不止于此。当大多数工程师还在用它解决DCOM配置问题时,少数先行者已经用它重…...
收藏必看|2026 版大厂 AI 岗位薪资曝光!普通程序员转型大模型最全指南
深夜收到大厂 HR 好友发来的内部资料,再三叮嘱切勿对外泄露。如今网络信息传播速度极快,这份 2026 年企业 AI 岗真实薪资内幕,也值得给广大程序员、零基础入行小白参考借鉴。 翻看完整薪资台账后,真切感受到当下大模型赛道的薪资差…...
【紧急预警】92%的DeepSeek测试用例生成失败源于这4个隐性配置缺陷——资深SDET连夜整理修复清单
更多请点击: https://codechina.net 第一章:DeepSeek测试用例生成的现状与危机本质 当前,DeepSeek系列大模型(如DeepSeek-Coder、DeepSeek-VL)在代码生成与理解任务中展现出强大能力,但其测试用例自动生成…...
毕业设计 yolov11骨折检测医疗辅助系统(源码+论文)
文章目录 0 前言1 项目运行效果2 课题背景2.1 研究背景2.2 国内外研究现状2.3 研究意义 3 设计框架(骨折检测系统设计框架说明)3.1. 系统架构图3.2. 技术选型3.2.1 核心组件3.2.2 辅助工具 3.3. 核心模块设计3.3.1 YOLO模型训练模块训练流程图关键伪代码…...
qobuz-dl终极实战指南:专业无损音乐下载工具架构解析与高效应用
qobuz-dl终极实战指南:专业无损音乐下载工具架构解析与高效应用 【免费下载链接】qobuz-dl A complete Lossless and Hi-Res music downloader for Qobuz 项目地址: https://gitcode.com/gh_mirrors/qo/qobuz-dl 在数字音乐时代,追求极致音质的音…...
【C++】零基础入门 · 第 5 节:函数基础
前面四节我们写的代码都集中在 main 函数里。随着程序变复杂,所有逻辑堆在一起会越来越难维护。函数就是用来解决这个问题的——它把一段代码「打包」起来,取个名字,需要的时候调用就行。 1. 为什么需要函数 假设你需要在程序的不同地方打印一行分隔线: cout << &…...
Linux 负载均衡的 cache_nice_tries:缓存友好的迁移尝试
简介现如今服务器、嵌入式设备、工控主板普遍采用多核、NUMA 架构 CPU,多进程多线程并发运行模式成为常态。Linux 内核依靠调度域分层负载均衡机制,分散 CPU 运行压力,避免单核心负载过高、其余核心空闲浪费硬件算力。但任务跨核心迁移是一把…...
