当前位置: 首页 > news >正文

大数据课程D5——hadoop的Sink

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 掌握Sink的HDFS Sink;

⚪ 掌握Sink的Logger Sink;

⚪ 掌握Sink的File Roll Sink;

⚪ 掌握Sink的Null Sink;

⚪ 掌握Sink的AVRO Sink;

⚪ 掌握Sink的Custom Sink;

一、HDFS Sink

1. 概述

1. HDFS Sink将收集到的数据写到HDFS中。

2. 在往HDFS上写的时候,支持三种文件类型:文本类型,序列类型以及压缩类型。如果不指定,那么默认使用使得序列类型。

3. 在往HDFS上写数据的时候,数据的存储文件会定时的滚动,如果不指定,那么每隔30s会滚动一次,生成一个文件,那么此时会生成大量的小文件。

2. 配置属性

属性

解释

type

必须是hdfs

hdfs.path

数据在HDFS上的存储路径

hdfs.rollInterval

指定文件的滚动的间隔时间

hdfs.fileType

指定文件的存储类型:DataSteam(文本),SequenceFile(序列),CompressedStream(压缩)

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置HDFS Sink

# 类型必须是hdfs

a1.sinks.k1.type = hdfs

# 指定数据在HDFS上的存储路径

a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata

# 指定文件的存储类型

a1.sinks.k1.hdfs.fileType = DataStream

# 指定文件滚动的间隔时间

a1.sinks.k1.hdfs.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f hdfssink.conf -

Dflume.root.logger=INFO,console

二、Logger Sink

1. 概述

1. Logger Sink是将Flume收集到的数据打印到控制台上。

2. 在打印的时候,为了防止过多的数据将屏幕占满,所以要求body部分的数据不能超过16个字节,超过的部分不打印。

3. Logger Sink在打印的时候,对中文支持不好。

2. 配置属性

属性

解释

type

必须是logger

maxBytesToLog

指定body部分打印的字节数

三、File Roll Sink

1. 概述

1. File Roll Sink将数据写到本地磁盘上。

2. 同HDFS Sink类似,File Roll Sink在往磁盘上写的时候,也有一个滚动的间隔时间,同样是30s,因此在磁盘上同样会形成大量的小文件。

2. 配置属性

属性

解释

type

必须是file_roll

sink.directory

指定数据的存储目录

sink.rollInterval

指定文件滚动的间隔时间

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置File Roll Sink

# 类型必须是file_roll

a1.sinks.k1.type = file_roll

# 指定数据在磁盘上的存储目录

a1.sinks.k1.sink.directory = /home/flumedata

# 指定文件的滚动间隔时间

a1.sinks.k1.sink.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f filerollsink.conf -

Dflume.root.logger=INFO,console

四、Null Sink

1. 概述

1. Null Sink会抛弃所有接收到的数据。

2. 配置属性

属性

解释

type

必须是null

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置Null Sink

# 类型必须是null

a1.sinks.k1.type = null

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1f

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f nullsink.conf -

Dflume.root.logger=INFO,console

五、AVRO Sink

1. 概述

1. AVRO Sink会将数据利用AVRO序列化之后写出到指定的节点的指定端口。

2. AVRO Sink结合AVRO Source实现多级、扇入、扇出流动效果。

2. 配置属性

属性

解释

type

必须是avro

hostname

数据要发往的主机的主机名或者IP

port

数据要发往的主机的接收端口

3. 多级流动

1. 第一个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop02

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 第二个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop03

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 第三个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

4. 启动Flume,启动的时候,谁接收数据,就先启动谁:

../bin/flume-ng agent -n a1 -c ../conf -f duoji.conf -

Dflume.root.logger=INFO,console

4. 扇入流动

1. 第一个和第二个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop03

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 第三个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f shanru.conf -

Dflume.root.logger=INFO,console

5. 扇出流动

1. 第一个节点:

a1.sources = s1

a1.channels = c1 c2

a1.sinks = k1 k2

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c2.type = memory

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop02

a1.sinks.k1.port = 8090

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop03

a1.sinks.k2.port = 8090

a1.sources.s1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2

2. 第二个和第三个节点::

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f shanchu.conf -

