Flume1.9.0自定义Sink组件将数据发送至Mysql
需求
1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。
2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到MySQL中到user中。
3、user.log的数据格式如下:
2020-01-01 01:10:23,tom,18,beijing
2020-01-01 01:12:09,jack,20,shanghai
2020-01-01 01:13:17,jessic,15,guangzhou
4、mysql中的user表结构如下:
CREATE TABLE user (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255),age int(11),city varchar(255),create_time datetime(0),PRIMARY KEY (id)
);
实现
鉴于此,可以使用 Exec Source + File Channel + Custom Mysql Sink 来实现。官方文档如下:
Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelCustom Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#custom-sink
https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink
创建工程
引入依赖
主要是 flume-ng-core 和 mysql-connector-java 依赖,其他可不引入。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>fastjson</artifactId>-->
<!-- <version>2.0.25</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.hutool</groupId>-->
<!-- <artifactId>hutool-core</artifactId>-->
<!-- <version>5.8.27</version>-->
<!-- </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>
编写 Custom Sink
package com.example.flumedemo.sink;import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** 自定义Sink,实现将数据写入到mysql。* <p>* 注意:* 1、编写完成打包后,需要把当前jar包和mysql驱动包放到flume下的lib目录下。* 2、linux直接连linux上的mysql,最好不要连win上的mysql了,避坑。** @author liaorj* @date 2024/11/14*/
public class MySink extends AbstractSink implements Configurable {private static final Logger logger = LoggerFactory.getLogger(MySink.class);private String mysqlUrl;private String username;private String password;private String tableName;//表字段,逗号分割。需要和Event body中的数据对应。private String tableFields;@Overridepublic void configure(Context context) {mysqlUrl = context.getString("mysqlUrl");Preconditions.checkNotNull(mysqlUrl, "mysqlUrl required");username = context.getString("username");Preconditions.checkNotNull(username, "username required");password = context.getString("password");Preconditions.checkNotNull(password, "password required");tableName = context.getString("tableName");Preconditions.checkNotNull(tableName, "tableName required");tableFields = context.getString("tableFields");Preconditions.checkNotNull(tableFields, "tableFields required");}@Overridepublic Status process() throws EventDeliveryException {Status status = null;//开启事务Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();Event event = null;while (true) {event = ch.take();if (event != null) {break;}}Connection conn = null;PreparedStatement stmt = null;try {//获取body中的数据String body = new String(event.getBody(), Charsets.UTF_8);//如果这两个数组大小不一样,则抛异常String[] bodySplit = body.split(",");String[] fieldsSplit = tableFields.split(",");if (bodySplit.length != fieldsSplit.length) {//字段数对不上throw new Exception("the number of tableFields is incorrect");}//根据字段数生成对应的问号List<String> questionMarkList = new ArrayList<>();for (int i = 0; i < fieldsSplit.length; i++) {questionMarkList.add("?");}String questionMarks = String.join(",", questionMarkList);//生成sql并插入数据String formatSql = String.format("insert into %s(%s) values(%s)", tableName, tableFields, questionMarks);logger.info("-----formatSql={}", formatSql);logger.info("-----mysqlUrl={}, username={}, password={}", mysqlUrl, username, password);DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());conn = DriverManager.getConnection(mysqlUrl, username, password);stmt = conn.prepareStatement(formatSql);for (int i = 0; i < bodySplit.length; i++) {stmt.setString(i + 1, bodySplit[i]);}stmt.executeUpdate();txn.commit();status = Status.READY;} catch (Throwable t) {//异常则回滚txn.rollback();status = Status.BACKOFF;if (t instanceof Error) {throw (Error) t;} else {throw new EventDeliveryException(t);}} finally {//关闭事务txn.close();//关闭PrepareStatement预处理if (stmt != null) {try {stmt.close();} catch (SQLException e) {e.printStackTrace();}}//关闭Connection连接if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}return status;}
}
打包
mvn clean
mvn package
打包好后,需要把当前jar包和mysql驱动包一起上传到linux上的flume目录下的lib目录中,否则会报错驱动找不到。
配置文件
创建配置文件
然后在flume目录下的conf目录下创建配置文件:file-to-mysql.conf,内容如下,注意mysqlUrl/username/password 要修改成自己的。
# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/user.log# Describe the sink,custom sink to mysql
a1.sinks.k1.type = com.example.flumedemo.sink.MySink
a1.sinks.k1.mysqlUrl = jdbc:mysql://192.168.163.128:3306/flume_demo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
a1.sinks.k1.username = root
a1.sinks.k1.password = toor
a1.sinks.k1.tableName = user
a1.sinks.k1.tableFields = create_time,name,age,city# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/user/checkpointDir
a1.channels.c1.dataDirs = /data/user/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
切换到flume目录,执行:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-mysql.conf -Dflume.root.logger=INFO,console
测试结果
查看flume控制台日志:
查看mysql user表,已插入数据:
相关文章:

