NIFI实现JSON转SQL并插入到数据库表中
说明
本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI
需求背景
现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库建表语句。
json数据
[{"name": "张三","age": 23,"gender": 1},{"name": "李四","age": 24,"gender": 1},{"name": "小红","age": 18,"gender": 0}
]
建表语句
CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age` int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';
json数据中的属性名和数据库字段的名要一一对应,要不然后期还得做转换,比较麻烦
创建文件流
添加处理器:GetFile
点击工具栏的Processor,拖拽到画布中
筛选GetFile,点击ADD添加到画布中
配置GetFile处理器
双击添加的处理器,弹出对应的配置界面
(可选操作)点击SETTINGS选项,在Name中输入处理器的名称:获取文件内容
点击SCHEDULING,在Run Schedule中输入定时器的时间,这里设置每10秒运行一次,如果不设置后面运行处理器的时候会无限循环运行
点击PROPERTIES选项
配置PROPERTIES,分别填写Input Directory、File Filter、Keep Source File,其他选项默认即可。
说明:博主的NIFI是使用docker安装的,容器的数据全部挂载到了宿主机中,NIFI的HOME默认是在/opt/nifi/nifi-current,挂载到宿主机的路径为:/root/data/nifi/nifi-current。所以Input Directory中填写的路径/opt/nifi/nifi-current/mydata/file 实际对应宿主机路径为:/root/data/nifi/nifi-current/mydata/file,到时候把测试文件放到宿主机的/root/data/nifi/nifi-current/mydata/file下面即可
将文件放到对应的目录下
说明:mydata/file中的所有文件需要有读写的权限,否则后面读取文件会报错
修改权限:
chmod +777 /root/data/nifi/nifi-current/mydata/file
(可选操作)测试处理器配置是否成功
添加LogAttribute处理器
连接处理器
将鼠标放到第一个处理器上,然后点击出现的箭头,将其拖拽到第二个处理器中,等待线条由红色变为绿色后,松开鼠标即可。
在弹出的界面中勾选success,然后点击ADD
第一个显示红色方框的代表当前处理器可以正常使用;第二个出现黄色三角感叹号的代表当前处理器有问题,双击第二个处理器。
在弹出的界面选择RELATIONSHIPS选项卡,在success下勾选terminate,最后点击APPLY
说明:success下面的两个选项:terminate和retry分别代表着当前处理器执行成功的操作
terminate代表成功后终止,retry代表成功后继续尝试
可以看到黄色的三角变成了红色的方框,表示当前处理器没问题了。
运行处理器
运行处理器有两种方式,第一种是一个一个单独运行另一种是直接运行全部
第一种
鼠标放到第一个处理器中然后右键,可以看到有一堆选项,这里运行处理器可以选择Sart或者Run Once,为了方便调试,这里选择Run Once即只运行一次
点击Run Once之后可以看到,在处理器的右上角多了一个标志,这个代表当前有几个线程在运行中
当处理器的任务执行结束后可以看到两个处理器的连接处会显示当前有几个队列,以及队列数据总的大小
将鼠标放到两个处理器的连接处,鼠标右键,选择List queue
在弹出的界面中可以看到等待中的队列列表
选择其中一个队列,点击左上角的提示,可以看到上一个处理器(GetFile)的一些信息,包括一些属性啊什么的,这个可以自己去看,这里不再仔细说明。点击OK可以关闭当前的弹框
点击某一个队列的右上角,第一个可以下载当前的内容,中间的小眼睛可以查看队列中的数据
点击小眼睛,可以看到文件中的内容显示在了页面中,默认是original,也可以选择formatted和hex
运行第二个处理器(LogAttribute),同样的鼠标放到处理器上,然后选择Run Once即可
然后可以在nifi的日志中看到打印了一些日志,主要包括了处理器的属性和内容
说明:如果要想打印出文件的内容,LogAttribute处理器需要选择以下内容
正常打印数据说明GetFile处理器配置的没问题
Json数组分隔
添加处理器:SplitJson
配置SplitJson处理器
双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容
JsonPath Expression(JSON 路径表达式):指定要提取的 JSON 对象的路径。例如,如果要提取根级别的 JSON 对象,可以将路径设置为 $
。
连接处理器
将GetFile处理器和SplitJson处理器连接起来,勾选For Relationships,然后选择ADD
(可选操作)测试处理器配置是否成功
将SplitJson处理器和LogAttribute处理器连接,连接处理器中的For Relationships选择split
此时发现SplitJson处理器还在告警,双击SplitJson处理器,选择RELATIONSHIPS,按照如图勾选
此时所有的处理器已正常显示
开启所有的处理器(在画布空白处鼠标右键,点击Start),查看nifi容器的日志,可以看到此时日志打印出来的不再是整个文件的内容,而是单独一条一条json数据
停止所有处理器(画布空白处鼠标右键,选择Stop),清空队列中的数据,在连接处鼠标右键,选择Empty queue
Json转为SQL
添加处理器:ConvertJSONToSQL
配置ConvertJSONToSQL处理器
双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容
配置JDBC Connection Pool
Value下面点击,选择Create new service
根据自己的情况选择对应的services,我这里选择的是默认的
点击最后面的右箭头
点击右侧的小齿轮
切换到SETTINGS选项卡,给驱动起个名字,方便以后识别
切换到PROPERTIES选项卡 ,配置数据库相关参数,其他按照默认的即可
校验参数配置是否正确,点击右上角的对号
校验通过会出现绿色对钩,如果配置不对会有对应提示,最后点击APPLY
开启JDBC的配置,点击闪电符号,在弹出的界面点击ENABLE,最后点击CLOSE
最后可以看到state已经变为Enabled,点击右上角的X关闭
到此JDBC的配置结束
配置Statement Type
再次双击处理器,配置Statement Type,选择INSERT,代表生成的是INSERT语句
配置Table Name
校验配置是否正确
最后点击APPLY
连接处理器
将SplitJson处理器和ConvertJSONToSQL处理器进行连接,Relationships选择split
(可选操作)测试处理器配置是否成功
这里跳过测试,如果需要测试自己的配置是否正确的,可以自行将处理器和LogAttribute处理器进行连接进行测试,以下是博主自己的测试结果,做个参考,最后面会打印生成的SQL语句
执行生成的SQL
添加处理器:PutSQL
配置PutSQL处理器
双击处理器,在PROPERTIES选项卡中配置以下内容,其他内容默认即可
校验配置是否正确
最后点击APPLY
连接处理器
将ConvertJSONToSQL处理器和PutSQL处理器进行连接,Relationships选择sql
处理PutSQL处理器的告警
双击处理器,在RELATIONSHIPS选项卡配置勾选以下内容
完整的配置结果
包含四个处理器,依次为GetFile=>SplitJson=>ConvertJSONToSQL=>PutSQL
开启所有的处理器
数据库是否有数据
可以看到现在的数据库里面还是没有数据的
开启处理器
在画布的空白位置,鼠标右键选择Start
开启后可以看到所有的处理器左上角都显示为绿色三角,表示处理器已经启动了,过十几秒再看处理器,发现已经有数据流入
查看数据库数据
此时数据库已经有数据插入,重复数据是因为每隔10秒执行一次任务,就会读取一次文件,然后重复往数据库插入数据,如果不想让数据不停插入数据库,可以将GetFile中的PROPERTIES下的Keep Source File设置为false即可(此操作需要停止处理器才能够设置)
结束语
NIFI学习需要花费一定的时间去仔细研究,它里面内置了大概300多个处理器,每个处理器实现的功能都不一样,配置也都不同。博主也正在不断地学习中,后续也会不断分享关于NIFI的内容,如果有什么疑问欢迎评论区进行评论。
相关文章:

NIFI实现JSON转SQL并插入到数据库表中
说明 本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI 需求背景 现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库…...

【canal系】canal集群异常Could not find first log file name in binary log index file
这里先说明下这边使用的canal版本号为1.1.5 在描述这个问题之前,首先需要简单对于canal架构有个基本的了解 canal工作原理 canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议MySQL master 收到 dum…...

ESP32C3 PWM输出
目前对于遥控双发差速小飞机计划采用如下架构: ESP32C3做主控,兼具遥控收发和飞行控制锂电池供电,带电量检测双发,720空心杯电机,55mm桨,带电流检测MPU6050加速度计和陀螺仪预留4个控制信号输出 马达控制要…...
二、GoLang输出HelloWorld、基本数据类型、变量常量定义、基本类型转换
一、输入Hello World go语言中,想要输出内容到控制台,package必须是main,包括方法名也必须是main, go语言输出的语法是 fmt 库。 package mainimport "fmt"func main() {// go语言中 输出内容 使用的库是 fmt库fmt.Pr…...

mojo初体验
目录标题 mojo初体验试用地址变量定义参数可变性和所有权Structures后续 mojo初体验 试用地址 https://www.modular.com/get-started 与python基础语法很相似。 变量定义 let定义不可变变量var定义可变变量 参数可变性和所有权 下面是一个基本的函数: fn add…...
python3 重启docker方法
一、工作中的问题 工作中进行测试时,需要修改nacos配置,修改完成后再重启对应的docker容器,让配置生效,研究了下,使用docker库可以做到。 如何修改nacos配置可以参见我的另一篇文章,传送门 python3 修改…...
【js】js中深拷贝与浅拷贝:
文章目录 一、浅拷贝(修改新对象会改到原对象)【1】通过 直接赋值【2】Object.assign()方法 二、深拷贝(修改新对象不会改到原对象)【1】通过JSON对象来实现深拷贝【2】 Object.create(…...

大文件上传demo,前端基于Uppy,后端基于koa
前言 文件上传基本上所有的管理系统之类的项目都有这么一个功能。因为使用了Element,可以方便的使用 其提供的Upload组件,对于普通上传来说基本上就够用了。但是有时候会涉及到大文件上传的需求,这时就会面临一些问题:比如文件上…...
typeScript--[函数定义]
一.TypesScript 函数的定义 函数的定义包括两种类型:函数声明和函数表达式。 1.函数声明 function hello(): string {return "hello" } 2.函数表达式 var hello1 function (): string {return "hello" } 二.函数之可选参数 参数后面的限…...

Spring初始化项目
1、官网用法 访问地址:https://start.spring.io idea配置:https://start.spring.io 2、阿里巴巴加速 访问地址:https://start.aliyun.com/bootstrap.html idea配置:https://start.aliyun.com 3、区别 官网阿里巴巴版本最新稍…...

Opencv 图像金字塔----高斯和拉普拉斯
原文:图像金字塔----高斯和拉普拉斯 图像金字塔是图像中多尺度表达的一种,最初用于机器视觉和图像压缩,最主要用于图像的分割、融合。 高斯金字塔 ( Gaussian pyramid): 高斯金字塔是由底部的最大分辨率图像逐次向下采样得到的一系列图像…...

gitLab(git)误提交命令
1.先使用下面命令查看一下分支上已提交的信息 git log 2.回退到之前的版本 git reset —hard 你要删除的提交哈希码(一般是离这个命令最近的一串数字) 3.覆盖掉远端的版本信息,使远端的仓库也回退到相应的版本 注意:切换到你提…...
Rust个人学习笔记2
一定要牢记Rust是基于表达式的语言,除了声明语句和表达式语句外,其他的都是表达式。所以if也是表达式,它可以被用作右值。 条件控制 if-else。cpp和python得结合。 fn main() {let number 34;if number < 0 {println!("number &l…...

深入浅出Android同步屏障机制
原文链接 Android Sync Barrier机制 诡异的假死问题 前段时间,项目上遇到了一个假死问题,随机出现,无固定复现规律,大量频繁随机操作后,便会出现假死,整个应用无法操作,不会响应事件ÿ…...

工程管理系统简介 工程管理系统源码 java工程管理系统 工程管理系统功能设计
鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展,企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性,公司对内部工程管…...
Python 专栏目录索引
文章目录 Python 环境搭建Python 语法 变量、print、注释和运算符Python 的基本结构Python 中的文件和文件夹操作Python 中常用库Python 常见问题及解决方案Python 应用实例 Python 环境搭建 vscode搭建Python环境 Python 语法 变量、print、注释和运算符 python语法 变量、…...
SSM学习
技术架构 crm的技术架构: 视图层(view):展示数据,跟用户交互。 html, css,js,jquery,bootstrap(ext / easyUI),jsp控制层(Controller):控制业客处理流程(接收请求,接收参数,封装参数;根据不同的请求调用业务 (servlet, ) springMVC ( , webwork,strutsl,struts2)业…...
.net项目部署Docker
1、项目生成的bin目录下创建Dockerfile文件 #运行环境描述,此处是用的Net5构建镜像 FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build #复制文件到 docker容器中的app文件夹中 COPY . /app #设置工作目录为 app 文件夹,要和上面一致哦 WORKDIR /app #设…...
Ubuntu无法加载exfat的USB存储设备
当接入设备USB存储设备提示: 不能挂在63GB 卷 Error mounting /dev/sdb1 at /media/ubuntu/83C9-26F4: Command-line mount -t "exfat" -o "uhelperudisks2,nodev,nosuid,uid1000,gid1000,iocharsetutf8,namecase0,errorsremount-ro,umask0077"…...
【计算机网络】网络编程接口 Socket API 解读(2)
Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具,和其他平台(比如 os-x …...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...

循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...

HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...

Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...

通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器
拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件: 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁
赛门铁克威胁猎手团队最新报告披露,数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据,严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能,但SEMR…...