当前位置: 首页 > news >正文

Flume的安装和使用

一、安装Flume

1. 下载flume-1.7.0

http://mirrors.shu.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2. 解压改名

tar xvf apache-flume-1.7.0-bin.tar.gz

mv apache-flume-1.7.0-bin flume

二、配置Flume

1. 配置sh文件

cp conf/flume-env.sh.template  conf/flume-env.sh

2. 配置conf文件

cp conf/flume-conf.properties.template conf/flume.conf

三、将文件输出到日志

1、配置源是那里,目标是那里

# source: avro, channel: memory, sink: log

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.type = avro

agent1.sources.avro-source1.bind = 0.0.0.0

agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = avro-source1

agent1.sinks = log-sink1

2、启动Flume agent

bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1

3、启动Flume Client

echo hello, world! > /usr/local/flume-test.log

echo hello, hadoop! >> /usr/local/flume-test.log

echo hello, flume! >> /usr/local/flume-test.log

echo hello, spark! >> /usr/local/flume-test.log

bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /usr/local/flume-test.log -Dflume.root.logger=DEBUG,console

4、client启动结果

2018-05-11 11:24:05,624 (main) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:498)] Batch size string = 5

2018-05-11 11:24:05,644 (main) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers

2018-05-11 11:24:06,100 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:233)] Finished

2018-05-11 11:24:06,100 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:236)] Closing reader

2018-05-11 11:24:06,101 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:240)] Closing RPC client

2018-05-11 11:24:06,107 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:84)] Exiting

5、Flume Server结果

2018-05-11 11:24:05,772 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 => /127.0.0.1:41414] OPEN

2018-05-11 11:24:05,773 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 => /127.0.0.1:41414] BOUND: /127.0.0.1:41414

2018-05-11 11:24:05,773 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 => /127.0.0.1:41414] CONNECTED: /127.0.0.1:52790

2018-05-11 11:24:06,076 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:378)] Avro source avro-source1: Received avro event batch of 4 events.

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 :> /127.0.0.1:41414] DISCONNECTED

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 :> /127.0.0.1:41414] UNBOUND

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 :> /127.0.0.1:41414] CLOSED

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:52790 disconnected.

2018-05-11 11:24:08,870 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 77 6F 72 6C 64 21          hello, world! }

2018-05-11 11:24:08,870 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 68 61 64 6F 6F 70 21       hello, hadoop! }

2018-05-11 11:24:08,870 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 66 6C 75 6D 65 21          hello, flume! }

2018-05-11 11:24:08,871 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 73 70 61 72 6B 21          hello, spark! }

可以看到server端收到了client写入的数据

四、用Flume监控目录将新文件上传到HDFS

1、配置方式

# 将/usr/local/flume/tmpData下的文本文件上传到hdfs://node10:9000/test/

# source: spooldir, channel: memory, sink: hdfs

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.spooldir-source1.channels = ch1

agent1.sources.spooldir-source1.type = spooldir

agent1.sources.spooldir-source1.spoolDir=/usr/local/flume/tmpData

# Event的headers增加文件名如:headers:{basename=a.txt}

agent1.sources.spooldir-source1.basenameHeader = true

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

# 文件名是:a.txt.1536559992413

agent1.sinks.hdfs-sink1.channel = ch1

agent1.sinks.hdfs-sink1.type = hdfs

agent1.sinks.hdfs-sink1.hdfs.path = hdfs://node10:9000/test

# hdfs.filePrefix = log_%Y%m%d_%H 如:log_20151016_13.1444973768543

agent1.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}

agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true

agent1.sinks.hdfs-sink1.hdfs.round = true

agent1.sinks.hdfs-sink1.hdfs.roundValue = 10

agent1.sinks.hdfs-sink1.hdfs.rollSize = 0

agent1.sinks.hdfs-sink1.hdfs.rollCount = 0

# Finally, now that we've defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = spooldir-source1

agent1.sinks = hdfs-sink1

2、通过Flume启动agent

bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name agent1 -Dflume.root.logger=DEBUG,console

3、查看hdfs

http://node10(节点ip):50070

3、监控目录的文件

每个文件读取完之后,修改后缀为.COMPLETED,重启flume时忽略这些文件

五、多个Agent - 多个Collector配置

1、Flume  Agent配置

#flume-client.properties

##### source: log(node1), channel: memory, sink: avro(node2, node3)

#agent1 name

agent1.channels = c1

agent1.sources = r1

agent1.sinks = k1 k2

#set gruop

agent1.sinkgroups = g1