Flume1.9.0自定义Sink组件将数据发送至Mysql
需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到…...

如何在 Ubuntu 24.04 上安装和配置 Fail2ban ?
确保你的 Ubuntu 24.04 服务器的安全是至关重要的,特别是如果它暴露在互联网上。一个常见的威胁是未经授权的访问尝试,特别是通过 SSH。Fail2ban 是一个强大的工具,可以通过自动阻止可疑活动来帮助保护您的服务器。 在本指南中,我…...

uniapp如何i18n国际化
1、正常情况下项目在代码生成的时候就已经有i18n的相关依赖,如果没有可以自行使用如下命令下载: npm install vue-i18n --save 2、创建相关文件 en文件下: zh文件下: index文件下: 3、在main.js中注册:…...

C++__day1
1、思维导图 2、如果登录失败,提示用户登录失败信息,并且提示错误几次,且重新输入;如果输入错误三次,则退出系统 #include <iostream> using namespace std;int main() {string id , pswd;string user"admi…...

Emacs进阶之插入时间信息(一百六十三)
简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【…...

Java线程池:ThreadPoolExecutor原理解析
一、线程池的基本概念 1.1 线程池的定义 线程池是一组预先创建的线程,这些线程可以重复使用来执行多个任务,避免了频繁创建和销毁线程的开销。线程池的核心思想是通过复用一组工作线程,来处理大量的并发任务,减少系统资源消耗&a…...
二叉树、哈夫曼报文大全
1、泛型链树 #include <iostream> #include<Windows.h> #include<string> #include<stack> #include<queue> using namespace std; void menu() {cout << "**********" << endl;cout << "-1.添加" <&…...

NotePad++中安装XML Tools插件
一、概述 作为开发人员,日常开发中大部的数据是标准的json格式,但是对于一些古老的应用,例如webservice接口,由于其响应结果是xml,那么我们拿到xml格式的数据后,常常会对其进行格式化,以便阅读。…...

聊天服务器(7)数据模块
目录 Mysql数据库代码封装头文件与源文件 Mysql数据库代码封装 业务层代码不要直接写数据库,因为业务层和数据层的代码逻辑也想完全区分开。万一不想存储mysql,想存redis的话,就要改动大量业务代码。解耦合就是改起来很方便。 首先需要安装m…...

VS2022编译32位OpenCV
使用环境 Visual Studio 2022 OpenCV: 4.7.0 cmake: 3.30.2一、使用CMake工具生成vs2022的openCV工程解决方案 打开cmake,选择opencv的源代码目录,创建一个文件夹,作为VS工程文件的生成目录 点击configure构建项目,弹出构建设置…...

WP网站如何增加文章/页面的自定义模板
通过Wordpress我们后台在发布文章或者页面的时候其实可以看到有些主题 他有选择使用的页面模板,可以自定义模板,但是有些主题却没有选择主题这个功能,那这个自定义模板的功能是如何实现的呢?以下分两种情况:Page页面和…...