Dflume.root.logger=INFO,console

六、Custom Sink

1. 概述

1. 定义一个类实现Sink接口,考虑到需要获取配置属性,所以同样需要实现Configurable接口。

2. 不同于自定义Source,自定Sink需要考虑事务问题。

2. 事务

1. Source收集到数据之后,会通过doPut操作将树放到队列PutList(本质上是一个阻塞式队列)中。

2. PutList会试图将数据推送到Channel中。如果PutList成功将数据放到了Channel中,那么执行doCommit操作;反之执行doRollback操作。

3. Channel有了数据之后,会将数据通过doTake操作推送到TakeList中。

4. TakeList会将数据推送给Sink,如果Sink写出成功,那么执行doCommit;反之执行doRollvack。

3. 自定义Sink步骤

1. 构建Maven工程,导入对应的POM依赖。

2. 定义一个类继承AbstractSink,实现Sink接口和Configurable接口,覆盖configure,start,process和stop方法。

3. 完成之后打成jar包放到Flume安装目录的lib目录下。

4. 编写格式文件:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置自定义Sink

# 类型必须是类的全路径名

a1.sinks.k1.type = cn.tedu.flume.sink.AuthSink

# 指定文件的存储路径

a1.sinks.k1.path = /home/flumedata

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

5. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f authsink.conf -

Dflume.root.logger=INFO,console

相关文章:

大数据课程D5——hadoop的Sink

文章作者邮箱:yugongshiyesina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Sink的HDFS Sink; ⚪ 掌握Sink的Logger Sink; ⚪ 掌握Sink的File Roll Sink; ⚪ 掌握Sink的Null Sink; ⚪ 掌握Si…...

【数据结构】27.移除元素

💐 🌸 🌷 🍀 🌹 🌻 🌺 🍁 🍃 🍂 🌿 🍄🍝 🍛 🍤 📃个人主页 :阿然成长日记 …...

机器学习分布式框架ray运行xgboost实例

Ray是一个开源的分布式计算框架,专门用于构建高性能的机器学习和深度学习应用程序。它的目标是简化分布式计算的复杂性,使得用户能够轻松地将任务并行化并在多台机器上运行,以加速训练和推理的速度。Ray的主要特点包括支持分布式任务执行、Ac…...

C++设计模式笔记

设计模式 如何解决复杂性? 分解 核心思想:分而治之,将大问题分解为多个小问题,将复杂问题分解为多个简单的问题。 抽象 核心思想:从高层次角度讲,人们处理复杂性有一个通用的技术,及抽象。…...

简单聊聊创新与创造力

文章目录 前言一、大脑运行的两种方式1、聚焦模式2、发散模式3、影响想法的因素a、背景知识b、兴趣c、天赋 4、思维固化 二、想法的不可靠1、对想法进行验证2、颠覆性创新,挤牙膏式创新3、为什么模仿这么多 三、更多更多的idea1、个人的方面a、积累不同的背景知识b、…...

使用TensorFlow训练深度学习模型实战(上)

大家好,尽管大多数关于神经网络的文章都强调数学,而TensorFlow文档则强调使用现成数据集进行快速实现,但将这些资源应用于真实世界数据集是很有挑战性的,很难将数学概念和现成数据集与我的具体用例联系起来。本文旨在提供一个实用…...

【Spring】什么是Bean的生命周期及作用域,什么是Spring的执行流程?

博主简介:想进大厂的打工人博主主页:xyk:所属专栏: JavaEE进阶 在前面的播客中讲解了如何从Spring中存取Bean对象,那么本篇我们来讲解Bean对象的生命周期是什么,Bean对象的6种作用域分别是什么,都有哪些区别&#xff…...

立创EDA学习

学习树莓派3B的板子发现有个扩展板比较好,自己最好画一个,反正免费。 学习视频:立创EDA(专业版)电路设计与制作快速入门。 下载专业版,并激活。【分专业版和标准版,专业版也是免费的】 手机…...

清风学习笔记—层次分析法—matlab对判断矩阵的一致性检验

