Flume1.9.0自定义Sink组件将数据发送至Mysql
需求
1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。
2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到MySQL中到user中。
3、user.log的数据格式如下:
2020-01-01 01:10:23,tom,18,beijing
2020-01-01 01:12:09,jack,20,shanghai
2020-01-01 01:13:17,jessic,15,guangzhou
4、mysql中的user表结构如下:
CREATE TABLE user (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255),age int(11),city varchar(255),create_time datetime(0),PRIMARY KEY (id)
);
实现
鉴于此,可以使用 Exec Source + File Channel + Custom Mysql Sink 来实现。官方文档如下:
Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelCustom Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#custom-sink
https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink
创建工程
引入依赖
主要是 flume-ng-core 和 mysql-connector-java 依赖,其他可不引入。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>fastjson</artifactId>-->
<!-- <version>2.0.25</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.hutool</groupId>-->
<!-- <artifactId>hutool-core</artifactId>-->
<!-- <version>5.8.27</version>-->
<!-- </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>
编写 Custom Sink
package com.example.flumedemo.sink;import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** 自定义Sink,实现将数据写入到mysql。* <p>* 注意:* 1、编写完成打包后,需要把当前jar包和mysql驱动包放到flume下的lib目录下。* 2、linux直接连linux上的mysql,最好不要连win上的mysql了,避坑。** @author liaorj* @date 2024/11/14*/
public class MySink extends AbstractSink implements Configurable {private static final Logger logger = LoggerFactory.getLogger(MySink.class);private String mysqlUrl;private String username;private String password;private String tableName;//表字段,逗号分割。需要和Event body中的数据对应。private String tableFields;@Overridepublic void configure(Context context) {mysqlUrl = context.getString("mysqlUrl");Preconditions.checkNotNull(mysqlUrl, "mysqlUrl required");username = context.getString("username");Preconditions.checkNotNull(username, "username required");password = context.getString("password");Preconditions.checkNotNull(password, "password required");tableName = context.getString("tableName");Preconditions.checkNotNull(tableName, "tableName required");tableFields = context.getString("tableFields");Preconditions.checkNotNull(tableFields, "tableFields required");}@Overridepublic Status process() throws EventDeliveryException {Status status = null;//开启事务Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();Event event = null;while (true) {event = ch.take();if (event != null) {break;}}Connection conn = null;PreparedStatement stmt = null;try {//获取body中的数据String body = new String(event.getBody(), Charsets.UTF_8);//如果这两个数组大小不一样,则抛异常String[] bodySplit = body.split(",");String[] fieldsSplit = tableFields.split(",");if (bodySplit.length != fieldsSplit.length) {//字段数对不上throw new Exception("the number of tableFields is incorrect");}//根据字段数生成对应的问号List<String> questionMarkList = new ArrayList<>();for (int i = 0; i < fieldsSplit.length; i++) {questionMarkList.add("?");}String questionMarks = String.join(",", questionMarkList);//生成sql并插入数据String formatSql = String.format("insert into %s(%s) values(%s)", tableName, tableFields, questionMarks);logger.info("-----formatSql={}", formatSql);logger.info("-----mysqlUrl={}, username={}, password={}", mysqlUrl, username, password);DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());conn = DriverManager.getConnection(mysqlUrl, username, password);stmt = conn.prepareStatement(formatSql);for (int i = 0; i < bodySplit.length; i++) {stmt.setString(i + 1, bodySplit[i]);}stmt.executeUpdate();txn.commit();status = Status.READY;} catch (Throwable t) {//异常则回滚txn.rollback();status = Status.BACKOFF;if (t instanceof Error) {throw (Error) t;} else {throw new EventDeliveryException(t);}} finally {//关闭事务txn.close();//关闭PrepareStatement预处理if (stmt != null) {try {stmt.close();} catch (SQLException e) {e.printStackTrace();}}//关闭Connection连接if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}return status;}
}
打包
mvn clean
mvn package
打包好后,需要把当前jar包和mysql驱动包一起上传到linux上的flume目录下的lib目录中,否则会报错驱动找不到。
配置文件
创建配置文件
然后在flume目录下的conf目录下创建配置文件:file-to-mysql.conf,内容如下,注意mysqlUrl/username/password 要修改成自己的。
# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/user.log# Describe the sink,custom sink to mysql
a1.sinks.k1.type = com.example.flumedemo.sink.MySink
a1.sinks.k1.mysqlUrl = jdbc:mysql://192.168.163.128:3306/flume_demo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
a1.sinks.k1.username = root
a1.sinks.k1.password = toor
a1.sinks.k1.tableName = user
a1.sinks.k1.tableFields = create_time,name,age,city# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/user/checkpointDir
a1.channels.c1.dataDirs = /data/user/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
切换到flume目录,执行:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-mysql.conf -Dflume.root.logger=INFO,console
测试结果
查看flume控制台日志:

查看mysql user表,已插入数据:

