关于Flume-Kafka-Flume的模式进行数据采集操作


测试是否连接成功:
在主节点flume目录下输入命令:
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
# 这个file_to_kafka.conf文件就是我们的配置文件

然后在另一台节点输入命令进行消费数据:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

然后再开一个主节点终端,在这个主节点上面在对应生成数据的文件追加数据

这样就可以看见第一个主节点的终端和消费节点上面有数据变化了!

下面这个是配置拦截器,把json格式的内容进行消费,其他的进行拦截
Flume采集数据到kafka的配置conf文件内容:
#定义组件
#1、定义source、channel、agent名称
a1.sources = r1
a1.channels = c1
#配置source#2、描述source
a1.sources.r1.type = TAILDIR#指定监控的组名
a1.sources.r1.filegroups = f1#指定f1组监控的路径
a1.sources.r1.filegroups.f1 = /opt/software/applog/log/app.*#指定断点续传的文件
a1.sources.r1.positionFile = /opt/software/flume/taildir_position.json
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder
#配置channel#3、描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel#指定kafka集群
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092#指定数据写到kafka哪个topic
a1.channels.c1.kafka.topic = topic_log#是否以Event对象的形式写入kafka
a1.channels.c1.parseAsFlumeEvent = false
#组装#4、关联source->channel
a1.sources.r1.channels = c1
如果一开始测试我们flume和kafka是否能成功采集数据的时候,我们应该先把拦截器的两行配置先删除,后面再根据我们需要的内容进行拦截对应的内容。就比如:我们期望我们采集到数据是json格式的,如果不是json格式的话,我们就放弃这个数据。
具体操作:
(1)创建Maven工程flume-interceptor
(2)创建包:com.gugu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在com.gugu.gmall.flume.utils包下创建JSONUtil类
package com.gugu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true 不是:返回false
* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}
在com.gugu.gmall.flume.interceptor包下创建ETLInterceptor类
package com.gugu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
}
然后进行打包,复制到我们flume下的lib目录下就可以了!
然后再和上面测试一样进行测试连接,是否成功把非json格式的数据拦截成功!