#set channel

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1

agent1.sources.r1.type = exec

agent1.sources.r1.command = tail -F /usr/local/hadoop/logs/hadoop-root-namenode-node1.log

agent1.sources.r1.interceptors = i1 i2

agent1.sources.r1.interceptors.i1.type = static

agent1.sources.r1.interceptors.i1.key = Type

agent1.sources.r1.interceptors.i1.value = LOGIN

agent1.sources.r1.interceptors.i2.type = timestamp

# set sink1

agent1.sinks.k1.channel = c1

agent1.sinks.k1.type = avro

agent1.sinks.k1.hostname = node2

agent1.sinks.k1.port = 52020

# set sink2

agent1.sinks.k2.channel = c1

agent1.sinks.k2.type = avro

agent1.sinks.k2.hostname = node3

agent1.sinks.k2.port = 52020

#set sink group

agent1.sinkgroups.g1.sinks = k1 k2

#设置失效恢复

agent1.sinkgroups.g1.processor.type = failover

agent1.sinkgroups.g1.processor.priority.k1 = 10

agent1.sinkgroups.g1.processor.priority.k2 = 1

agent1.sinkgroups.g1.processor.maxpenalty = 10000

2、Flume Collector配置

# flume-server.properties

#set Agent name

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#set channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# other node,change node2 to node3

a1.sources.r1.type = avro

a1.sources.r1.bind = node2

a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = Collector

a1.sources.r1.interceptors.i1.value = NNA

a1.sources.r1.channels = c1

#set sink to hdfs

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=/flume/logdfs

a1.sinks.k1.hdfs.fileType=DataStream

a1.sinks.k1.hdfs.writeFormat=TEXT

a1.sinks.k1.hdfs.rollInterval=1

a1.sinks.k1.channel=c1

a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

2、Flume启动Agent

flume-ng agent -n agent1 -c conf -f conf/flume-client.properties -Dflume.root.logger=DEBUG,console

3、Flume启动Collector

flume-ng agent -n a1 -c conf -f conf/flume-server.properties -Dflume.root.logger=DEBUG,console

4、Flume Failover测试

Collector1的权重比Collector2大

kill掉Collector1,Collector2工作

恢复Collector1,Collector1工作

5、Flume  LoadBalancer

# 设置负载均衡

#agent1.sinkgroups.g1.processor.type = load_balance

#agent1.sinkgroups.g1.processor.backoff = true

#agent1.sinkgroups.g1.processor.selector = round_robin

六、Flume 整合Kafka

#设置Kafka接收器

a1.sinks.k2.type= org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的broker地址和端口号

a1.sinks.k2.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092

#a1.sinks.k2.defaultPartitionId=0

a1.sinks.k2.producer.type=sync

#设置Kafka的Topic

a1.sinks.k2.kafka.topic=TestTopic

#设置序列化方式

a1.sinks.k2.serializer.class=kafka.serializer.StringEncoder

a1.sinks.k2.channel=c1

七、flume传递header到spark

#a1.sinks.k2.serializer.class=org.apache.kafka.common.serialization.BytesDeserializer

a1.sinks.k2.useFlumeEventFormat = true

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "192.168.60.15:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[BytesDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean))

val topics = Array("sspclick")

val sspclick = KafkaUtils.createDirectStream[String, Bytes](

ssc,

PreferConsistent,

Subscribe[String, Bytes](topics, kafkaParams))

val reader = new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])

