大数据课程D5——hadoop的Sink
文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州
▲ 本章节目的
⚪ 掌握Sink的HDFS Sink;
⚪ 掌握Sink的Logger Sink;
⚪ 掌握Sink的File Roll Sink;
⚪ 掌握Sink的Null Sink;
⚪ 掌握Sink的AVRO Sink;
⚪ 掌握Sink的Custom Sink;
一、HDFS Sink
1. 概述
1. HDFS Sink将收集到的数据写到HDFS中。
2. 在往HDFS上写的时候,支持三种文件类型:文本类型,序列类型以及压缩类型。如果不指定,那么默认使用使得序列类型。
3. 在往HDFS上写数据的时候,数据的存储文件会定时的滚动,如果不指定,那么每隔30s会滚动一次,生成一个文件,那么此时会生成大量的小文件。
2. 配置属性
| 属性 | 解释 |
| type | 必须是hdfs |
| hdfs.path | 数据在HDFS上的存储路径 |
| hdfs.rollInterval | 指定文件的滚动的间隔时间 |
| hdfs.fileType | 指定文件的存储类型:DataSteam(文本),SequenceFile(序列),CompressedStream(压缩) |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置HDFS Sink
# 类型必须是hdfs
a1.sinks.k1.type = hdfs
# 指定数据在HDFS上的存储路径
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata
# 指定文件的存储类型
a1.sinks.k1.hdfs.fileType = DataStream
# 指定文件滚动的间隔时间
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f hdfssink.conf -
Dflume.root.logger=INFO,console
二、Logger Sink
1. 概述
1. Logger Sink是将Flume收集到的数据打印到控制台上。
2. 在打印的时候,为了防止过多的数据将屏幕占满,所以要求body部分的数据不能超过16个字节,超过的部分不打印。
3. Logger Sink在打印的时候,对中文支持不好。
2. 配置属性
| 属性 | 解释 |
| type | 必须是logger |
| maxBytesToLog | 指定body部分打印的字节数 |
三、File Roll Sink
1. 概述
1. File Roll Sink将数据写到本地磁盘上。
2. 同HDFS Sink类似,File Roll Sink在往磁盘上写的时候,也有一个滚动的间隔时间,同样是30s,因此在磁盘上同样会形成大量的小文件。
2. 配置属性
| 属性 | 解释 |
| type | 必须是file_roll |
| sink.directory | 指定数据的存储目录 |
| sink.rollInterval | 指定文件滚动的间隔时间 |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置File Roll Sink
# 类型必须是file_roll
a1.sinks.k1.type = file_roll
# 指定数据在磁盘上的存储目录
a1.sinks.k1.sink.directory = /home/flumedata
# 指定文件的滚动间隔时间
a1.sinks.k1.sink.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f filerollsink.conf -
Dflume.root.logger=INFO,console
四、Null Sink
1. 概述
1. Null Sink会抛弃所有接收到的数据。
2. 配置属性
| 属性 | 解释 |
| type | 必须是null |
3. 案例
1. 编写格式文件,添加如下内容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置Null Sink
# 类型必须是null
a1.sinks.k1.type = null
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1f
2. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f nullsink.conf -
Dflume.root.logger=INFO,console
五、AVRO Sink
1. 概述
1. AVRO Sink会将数据利用AVRO序列化之后写出到指定的节点的指定端口。
2. AVRO Sink结合AVRO Source实现多级、扇入、扇出流动效果。
2. 配置属性
| 属性 | 解释 |
| type | 必须是avro |
| hostname | 数据要发往的主机的主机名或者IP |
| port | 数据要发往的主机的接收端口 |
3. 多级流动
1. 第一个节点:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置多级流动
# 类型必须是avro
a1.sinks.k1.type = avro
# 指定主机名或者IP
a1.sinks.k1.hostname = hadoop02
# 指定端口
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 第二个节点:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置多级流动
# 类型必须是avro
a1.sinks.k1.type = avro
# 指定主机名或者IP
a1.sinks.k1.hostname = hadoop03
# 指定端口
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
3. 第三个节点:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
4. 启动Flume,启动的时候,谁接收数据,就先启动谁:
../bin/flume-ng agent -n a1 -c ../conf -f duoji.conf -
Dflume.root.logger=INFO,console
4. 扇入流动
1. 第一个和第二个节点:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置多级流动
# 类型必须是avro
a1.sinks.k1.type = avro
# 指定主机名或者IP
a1.sinks.k1.hostname = hadoop03
# 指定端口
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2. 第三个节点:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
3. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f shanru.conf -
Dflume.root.logger=INFO,console
5. 扇出流动
1. 第一个节点:
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2. 第二个和第三个节点::
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
3. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f shanchu.conf -
Dflume.root.logger=INFO,console
六、Custom Sink
1. 概述
1. 定义一个类实现Sink接口,考虑到需要获取配置属性,所以同样需要实现Configurable接口。
2. 不同于自定义Source,自定Sink需要考虑事务问题。
2. 事务

