当前位置: 首页 > news >正文

NiFi-从部署到开发(图文详解)

NiFi简介

Apache NiFi 是一款强大的开源数据集成工具,旨在简化数据流的管理、传输和自动化。它提供了直观的用户界面和可视化工具,使用户能够轻松设计、控制和监控复杂的数据流程,NiFi 具备强大的扩展性和可靠性,可用于处理海量数据,并且能很好地应对复杂的数据转换需求,还可以设置定时调度任务

特点

  • 可视化操作:提供了图形化界面,用户可以通过拖放组件来构建数据处理流程。
  • 数据处理功能强大:能够实现数据的获取、转换、分发等操作。例如,可以从各种数据源获取数据,对数据进行格式转换、内容过滤等处理,再将数据发送到目标系统。
  • 可靠性高:在数据传输和处理过程中具有良好的容错机制,保障数据的完整性和准确性。
  • 扩展性好:可以轻松地扩展来处理大规模的数据流量和复杂的数据处理任务

NiFi的下载和安装

在本篇文章中主要讲述了四种部署方式,一种是单机部署比较简单,第二种是使用nifi自带的zookeeper部署伪分布模式,如果你的资源不足的话可以选用该模式,第三种是使用nifi自带的zookeeper部署集群模式,当然这种使用内部自带zookeeper的情况在实际开发中不常用,第三种就是比较常用的使用外部zookeeper的nifi集群模式的部署

首先我们先来看一下单机部署

单机部署

以下部署要在java环境下,若没有请事先安装jdk

下载安装包

进入Apache NiFi 官方网站来下载你所需要的版本

也可以直接点击nifi官网所有版本来下载

这里我下载的是1.13.2版本

下载完成后上传至Linux进行解压处理

解压:tar -zxvf  nifi-1.13.2-bin.tar.gz -C /opt/installs/

修改配置文件

cp nifi-1.13.2/conf/nifi.properties nifi-1.13.2/conf/nifi.properties.bak

修改ip和端口号

启动nifi

nifi-1.13.2/bin/nifi.sh

下面可以看到有启动/停止/运行/重启/状态等服务

启动nifi:nifi-1.13.2/bin/nifi.sh start

进入nifi的可视化界面

http://192.168.233.128:5800/nifi/

日志的位置

Nifi 集群部署-伪分布模式

由于nifi内置zookeeper,故我们先使用内置zookeeper进行搭建,可以在一台服务器上搭建伪分布模式,也可以在三台服务器上搭建集群模式,因为伪分布模式在一台机器上搭建,所以不同节点的相同功能端口会不同,如果搭建集群模式,IP不同,那么不同节点的相同功能端口可以相同,此处搭建集群模式

1、修改linux的⼀个安全机制

(1)进入vi /etc/selinux/config,添加SELINUX=disabled,防止后续出现一些问题

三台都需要修改,改完重启一下

(2)因为配置完单机模式后,nifi会产生许多新的数据库来存储数据,为了防止对伪分布模式有影响,这里先删除单机模式的nifi,重新解压一份

解压:tar -zxvf  nifi-1.13.2-bin.tar.gz -C /opt/installs/重命名:mv nifi-1.13.2/ nifi

rm -rf /opt/installs/nifi-1.13.2/

2、准备三个单机NIFI实例

nifi-1 nifi-2 nifi-3

3、修改配置文件

cp nifi/conf/nifi.properties nifi/conf/nifi.properties.bak

(1)修改三台服务器nifi中的zookeeper.properties

# 1节点2181,2节点2182,1节点2183clientPort=12181                                                                                                                    # 不同机器使用不同IPserver.1=bigdata01:12888:13888server.2=bigdata01:14888:15888server.3=bigdata01:16888:17888

(2)新建nifi-1/state/zookeeper,nifi-2/state/zookeeper'nifi-3/state/zookeeper在此文件夹中新建文件myid,分别对应写入1、2、3

(3)编辑节点conf/nifi.properties文件

