Flume 之自定义Sink
1、简介
前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。
2、自定义Source
自定义Source参考前文:https://blog.csdn.net/zwl2220943286/article/details/135633120
3、自定义Sink
本文将Sink定义为mysql。
3.1、引入依赖
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.11.0</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>
3.2、自定义Sink
3.2.1、Sink代码
import com.weilong.flumeselfdefinition.util.MysqlConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {private final static Logger log = LoggerFactory.getLogger(MySink.class);private String url;private String username;private String password;@Overridepublic Status process() throws EventDeliveryException {Status status = null;Channel channel = getChannel();// channel 支持事务Transaction thx = channel.getTransaction();thx.begin();try {Event event = channel.take();String name = new String(event.getBody());int i = MysqlConfig.insertData(this.url, this.username, this.password, name);if (i > 0){log.info("==插入数据库成功==");}thx.commit();status = Status.READY;} catch (Exception ex){ex.printStackTrace();}return status;}@Overridepublic void configure(Context context) {String url = context.getString("url");String username = context.getString("username");String password = context.getString("password");this.url = url;this.username = username;this.password = password;}
}
3.2.2、数据库连接配置:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class MysqlConfig {private MysqlConfig(){}static {try {Class.forName("com.mysql.cj.jdbc.Driver");}catch (Exception ex){ex.printStackTrace();}}public static Connection getConnection(String url, String username, String password) throws SQLException {Connection connection = DriverManager.getConnection(url, username, password);return connection;}public static int insertData(String url, String username,String password, String name){Connection connection = null;try{connection = getConnection(url, username, password);PreparedStatement preparedStatement = connection.prepareStatement("insert into test(`name`) values( '" + name + "')");boolean res = preparedStatement.execute();if (res){return 1;}return 0;}catch (Exception ex){ex.printStackTrace();}finally {if (connection != null){try {connection.close();}catch (Exception ex){ex.printStackTrace();}}}return 0;}
}
3.3、Flume 配置文件
vim flume-self-source-sink.conf
a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource
# 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.3:8088/hello
# 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 自定义Sink
a1.sinks.k1.type = com.weilong.flumeselfdefinition.MySink
a1.sinks.k1.url = jdbc:mysql://192.168.30.3:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
a1.sinks.k1.username = root
a1.sinks.k1.password = 146815
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、将jar包放入lib目录
4.1、将自定义jar包放入lib目录

4.2、将数据库驱动jar包放入lib目录
驱动jar包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

注:mysql 驱动jar包不放进lib,会出现驱动类找不到。
5、启动 Flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source-sink.conf -Dflume.root.logger=INFO,console
注:启动Flume 之前,自定义 web 服务也要启动。

6、结果
成功保存进数据库。

7、总结
本文结合前文完成 Flume 的 Source 和 Sink 的自定义,帮助大家能够完成各种场景下的Flume的使用。关于更高级Flume的知识,关注下面公众号。
本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

相关文章:
Flume 之自定义Sink
1、简介 前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。 2、自定…...
【1】SM4 CBC-MAC 机制
0x01 题目 MSG1: e55e3e24a3ae7797808fdca05a16ac15eb5fa2e6185c23a814a35ba32b4637c2 MAC1: 0712c867aa6ec7c1bb2b66312367b2c8 ----------------------------------------------------- MSG2: d8d94f33797e1f41cab9217793b2d0f02b93d46c2ead104dce4bfec453767719 MAC2: 4366…...
响应式编程Reactor API大全(下)
Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例: pom依赖 <dependencyMan…...
【STM32】HAL库的STOP低功耗模式UART串口唤醒,解决首字节出错的问题(全网第一解决方案)
【STM32】HAL库的STOP低功耗模式UART串口唤醒,解决首字节出错的问题(全网第一解决方案) 前文: 【STM32】HAL库的STOP低功耗模式UART串口唤醒,第一个接收字节出错的问题(疑难杂症) 目前已解决 …...
Python 语法糖
一、基本概念 语法糖,可以理解为:“甜蜜” 的便捷语法。 它是编程语言为程序提供的更简洁、更易读的语法实现的语法结构,它并不影响语言的功能,仅仅是一种更便捷的书写方式。 这就像你制作蛋糕时,使用现代烤箱而不是…...
一个小程序跳转到另一个小程序中如何实现
小程序 保证两个小程序是一样的主体才可以跳转。怎么知道是不是同样的主体呢? 小程序的后台管理-设置-基本设置-基本信息。查看主体信息。 跳转 <button clicktoOtherMini()>跳转到另一个小程序</button> function toOtherMini(){wx.navigateToMini…...
STM32+HAL库驱动ADXL345传感器(SPI协议)
STM32HAL库驱动ADXL345传感器(SPI协议) ADXL345传感器简介实物STM32CubeMX配置SPI配置片选引脚配置串口配置 特别注意(重点部分)核心代码效果展示 ADXL345传感器简介 ADXL345 是 ADI 公司推出的基于 iMEMS 技术的 3 轴、数字输出加…...
Redis实现全局唯一Id
一、全局唯一ID 每个店铺都可以发布优惠券: 当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题: id的规律性太明显 受单表数据量的限制 场景分析:如果我们的…...
【J-Flash基本使用总结】
【J-Flash基本使用总结】 VX:hao541022348 ■ 烧录文件■ 创建新的工程■ 烧录模式-SWD模式■ J-Flash下载程序到单片机 ■ J-Flash拼接多个hex或bin文件■ J-Flash读单片机的option byte■ J-Flash读单片机Flash数据■ 将读出来的文件用jflash烧录到其他的芯片■ 设…...
宝塔发布网站问题汇总和记录
1、添加网站站点后打不开 解决办法,关闭防跨站攻击2 2、laravel项目部署到linux的时候出现The stream or file "/home/www/storage/logs/laravel.log" could not be opened in append mode 给目录加权限 chmod -R 777 storage 3、Class "Redis"…...
决战排序之巅(二)
决战排序之巅(二) 排序测试函数 void verify(int* arr, int n) 归并排序递归方案代码可行性测试 非递归方案代码可行性测试 特点分析 计数排序代码实现代码可行性测试 特点分析 归并排序 VS 计数排序(Release版本)说明1w rand( ) …...
自动化网络监控:每分钟自动检测网站可用性
🧙♂️ 诸位好,吾乃诸葛妙计,编程界之翘楚,代码之大师。算法如流水,逻辑如棋局。 📜 吾之笔记,内含诸般技术之秘诀。吾欲以此笔记,传授编程之道,助汝解技术难题。 &…...
Asp .Net Core 系列:集成 Ocelot+Consul实现网关、服务注册、服务发现
什么是Ocelot? Ocelot是一个开源的ASP.NET Core微服务网关,它提供了API网关所需的所有功能,如路由、认证、限流、监控等。 Ocelot是一个简单、灵活且功能强大的API网关,它可以与现有的服务集成,并帮助您保护、监控和扩展您的微…...
MSSQL行转列、列转行
行转列 SELECT * FROM student PIVOT ( SUM(score) FOR subject IN (语文, 数学, 英语) ) AS PivotedData; 列转行 SELECT * FROM student1 UNPIVOT ( score FOR subject IN ("语文","数学","英语") )AS PivotedData;...
【MySQL】创建和管理表
文章目录 前置 标识符命名规则一、MySQL数据类型二、创建和管理数据库2.1 创建数据库2.2 使用数据库2.3 修改数据库2.4 删除数据库 三、创建表3.1 创建方式一3.2 创建方式二3.3 查看数据表结构 四、修改表4.1 增加一个列4.2 修改一个列4.3 重命名一个列4.4 删除一个列 五、重命…...
缓存和数据库一致性
前言: 项目的难点是如何保证缓存和数据库的一致性。无论我们是先更新数据库,后更新缓存还是先更新数据库,然后删除缓存,在并发场景之下,仍然会存在数据不一致的情况(也存在删除失败的情况,删除…...
iOS UI掉帧和卡顿优化解决方案记录
UI卡顿原理 在 VSync 信号到来后,系统图形服务会通过 CADisplayLink 等机制通知 App,App 主线程开始在 CPU 中计算显示内容,比如视图的创建、布局计算、图片解码、文本绘制等。随后 CPU 会将计算好的内容提交到 GPU 去,由 GPU 进行…...
transbigdata 笔记: 轨迹密集化/稀疏化 轨迹平滑
1 密集化 transbigdata.traj_densify(data, col[Vehicleid, Time, Lng, Lat], timegap15) 轨迹致密化,保证至多每隔timegap秒都有一个轨迹点 这边插补使用的是pandas的interpolate,method设置的是index 1.1 举例 transbigdata 笔记: 官方…...
反向代理的本质是什么?
反向代理是一种网络架构模式,通常用于提供静态内容、处理安全、负载均衡和缓存等任务。在这种架构中,客户端发送的请求首先到达反向代理服务器,然后由反向代理服务器将请求转发给后端的实际服务器。反向代理服务器可以处理和修改请求和响应&a…...
Kali Linux保姆级教程|零基础从入门到精通,看完这一篇就够了!(附工具包)
作为一名从事网络安全的技术人员,不懂Kali Linux的话,连脚本小子都算不上。 Kali Linux预装了数百种享誉盛名的渗透工具,使你可以更轻松地测试、破解以及进行与数字取证相关的任何其他工作。 今天给大家分享一套Kali Linux资料合集…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
安卓基础(aar)
重新设置java21的环境,临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的: MyApp/ ├── app/ …...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
《C++ 模板》
目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板,就像一个模具,里面可以将不同类型的材料做成一个形状,其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式:templa…...