1. Source收集到数据之后,会通过doPut操作将树放到队列PutList(本质上是一个阻塞式队列)中。
2. PutList会试图将数据推送到Channel中。如果PutList成功将数据放到了Channel中,那么执行doCommit操作;反之执行doRollback操作。
3. Channel有了数据之后,会将数据通过doTake操作推送到TakeList中。
4. TakeList会将数据推送给Sink,如果Sink写出成功,那么执行doCommit;反之执行doRollvack。
3. 自定义Sink步骤
1. 构建Maven工程,导入对应的POM依赖。
2. 定义一个类继承AbstractSink,实现Sink接口和Configurable接口,覆盖configure,start,process和stop方法。
3. 完成之后打成jar包放到Flume安装目录的lib目录下。
4. 编写格式文件:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
# 配置自定义Sink
# 类型必须是类的全路径名
a1.sinks.k1.type = cn.tedu.flume.sink.AuthSink
# 指定文件的存储路径
a1.sinks.k1.path = /home/flumedata
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
5. 启动Flume:
../bin/flume-ng agent -n a1 -c ../conf -f authsink.conf -
Dflume.root.logger=INFO,console
相关文章:
大数据课程D5——hadoop的Sink
文章作者邮箱:yugongshiyesina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Sink的HDFS Sink; ⚪ 掌握Sink的Logger Sink; ⚪ 掌握Sink的File Roll Sink; ⚪ 掌握Sink的Null Sink; ⚪ 掌握Si…...
【数据结构】27.移除元素
💐 🌸 🌷 🍀 🌹 🌻 🌺 🍁 🍃 🍂 🌿 🍄🍝 🍛 🍤 📃个人主页 :阿然成长日记 …...
机器学习分布式框架ray运行xgboost实例
Ray是一个开源的分布式计算框架,专门用于构建高性能的机器学习和深度学习应用程序。它的目标是简化分布式计算的复杂性,使得用户能够轻松地将任务并行化并在多台机器上运行,以加速训练和推理的速度。Ray的主要特点包括支持分布式任务执行、Ac…...
C++设计模式笔记
设计模式 如何解决复杂性? 分解 核心思想:分而治之,将大问题分解为多个小问题,将复杂问题分解为多个简单的问题。 抽象 核心思想:从高层次角度讲,人们处理复杂性有一个通用的技术,及抽象。…...
简单聊聊创新与创造力
文章目录 前言一、大脑运行的两种方式1、聚焦模式2、发散模式3、影响想法的因素a、背景知识b、兴趣c、天赋 4、思维固化 二、想法的不可靠1、对想法进行验证2、颠覆性创新,挤牙膏式创新3、为什么模仿这么多 三、更多更多的idea1、个人的方面a、积累不同的背景知识b、…...
使用TensorFlow训练深度学习模型实战(上)
大家好,尽管大多数关于神经网络的文章都强调数学,而TensorFlow文档则强调使用现成数据集进行快速实现,但将这些资源应用于真实世界数据集是很有挑战性的,很难将数学概念和现成数据集与我的具体用例联系起来。本文旨在提供一个实用…...
【Spring】什么是Bean的生命周期及作用域,什么是Spring的执行流程?
博主简介:想进大厂的打工人博主主页:xyk:所属专栏: JavaEE进阶 在前面的播客中讲解了如何从Spring中存取Bean对象,那么本篇我们来讲解Bean对象的生命周期是什么,Bean对象的6种作用域分别是什么,都有哪些区别ÿ…...
立创EDA学习
学习树莓派3B的板子发现有个扩展板比较好,自己最好画一个,反正免费。 学习视频:立创EDA(专业版)电路设计与制作快速入门。 下载专业版,并激活。【分专业版和标准版,专业版也是免费的】 手机…...
清风学习笔记—层次分析法—matlab对判断矩阵的一致性检验
在判断矩阵是否为正互反矩阵这块,我写了两种代码,改进前很麻烦且有错误,改进后简洁多了,改进前的代码还有错误,忽略了对角线的值必须都是1,只考虑了除开对角线的元素相乘为1。 %% 改进前代码 A[3 2 4;1/2 …...
大众安徽内推
大众汽车(安徽)有限公司是大众汽车集团在中国第一家专注于新能源汽车的合资企业,是集团在中国首家拥有全面运营管理权的合资企业,担负着产品研发及数字化研发的重任,将成为集团全球电动出行中心之一。 VW Anhui Offic…...
Meta “地平线世界”移动端应用即将上线,手机快乐元宇宙?
根据海外记者 Janko Roettgers 的报道,Meta 预计很快推出移动版的 VR 元宇宙服务 "地平线世界",这是Meta 长期开发的产品。 根据最新报道,Meta宣布正在研发“地平线世界”的移动版,并表示这一服务已经可以在Quest VR设…...
更省更快更安全的云服务器,一站式集中管理,随时随地远程——站斧云桌面
随着全球化和数字化经济的发展,越来越多的企业开始海外扩张和拓展国际市场。而云服务器作为一种高效、灵活且可靠的IT基础设施方案,已成为出海企业不可或缺的重要工具。这里就为大家介绍云服务器在出海企业中的几个使用场景。 1.全球范围内协同办公 对…...
出现 Try run Maven import with -U flag (force update snapshots) 的解决方法
目录 1. 问题所示2. 原理分析3. 解决方法1. 问题所示 在配置Maven依赖信息的时候,出现如下问题: com.alibaba.nacos:nacos‐client:pom:1.1.3 failed to transfer from http://nexus.hepengju.cn:8081/nexus/content/groups/public/ during a previous attempt. This failu…...
python多线程
目录 一.多线程的定义 A.什么是多线程? B.多线程如今遇到的挑战 C.总结 二.python中的多线程 A.python中的多线程底层原理: B.全局解释器锁导致python多线程不能实现真正的并行执行! C.总结应用场景 三.java多线程,以及…...
Spring Framework 提供缓存管理器Caffeine
说明 Spring Framework 提供了一个名为 Caffeine 的缓存管理器。Caffeine 是一个基于 Java 的高性能缓存库,被广泛用于处理大规模缓存数据。 使用 Caffeine 缓存管理器,可以轻松地在 Spring 应用程序中添加缓存功能。它提供了以下主要特性:…...
ZQC的游戏 题解
前言 这题题意描述不是很清楚啊,所以我找了个有权限的人把题面改了改,应该还是比较清楚了。 感觉这道题挺妙的,就来写一篇题解。 思路 首先,根据贪心思想,我们会将 1 1 1 号点半径以内能吃的都吃了,假…...
24考研数据结构-第一章 绪论
数据结构 引用文章第一章:绪论1.0 数据结构在学什么1.1 数据结构的基本概念1.2 数据结构的三要素1.3 算法的基本概念1.4 算法的时间复杂度1.4.1 渐近时间复杂度1.4.2 常对幂指阶1.4.3 时间复杂度的计算1.4.4 最好与最坏时间复杂度 1.5 算法的空间复杂度1.5.1 空间复…...
Gitlab 备份与恢复
备份 1、备份数据(手动备份) gitlab-rake gitlab:backup:create2、备份数据(定时任务备份) [rootlocalhost ]# crontab -l 00 1 * * * /opt/gitlab/bin/gitlab-rake gitlab:backup:create 说明:每天凌晨1点备份数据…...
数据库—用户权限管理(三十三)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、概述 二、用户权限类型 三、用户赋权 四、权限删除 五、用户删除 前言 数据库用户权限管理是指对数据库用户的权限进行控制和管理,确保用户只能执…...
C语言【怎么定义变量?】
变量定义的目的是向编译器说明在哪里创建变量的存储,并指明如何创建变量的存储方式。变量定义会明确指定一个数据类型,并包含一个或多个变量的列表。例如: type variable_list; 在这里,"type"必须是一个合法的C数据类…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
python爬虫——气象数据爬取
一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用: 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests:发送 …...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