感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!
好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!
私信,评论我呗!!!!!!
关注我下一篇不迷路哦!
相关文章:
关于Flume-Kafka-Flume的模式进行数据采集操作
测试是否连接成功: 在主节点flume目录下输入命令: bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console # 这个file_to_kafka.conf文件就是我们的配置文件 然后在另一台节点输入命令进行消费数据: kafka-cons…...
WeTab--颜值与实力并存的浏览器插件
一.前言 现在的浏览器花花绿绿,有大量的广告与信息,令人目不暇接。有没有一款好用的浏览器插件可以解决这个问题呢?我愿称WeTab为版本答案。 WeTab的界面: 干净又整洁。最最关键的是还有智能AI供你服务。 这个WeTabAI就像chatgp…...
【整理】HTTP相关版本对比
1. HTTP/1 超文本传输协议,处于计算机网络中的应用层,HTTP是建立在TCP协议之上,所以HTTP协议的瓶颈及其优化技巧都是基于TCP协议本身的特性。 缺陷: 连接无法复用 ---------- 每次请求经历三次握手和慢启动HOLB(队头…...
spark性能调优 | 默认并行度
Spark Sql默认并行度 看官网,默认并行度200 https://spark.apache.org/docs/2.4.5/sql-performance-tuning.html#other-configuration-options 优化 在数仓中 task最好是cpu的两倍或者3倍(最好是倍数,不要使基数) 拓展 在本地 task需要自己设置&a…...
Python-pptx教程之二操作已有PPT模板文件
文章目录 简单的案例找到要修改的元素修改幻灯片中的文本代码使用示例 修改幻灯片的图片代码使用示例 删除幻灯片代码使用示例 获取PPT中所有的文本内容获取PPT中所有的图片总结 在上一篇中我们已经学会了如何从零开始生成PPT文件,从零开始生成较为复杂的PPT是非常消…...
生活总是自己的,请尽情打扮,尽情可爱,,
同色系拼接羽绒服了解一下 穿上时尚感一下子就突显出来了 90白鸭绒填充,不仅时尚还保暖 设计感满满的羽绒服不考虑一下吗?...
栈和队列的初始化,插入,删除,销毁。
目录 题外话 顺序表和链表优缺点以及特点 一.栈的特点 二. 栈的操作 2.1初始化 2.2 栈的销毁 2.3 栈的插入 2.3 输出top 2.4 栈的删除 2.5 输出栈 题外话 顺序表和链表优缺点以及特点 特点:顺序表,逻辑地址物理地址。可以任意访问,…...
重温《Unix设计哲学》
重温Unix设计哲学 这个世界是复杂的,但往往本质的东西都是简单的。这些原则,不光是用在程序开发,也适用于架构设计,产品设计等等地方。 简洁原则:以简洁为美 不要为了满足自己的虚荣心,企图搞一些花哨的东…...
AIGC创作系统ChatGPT源码,AI绘画源码,支持最新GPT-4-Turbo模型,支持DALL-E3文生图
一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…...
Spring条件注解@Conditoinal+ Profile环境切换应用@Profile
Spring条件注解 一、创建一个maven项目 <dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.1.5.RELEASE</version></dependency> </dependenc…...
Scrum框架中的Sprint
上图就是sprint里要做的事。Sprint是scrum框架的核心,是所有的想法、主意转换为价值的地方。所有实现产品目标的必要工作都在sprint里完成,这些工作主要包括Sprint 计划(Sprint planning)、每日站会(Daily Scrum&#…...
openfeign、nacos获取接口提供方真实IP
源码分析 client 是 LoadBalancerFeignClient org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient#execute public Response execute(Request request, Request.Options options) throws IOException {try {URI asUri URI.create(request.url());String c…...
Linux系统编程学习 NO.9——git、gdb
前言 本篇文章简单介绍了Linux操作系统中两个实用的开发工具git版本控制器和gdb调试器。 git 什么是git? git是一款开源的分布式版本控制软件。它不仅具有网络功能,还是服务端与客户端一体的软件。它可以高效的处理程序项目中的版本管理。它是Linux内…...
【联邦学习+区块链】TORR: A Lightweight Blockchain for Decentralized Federated Learning
文章目录 I.CONTRIBUTIONII. ASSUMPTIONS AND THREAT MODELA. AssumptionsB. Threat Model III. SYSTEM DESIGNA. Design OverviewB. Block DesignC. InitializationD. Role SelectionE. Storage ProtocolF. Aggregation ProtocolG. Proof of ReliabilityH. Blockchain Consens…...
《网络协议》08. 概念补充
title: 《网络协议》08. 概念补充 date: 2022-10-06 18:33:04 updated: 2023-11-17 10:35:52 categories: 学习记录:网络协议 excerpt: 代理、VPN、CDN、网络爬虫、无线网络、缓存、Cookie & Session、RESTful。 comments: false tags: top_image: /images/back…...
利用NVIDIA DALI读取视频帧
1. NVIDIA DALI简介 NVIDIA DALI全称是NVIDIA Data Loading Library,是一个用GPU加速的数据加载和预处理库,可用于图像、视频和语音数据的加载和处理,从而为深度学习的训练和推理加速。 NVIDIA DALI库的出发点是,深度学习应用中…...
TSINGSEE青犀AI智能分析+视频监控工业园区周界安全防范方案
一、背景需求分析 在工业产业园、化工园或生产制造园区中,周界防范意义重大,对园区的安全起到重要的作用。常规的安防方式是采用人员巡查,人力投入成本大而且效率低。周界一旦被破坏或入侵,会影响园区人员和资产安全,…...
【算法每日一练]-图论(保姆级教程 篇5(LCA,最短路,分层图)) #LCA #最短路计数 #社交网络 #飞行路线 # 第二短路
今天讲最短路统计和分层图 目录 题目:LCA 思路: 题目:最短路计数 思路: 题目:社交网络 思路: 题目:飞行路线 思路: 题目:第二短路 思路: 题目&a…...
德迅云安全为您介绍关于抗D盾的一些事
抗D盾概述: 抗D盾是新一代的智能分布式云接入系统,接入节点采用多机房集群部署模式,隐藏真实服务器IP,类似于网站CDN的节点接入,但是“抗D盾”是比CDN应用范围更广的接入方式,适合任何TCP 端类应用包括&am…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