1 ####################2 # State Management #                                                                                                 3 ####################4 nifi.state.management.configuration.file=./conf/state-management.xml                                             5 nifi.state.management.provider.local=local-provider  6 nifi.state.management.provider.cluster=zk-provider7 #  指定此NiFi实例是否应运行嵌入式ZooKeeper服务器,默认是false                          8 nifi.state.management.embedded.zookeeper.start=true                                                                9 nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties 
10 
11 # web properties #                                                 
12 nifi.web.war.directory=./lib    
13 # HTTP主机。默认为空白                                               
14 nifi.web.http.host=bigdata01
15 # HTTP端口。默认值为8080
16 nifi.web.http.port=18001
17 
18 # cluster node properties (only configure for cluster nodes) #   
19 # 如果实例是群集中的节点,请将此设置为true。默认值为false
20 nifi.cluster.is.node=true 
21 # 节点的完全限定地址。默认为空白
22 nifi.cluster.node.address=bigdata01
23 # 节点的协议端口。默认为空白
24 nifi.cluster.node.protocol.port=28001
25 
26 # 指定在选择Flow作为“正确”流之前等待的时间量。如果已投票的节点数等于nifi.cluster.flow.election.max.candidates属性指定的数量,则群集将不会等待这么长时间。默认值为5 mins
27 nifi.cluster.flow.election.max.wait.time=1 mins 
28 # 指定群集中所需的节点数,以便提前选择流。这允许群集中的节点避免在开始处理之前等待很长时间,如果我们至少达到群集中的此数量的节点
29 nifi.cluster.flow.election.max.candidates=1
30 
31 # cluster load balancing properties #  
32 nifi.cluster.load.balance.host=
33 nifi.cluster.load.balance.port=6342
34 
35 # zookeeper properties, used for cluster management # 
36 # 连接到Apache ZooKeeper所需的连接字符串。这是一个以逗号分隔的hostname:port对列表
37 nifi.zookeeper.connect.string=bigdata01:12181,bigdata01:12182,bigdata01:12183
38 nifi.zookeeper.connect.timeout=3 secs                                                      
39 nifi.zookeeper.session.timeout=3 secs                                                   
40 nifi.zookeeper.root.node=/nifi
bigdata01
143行:nifi.web.http.host=bigdata01
144行:nifi.web.http.port=18001
241行:nifi.cluster.is.node=true
242行:nifi.cluster.node.address=bigdata01
243行:nifi.cluster.node.protocol.port=28001
256行:nifi.cluster.load.balance.port=6342
262行:nifi.zookeeper.connect.string=bigdata01:12181,bigdata01:12182,bigdata01:12183

节点2,节点3内容跟节点1相同,只是nifi.web.http.port,nifi.cluster.node.protocol.port,nifi.cluster.load.balance.port,这三个端口区分开来,避免端口重复

(4)修改conf/state-management.xml文件

61行:<property name="Connect String">bigdata01:12181,bigdata01:12182,bigdata01:12183</property>

4、启动三个实例,启动完成后进入可视化界面

启动:nifi-1/bin/nifi.sh start
启动:nifi-2/bin/nifi.sh start
启动:nifi-3/bin/nifi.sh start
可视化界面:bigdata01:18001

Nifi 集群部署-内置Zookeeper

因为配置完单机模式后,nifi会产生许多新的数据库来存储数据,为了防止对集群模式有影响,这里先删除单机模式的nifi,重新解压一份,重命名为nifi

1、分发nifi至三台服务器

xsync nifi/

2、修改配置文件

(1)修改state-management.xml

<cluster-provider><id>zk-provider</id><class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class><property name="Connect String">bigdata01:2181,bigdata02:2181,bigdata03:2181</property><property name="Root Node">/nifi</property><property name="Session Timeout">10 seconds</property><property name="Access Control">Open</property></cluster-provider>

进行分发

xsync /opt/installs/nifi/conf/state-management.xml

(2)修改nifi.properties

bigdata01
51行:nifi.state.management.embedded.zookeeper.start=true(默认为false,如果使用外部的zookeeper集群为false)
143行:nifi.web.http.host=bigdata01
144行:nifi.web.http.port=18001
241行:nifi.cluster.is.node=true
143行:nifi.cluster.node.address=bigdata01
243行:nifi.cluster.node.protocol.port=28001
262行:nifi.zookeeper.connect.string=bigdata01:2181,bigdata02:2181,bigdata03:2181