在判断矩阵是否为正互反矩阵这块,我写了两种代码,改进前很麻烦且有错误,改进后简洁多了,改进前的代码还有错误,忽略了对角线的值必须都是1,只考虑了除开对角线的元素相乘为1。 %% 改进前代码 A[3 2 4;1/2 …...

大众安徽内推

大众汽车(安徽)有限公司是大众汽车集团在中国第一家专注于新能源汽车的合资企业,是集团在中国首家拥有全面运营管理权的合资企业,担负着产品研发及数字化研发的重任,将成为集团全球电动出行中心之一。 VW Anhui Offic…...

Meta “地平线世界”移动端应用即将上线,手机快乐元宇宙?

根据海外记者 Janko Roettgers 的报道,Meta 预计很快推出移动版的 VR 元宇宙服务 "地平线世界",这是Meta 长期开发的产品。 根据最新报道,Meta宣布正在研发“地平线世界”的移动版,并表示这一服务已经可以在Quest VR设…...

更省更快更安全的云服务器,一站式集中管理,随时随地远程——站斧云桌面

随着全球化和数字化经济的发展,越来越多的企业开始海外扩张和拓展国际市场。而云服务器作为一种高效、灵活且可靠的IT基础设施方案,已成为出海企业不可或缺的重要工具。这里就为大家介绍云服务器在出海企业中的几个使用场景。 1.全球范围内协同办公 对…...

出现 Try run Maven import with -U flag (force update snapshots) 的解决方法

目录 1. 问题所示2. 原理分析3. 解决方法1. 问题所示 在配置Maven依赖信息的时候,出现如下问题: com.alibaba.nacos:nacos‐client:pom:1.1.3 failed to transfer from http://nexus.hepengju.cn:8081/nexus/content/groups/public/ during a previous attempt. This failu…...

python多线程

目录 一.多线程的定义 A.什么是多线程? B.多线程如今遇到的挑战 C.总结 二.python中的多线程 A.python中的多线程底层原理: B.全局解释器锁导致python多线程不能实现真正的并行执行! C.总结应用场景 三.java多线程,以及…...

Spring Framework 提供缓存管理器Caffeine

说明 Spring Framework 提供了一个名为 Caffeine 的缓存管理器。Caffeine 是一个基于 Java 的高性能缓存库,被广泛用于处理大规模缓存数据。 使用 Caffeine 缓存管理器,可以轻松地在 Spring 应用程序中添加缓存功能。它提供了以下主要特性:…...

ZQC的游戏 题解

前言 这题题意描述不是很清楚啊,所以我找了个有权限的人把题面改了改,应该还是比较清楚了。 感觉这道题挺妙的,就来写一篇题解。 思路 首先,根据贪心思想,我们会将 1 1 1 号点半径以内能吃的都吃了,假…...

24考研数据结构-第一章 绪论

数据结构 引用文章第一章:绪论1.0 数据结构在学什么1.1 数据结构的基本概念1.2 数据结构的三要素1.3 算法的基本概念1.4 算法的时间复杂度1.4.1 渐近时间复杂度1.4.2 常对幂指阶1.4.3 时间复杂度的计算1.4.4 最好与最坏时间复杂度 1.5 算法的空间复杂度1.5.1 空间复…...

Gitlab 备份与恢复

备份 1、备份数据(手动备份) gitlab-rake gitlab:backup:create2、备份数据(定时任务备份) [rootlocalhost ]# crontab -l 00 1 * * * /opt/gitlab/bin/gitlab-rake gitlab:backup:create 说明:每天凌晨1点备份数据…...

数据库—用户权限管理(三十三)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、概述 二、用户权限类型 ​三、用户赋权 四、权限删除 五、用户删除 前言 数据库用户权限管理是指对数据库用户的权限进行控制和管理,确保用户只能执…...

C语言【怎么定义变量?】

变量定义的目的是向编译器说明在哪里创建变量的存储,并指明如何创建变量的存储方式。变量定义会明确指定一个数据类型,并包含一个或多个变量的列表。例如: type variable_list; 在这里,"type"必须是一个合法的C数据类…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...

安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖

在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会&#xff0…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1:通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分,设置 Gradle JDK 方法2:通过 Settings File → Settings... (或 CtrlAltS)…...

Vue ③-生命周期 || 脚手架

生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...