当前位置: 首页 > news >正文

NIFI实现JSON转SQL并插入到数据库表中

说明

本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI

需求背景

现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库建表语句。

json数据

[{"name": "张三","age": 23,"gender": 1},{"name": "李四","age": 24,"gender": 1},{"name": "小红","age": 18,"gender": 0}
]

建表语句

CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age`  int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

json数据中的属性名和数据库字段的名要一一对应,要不然后期还得做转换,比较麻烦

创建文件流

添加处理器:GetFile

点击工具栏的Processor,拖拽到画布中

筛选GetFile,点击ADD添加到画布中

配置GetFile处理器

双击添加的处理器,弹出对应的配置界面

可选操作)点击SETTINGS选项,在Name中输入处理器的名称:获取文件内容

点击SCHEDULING,在Run Schedule中输入定时器的时间,这里设置每10秒运行一次,如果不设置后面运行处理器的时候会无限循环运行

 点击PROPERTIES选项

 配置PROPERTIES,分别填写Input DirectoryFile FilterKeep Source File,其他选项默认即可。

说明:博主的NIFI是使用docker安装的,容器的数据全部挂载到了宿主机中,NIFI的HOME默认是在/opt/nifi/nifi-current,挂载到宿主机的路径为:/root/data/nifi/nifi-current。所以Input Directory中填写的路径/opt/nifi/nifi-current/mydata/file 实际对应宿主机路径为:/root/data/nifi/nifi-current/mydata/file,到时候把测试文件放到宿主机的/root/data/nifi/nifi-current/mydata/file下面即可

将文件放到对应的目录下

说明:mydata/file中的所有文件需要有读写的权限,否则后面读取文件会报错

修改权限:

chmod +777 /root/data/nifi/nifi-current/mydata/file

(可选操作)测试处理器配置是否成功

 添加LogAttribute处理器

连接处理器

将鼠标放到第一个处理器上,然后点击出现的箭头,将其拖拽到第二个处理器中,等待线条由红色变为绿色后,松开鼠标即可。

在弹出的界面中勾选success,然后点击ADD

 第一个显示红色方框的代表当前处理器可以正常使用;第二个出现黄色三角感叹号的代表当前处理器有问题,双击第二个处理器。

在弹出的界面选择RELATIONSHIPS选项卡,在success下勾选terminate,最后点击APPLY

说明:success下面的两个选项:terminate和retry分别代表着当前处理器执行成功的操作

terminate代表成功后终止,retry代表成功后继续尝试

可以看到黄色的三角变成了红色的方框,表示当前处理器没问题了。

运行处理器

运行处理器有两种方式,第一种是一个一个单独运行另一种是直接运行全部

第一种

鼠标放到第一个处理器中然后右键,可以看到有一堆选项,这里运行处理器可以选择Sart或者Run Once,为了方便调试,这里选择Run Once即只运行一次

点击Run Once之后可以看到,在处理器的右上角多了一个标志,这个代表当前有几个线程在运行中

 当处理器的任务执行结束后可以看到两个处理器的连接处会显示当前有几个队列,以及队列数据总的大小

将鼠标放到两个处理器的连接处,鼠标右键,选择List queue

 在弹出的界面中可以看到等待中的队列列表

选择其中一个队列,点击左上角的提示,可以看到上一个处理器(GetFile)的一些信息,包括一些属性啊什么的,这个可以自己去看,这里不再仔细说明。点击OK可以关闭当前的弹框

 点击某一个队列的右上角,第一个可以下载当前的内容,中间的小眼睛可以查看队列中的数据

 点击小眼睛,可以看到文件中的内容显示在了页面中,默认是original,也可以选择formatted和hex

 运行第二个处理器(LogAttribute),同样的鼠标放到处理器上,然后选择Run Once即可

然后可以在nifi的日志中看到打印了一些日志,主要包括了处理器的属性和内容

说明:如果要想打印出文件的内容,LogAttribute处理器需要选择以下内容

正常打印数据说明GetFile处理器配置的没问题

Json数组分隔

添加处理器:SplitJson

 配置SplitJson处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

JsonPath Expression(JSON 路径表达式):指定要提取的 JSON 对象的路径。例如,如果要提取根级别的 JSON 对象,可以将路径设置为 $

连接处理器

将GetFile处理器和SplitJson处理器连接起来,勾选For Relationships,然后选择ADD

可选操作)测试处理器配置是否成功

将SplitJson处理器和LogAttribute处理器连接,连接处理器中的For Relationships选择split

 此时发现SplitJson处理器还在告警,双击SplitJson处理器,选择RELATIONSHIPS,按照如图勾选

此时所有的处理器已正常显示

开启所有的处理器(在画布空白处鼠标右键,点击Start),查看nifi容器的日志,可以看到此时日志打印出来的不再是整个文件的内容,而是单独一条一条json数据

停止所有处理器(画布空白处鼠标右键,选择Stop),清空队列中的数据,在连接处鼠标右键,选择Empty queue

Json转为SQL

添加处理器:ConvertJSONToSQL

 配置ConvertJSONToSQL处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

配置JDBC Connection Pool

Value下面点击,选择Create new service

根据自己的情况选择对应的services,我这里选择的是默认的 

点击最后面的右箭头

点击右侧的小齿轮

切换到SETTINGS选项卡,给驱动起个名字,方便以后识别

切换到PROPERTIES选项卡 ,配置数据库相关参数,其他按照默认的即可

校验参数配置是否正确,点击右上角的对号

校验通过会出现绿色对钩,如果配置不对会有对应提示,最后点击APPLY

开启JDBC的配置,点击闪电符号,在弹出的界面点击ENABLE,最后点击CLOSE

 最后可以看到state已经变为Enabled,点击右上角的X关闭

到此JDBC的配置结束

配置Statement Type

再次双击处理器,配置Statement Type,选择INSERT,代表生成的是INSERT语句

配置Table Name

校验配置是否正确

最后点击APPLY

连接处理器

将SplitJson处理器和ConvertJSONToSQL处理器进行连接,Relationships选择split

可选操作)测试处理器配置是否成功

这里跳过测试,如果需要测试自己的配置是否正确的,可以自行将处理器和LogAttribute处理器进行连接进行测试,以下是博主自己的测试结果,做个参考,最后面会打印生成的SQL语句

执行生成的SQL

添加处理器:PutSQL

配置PutSQL处理器

双击处理器,在PROPERTIES选项卡中配置以下内容,其他内容默认即可

 

 校验配置是否正确

最后点击APPLY

连接处理器

将ConvertJSONToSQL处理器和PutSQL处理器进行连接,Relationships选择sql

 处理PutSQL处理器的告警

双击处理器,在RELATIONSHIPS选项卡配置勾选以下内容

完整的配置结果

包含四个处理器,依次为GetFile=>SplitJson=>ConvertJSONToSQL=>PutSQL

 开启所有的处理器

数据库是否有数据

可以看到现在的数据库里面还是没有数据的

 开启处理器

在画布的空白位置,鼠标右键选择Start

开启后可以看到所有的处理器左上角都显示为绿色三角,表示处理器已经启动了,过十几秒再看处理器,发现已经有数据流入

 查看数据库数据

此时数据库已经有数据插入,重复数据是因为每隔10秒执行一次任务,就会读取一次文件,然后重复往数据库插入数据,如果不想让数据不停插入数据库,可以将GetFile中的PROPERTIES下的Keep Source File设置为false即可(此操作需要停止处理器才能够设置)

结束语

NIFI学习需要花费一定的时间去仔细研究,它里面内置了大概300多个处理器,每个处理器实现的功能都不一样,配置也都不同。博主也正在不断地学习中,后续也会不断分享关于NIFI的内容,如果有什么疑问欢迎评论区进行评论。

相关文章:

NIFI实现JSON转SQL并插入到数据库表中

说明 本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI 需求背景 现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库…...

【canal系】canal集群异常Could not find first log file name in binary log index file

这里先说明下这边使用的canal版本号为1.1.5 在描述这个问题之前,首先需要简单对于canal架构有个基本的了解 canal工作原理 canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议MySQL master 收到 dum…...

ESP32C3 PWM输出

目前对于遥控双发差速小飞机计划采用如下架构: ESP32C3做主控,兼具遥控收发和飞行控制锂电池供电,带电量检测双发,720空心杯电机,55mm桨,带电流检测MPU6050加速度计和陀螺仪预留4个控制信号输出 马达控制要…...

二、GoLang输出HelloWorld、基本数据类型、变量常量定义、基本类型转换

一、输入Hello World go语言中,想要输出内容到控制台,package必须是main,包括方法名也必须是main, go语言输出的语法是 fmt 库。 package mainimport "fmt"func main() {// go语言中 输出内容 使用的库是 fmt库fmt.Pr…...

mojo初体验

目录标题 mojo初体验试用地址变量定义参数可变性和所有权Structures后续 mojo初体验 试用地址 https://www.modular.com/get-started 与python基础语法很相似。 变量定义 let定义不可变变量var定义可变变量 参数可变性和所有权 下面是一个基本的函数: fn add…...

python3 重启docker方法

一、工作中的问题 工作中进行测试时,需要修改nacos配置,修改完成后再重启对应的docker容器,让配置生效,研究了下,使用docker库可以做到。 如何修改nacos配置可以参见我的另一篇文章,传送门 python3 修改…...

【js】js中深拷贝与浅拷贝:

文章目录 一、浅拷贝(修改新对象会改到原对象)【1】通过 直接赋值【2】Object.assign()方法 二、深拷贝(修改新对象不会改到原对象)【1】通过JSON对象来实现深拷贝【2】 Object.create(&#xf…...

大文件上传demo,前端基于Uppy,后端基于koa

前言 文件上传基本上所有的管理系统之类的项目都有这么一个功能。因为使用了Element,可以方便的使用 其提供的Upload组件,对于普通上传来说基本上就够用了。但是有时候会涉及到大文件上传的需求,这时就会面临一些问题:比如文件上…...

typeScript--[函数定义]

一.TypesScript 函数的定义 函数的定义包括两种类型:函数声明和函数表达式。 1.函数声明 function hello(): string {return "hello" } 2.函数表达式 var hello1 function (): string {return "hello" } 二.函数之可选参数 参数后面的限…...

Spring初始化项目

1、官网用法 访问地址:https://start.spring.io idea配置:https://start.spring.io 2、阿里巴巴加速 访问地址:https://start.aliyun.com/bootstrap.html idea配置:https://start.aliyun.com 3、区别 官网阿里巴巴版本最新稍…...

Opencv 图像金字塔----高斯和拉普拉斯

原文:图像金字塔----高斯和拉普拉斯 图像金字塔是图像中多尺度表达的一种,最初用于机器视觉和图像压缩,最主要用于图像的分割、融合。 高斯金字塔 ( Gaussian pyramid): 高斯金字塔是由底部的最大分辨率图像逐次向下采样得到的一系列图像…...

gitLab(git)误提交命令

1.先使用下面命令查看一下分支上已提交的信息 git log 2.回退到之前的版本 git reset —hard 你要删除的提交哈希码(一般是离这个命令最近的一串数字) 3.覆盖掉远端的版本信息,使远端的仓库也回退到相应的版本 注意:切换到你提…...

Rust个人学习笔记2

一定要牢记Rust是基于表达式的语言&#xff0c;除了声明语句和表达式语句外&#xff0c;其他的都是表达式。所以if也是表达式&#xff0c;它可以被用作右值。 条件控制 if-else。cpp和python得结合。 fn main() {let number 34;if number < 0 {println!("number &l…...

深入浅出Android同步屏障机制

原文链接 Android Sync Barrier机制 诡异的假死问题 前段时间&#xff0c;项目上遇到了一个假死问题&#xff0c;随机出现&#xff0c;无固定复现规律&#xff0c;大量频繁随机操作后&#xff0c;便会出现假死&#xff0c;整个应用无法操作&#xff0c;不会响应事件&#xff…...

工程管理系统简介 工程管理系统源码 java工程管理系统 工程管理系统功能设计

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…...

Python 专栏目录索引

文章目录 Python 环境搭建Python 语法 变量、print、注释和运算符Python 的基本结构Python 中的文件和文件夹操作Python 中常用库Python 常见问题及解决方案Python 应用实例 Python 环境搭建 vscode搭建Python环境 Python 语法 变量、print、注释和运算符 python语法 变量、…...

SSM学习

技术架构 crm的技术架构: 视图层(view):展示数据&#xff0c;跟用户交互。 html, css,js,jquery,bootstrap(ext / easyUI),jsp控制层(Controller):控制业客处理流程(接收请求,接收参数,封装参数;根据不同的请求调用业务 (servlet, ) springMVC ( , webwork,strutsl,struts2)业…...

.net项目部署Docker

1、项目生成的bin目录下创建Dockerfile文件 #运行环境描述&#xff0c;此处是用的Net5构建镜像 FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build #复制文件到 docker容器中的app文件夹中 COPY . /app #设置工作目录为 app 文件夹&#xff0c;要和上面一致哦 WORKDIR /app #设…...

Ubuntu无法加载exfat的USB存储设备

当接入设备USB存储设备提示&#xff1a; 不能挂在63GB 卷 Error mounting /dev/sdb1 at /media/ubuntu/83C9-26F4: Command-line mount -t "exfat" -o "uhelperudisks2,nodev,nosuid,uid1000,gid1000,iocharsetutf8,namecase0,errorsremount-ro,umask0077"…...

【计算机网络】网络编程接口 Socket API 解读(2)

Socket 是网络协议栈暴露给编程人员的 API&#xff0c;相比复杂的计算机网络协议&#xff0c;API 对关键操作和配置数据进行了抽象&#xff0c;简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具&#xff0c;和其他平台&#xff08;比如 os-x …...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

React Native 导航系统实战(React Navigation)

导航系统实战&#xff08;React Navigation&#xff09; React Navigation 是 React Native 应用中最常用的导航库之一&#xff0c;它提供了多种导航模式&#xff0c;如堆栈导航&#xff08;Stack Navigator&#xff09;、标签导航&#xff08;Tab Navigator&#xff09;和抽屉…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...