进行分发并修改主机名

xsync /opt/installs/nifi/conf/nifi.properties
修改143行、143行主机名为相应的另外两台服务器的名字即可

(3)修改zookeeper.properties

clientPort=2181                                                                                                                    
server.1=bigdata01:2888:3888;2181
server.2=bigdata02:2888:3888;2181
server.3=bigdata03:2888:3888;2181

进行分发

xsync /opt/installs/nifi/conf/nifi.properties

(4)新建nifi/state/zookeeper,在此文件夹中新建文件myid,且在三台虚拟机中依次写入1、2、3

3、启动nifi并产看web界面

cd /opt/installs/nifibin/nifi.sh start #三台都要启动bigdata01:18001

Nifi 集群部署-外部Zookeeper

修改使用内置zookeeper状态下的nifi.properties中的第51行即可

web界面简介以及简单读取文件案例

选择自己想使用的处理器

填写文件夹路径(三台服务器都要有)

打勾了说明如果成功了就会停下来

把上述success打的勾取消,再拖拽入一个output类型的处理器进行连接,点击运行,发现getFile运行中,nifi的强大之处,只要有一个处理器能运行就会执行,此时我们点击运行,并往上述文件夹中发送数据,数据会先存储到管道中,数据会管道把数据存到本地磁盘,如果后续处理器发生错误也不会影响前面处理器的运行

点击小眼睛或者感叹号都可以查看内容

接下来配置输出路径,会发现数据传到了输出路径中,管道变成了0字节

从本地文件读取数据到MySQL

1、添加处理组file_to_mysql

开启hdfs服务:start-dfs.sh

2、填写hadoop中core-site-xml和hdfs-site.xml的路径

/opt/installs/hadoop/etc/hadoop/core-site.xml,/opt/installs/hadoop/etc/hadoop/hdfs-site.xml

其他参数可以先不配置

因为默认会加载你hadoop的配置文件

点击运行,往输入路径中写入文件

cp wcc.txt /home/data

离线同步MySQL数据到hdfs

类似于datax的功能

新建一个处理组mysql_to_hdfs

添加mysql端

添加驱动以及驱动地址

jdbc:mysql://bigdata01:3306/mydb01
com.mysql.jdbc.Driver
/opt/installs/hive/lib/mysql-connector-java-8.0.26.jar

这是因为没有填写账号密码

再次启动即可

这时发现文件传输速度特别快,可以修改调度时间,因为我们的离线数仓是一天调度一次,可以改成86400秒

查看hdfs的数据发现是乱码的

这时我们可以添加一个中间处理器将avro格式转换为json格式

添加之后再次启动即可

可以看到我们每次运行都把数据放入了同一个文件夹,这样是不行的,我们之前用datax导数据时放入了不同的文件夹

修改动态目录

添加dt后仍然无法识别,需要再添加一个处理器UpdateAttribute

给前三个处理器执行一下,使数据在第三个管道内

再运行第四个处理器,在hdfs上查看

但是还是要手动去写,能不能调用一些函数呢

双击可查看获取当前时间的方法

这里我们选用${now():format('yyyy')}并改成年月日的形式${now():format('yyyy-MM-dd')}

这样每天都会创建新的文件夹,但是打开文件夹后发现里面的文件名是用UUID命名的,虽然使文件名不重复但是不便于观看

以时间戳和后缀作为文件名,这样可以控制文件的滚动

再次运行可以看到文件名得到了修改

还可以修改成每个小时生成一个新文件

实时监控kafka数据到hdfs

启动kafka集群,添加kafka消费者,选择对应的版本

修改配置

复制一份puthdfs,修改输出路径

和上面一样通过添加日期函数使一个小时生成一个文件夹,这样可以解决小文件问题

相关文章:

NiFi-从部署到开发(图文详解)

