当前位置: 首页 > news >正文

NIFI实现JSON转SQL并插入到数据库表中

说明

本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI

需求背景

现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库建表语句。

json数据

[{"name": "张三","age": 23,"gender": 1},{"name": "李四","age": 24,"gender": 1},{"name": "小红","age": 18,"gender": 0}
]

建表语句

CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age`  int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

json数据中的属性名和数据库字段的名要一一对应,要不然后期还得做转换,比较麻烦

创建文件流

添加处理器:GetFile

点击工具栏的Processor,拖拽到画布中

筛选GetFile,点击ADD添加到画布中

配置GetFile处理器

双击添加的处理器,弹出对应的配置界面

可选操作)点击SETTINGS选项,在Name中输入处理器的名称:获取文件内容

点击SCHEDULING,在Run Schedule中输入定时器的时间,这里设置每10秒运行一次,如果不设置后面运行处理器的时候会无限循环运行

 点击PROPERTIES选项

 配置PROPERTIES,分别填写Input DirectoryFile FilterKeep Source File,其他选项默认即可。

说明:博主的NIFI是使用docker安装的,容器的数据全部挂载到了宿主机中,NIFI的HOME默认是在/opt/nifi/nifi-current,挂载到宿主机的路径为:/root/data/nifi/nifi-current。所以Input Directory中填写的路径/opt/nifi/nifi-current/mydata/file 实际对应宿主机路径为:/root/data/nifi/nifi-current/mydata/file,到时候把测试文件放到宿主机的/root/data/nifi/nifi-current/mydata/file下面即可

将文件放到对应的目录下

说明:mydata/file中的所有文件需要有读写的权限,否则后面读取文件会报错

修改权限:

chmod +777 /root/data/nifi/nifi-current/mydata/file

(可选操作)测试处理器配置是否成功

 添加LogAttribute处理器

连接处理器

将鼠标放到第一个处理器上,然后点击出现的箭头,将其拖拽到第二个处理器中,等待线条由红色变为绿色后,松开鼠标即可。

在弹出的界面中勾选success,然后点击ADD

 第一个显示红色方框的代表当前处理器可以正常使用;第二个出现黄色三角感叹号的代表当前处理器有问题,双击第二个处理器。

在弹出的界面选择RELATIONSHIPS选项卡,在success下勾选terminate,最后点击APPLY

说明:success下面的两个选项:terminate和retry分别代表着当前处理器执行成功的操作

terminate代表成功后终止,retry代表成功后继续尝试

可以看到黄色的三角变成了红色的方框,表示当前处理器没问题了。

运行处理器

运行处理器有两种方式,第一种是一个一个单独运行另一种是直接运行全部

第一种

鼠标放到第一个处理器中然后右键,可以看到有一堆选项,这里运行处理器可以选择Sart或者Run Once,为了方便调试,这里选择Run Once即只运行一次

点击Run Once之后可以看到,在处理器的右上角多了一个标志,这个代表当前有几个线程在运行中

 当处理器的任务执行结束后可以看到两个处理器的连接处会显示当前有几个队列,以及队列数据总的大小

将鼠标放到两个处理器的连接处,鼠标右键,选择List queue

 在弹出的界面中可以看到等待中的队列列表

选择其中一个队列,点击左上角的提示,可以看到上一个处理器(GetFile)的一些信息,包括一些属性啊什么的,这个可以自己去看,这里不再仔细说明。点击OK可以关闭当前的弹框

 点击某一个队列的右上角,第一个可以下载当前的内容,中间的小眼睛可以查看队列中的数据

 点击小眼睛,可以看到文件中的内容显示在了页面中,默认是original,也可以选择formatted和hex

 运行第二个处理器(LogAttribute),同样的鼠标放到处理器上,然后选择Run Once即可

然后可以在nifi的日志中看到打印了一些日志,主要包括了处理器的属性和内容

说明:如果要想打印出文件的内容,LogAttribute处理器需要选择以下内容

正常打印数据说明GetFile处理器配置的没问题

Json数组分隔

添加处理器:SplitJson

 配置SplitJson处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

JsonPath Expression(JSON 路径表达式):指定要提取的 JSON 对象的路径。例如,如果要提取根级别的 JSON 对象,可以将路径设置为 $

连接处理器

将GetFile处理器和SplitJson处理器连接起来,勾选For Relationships,然后选择ADD

可选操作)测试处理器配置是否成功

将SplitJson处理器和LogAttribute处理器连接,连接处理器中的For Relationships选择split

 此时发现SplitJson处理器还在告警,双击SplitJson处理器,选择RELATIONSHIPS,按照如图勾选

此时所有的处理器已正常显示

开启所有的处理器(在画布空白处鼠标右键,点击Start),查看nifi容器的日志,可以看到此时日志打印出来的不再是整个文件的内容,而是单独一条一条json数据

停止所有处理器(画布空白处鼠标右键,选择Stop),清空队列中的数据,在连接处鼠标右键,选择Empty queue

Json转为SQL

添加处理器:ConvertJSONToSQL

 配置ConvertJSONToSQL处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

配置JDBC Connection Pool

Value下面点击,选择Create new service

根据自己的情况选择对应的services,我这里选择的是默认的 

点击最后面的右箭头

点击右侧的小齿轮

切换到SETTINGS选项卡,给驱动起个名字,方便以后识别

切换到PROPERTIES选项卡 ,配置数据库相关参数,其他按照默认的即可

校验参数配置是否正确,点击右上角的对号

校验通过会出现绿色对钩,如果配置不对会有对应提示,最后点击APPLY

开启JDBC的配置,点击闪电符号,在弹出的界面点击ENABLE,最后点击CLOSE

 最后可以看到state已经变为Enabled,点击右上角的X关闭

到此JDBC的配置结束

配置Statement Type

再次双击处理器,配置Statement Type,选择INSERT,代表生成的是INSERT语句

配置Table Name

校验配置是否正确

最后点击APPLY

连接处理器

将SplitJson处理器和ConvertJSONToSQL处理器进行连接,Relationships选择split

可选操作)测试处理器配置是否成功

这里跳过测试,如果需要测试自己的配置是否正确的,可以自行将处理器和LogAttribute处理器进行连接进行测试,以下是博主自己的测试结果,做个参考,最后面会打印生成的SQL语句

执行生成的SQL

添加处理器:PutSQL

配置PutSQL处理器

双击处理器,在PROPERTIES选项卡中配置以下内容,其他内容默认即可

 

 校验配置是否正确

最后点击APPLY

连接处理器

将ConvertJSONToSQL处理器和PutSQL处理器进行连接,Relationships选择sql

 处理PutSQL处理器的告警

双击处理器,在RELATIONSHIPS选项卡配置勾选以下内容

完整的配置结果

包含四个处理器,依次为GetFile=>SplitJson=>ConvertJSONToSQL=>PutSQL

 开启所有的处理器

数据库是否有数据

可以看到现在的数据库里面还是没有数据的

 开启处理器

在画布的空白位置,鼠标右键选择Start

开启后可以看到所有的处理器左上角都显示为绿色三角,表示处理器已经启动了,过十几秒再看处理器,发现已经有数据流入

 查看数据库数据

此时数据库已经有数据插入,重复数据是因为每隔10秒执行一次任务,就会读取一次文件,然后重复往数据库插入数据,如果不想让数据不停插入数据库,可以将GetFile中的PROPERTIES下的Keep Source File设置为false即可(此操作需要停止处理器才能够设置)

结束语

NIFI学习需要花费一定的时间去仔细研究,它里面内置了大概300多个处理器,每个处理器实现的功能都不一样,配置也都不同。博主也正在不断地学习中,后续也会不断分享关于NIFI的内容,如果有什么疑问欢迎评论区进行评论。

相关文章:

NIFI实现JSON转SQL并插入到数据库表中

说明 本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI 需求背景 现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库…...

【canal系】canal集群异常Could not find first log file name in binary log index file

这里先说明下这边使用的canal版本号为1.1.5 在描述这个问题之前,首先需要简单对于canal架构有个基本的了解 canal工作原理 canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议MySQL master 收到 dum…...

ESP32C3 PWM输出

目前对于遥控双发差速小飞机计划采用如下架构: ESP32C3做主控,兼具遥控收发和飞行控制锂电池供电,带电量检测双发,720空心杯电机,55mm桨,带电流检测MPU6050加速度计和陀螺仪预留4个控制信号输出 马达控制要…...

二、GoLang输出HelloWorld、基本数据类型、变量常量定义、基本类型转换

一、输入Hello World go语言中,想要输出内容到控制台,package必须是main,包括方法名也必须是main, go语言输出的语法是 fmt 库。 package mainimport "fmt"func main() {// go语言中 输出内容 使用的库是 fmt库fmt.Pr…...

mojo初体验

目录标题 mojo初体验试用地址变量定义参数可变性和所有权Structures后续 mojo初体验 试用地址 https://www.modular.com/get-started 与python基础语法很相似。 变量定义 let定义不可变变量var定义可变变量 参数可变性和所有权 下面是一个基本的函数: fn add…...

python3 重启docker方法

一、工作中的问题 工作中进行测试时,需要修改nacos配置,修改完成后再重启对应的docker容器,让配置生效,研究了下,使用docker库可以做到。 如何修改nacos配置可以参见我的另一篇文章,传送门 python3 修改…...

【js】js中深拷贝与浅拷贝:

文章目录 一、浅拷贝(修改新对象会改到原对象)【1】通过 直接赋值【2】Object.assign()方法 二、深拷贝(修改新对象不会改到原对象)【1】通过JSON对象来实现深拷贝【2】 Object.create(&#xf…...

大文件上传demo,前端基于Uppy,后端基于koa

前言 文件上传基本上所有的管理系统之类的项目都有这么一个功能。因为使用了Element,可以方便的使用 其提供的Upload组件,对于普通上传来说基本上就够用了。但是有时候会涉及到大文件上传的需求,这时就会面临一些问题:比如文件上…...

typeScript--[函数定义]

一.TypesScript 函数的定义 函数的定义包括两种类型:函数声明和函数表达式。 1.函数声明 function hello(): string {return "hello" } 2.函数表达式 var hello1 function (): string {return "hello" } 二.函数之可选参数 参数后面的限…...

Spring初始化项目

1、官网用法 访问地址:https://start.spring.io idea配置:https://start.spring.io 2、阿里巴巴加速 访问地址:https://start.aliyun.com/bootstrap.html idea配置:https://start.aliyun.com 3、区别 官网阿里巴巴版本最新稍…...

Opencv 图像金字塔----高斯和拉普拉斯

原文:图像金字塔----高斯和拉普拉斯 图像金字塔是图像中多尺度表达的一种,最初用于机器视觉和图像压缩,最主要用于图像的分割、融合。 高斯金字塔 ( Gaussian pyramid): 高斯金字塔是由底部的最大分辨率图像逐次向下采样得到的一系列图像…...

gitLab(git)误提交命令

1.先使用下面命令查看一下分支上已提交的信息 git log 2.回退到之前的版本 git reset —hard 你要删除的提交哈希码(一般是离这个命令最近的一串数字) 3.覆盖掉远端的版本信息,使远端的仓库也回退到相应的版本 注意:切换到你提…...

Rust个人学习笔记2

一定要牢记Rust是基于表达式的语言&#xff0c;除了声明语句和表达式语句外&#xff0c;其他的都是表达式。所以if也是表达式&#xff0c;它可以被用作右值。 条件控制 if-else。cpp和python得结合。 fn main() {let number 34;if number < 0 {println!("number &l…...

深入浅出Android同步屏障机制

原文链接 Android Sync Barrier机制 诡异的假死问题 前段时间&#xff0c;项目上遇到了一个假死问题&#xff0c;随机出现&#xff0c;无固定复现规律&#xff0c;大量频繁随机操作后&#xff0c;便会出现假死&#xff0c;整个应用无法操作&#xff0c;不会响应事件&#xff…...

工程管理系统简介 工程管理系统源码 java工程管理系统 工程管理系统功能设计

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…...

Python 专栏目录索引

文章目录 Python 环境搭建Python 语法 变量、print、注释和运算符Python 的基本结构Python 中的文件和文件夹操作Python 中常用库Python 常见问题及解决方案Python 应用实例 Python 环境搭建 vscode搭建Python环境 Python 语法 变量、print、注释和运算符 python语法 变量、…...

SSM学习

技术架构 crm的技术架构: 视图层(view):展示数据&#xff0c;跟用户交互。 html, css,js,jquery,bootstrap(ext / easyUI),jsp控制层(Controller):控制业客处理流程(接收请求,接收参数,封装参数;根据不同的请求调用业务 (servlet, ) springMVC ( , webwork,strutsl,struts2)业…...

.net项目部署Docker

1、项目生成的bin目录下创建Dockerfile文件 #运行环境描述&#xff0c;此处是用的Net5构建镜像 FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build #复制文件到 docker容器中的app文件夹中 COPY . /app #设置工作目录为 app 文件夹&#xff0c;要和上面一致哦 WORKDIR /app #设…...

Ubuntu无法加载exfat的USB存储设备

当接入设备USB存储设备提示&#xff1a; 不能挂在63GB 卷 Error mounting /dev/sdb1 at /media/ubuntu/83C9-26F4: Command-line mount -t "exfat" -o "uhelperudisks2,nodev,nosuid,uid1000,gid1000,iocharsetutf8,namecase0,errorsremount-ro,umask0077"…...

【计算机网络】网络编程接口 Socket API 解读(2)

Socket 是网络协议栈暴露给编程人员的 API&#xff0c;相比复杂的计算机网络协议&#xff0c;API 对关键操作和配置数据进行了抽象&#xff0c;简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具&#xff0c;和其他平台&#xff08;比如 os-x …...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

linux arm系统烧录

1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 &#xff08;忘了有没有这步了 估计有&#xff09; 刷机程序 和 镜像 就不提供了。要刷的时…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...