当前位置: 首页 > news >正文

Flume1.9.0自定义Sink组件将数据发送至Mysql

需求

1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。

2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到MySQL中到user中。

3、user.log的数据格式如下:

2020-01-01 01:10:23,tom,18,beijing
2020-01-01 01:12:09,jack,20,shanghai
2020-01-01 01:13:17,jessic,15,guangzhou

4、mysql中的user表结构如下:

CREATE TABLE user  (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255),age int(11),city varchar(255),create_time datetime(0),PRIMARY KEY (id)
);

实现 

鉴于此,可以使用 Exec Source + File Channel + Custom Mysql Sink 来实现。官方文档如下:

Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelCustom Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#custom-sink
https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

创建工程

引入依赖

主要是 flume-ng-core 和 mysql-connector-java 依赖,其他可不引入。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba</groupId>-->
<!--            <artifactId>fastjson</artifactId>-->
<!--            <version>2.0.25</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>cn.hutool</groupId>-->
<!--            <artifactId>hutool-core</artifactId>-->
<!--            <version>5.8.27</version>-->
<!--        </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>

编写 Custom Sink

package com.example.flumedemo.sink;import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** 自定义Sink,实现将数据写入到mysql。* <p>* 注意:* 1、编写完成打包后,需要把当前jar包和mysql驱动包放到flume下的lib目录下。* 2、linux直接连linux上的mysql,最好不要连win上的mysql了,避坑。** @author liaorj* @date 2024/11/14*/
public class MySink extends AbstractSink implements Configurable {private static final Logger logger = LoggerFactory.getLogger(MySink.class);private String mysqlUrl;private String username;private String password;private String tableName;//表字段,逗号分割。需要和Event body中的数据对应。private String tableFields;@Overridepublic void configure(Context context) {mysqlUrl = context.getString("mysqlUrl");Preconditions.checkNotNull(mysqlUrl, "mysqlUrl required");username = context.getString("username");Preconditions.checkNotNull(username, "username required");password = context.getString("password");Preconditions.checkNotNull(password, "password required");tableName = context.getString("tableName");Preconditions.checkNotNull(tableName, "tableName required");tableFields = context.getString("tableFields");Preconditions.checkNotNull(tableFields, "tableFields required");}@Overridepublic Status process() throws EventDeliveryException {Status status = null;//开启事务Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();Event event = null;while (true) {event = ch.take();if (event != null) {break;}}Connection conn = null;PreparedStatement stmt = null;try {//获取body中的数据String body = new String(event.getBody(), Charsets.UTF_8);//如果这两个数组大小不一样,则抛异常String[] bodySplit = body.split(",");String[] fieldsSplit = tableFields.split(",");if (bodySplit.length != fieldsSplit.length) {//字段数对不上throw new Exception("the number of tableFields is incorrect");}//根据字段数生成对应的问号List<String> questionMarkList = new ArrayList<>();for (int i = 0; i < fieldsSplit.length; i++) {questionMarkList.add("?");}String questionMarks = String.join(",", questionMarkList);//生成sql并插入数据String formatSql = String.format("insert into %s(%s) values(%s)", tableName, tableFields, questionMarks);logger.info("-----formatSql={}", formatSql);logger.info("-----mysqlUrl={}, username={}, password={}", mysqlUrl, username, password);DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());conn = DriverManager.getConnection(mysqlUrl, username, password);stmt = conn.prepareStatement(formatSql);for (int i = 0; i < bodySplit.length; i++) {stmt.setString(i + 1, bodySplit[i]);}stmt.executeUpdate();txn.commit();status = Status.READY;} catch (Throwable t) {//异常则回滚txn.rollback();status = Status.BACKOFF;if (t instanceof Error) {throw (Error) t;} else {throw new EventDeliveryException(t);}} finally {//关闭事务txn.close();//关闭PrepareStatement预处理if (stmt != null) {try {stmt.close();} catch (SQLException e) {e.printStackTrace();}}//关闭Connection连接if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}return status;}
}

打包

mvn clean
mvn package
打包好后,需要把当前jar包和mysql驱动包一起上传到linux上的flume目录下的lib目录中,否则会报错驱动找不到。

配置文件

创建配置文件

然后在flume目录下的conf目录下创建配置文件:file-to-mysql.conf,内容如下,注意mysqlUrl/username/password 要修改成自己的。

# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/user.log# Describe the sink,custom sink to mysql
a1.sinks.k1.type = com.example.flumedemo.sink.MySink
a1.sinks.k1.mysqlUrl = jdbc:mysql://192.168.163.128:3306/flume_demo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
a1.sinks.k1.username = root
a1.sinks.k1.password = toor
a1.sinks.k1.tableName = user
a1.sinks.k1.tableFields = create_time,name,age,city# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/user/checkpointDir
a1.channels.c1.dataDirs = /data/user/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

切换到flume目录,执行:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-mysql.conf -Dflume.root.logger=INFO,console

测试结果

查看flume控制台日志:

查看mysql user表,已插入数据:

相关文章:

Flume1.9.0自定义Sink组件将数据发送至Mysql

需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份&#xff0c;但是Flume目前不支持直接向MySQL中写数据&#xff0c;所以需要用到自定义Sink&#xff0c;自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中&#xff0c;使用Flume采集到…...

如何在 Ubuntu 24.04 上安装和配置 Fail2ban ?

确保你的 Ubuntu 24.04 服务器的安全是至关重要的&#xff0c;特别是如果它暴露在互联网上。一个常见的威胁是未经授权的访问尝试&#xff0c;特别是通过 SSH。Fail2ban 是一个强大的工具&#xff0c;可以通过自动阻止可疑活动来帮助保护您的服务器。 在本指南中&#xff0c;我…...

uniapp如何i18n国际化

1、正常情况下项目在代码生成的时候就已经有i18n的相关依赖&#xff0c;如果没有可以自行使用如下命令下载&#xff1a; npm install vue-i18n --save 2、创建相关文件 en文件下&#xff1a; zh文件下&#xff1a; index文件下&#xff1a; 3、在main.js中注册&#xff1a…...

C++__day1

1、思维导图 2、如果登录失败&#xff0c;提示用户登录失败信息&#xff0c;并且提示错误几次&#xff0c;且重新输入&#xff1b;如果输入错误三次&#xff0c;则退出系统 #include <iostream> using namespace std;int main() {string id , pswd;string user"admi…...

Emacs进阶之插入时间信息(一百六十三)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…...

Java线程池:ThreadPoolExecutor原理解析

一、线程池的基本概念 1.1 线程池的定义 线程池是一组预先创建的线程&#xff0c;这些线程可以重复使用来执行多个任务&#xff0c;避免了频繁创建和销毁线程的开销。线程池的核心思想是通过复用一组工作线程&#xff0c;来处理大量的并发任务&#xff0c;减少系统资源消耗&a…...

二叉树、哈夫曼报文大全

1、泛型链树 #include <iostream> #include<Windows.h> #include<string> #include<stack> #include<queue> using namespace std; void menu() {cout << "**********" << endl;cout << "-1.添加" <&…...

NotePad++中安装XML Tools插件

一、概述 作为开发人员&#xff0c;日常开发中大部的数据是标准的json格式&#xff0c;但是对于一些古老的应用&#xff0c;例如webservice接口&#xff0c;由于其响应结果是xml&#xff0c;那么我们拿到xml格式的数据后&#xff0c;常常会对其进行格式化&#xff0c;以便阅读。…...

聊天服务器(7)数据模块

目录 Mysql数据库代码封装头文件与源文件 Mysql数据库代码封装 业务层代码不要直接写数据库&#xff0c;因为业务层和数据层的代码逻辑也想完全区分开。万一不想存储mysql&#xff0c;想存redis的话&#xff0c;就要改动大量业务代码。解耦合就是改起来很方便。 首先需要安装m…...

VS2022编译32位OpenCV

使用环境 Visual Studio 2022 OpenCV: 4.7.0 cmake: 3.30.2一、使用CMake工具生成vs2022的openCV工程解决方案 打开cmake&#xff0c;选择opencv的源代码目录&#xff0c;创建一个文件夹&#xff0c;作为VS工程文件的生成目录 点击configure构建项目&#xff0c;弹出构建设置…...

WP网站如何增加文章/页面的自定义模板

通过Wordpress我们后台在发布文章或者页面的时候其实可以看到有些主题 他有选择使用的页面模板&#xff0c;可以自定义模板&#xff0c;但是有些主题却没有选择主题这个功能&#xff0c;那这个自定义模板的功能是如何实现的呢&#xff1f;以下分两种情况&#xff1a;Page页面和…...

【Linux网络编程】简单的UDP网络程序