相关文章:
Flume1.9.0自定义Sink组件将数据发送至Mysql
需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到…...
如何在 Ubuntu 24.04 上安装和配置 Fail2ban ?
确保你的 Ubuntu 24.04 服务器的安全是至关重要的,特别是如果它暴露在互联网上。一个常见的威胁是未经授权的访问尝试,特别是通过 SSH。Fail2ban 是一个强大的工具,可以通过自动阻止可疑活动来帮助保护您的服务器。 在本指南中,我…...
uniapp如何i18n国际化
1、正常情况下项目在代码生成的时候就已经有i18n的相关依赖,如果没有可以自行使用如下命令下载: npm install vue-i18n --save 2、创建相关文件 en文件下: zh文件下: index文件下: 3、在main.js中注册:…...
C++__day1
1、思维导图 2、如果登录失败,提示用户登录失败信息,并且提示错误几次,且重新输入;如果输入错误三次,则退出系统 #include <iostream> using namespace std;int main() {string id , pswd;string user"admi…...
Emacs进阶之插入时间信息(一百六十三)
简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【…...
Java线程池:ThreadPoolExecutor原理解析
一、线程池的基本概念 1.1 线程池的定义 线程池是一组预先创建的线程,这些线程可以重复使用来执行多个任务,避免了频繁创建和销毁线程的开销。线程池的核心思想是通过复用一组工作线程,来处理大量的并发任务,减少系统资源消耗&a…...
二叉树、哈夫曼报文大全
1、泛型链树 #include <iostream> #include<Windows.h> #include<string> #include<stack> #include<queue> using namespace std; void menu() {cout << "**********" << endl;cout << "-1.添加" <&…...
NotePad++中安装XML Tools插件
一、概述 作为开发人员,日常开发中大部的数据是标准的json格式,但是对于一些古老的应用,例如webservice接口,由于其响应结果是xml,那么我们拿到xml格式的数据后,常常会对其进行格式化,以便阅读。…...
聊天服务器(7)数据模块
目录 Mysql数据库代码封装头文件与源文件 Mysql数据库代码封装 业务层代码不要直接写数据库,因为业务层和数据层的代码逻辑也想完全区分开。万一不想存储mysql,想存redis的话,就要改动大量业务代码。解耦合就是改起来很方便。 首先需要安装m…...
VS2022编译32位OpenCV
使用环境 Visual Studio 2022 OpenCV: 4.7.0 cmake: 3.30.2一、使用CMake工具生成vs2022的openCV工程解决方案 打开cmake,选择opencv的源代码目录,创建一个文件夹,作为VS工程文件的生成目录 点击configure构建项目,弹出构建设置…...
WP网站如何增加文章/页面的自定义模板
通过Wordpress我们后台在发布文章或者页面的时候其实可以看到有些主题 他有选择使用的页面模板,可以自定义模板,但是有些主题却没有选择主题这个功能,那这个自定义模板的功能是如何实现的呢?以下分两种情况:Page页面和…...
【Linux网络编程】简单的UDP网络程序
目录 一,socket编程的相关说明 1-1,sockaddr结构体 1-2,Socket API 二,基于Udp协议的简单通信 一,socket编程的相关说明 Socket编程是一种网络通信编程技术,它允许两个或多个程序在网络上相互通信&…...
LabVIEW中坐标排序与旋转 参见附件snippet程序
LabVIEW中坐标排序与旋转 参见附件snippet程序LabVIEW中坐标排序与旋转 参见附件snippet程序 - 北京瀚文网星科技有限公司 在LabVIEW中处理坐标排序的过程,尤其是按顺时针或逆时针排列坐标点,常见的应用包括处理几何形状、路径规划等任务。下面我将为您…...
SPIRiT-Diffusion:基于自一致性驱动的加速MRI扩散模型|文献速递-基于深度学习的病灶分割与数据超分辨率
Title 题目 SPIRiT-Diffusion: Self-Consistency Driven Diffusion Model for Accelerated MRI SPIRiT-Diffusion:基于自一致性驱动的加速MRI扩散模型 01 文献速递介绍 磁共振成像(MRI) 在临床和研究领域被广泛应用。然而,其…...
jwt封装教程
使用步骤: 1.导入jwt相关依赖 2.创建jwt工具类方便使用 3.通过工具类提供的方法进行生成jwt 4.通过工具类解析jwt令牌获取封装的数据 5.设定拦截器,每次执行请求的时候都需要验证token 6.注册拦截器 1.jwt依赖 <dependency><groupId>io.json…...
postman变量和脚本功能介绍
1、基本概念——global、collection、environment 在postman中,为了更好的管理各类变量、测试环境以及脚本等,创建了一些概念,包括:globals、collection、environment。其实在postman中,最上层还有一个Workspaces的概…...
【AI新领域应用】AlphaFold 2,原子级别精度的蛋白质3D结构预测,李沐论文精读(2021Nature封面,2024诺贝尔奖)
文章目录 AlphaFold 2 —— 原子级别精度的蛋白质3D结构预测背景(2024诺奖与AI学习资料)1、摘要、导论、写作技巧2、方案:模型,编码器,解码器3、实验:数据集,训练,结果 AlphaFold 2 …...
Figma汉化:提升设计效率,降低沟通成本
在UI设计领域,Figma因其强大的功能而广受欢迎,但全英文界面对于国内设计师来说是一个不小的挑战。幸运的是,通过Figma汉化插件,我们可以克服语言障碍。以下是两种获取和安装Figma汉化插件的方法,旨在帮助国内的UI设计师…...
前端知识点---this的用法 , this动态绑定(Javascript)
文章目录 this动态绑定 , this的用法01. 全局作用域下的 this02. 函数中的 this2.1 普通函数调用2.2 构造函数调用2.3 箭头函数中的 this 03对象方法调用04. 事件处理中的 this05. 动态绑定的方式5.1 call 方法5.2 apply 方法5.3 bind 方法 06类中的 this07. 总结 this动态绑定…...
web——upload-labs——第五关——大小写绕过绕过
先上传一个 先尝试直接上传一个普通的一句话木马 不行 可以看到,.htaccess文件也被过滤了,我们来查看一下源码 第五关的源码没有把字符强制转换为小写的语句: $file_ext strtolower($file_ext); //转换为小写 直接通过Burpsuite抓包修改文…...
别再手动刷新了!SAP ALV中利用change事件与modify_cell实现智能数据同步
SAP ALV开发进阶:巧用change事件与modify_cell构建智能数据联动体系 在SAP前端开发领域,ALV(ABAP List Viewer)作为最常用的数据展示控件,其交互体验直接影响用户操作效率。传统开发模式中,当用户修改某个单…...
从零开始:Gemma-3-12B-IT WebUI在A10/A100/V100上的部署实践
从零开始:Gemma-3-12B-IT WebUI在A10/A100/V100上的部署实践 1. 项目简介:为什么选择Gemma-3-12B-IT? 如果你正在寻找一个性能强劲、部署友好,又不需要天价硬件支持的大语言模型,那么Gemma-3-12B-IT可能就是你的理想选…...
别再只测烟雾了!用STM32CubeMX+MQ-2传感器,做个厨房燃气泄漏+烟雾双检测器(附完整代码)
厨房安全卫士:基于STM32CubeMX与MQ-2的燃气烟雾双模检测系统 厨房是家庭安全事故的高发区域,燃气泄漏和烟雾积聚都可能引发严重后果。传统烟雾报警器功能单一,而市面上的复合型安防设备价格昂贵。本文将带你用STM32单片机和MQ-2气敏传感器&am…...
FlexASIO:打破专业音频门槛,让普通设备也能拥有专业级ASIO体验
FlexASIO:打破专业音频门槛,让普通设备也能拥有专业级ASIO体验 【免费下载链接】FlexASIO A flexible universal ASIO driver that uses the PortAudio sound I/O library. Supports WASAPI (shared and exclusive), KS, DirectSound and MME. 项目地址…...
别再踩坑了!Jetson Nano/Xavier NX上PyTorch和torchvision版本匹配保姆级指南(含JetPack 5/6)
Jetson设备PyTorch环境配置终极避坑手册:从版本匹配到性能调优 刚拿到Jetson Nano或Xavier NX的开发者们,十个里有九个会在PyTorch环境配置上栽跟头。不是torchvision报错就是CUDA不可用,最崩溃的是好不容易装好了却发现性能还不如树莓派。本…...
快捷键冲突终结者:Hotkey Detective全方位排障指南
快捷键冲突终结者:Hotkey Detective全方位排障指南 【免费下载链接】hotkey-detective A small program for investigating stolen hotkeys under Windows 8 项目地址: https://gitcode.com/gh_mirrors/ho/hotkey-detective 问题诊断:你的快捷键为…...
Emacs verilog-mode实战:5分钟搞定AUTOARG自动参数生成(附避坑指南)
Emacs verilog-mode实战:5分钟掌握AUTOARG高效参数生成技巧 在数字电路设计领域,Verilog作为主流硬件描述语言,其模块化开发方式虽然提高了代码复用性,却也带来了大量重复性工作。模块接口定义中的参数列表维护就是典型痛点——每…...
从零构建树莓派人脸识别门禁:硬件选型、环境部署与实战避坑
1. 硬件选型与采购清单 第一次玩树莓派人脸识别项目时,我在淘宝上花了整整三天对比各种硬件参数。当时最纠结的就是摄像头模块——普通USB摄像头才30块钱,而官方推荐的Raspberry Pi Camera Module V2要200多。后来实测发现,这差价真不能省。 …...
QT6.5串口编程第一步:用CMakeLists.txt引入SerialPort模块的避坑指南
QT6.5串口编程避坑指南:CMakeLists.txt配置全解析 当你满怀期待地在QT6.5项目中引入串口通信功能,却在编译时遭遇"找不到QtSerialPort"的红色错误提示,这种挫败感我深有体会。作为一位经历过无数次类似"战斗"的开发者&am…...
OFA模型在零售行业的视觉问答应用案例
OFA模型在零售行业的视觉问答应用案例 1. 引言 走进任何一家现代零售商店,你都会看到成千上万的商品整齐地陈列在货架上。但对于店员来说,要快速准确地回答"这个品牌的洗发水有没有无硅油版本?"或者"这款饼干是否含有坚果成…...
