【Flume Kafaka实战】Using Kafka with Flume
一 目标
在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。
二 实战
2.1 Kafka Sink
第一步,在Cloudera Manager中安装Flume,安装时指定两个Agent。这一步很简单。
第二步,创建一个新Role Group。默认情况下,所有的Agent都处于一个叫Agent Default Group的角色组中,处于同一角色组中的Agent共享相同的配置。但是在我们这个例子中,两个Agent要完成不同的工作,需要不同的配置。所有新建一个Role Group,并把其中一个Agent移到到这个新的Group中,如下图所示。

第三步,分别编辑两个Agent的配置文件,我的第一个Agent名字为file2Kafka,配置文件内容如下。不难看出,这个配置的source就是去tail一个本地文件,然后写入到kafka的消息队列中。
即:Kafka Sink
# Name the components on this agent
file2Kafka.sources = file2Kafka_source
file2Kafka.sinks = file2Kafka_sink
file2Kafka.channels = file2Kafka_channel# Describe/configure the source
file2Kafka.sources.file2Kafka_source.type = exec
file2Kafka.sources.file2Kafka_source.command = tail -F /home/demo/flume-exec.txt# Describe the sink
file2Kafka.sinks.file2Kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
# topic前不加kafka
file2Kafka.sinks.file2Kafka_sink.topic = flumetest
file2Kafka.sinks.file2Kafka_sink.kafka.bootstrap.servers= slave1:9092,slave2:9092
file2Kafka.sinks.file2Kafka_sink.kafka.flumeBatchSize= 20# Use a channel which buffers events in memory
file2Kafka.channels.file2Kafka_channel.type = memory
file2Kafka.channels.file2Kafka_channel.capacity = 1000
file2Kafka.channels.file2Kafka_channel.transactionCapacity = 1000# Bind the source and sink to the channel
file2Kafka.sources.file2Kafka_source.channels = file2Kafka_channel
file2Kafka.sinks.file2Kafka_sink.channel = file2Kafka_channel
2.2 Kafka Source
第二Agent的名字是kafka2Hdfs,配置文件如下。这个配置的内容就是把Agent1中写到kafka的数据读出来,然后写入到HDFS中。注意hdfs.path这个配置,由于在Cloudera Manager中,Flume知道HDFS相关的配置,所以无需去加入hdfs://my-cluster这样的协议前缀。
# Name the components on this agent
kafka2Hdfs.sources = kafka2Hdfs_source
kafka2Hdfs.sinks = kafka2Hdfs_sink
kafka2Hdfs.channels = kafka2Hdfs_channel# Describe/configure the source
kafka2Hdfs.sources.kafka2Hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
kafka2Hdfs.sources.kafka2Hdfs_source.batchSize = 10
kafka2Hdfs.sources.kafka2Hdfs_source.batchDurationMillis = 1000
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.topics = flumetest
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.consumer.group.id = flume# Describe the sink
kafka2Hdfs.sinks.kafka2Hdfs_sink.type = hdfs
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.path = /flume/
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.fileType = DataStream
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.filePrefix=sxt
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollCount=0
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollInterval=0# Use a channel which buffers events in memory
kafka2Hdfs.channels.kafka2Hdfs_channel.type = memory
kafka2Hdfs.channels.kafka2Hdfs_channel.capacity = 1000
kafka2Hdfs.channels.kafka2Hdfs_channel.transactionCapacity = 100# Bind the source and sink to the channel
kafka2Hdfs.sources.kafka2Hdfs_source.channels = kafka2Hdfs_channel
kafka2Hdfs.sinks.kafka2Hdfs_sink.channel = kafka2Hdfs_channel
整个配置完成之后,Cloudera Manager中的界面如下图:

在运行中可能会出现一些目录读写的权限问题,需要去修改hdfs中相关目录的权限。比如我的配置中,数据是写到/flume这个目录下的,这个目录我是用root用户去创建的,但flume运行是使用一个叫flume的用户名来运行的,所以用hdfs dfs -chmod 777 /flume把这个目录的读写权限放开了。
这是一个例子,主要演示如何在cloudera manager中把两个flume的agent串联在一起使用。在现实的生产中,如果需要把一个文本数据通过kakfa写入到hdfs中,更合理的做法是使用一个agent,把kafka作为channel来使用。具体可以参考https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
2.3 Kafka Channel
# Name the components on this agent
kafkaCh.sources = src_1_file
kafkaCh.channels = ch_1_kafka
kafkaCh.sinks = sink_1_hdfs# Describe/configure the source
kafkaCh.sources.src_1_file.type = exec
kafkaCh.sources.src_1_file.command = tail -F /home/demo/flume-exec.txt# Define a kafka channel
kafkaCh.channels.ch_1_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
kafkaCh.channels.ch_1_kafka.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafkaCh.channels.ch_1_kafka.kafka.topic = kafka_channel
kafkaCh.channels.ch_1_kafka.kafka.consumer.group.id = flume-consumer# Describe the sink
kafkaCh.sinks.sink_1_hdfs.type = hdfs
kafkaCh.sinks.sink_1_hdfs.hdfs.path = /flume/kafka/channel
kafkaCh.sinks.sink_1_hdfs.hdfs.fileType = DataStream
kafkaCh.sinks.sink_1_hdfs.hdfs.filePrefix=sxt
kafkaCh.sinks.sink_1_hdfs.hdfs.rollCount=0
kafkaCh.sinks.sink_1_hdfs.hdfs.rollInterval=0# Bind the source and sink to the channel
kafkaCh.sources.src_1_file.channels = ch_1_kafka
kafkaCh.sinks.sink_1_hdfs.channel = ch_1_kafka
将上面两个Agent放在一个Agent中,用Kafka Channel实现。
注意:hdfs.path 必须存在,且有权限进行操作
相关文章:
【Flume Kafaka实战】Using Kafka with Flume
一 目标 在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。 二 实战 2.1 Kafka Sink 第一步࿰…...
5G NR物理信号
文章目录 NR 物理信号与LTE的区别上行参考信号DMRS (UL)SRSPT-RS(UL) 下行参考信号DMRS(DL)PT-RS(DL)CSI-RSPSSSSS NR 物理信号与LTE的区别 用SSS、CSI-RS和DMRS 取代了CRS信号。下行业务信道采用TM1波束赋形传输模式。基于SSB 或者CSI-RS进行RSRP和SINR测量。基于DMRS 进行共…...
Pikachu-Cross-Site Scripting-存储型xss
存储型xss ,随便输入点内容,都能保存下来;刷新后也不会丢失;输入特殊字符,也能原样返回; 查看代码,也可以看到输出结果直接原路返回,不做处理 构造payload <script>alert(1)…...
媲美GPT-4o mini的小模型,Meta Llama 3.2模型全面解读!
大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,专注于分享AI全维度知识,包括但不限于AI科普,AI工…...
【leetcode】 45.跳跃游戏 ||
如果我们「贪心」地进行正向查找,每次找到可到达的最远位置,就可以在线性时间内得到最少的跳跃次数。 例如,对于数组 [2,3,1,2,4,2,3],初始位置是下标 0,从下标 0 出发,最远可到达下标 2。下标 0 可到达的…...
coco(json)、yolo(txt)、voc(xml)标注格式的相互转换
一般都是用labeleme进行标注 标注格式都是json 然后根据不同的格式进行数据标注转换: 1.逐个json转xml: 当我们在使用数据集训练计算机视觉模型时,常常会遇到有的数据集只给了单个的json annotation文件,而模型所需要的annotation是基于每…...
以太网交换安全:端口安全
一、端口安全介绍 端口安全是一种网络设备防护措施,通过将接口学习到的动态MAC地址转换为安全MAC地址(包括安全动态MAC和Sticky MAC),阻止除安全MAC和静态MAC之外的主机通过本接口和设备通信,从而增强设备的安全性。以…...
[题解] Codeforces Round 976 (Div. 2) A ~ E
A. Find Minimum Operations 签到. void solve() {int n, k;cin >> n >> k;if (k 1) {cout << n << endl;return;}int ans 0;while (n) {ans n % k;n / k;}cout << ans << endl; }B. Brightness Begins 打表发现, 翻转完后的序列为: 0…...
【零基础入门产品经理】学习准备篇 | 需要学一些什么呢?
前言: 零实习转行产品经理经验分享01-学习准备篇_哔哩哔哩_bilibili 该篇内容主要是对bilibili这个视频的观后笔记~谢谢美丽滴up主友情分享。 全文摘要:如何在0实习且没有任何产品相关经验下,如何上岸产品经理~ 目录 一、想清楚为什么…...
第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)征稿
第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)由湖南第一师范学院主办,南京师范大学、山东女子学院、爱迩思出版社(ELSP)协办。 大会将专注于机器人、数字化、自动化、人工智能等技术的开发和融合,…...
[数据集][目标检测]电力场景防震锤缺陷检测数据集VOC+YOLO格式705张1类别
重要说明:防震锤缺陷图片太难找,数据集里面存在大量单一场景图片,请仔细查看图片预览谨慎下载,此外数据集均为小目标检测,如果训练map偏低属于正常现象 数据集格式:Pascal VOC格式YOLO格式(不包含分割路径…...
【SpringBoot】
目录 一、Spring Boot概要 1. SpringBoot介绍 2. SpringBoot优点 3. SpringBoot缺点 4. 时代背景-微服务 二、Spring Boot 核心配置 1. Spring Boot配置文件分类 1.1 application.properties 1.2 application.yml 1.3 小结 2. YAML概述 3. YAML基础语法 3.1 注意事…...
Linux操作系统中MongoDB
1、什么是MongoDB 1、非关系型数据库 NoSQL,泛指非关系型的数据库。随着互联网web2.0网站的兴起,传统的关系数据库在处理web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,出现了很多难以克服的问…...
2、.Net 前端框架:OpenAuth.Net - .Net宣传系列文章
OpenAuth.Net 是一个开源的身份验证框架,由开发者 Yubaolee 创建,它旨在简化 Web 应用和服务的安全授权过程。这个框架以其强大的功能和易用性,为开发人员提供了一种高效的方式来处理用户认证和授权问题。 OpenAuth.Net 的关键特性包括&#…...
unreal engine5制作动作类游戏时,我们使用刀剑等武器攻击怪物或敌方单位时,发现攻击特效、伤害等没有触发
UE5系列文章目录 文章目录 UE5系列文章目录前言一、问题分析二、解决方法1. 添加项目设置碰撞检测通道2.玩家角色碰撞设置3.怪物角色碰撞预设 最终效果 前言 在使用unreal engine5制作动作类游戏时,我们使用刀剑等武器攻击怪物或敌方单位时,发现攻击特效…...
数据权限的设计与实现系列11——前端筛选器组件Everright-filter集成功能完善2
筛选条件数据类型完善 文本类 筛选器组件给了一个文本类操作的范例,如下: Text: [{label: 等于,en_label: Equal,style: noop},{label: 等于其中之一,en_label: Equal to one of,value: one_of,style: tags},{label: 不等于,en_label: Not equal,v…...
C++ 游戏开发
C游戏开发 C 是一种高效、灵活且功能强大的编程语言,因其性能和控制能力而在游戏开发中被广泛应用。许多著名的游戏引擎,如 Unreal Engine、CryEngine 和 Godot 等,都依赖于 C 进行核心开发。本文将详细介绍 C 在游戏开发中的应用࿰…...
【历年CSP-S复赛第一题】暴力解法与正解合集(2019-2022)
P5657 [CSP-S2019] 格雷码P7076 [CSP-S2020] 动物园P7913 [CSP-S 2021] 廊桥分配P8817 [CSP-S 2022] 假期计划 P5657 [CSP-S2019] 格雷码 暴力50分 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(false),cin.tie(0),cout.tie(0) #define int long long #d…...
基于PyQt5和SQLite的数据库操作程序
基于PyQt5和SQLite的数据库操作程序:功能解析 在现代办公和数据处理中,数据库操作是不可或缺的一部分。然而,传统的数据库管理工具往往界面复杂,操作繁琐,对于非专业人士来说存在一定的学习曲线。为了解决这个问题,我们开发了一款基于PyQt5和SQLite的数据库操作程序。该…...
在Ubuntu 20.04中安装CARLA
0. 引言 CARLA (Car Learning to Act) 是一款开源自动驾驶模拟器,其支持自动驾驶系统全管线的开发、训练和验证(Development, Training, and Validation of autonomous driving systems)。Carla提供了丰富的数字资产,例如城市布局…...
实战指南:基于快马平台,快速构建可部署的unet卫星图像分割系统
今天想和大家分享一个实战项目:基于UNet的卫星图像建筑物分割系统。这个项目特别适合在InsCode(快马)平台上快速搭建,因为它涉及从数据处理到模型部署的完整流程,而平台的一键部署功能正好能省去繁琐的环境配置工作。 项目背景与需求分析 卫星…...
如何通过InstantClick事件回调实现精准的性能监控:开发者必备指南
如何通过InstantClick事件回调实现精准的性能监控:开发者必备指南 【免费下载链接】instantclick InstantClick makes following links in your website instant. 项目地址: https://gitcode.com/gh_mirrors/in/instantclick InstantClick是一款能让网站链接…...
FreeRTOS进阶:任务优先级与调度策略深度解析
1. FreeRTOS任务优先级基础 在嵌入式实时操作系统中,任务优先级决定了任务执行的先后顺序。FreeRTOS采用数值越大优先级越高的设计,优先级范围通常为0到(configMAX_PRIORITIES-1)。我刚开始接触FreeRTOS时,经常混淆这个概念,直到在…...
告别软件盗版烦恼:用YT88加密狗5分钟搞定C#/Java/Python源代码加密(附完整开发包下载)
5分钟实现多语言源代码加密:YT88加密狗实战指南 独立开发者最头疼的问题之一,就是辛苦编写的代码被轻易反编译或盗用。上周我的一个朋友就遇到了这种情况——他花了三个月开发的Python数据分析工具,刚上线两周就被破解并免费传播。这种经历在…...
5分钟快速上手:AsrTools智能语音转文字工具全攻略
5分钟快速上手:AsrTools智能语音转文字工具全攻略 【免费下载链接】AsrTools ✨ AsrTools: Smart Voice-to-Text Tool | Efficient Batch Processing | User-Friendly Interface | No GPU Required | Supports SRT/TXT Output | Turn your audio into accurate text…...
迪文串口屏C51开发避坑指南:从ModBus ASCII模式到音乐播放实战
迪文串口屏C51开发实战:从ModBus ASCII到音乐播放的深度解析 迪文串口屏在工业控制领域占据重要地位,其C51开发环境为开发者提供了高度灵活的定制能力。本文将聚焦三个典型开发场景:ModBus ASCII模式移植、C51变量定义导致的定时问题以及音乐…...
为什么小数据集上神经网络会突然‘开窍‘?揭秘Grokking现象背后的LU机制
为什么小数据集上神经网络会突然"开窍"?揭秘Grokking现象背后的LU机制 在机器学习实践中,我们常常观察到一种反直觉的现象:当神经网络在小规模算法数据集上训练时,测试准确率会在长时间停滞于随机猜测水平后突然跃升至接…...
XPath与lxml解析库
test.xml<?xml version"1.0" encoding"utf-8"?><bookstore><book name"halibote"><title lang"en">Harry Potter</title><author>J K. Rowling</author><year>2005</year>&l…...
Flutter项目卡在‘assembleDebug’?Gradle配置优化全攻略
1. 为什么Flutter项目会卡在assembleDebug阶段? 这个问题困扰过无数Flutter开发者,尤其是刚入门的新手。当你满怀期待地运行flutter run命令,结果控制台卡在Running Gradle task assembleDebug...一动不动,那种感觉就像等一辆永远…...
UniApp跨平台开发入门:用现有Vue代码快速生成小程序/App(2023最新版)
UniApp跨平台开发实战:2023年Vue代码高效迁移指南 移动互联网时代,开发者常面临一个核心挑战:如何用最小成本将Web应用扩展到移动端。如果你手头已有成熟的Vue项目,UniApp可能是最经济的跨平台解决方案——它允许你复用80%以上的现…...