NiFi简介 Apache NiFi 是一款强大的开源数据集成工具&#xff0c;旨在简化数据流的管理、传输和自动化。它提供了直观的用户界面和可视化工具&#xff0c;使用户能够轻松设计、控制和监控复杂的数据流程&#xff0c;NiFi 具备强大的扩展性和可靠性&#xff0c;可用于处理海量数…...

Scala的条件匹配

条件匹配 在 Scala 中&#xff0c;条件匹配主要通过match表达式来实现&#xff0c;它类似于其他语言中的switch语句&#xff0c;但功能更强。 基本语法&#xff1a;match表达式通常与case关键字一起使用。语法格式如下&#xff1a; 输入一段数字&#xff0c;判断属于那个范围…...

如何手搓一个智能激光逗猫棒

背景 最近家里的猫胖了&#xff0c;所以我就想做个逗猫棒。找了一圈市场上的智能逗猫棒&#xff0c;运行轨迹比较单一&#xff0c;互动性不足。 轨迹单一&#xff0c;活动范围有限 而我希望后续可以结合人工智能物联网&#xff0c;通过摄像头来捕捉猫的位置&#xff0c;让小…...

leetcode LCP 开幕式焰火

LCP 44. 开幕式焰火 - 力扣&#xff08;LeetCode&#xff09; 「力扣挑战赛」开幕式开始了&#xff0c;空中绽放了一颗二叉树形的巨型焰火。 给定一棵二叉树 root 代表焰火&#xff0c;节点值表示巨型焰火这一位置的颜色种类。请帮小扣计算巨型焰火有多少种不同的颜色。 示例…...

使用GDI对象绘制UI时需要注意的若干细节问题总结

目录 1、一个bitmap不能同时被选进两个dc中 2、CreateCompatibleDC和CreateCompatibleBitmap要使用同一个dc作为参数 3、不能删除已经被选入DC中的GDI对象 4、使用完的GDI对象&#xff0c;要将之释放掉&#xff0c;否则会导致GDI对象泄漏 5、CreateCompatibleBitmap返回错…...

51单片机(STC89C52RC版本)学习笔记(更新中...)

文章目录 参考资料1. 准备工作1.1 win10配置51单片机开发环境1.1 Ubuntu配置51单片机开发环境问题1&#xff1a;mcs51/8051.h依赖于mcs51/lint.h问题2&#xff1a;提示找不到头文件mcs51/8051.h 2. 认识51单片机2.1 STC89C52单片机2.2 管脚图2.3 原理图2.4 按键抖动2.5 头文件说…...

七:仪表盘安装-controller node

一&#xff1a;工具、环境准备-controller node 二&#xff1a;OpenStack环境准备-controller node 三&#xff1a;安装服务-controller node 四&#xff1a;工具、环境准备-compute node 五&#xff1a;OpenStack环境准备-compute node 六&#xff1a;安装服务-compute node 七…...

C++设计模式之外观模式

动机 下图中左边方案的问题在于组件的客户和组件中各种复杂的子系统有了过多的耦合&#xff0c;随着外部客户程序和各子系统的演化&#xff0c;这种过多的耦合面临很多变化的挑战。 如何简化外部客户程序和系统间的交互接口&#xff1f;如何将外部客户程序的演化和内部子系统…...

显卡(Graphics Processing Unit,GPU)比特币挖矿

1. 比特币挖矿基本原理 比特币挖矿是通过参与比特币网络的共识机制——工作量证明&#xff08;Proof of Work, PoW&#xff09; 来完成的。具体来说&#xff0c;矿工通过不断尝试不同的哈希值&#xff0c;以解决一个难度逐渐增大的数学问题&#xff0c;从而验证交易并获得比特…...

【SARL】单智能体强化学习(Single-Agent Reinforcement Learning)《纲要》

&#x1f4e2;本篇文章是博主强化学习&#xff08;RL&#xff09;领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对相关等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅…...

CSS 动画效果实现:图片展示与交互

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Css篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来Css篇专栏内容:CSS 动画效果实现&#xff1a;图片展示与交互 前言 在现代网页设计中&#xff0c;动态效果能够显著…...

