【Flume Kafaka实战】Using Kafka with Flume
一 目标
在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。
二 实战
2.1 Kafka Sink
第一步,在Cloudera Manager中安装Flume,安装时指定两个Agent。这一步很简单。
第二步,创建一个新Role Group。默认情况下,所有的Agent都处于一个叫Agent Default Group的角色组中,处于同一角色组中的Agent共享相同的配置。但是在我们这个例子中,两个Agent要完成不同的工作,需要不同的配置。所有新建一个Role Group,并把其中一个Agent移到到这个新的Group中,如下图所示。

第三步,分别编辑两个Agent的配置文件,我的第一个Agent名字为file2Kafka,配置文件内容如下。不难看出,这个配置的source就是去tail一个本地文件,然后写入到kafka的消息队列中。
即:Kafka Sink
# Name the components on this agent
file2Kafka.sources = file2Kafka_source
file2Kafka.sinks = file2Kafka_sink
file2Kafka.channels = file2Kafka_channel# Describe/configure the source
file2Kafka.sources.file2Kafka_source.type = exec
file2Kafka.sources.file2Kafka_source.command = tail -F /home/demo/flume-exec.txt# Describe the sink
file2Kafka.sinks.file2Kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
# topic前不加kafka
file2Kafka.sinks.file2Kafka_sink.topic = flumetest
file2Kafka.sinks.file2Kafka_sink.kafka.bootstrap.servers= slave1:9092,slave2:9092
file2Kafka.sinks.file2Kafka_sink.kafka.flumeBatchSize= 20# Use a channel which buffers events in memory
file2Kafka.channels.file2Kafka_channel.type = memory
file2Kafka.channels.file2Kafka_channel.capacity = 1000
file2Kafka.channels.file2Kafka_channel.transactionCapacity = 1000# Bind the source and sink to the channel
file2Kafka.sources.file2Kafka_source.channels = file2Kafka_channel
file2Kafka.sinks.file2Kafka_sink.channel = file2Kafka_channel
2.2 Kafka Source
第二Agent的名字是kafka2Hdfs,配置文件如下。这个配置的内容就是把Agent1中写到kafka的数据读出来,然后写入到HDFS中。注意hdfs.path这个配置,由于在Cloudera Manager中,Flume知道HDFS相关的配置,所以无需去加入hdfs://my-cluster这样的协议前缀。
# Name the components on this agent
kafka2Hdfs.sources = kafka2Hdfs_source
kafka2Hdfs.sinks = kafka2Hdfs_sink
kafka2Hdfs.channels = kafka2Hdfs_channel# Describe/configure the source
kafka2Hdfs.sources.kafka2Hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
kafka2Hdfs.sources.kafka2Hdfs_source.batchSize = 10
kafka2Hdfs.sources.kafka2Hdfs_source.batchDurationMillis = 1000
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.topics = flumetest
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.consumer.group.id = flume# Describe the sink
kafka2Hdfs.sinks.kafka2Hdfs_sink.type = hdfs
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.path = /flume/
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.fileType = DataStream
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.filePrefix=sxt
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollCount=0
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollInterval=0# Use a channel which buffers events in memory
kafka2Hdfs.channels.kafka2Hdfs_channel.type = memory
kafka2Hdfs.channels.kafka2Hdfs_channel.capacity = 1000
kafka2Hdfs.channels.kafka2Hdfs_channel.transactionCapacity = 100# Bind the source and sink to the channel
kafka2Hdfs.sources.kafka2Hdfs_source.channels = kafka2Hdfs_channel
kafka2Hdfs.sinks.kafka2Hdfs_sink.channel = kafka2Hdfs_channel
整个配置完成之后,Cloudera Manager中的界面如下图:

在运行中可能会出现一些目录读写的权限问题,需要去修改hdfs中相关目录的权限。比如我的配置中,数据是写到/flume这个目录下的,这个目录我是用root用户去创建的,但flume运行是使用一个叫flume的用户名来运行的,所以用hdfs dfs -chmod 777 /flume把这个目录的读写权限放开了。
这是一个例子,主要演示如何在cloudera manager中把两个flume的agent串联在一起使用。在现实的生产中,如果需要把一个文本数据通过kakfa写入到hdfs中,更合理的做法是使用一个agent,把kafka作为channel来使用。具体可以参考https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
2.3 Kafka Channel
# Name the components on this agent
kafkaCh.sources = src_1_file
kafkaCh.channels = ch_1_kafka
kafkaCh.sinks = sink_1_hdfs# Describe/configure the source
kafkaCh.sources.src_1_file.type = exec
kafkaCh.sources.src_1_file.command = tail -F /home/demo/flume-exec.txt# Define a kafka channel
kafkaCh.channels.ch_1_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
kafkaCh.channels.ch_1_kafka.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafkaCh.channels.ch_1_kafka.kafka.topic = kafka_channel
kafkaCh.channels.ch_1_kafka.kafka.consumer.group.id = flume-consumer# Describe the sink
kafkaCh.sinks.sink_1_hdfs.type = hdfs
kafkaCh.sinks.sink_1_hdfs.hdfs.path = /flume/kafka/channel
kafkaCh.sinks.sink_1_hdfs.hdfs.fileType = DataStream
kafkaCh.sinks.sink_1_hdfs.hdfs.filePrefix=sxt
kafkaCh.sinks.sink_1_hdfs.hdfs.rollCount=0
kafkaCh.sinks.sink_1_hdfs.hdfs.rollInterval=0# Bind the source and sink to the channel
kafkaCh.sources.src_1_file.channels = ch_1_kafka
kafkaCh.sinks.sink_1_hdfs.channel = ch_1_kafka
将上面两个Agent放在一个Agent中,用Kafka Channel实现。
注意:hdfs.path 必须存在,且有权限进行操作
相关文章:
【Flume Kafaka实战】Using Kafka with Flume
一 目标 在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。 二 实战 2.1 Kafka Sink 第一步࿰…...
5G NR物理信号
文章目录 NR 物理信号与LTE的区别上行参考信号DMRS (UL)SRSPT-RS(UL) 下行参考信号DMRS(DL)PT-RS(DL)CSI-RSPSSSSS NR 物理信号与LTE的区别 用SSS、CSI-RS和DMRS 取代了CRS信号。下行业务信道采用TM1波束赋形传输模式。基于SSB 或者CSI-RS进行RSRP和SINR测量。基于DMRS 进行共…...
Pikachu-Cross-Site Scripting-存储型xss
存储型xss ,随便输入点内容,都能保存下来;刷新后也不会丢失;输入特殊字符,也能原样返回; 查看代码,也可以看到输出结果直接原路返回,不做处理 构造payload <script>alert(1)…...
媲美GPT-4o mini的小模型,Meta Llama 3.2模型全面解读!
大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,专注于分享AI全维度知识,包括但不限于AI科普,AI工…...
【leetcode】 45.跳跃游戏 ||
如果我们「贪心」地进行正向查找,每次找到可到达的最远位置,就可以在线性时间内得到最少的跳跃次数。 例如,对于数组 [2,3,1,2,4,2,3],初始位置是下标 0,从下标 0 出发,最远可到达下标 2。下标 0 可到达的…...
coco(json)、yolo(txt)、voc(xml)标注格式的相互转换
一般都是用labeleme进行标注 标注格式都是json 然后根据不同的格式进行数据标注转换: 1.逐个json转xml: 当我们在使用数据集训练计算机视觉模型时,常常会遇到有的数据集只给了单个的json annotation文件,而模型所需要的annotation是基于每…...
以太网交换安全:端口安全
一、端口安全介绍 端口安全是一种网络设备防护措施,通过将接口学习到的动态MAC地址转换为安全MAC地址(包括安全动态MAC和Sticky MAC),阻止除安全MAC和静态MAC之外的主机通过本接口和设备通信,从而增强设备的安全性。以…...
[题解] Codeforces Round 976 (Div. 2) A ~ E
A. Find Minimum Operations 签到. void solve() {int n, k;cin >> n >> k;if (k 1) {cout << n << endl;return;}int ans 0;while (n) {ans n % k;n / k;}cout << ans << endl; }B. Brightness Begins 打表发现, 翻转完后的序列为: 0…...
【零基础入门产品经理】学习准备篇 | 需要学一些什么呢?
前言: 零实习转行产品经理经验分享01-学习准备篇_哔哩哔哩_bilibili 该篇内容主要是对bilibili这个视频的观后笔记~谢谢美丽滴up主友情分享。 全文摘要:如何在0实习且没有任何产品相关经验下,如何上岸产品经理~ 目录 一、想清楚为什么…...
第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)征稿
第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)由湖南第一师范学院主办,南京师范大学、山东女子学院、爱迩思出版社(ELSP)协办。 大会将专注于机器人、数字化、自动化、人工智能等技术的开发和融合,…...
[数据集][目标检测]电力场景防震锤缺陷检测数据集VOC+YOLO格式705张1类别
重要说明:防震锤缺陷图片太难找,数据集里面存在大量单一场景图片,请仔细查看图片预览谨慎下载,此外数据集均为小目标检测,如果训练map偏低属于正常现象 数据集格式:Pascal VOC格式YOLO格式(不包含分割路径…...
【SpringBoot】
目录 一、Spring Boot概要 1. SpringBoot介绍 2. SpringBoot优点 3. SpringBoot缺点 4. 时代背景-微服务 二、Spring Boot 核心配置 1. Spring Boot配置文件分类 1.1 application.properties 1.2 application.yml 1.3 小结 2. YAML概述 3. YAML基础语法 3.1 注意事…...
Linux操作系统中MongoDB
1、什么是MongoDB 1、非关系型数据库 NoSQL,泛指非关系型的数据库。随着互联网web2.0网站的兴起,传统的关系数据库在处理web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,出现了很多难以克服的问…...
2、.Net 前端框架:OpenAuth.Net - .Net宣传系列文章
OpenAuth.Net 是一个开源的身份验证框架,由开发者 Yubaolee 创建,它旨在简化 Web 应用和服务的安全授权过程。这个框架以其强大的功能和易用性,为开发人员提供了一种高效的方式来处理用户认证和授权问题。 OpenAuth.Net 的关键特性包括&#…...
unreal engine5制作动作类游戏时,我们使用刀剑等武器攻击怪物或敌方单位时,发现攻击特效、伤害等没有触发
UE5系列文章目录 文章目录 UE5系列文章目录前言一、问题分析二、解决方法1. 添加项目设置碰撞检测通道2.玩家角色碰撞设置3.怪物角色碰撞预设 最终效果 前言 在使用unreal engine5制作动作类游戏时,我们使用刀剑等武器攻击怪物或敌方单位时,发现攻击特效…...
数据权限的设计与实现系列11——前端筛选器组件Everright-filter集成功能完善2
筛选条件数据类型完善 文本类 筛选器组件给了一个文本类操作的范例,如下: Text: [{label: 等于,en_label: Equal,style: noop},{label: 等于其中之一,en_label: Equal to one of,value: one_of,style: tags},{label: 不等于,en_label: Not equal,v…...
C++ 游戏开发
C游戏开发 C 是一种高效、灵活且功能强大的编程语言,因其性能和控制能力而在游戏开发中被广泛应用。许多著名的游戏引擎,如 Unreal Engine、CryEngine 和 Godot 等,都依赖于 C 进行核心开发。本文将详细介绍 C 在游戏开发中的应用࿰…...
【历年CSP-S复赛第一题】暴力解法与正解合集(2019-2022)
P5657 [CSP-S2019] 格雷码P7076 [CSP-S2020] 动物园P7913 [CSP-S 2021] 廊桥分配P8817 [CSP-S 2022] 假期计划 P5657 [CSP-S2019] 格雷码 暴力50分 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(false),cin.tie(0),cout.tie(0) #define int long long #d…...
基于PyQt5和SQLite的数据库操作程序
基于PyQt5和SQLite的数据库操作程序:功能解析 在现代办公和数据处理中,数据库操作是不可或缺的一部分。然而,传统的数据库管理工具往往界面复杂,操作繁琐,对于非专业人士来说存在一定的学习曲线。为了解决这个问题,我们开发了一款基于PyQt5和SQLite的数据库操作程序。该…...
在Ubuntu 20.04中安装CARLA
0. 引言 CARLA (Car Learning to Act) 是一款开源自动驾驶模拟器,其支持自动驾驶系统全管线的开发、训练和验证(Development, Training, and Validation of autonomous driving systems)。Carla提供了丰富的数字资产,例如城市布局…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...
从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...