目录 一&#xff0c;socket编程的相关说明 1-1&#xff0c;sockaddr结构体 1-2&#xff0c;Socket API 二&#xff0c;基于Udp协议的简单通信 一&#xff0c;socket编程的相关说明 Socket编程是一种网络通信编程技术&#xff0c;它允许两个或多个程序在网络上相互通信&…...

LabVIEW中坐标排序与旋转 参见附件snippet程序

LabVIEW中坐标排序与旋转 参见附件snippet程序LabVIEW中坐标排序与旋转 参见附件snippet程序 - 北京瀚文网星科技有限公司 在LabVIEW中处理坐标排序的过程&#xff0c;尤其是按顺时针或逆时针排列坐标点&#xff0c;常见的应用包括处理几何形状、路径规划等任务。下面我将为您…...

SPIRiT-Diffusion:基于自一致性驱动的加速MRI扩散模型|文献速递-基于深度学习的病灶分割与数据超分辨率

Title 题目 SPIRiT-Diffusion: Self-Consistency Driven Diffusion Model for Accelerated MRI SPIRiT-Diffusion&#xff1a;基于自一致性驱动的加速MRI扩散模型 01 文献速递介绍 磁共振成像&#xff08;MRI&#xff09; 在临床和研究领域被广泛应用。然而&#xff0c;其…...

jwt封装教程

使用步骤&#xff1a; 1.导入jwt相关依赖 2.创建jwt工具类方便使用 3.通过工具类提供的方法进行生成jwt 4.通过工具类解析jwt令牌获取封装的数据 5.设定拦截器&#xff0c;每次执行请求的时候都需要验证token 6.注册拦截器 1.jwt依赖 <dependency><groupId>io.json…...

postman变量和脚本功能介绍

1、基本概念——global、collection、environment 在postman中&#xff0c;为了更好的管理各类变量、测试环境以及脚本等&#xff0c;创建了一些概念&#xff0c;包括&#xff1a;globals、collection、environment。其实在postman中&#xff0c;最上层还有一个Workspaces的概…...

【AI新领域应用】AlphaFold 2,原子级别精度的蛋白质3D结构预测,李沐论文精读(2021Nature封面,2024诺贝尔奖)

文章目录 AlphaFold 2 —— 原子级别精度的蛋白质3D结构预测背景&#xff08;2024诺奖与AI学习资料&#xff09;1、摘要、导论、写作技巧2、方案&#xff1a;模型&#xff0c;编码器&#xff0c;解码器3、实验&#xff1a;数据集&#xff0c;训练&#xff0c;结果 AlphaFold 2 …...

Figma汉化:提升设计效率,降低沟通成本

在UI设计领域&#xff0c;Figma因其强大的功能而广受欢迎&#xff0c;但全英文界面对于国内设计师来说是一个不小的挑战。幸运的是&#xff0c;通过Figma汉化插件&#xff0c;我们可以克服语言障碍。以下是两种获取和安装Figma汉化插件的方法&#xff0c;旨在帮助国内的UI设计师…...

前端知识点---this的用法 , this动态绑定(Javascript)

文章目录 this动态绑定 , this的用法01. 全局作用域下的 this02. 函数中的 this2.1 普通函数调用2.2 构造函数调用2.3 箭头函数中的 this 03对象方法调用04. 事件处理中的 this05. 动态绑定的方式5.1 call 方法5.2 apply 方法5.3 bind 方法 06类中的 this07. 总结 this动态绑定…...

web——upload-labs——第五关——大小写绕过绕过

先上传一个 先尝试直接上传一个普通的一句话木马 不行 可以看到&#xff0c;.htaccess文件也被过滤了&#xff0c;我们来查看一下源码 第五关的源码没有把字符强制转换为小写的语句&#xff1a; $file_ext strtolower($file_ext); //转换为小写 直接通过Burpsuite抓包修改文…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 &#xff08;一&#xff09;实时滤波与参数调整 基础滤波操作 60Hz 工频滤波&#xff1a;勾选界面右侧 “60Hz” 复选框&#xff0c;可有效抑制电网干扰&#xff08;适用于北美地区&#xff0c;欧洲用户可调整为 50Hz&#xff09;。 平滑处理&…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案

问题描述&#xff1a;iview使用table 中type: "index",分页之后 &#xff0c;索引还是从1开始&#xff0c;试过绑定后台返回数据的id, 这种方法可行&#xff0c;就是后台返回数据的每个页面id都不完全是按照从1开始的升序&#xff0c;因此百度了下&#xff0c;找到了…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...