【Linux网络编程】简单的UDP网络程序
目录 一,socket编程的相关说明 1-1,sockaddr结构体 1-2,Socket API 二,基于Udp协议的简单通信 一,socket编程的相关说明 Socket编程是一种网络通信编程技术,它允许两个或多个程序在网络上相互通信&…...

LabVIEW中坐标排序与旋转 参见附件snippet程序
LabVIEW中坐标排序与旋转 参见附件snippet程序LabVIEW中坐标排序与旋转 参见附件snippet程序 - 北京瀚文网星科技有限公司 在LabVIEW中处理坐标排序的过程,尤其是按顺时针或逆时针排列坐标点,常见的应用包括处理几何形状、路径规划等任务。下面我将为您…...

SPIRiT-Diffusion:基于自一致性驱动的加速MRI扩散模型|文献速递-基于深度学习的病灶分割与数据超分辨率
Title 题目 SPIRiT-Diffusion: Self-Consistency Driven Diffusion Model for Accelerated MRI SPIRiT-Diffusion:基于自一致性驱动的加速MRI扩散模型 01 文献速递介绍 磁共振成像(MRI) 在临床和研究领域被广泛应用。然而,其…...
jwt封装教程
使用步骤: 1.导入jwt相关依赖 2.创建jwt工具类方便使用 3.通过工具类提供的方法进行生成jwt 4.通过工具类解析jwt令牌获取封装的数据 5.设定拦截器,每次执行请求的时候都需要验证token 6.注册拦截器 1.jwt依赖 <dependency><groupId>io.json…...

postman变量和脚本功能介绍
1、基本概念——global、collection、environment 在postman中,为了更好的管理各类变量、测试环境以及脚本等,创建了一些概念,包括:globals、collection、environment。其实在postman中,最上层还有一个Workspaces的概…...

【AI新领域应用】AlphaFold 2,原子级别精度的蛋白质3D结构预测,李沐论文精读(2021Nature封面,2024诺贝尔奖)
文章目录 AlphaFold 2 —— 原子级别精度的蛋白质3D结构预测背景(2024诺奖与AI学习资料)1、摘要、导论、写作技巧2、方案:模型,编码器,解码器3、实验:数据集,训练,结果 AlphaFold 2 …...

Figma汉化:提升设计效率,降低沟通成本
在UI设计领域,Figma因其强大的功能而广受欢迎,但全英文界面对于国内设计师来说是一个不小的挑战。幸运的是,通过Figma汉化插件,我们可以克服语言障碍。以下是两种获取和安装Figma汉化插件的方法,旨在帮助国内的UI设计师…...

前端知识点---this的用法 , this动态绑定(Javascript)
文章目录 this动态绑定 , this的用法01. 全局作用域下的 this02. 函数中的 this2.1 普通函数调用2.2 构造函数调用2.3 箭头函数中的 this 03对象方法调用04. 事件处理中的 this05. 动态绑定的方式5.1 call 方法5.2 apply 方法5.3 bind 方法 06类中的 this07. 总结 this动态绑定…...

web——upload-labs——第五关——大小写绕过绕过
先上传一个 先尝试直接上传一个普通的一句话木马 不行 可以看到,.htaccess文件也被过滤了,我们来查看一下源码 第五关的源码没有把字符强制转换为小写的语句: $file_ext strtolower($file_ext); //转换为小写 直接通过Burpsuite抓包修改文…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)
引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...

Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...

ubuntu系统文件误删(/lib/x86_64-linux-gnu/libc.so.6)修复方案 [成功解决]
报错信息:libc.so.6: cannot open shared object file: No such file or directory: #ls, ln, sudo...命令都不能用 error while loading shared libraries: libc.so.6: cannot open shared object file: No such file or directory重启后报错信息&…...