【机器学习】—Transformers的扩展应用:从NLP到多领域突破

好久不见&#xff01;喜欢就关注吧~ 云边有个稻草人-CSDN博客 目录 引言 一、Transformer架构解析 &#xff08;一&#xff09;、核心组件 &#xff08;二&#xff09;、架构图 二、领域扩展&#xff1a;从NLP到更多场景 1. 自然语言处理&#xff08;NLP&#xff09; 2…...

Linux权限机制深度解读:系统安全的第一道防线

文章目录 前言‼️一、Linux权限的概念‼️二、Linux权限管理❕2.1 文件访问者的分类&#xff08;人&#xff09;❕2.2 文件类型和访问权限&#xff08;事物属性&#xff09;✔️1. 文件类型✔️2. 基本权限✔️3. 权限值的表示方法 ❕2.3 文件访问权限的相关设置方法✔️1. ch…...

NineData云原生智能数据管理平台新功能发布|2024年11月版

本月发布 8 项更新&#xff0c;其中重点发布 2 项、功能优化 6 项。 重点发布 数据库 Devops - 数据生成支持多个数据源 NineData 支持在数据库中自动生成符合特定业务场景的随机数据&#xff0c;用于模拟实际生产环境中的数据情况&#xff0c;帮助用户在不使用真实数据的情况…...

Vue中控制组件的挂载位置

在 Vue 中&#xff0c;append-to-body“true” 主要用于一些第三方组件&#xff08;如 Element UI 或 Ant Design Vue 中的弹出框、下拉菜单等&#xff09;来控制组件的挂载位置。具体来说&#xff0c;当你设置 append-to-body“true” 时&#xff0c;它会将该组件的 DOM 元素插…...

查看docker容器日志

容器里面的服务运行报错了&#xff0c;要查看容器的日志 要查看 Docker 容器的日志&#xff0c;可以使用 docker logs 命令。以下是一些常见的使用方法&#xff1a; 基本用法 docker logs <container_name_or_id> 查看最近的日志 docker logs --tail 100 <contai…...

Apache Commons工具类库使用整理

文章目录 Apache Commons工具类库分类- commons-lang3字符串工具&#xff1a;StringUtils日期工具&#xff1a;DateUtils数值工具&#xff1a;NumberUtils对象工具&#xff1a;ObjectUtils数组工具&#xff1a;ArrayUtils异常工具&#xff1a;ExceptionUtils枚举工具&#xff1…...

力扣第89题 格雷编码

题目描述 格雷编码序列是一个二进制数字序列&#xff0c;其中的每两个相邻的数字只有一个二进制位不同。给定一个整数 n&#xff0c;表示格雷编码的位数&#xff0c;要求返回 n 位的格雷编码序列。 示例 1 输入&#xff1a; n 2输出&#xff1a; [0, 1, 3, 2]解释&#x…...

Linux C/C++编程中的多线程编程基本概念

【图书推荐】《Linux C与C一线开发实践&#xff08;第2版&#xff09;》_linux c与c一线开发实践pdf-CSDN博客《Linux C与C一线开发实践&#xff08;第2版&#xff09;&#xff08;Linux技术丛书&#xff09;》(朱文伟&#xff0c;李建英)【摘要 书评 试读】- 京东图书 (jd.com…...

解决Tomcat运行时错误:“Address localhost:1099 is already in use”

目录 背景: 过程&#xff1a; 报错的原因&#xff1a; 解决的方法&#xff1a; 总结&#xff1a; 直接结束Java.exe进程&#xff1a; 使用neststat -aon | findstr 1099 命令&#xff1a; 选择建议&#xff1a; 背景: 准备运行Tomcat服务器调试项目时&#xff0c;程序下…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

Rapidio门铃消息FIFO溢出机制

关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系&#xff0c;以下是深入解析&#xff1a; 门铃FIFO溢出的本质 在RapidIO系统中&#xff0c;门铃消息FIFO是硬件控制器内部的缓冲区&#xff0c;用于临时存储接收到的门铃消息&#xff08;Doorbell Message&#xff09;。…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...