val a = sspclick.map(f => {

var body = f.value().get

var decoder = DecoderFactory.get().binaryDecoder(body, null);

var result = reader.read(null, decoder);

var hostname = result.getHeaders.get(new Utf8("hostname"))

var text = new String(result.getBody.array())

var array = text.split("\t")

相关文章:

Flume的安装和使用

一、安装Flume 1. 下载flume-1.7.0 http://mirrors.shu.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 2. 解压改名 tar xvf apache-flume-1.7.0-bin.tar.gz mv apache-flume-1.7.0-bin flume 二、配置Flume 1. 配置sh文件 cp conf/flume-env.sh.template …...

[Hive]七 Hive 内核

1. Hive架构 Hive架构主要包括: 用户界面:命令行(CLI)和web UIThrift Server:公开了一个非常简单的客户端执行HiveQL语句的API,包括JDBC(Java)和ODBC(C)&…...

Druid密码错误重试导致数据库超慢

文章目录 密码错误重试导致数据库超慢如何避免呢? 密码错误重试导致数据库超慢 有同事把项目的数据库密码配错了,导致其他所有连接该数据库的项目全部连接都获取缓慢了,一个页面加载要花费十几秒。排查mysql连接发现很多connect命令的连接 …...

Ubuntu 24.04安装和使用WPS 2019

为Ubuntu找一款免费、功能丰富的 Microsoft Office 替代品?WPS Office是理想选择!在本文中,包含在Ubuntu上安装 WPS Office,修复初次使用出现问题的修复。 安装WPS,参考链接>>How to Install WPS Office on Ubu…...

week05_nlp大模型训练·词向量文本向量

1、词向量训练 1.1 CBOW(两边预测中间) 一、CBOW 基本概念 CBOW 是一种用于生成词向量的方法,属于神经网络语言模型的一种。其核心思想是根据上下文来预测中心词。在 CBOW 中,输入是目标词的上下文词汇,输出是该目标…...

【RabbitMQ消息队列原理与应用】

RabbitMQ消息队列原理与应用 一、消息队列概述 (一)概念 消息队列(Message Queue,简称MQ)是一种应用程序间的通信方式,它允许应用程序通过将消息放入队列中,而不是直接调用其他应用程序的接口…...

反欺诈风控体系及策略

本文详细介绍了互联网领域金融信贷行业的反欺诈策略。首先,探讨了反欺诈的定义、重要性以及在当前互联网发展背景下欺诈风险的加剧。接着,分析了反欺诈的主要手段和基础技术,包括对中介和黑产的了解、欺诈风险的具体类型和表现方式&#xff0…...

Mac 12.1安装tiger-vnc问题-routines:CRYPTO_internal:bad key length

背景:因为某些原因需要从本地mac连接远程linxu桌面查看一些内容,必须使用桌面查看,所以ssh无法满足,所以决定安装vnc客户端。 问题: 在mac上通过 brew install tiger-vnc命令安装, 但是报错如下: > D…...

【代码分析】Unet-Pytorch

1:unet_parts.py 主要包含: 【1】double conv,双层卷积 【2】down,下采样 【3】up,上采样 【4】out conv,输出卷积 """ Parts of the U-Net model """import torch im…...

【LLM入门系列】01 深度学习入门介绍

NLP Github 项目: NLP 项目实践:fasterai/nlp-project-practice 介绍:该仓库围绕着 NLP 任务模型的设计、训练、优化、部署和应用,分享大模型算法工程师的日常工作和实战经验 AI 藏经阁:https://gitee.com/fasterai/a…...

安卓系统主板_迷你安卓主板定制开发_联发科MTK安卓主板方案

安卓主板搭载联发科MT8766处理器,采用了四核Cortex-A53架构,高效能和低功耗设计。其在4G网络待机时的电流消耗仅为10-15mA/h,支持高达2.0GHz的主频。主板内置IMG GE832 GPU,运行Android 9.0系统,内存配置选项丰富&…...

关键点检测——HRNet原理详解篇

🍊作者简介:秃头小苏,致力于用最通俗的语言描述问题 🍊专栏推荐:深度学习网络原理与实战 🍊近期目标:写好专栏的每一篇文章 🍊支持小苏:点赞👍🏼、…...

25考研总结

11408确实难,25英一直接单科斩杀😭 对过去这一年多备考的经历进行复盘,以及考试期间出现的问题进行思考。 考408的人,政治英语都不能拖到最后,408会惩罚每一个偷懒的人! 政治 之所以把政治写在最开始&am…...

网络安全态势感知

一、网络安全态势感知(Cyber Situational Awareness)是一种通过收集、处理和分析网络数据来理解当前和预测未来网络安全状态的能力。它的目的是提供实时的、安全的网络全貌,帮助组织理解当前网络中发生的事情,评估风险&#xff0c…...

在K8S中,节点状态notReady如何排查?

在kubernetes集群中,当一个节点(Node)的状态变为NotReady时,意味着该节点可能无法运行Pod或不能正确相应kubernetes控制平面。排查NotReady节点通常涉及以下步骤: 1. 获取基本信息 使用kubectl命令行工具获取节点状态…...

深度学习在光学成像中是如何发挥作用的?

深度学习在光学成像中的作用主要体现在以下几个方面: 1. **图像重建和去模糊**:深度学习可以通过优化图像重建算法来处理模糊图像或降噪,改善成像质量。这涉及到从低分辨率图像生成高分辨率图像,突破传统光学系统的分辨率限制。 …...

树莓派linux内核源码编译

Raspberry Pi 内核 托管在 GitHub 上;更新滞后于上游 Linux内核,Raspberry Pi 会将 Linux 内核的长期版本整合到 Raspberry Pi 内核中。 1 构建内核 操作系统随附的默认编译器和链接器被配置为构建在该操作系统上运行的可执行文件。原生编译使用这些默…...

本地小主机安装HomeAssistant开源智能家居平台打造个人AI管家

文章目录 前言1. 添加镜像源2. 部署HomeAssistant3. HA系统初始化配置4. HA系统添加智能设备4.1 添加已发现的设备4.2 添加HACS插件安装设备 5. 安装cpolar内网穿透5.1 配置HA公网地址 6. 配置固定公网地址 前言 大家好!今天我要向大家展示如何将一台迷你的香橙派Z…...

SpringBoot返回文件让前端下载的几种方式

01 背景 在后端开发中,通常会有文件下载的需求,常用的解决方案有两种: 不通过后端应用,直接使用nginx直接转发文件地址下载(适用于一些公开的文件,因为这里不需要授权)通过后端进行下载&#…...

人工智能及深度学习的一些题目

1、一个含有2个隐藏层的多层感知机(MLP),神经元个数都为20,输入和输出节点分别由8和5个节点,这个网络有多少权重值? 答:在MLP中,权重是连接神经元的参数,每个连接都有一…...

15-利用dubbo远程服务调用

本文介绍利用apache dubbo调用远程服务的开发过程,其中利用zookeeper作为注册中心。关于zookeeper的环境搭建,可以参考我的另一篇博文:14-zookeeper环境搭建。 0、环境 jdk:1.8zookeeper:3.8.4dubbo:2.7.…...

【Rust自学】8.5. HashMap Pt.1:HashMap的定义、创建、合并与访问

8.5.0. 本章内容 第八章主要讲的是Rust中常见的集合。Rust中提供了很多集合类型的数据结构,这些集合可以包含很多值。但是第八章所讲的集合与数组和元组有所不同。 第八章中的集合是存储在堆内存上而非栈内存上的,这也意味着这些集合的数据大小无需在编…...

未来网络技术的新征程:5G、物联网与边缘计算(10/10)

一、5G 网络:引领未来通信新潮流 (一)5G 网络的特点 高速率:5G 依托良好技术架构,提供更高的网络速度,峰值要求不低于 20Gb/s,下载速度最高达 10Gbps。相比 4G 网络,5G 的基站速度…...

LLM(十二)| DeepSeek-V3 技术报告深度解读——开源模型的巅峰之作

近年来,大型语言模型(LLMs)的发展突飞猛进,逐步缩小了与通用人工智能(AGI)的差距。DeepSeek-AI 团队最新发布的 DeepSeek-V3,作为一款强大的混合专家模型(Mixture-of-Experts, MoE&a…...

Uniapp在浏览器拉起导航

Uniapp在浏览器拉起导航 最近涉及到要在浏览器中拉起导航,对目标点进行路线规划等功能,踩了一些坑,找到了使用方法。(浏览器拉起) 效果展示 可以拉起三大平台及苹果导航 点击选中某个导航,会携带经纬度跳转…...

公平联邦学习——多目标优化

前言 前段时间接触到了联邦学习(Federated Learning, FL)。涉猎了几年多目标优化的我,惊奇地发现横向联邦学习里面也有用多目标优化来做的。于是有感而发,特此写一篇博客记录记录,如有机会可以和大家多多交流。遇到不…...

奇怪的Python:为何字符串要设置成不可变的?

你好!我是老邓。今天我们来聊聊 Python 中字符串不可变这个话题。 1、问题简介: Python 中,字符串属于不可变对象。这意味着一旦字符串被创建,它的值就无法被修改。任何看似修改字符串的操作,实际上都是创建了一个新…...

Vue-Router之嵌套路由

在路由配置中,配置children import Vue from vue import VueRouter from vue-routerVue.use(VueRouter)const router new VueRouter({mode: history,base: import.meta.env.BASE_URL,routes: [{path: /,redirect: /home},{path: /home,name: home,component: () &…...

MyBatis使用的设计模式

目录 1. 工厂模式(Factory Pattern) 2. 单例模式(Singleton Pattern) 3. 代理模式(Proxy Pattern) 4. 装饰器模式(Decorator Pattern) 5. 观察者模式(Observer Patt…...

arm rk3588 升级glibc2.31到2.33

一、查看glibc版本 rootztl:~# ldd --version ldd (Ubuntu GLIBC 2.31-0ubuntu9.2) 2.31 Copyright (C) 2020 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNE…...