Flume 之自定义Sink
1、简介
前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。
2、自定义Source
自定义Source参考前文:https://blog.csdn.net/zwl2220943286/article/details/135633120
3、自定义Sink
本文将Sink定义为mysql。
3.1、引入依赖
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.11.0</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>
3.2、自定义Sink
3.2.1、Sink代码
import com.weilong.flumeselfdefinition.util.MysqlConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {private final static Logger log = LoggerFactory.getLogger(MySink.class);private String url;private String username;private String password;@Overridepublic Status process() throws EventDeliveryException {Status status = null;Channel channel = getChannel();// channel 支持事务Transaction thx = channel.getTransaction();thx.begin();try {Event event = channel.take();String name = new String(event.getBody());int i = MysqlConfig.insertData(this.url, this.username, this.password, name);if (i > 0){log.info("==插入数据库成功==");}thx.commit();status = Status.READY;} catch (Exception ex){ex.printStackTrace();}return status;}@Overridepublic void configure(Context context) {String url = context.getString("url");String username = context.getString("username");String password = context.getString("password");this.url = url;this.username = username;this.password = password;}
}
3.2.2、数据库连接配置:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class MysqlConfig {private MysqlConfig(){}static {try {Class.forName("com.mysql.cj.jdbc.Driver");}catch (Exception ex){ex.printStackTrace();}}public static Connection getConnection(String url, String username, String password) throws SQLException {Connection connection = DriverManager.getConnection(url, username, password);return connection;}public static int insertData(String url, String username,String password, String name){Connection connection = null;try{connection = getConnection(url, username, password);PreparedStatement preparedStatement = connection.prepareStatement("insert into test(`name`) values( '" + name + "')");boolean res = preparedStatement.execute();if (res){return 1;}return 0;}catch (Exception ex){ex.printStackTrace();}finally {if (connection != null){try {connection.close();}catch (Exception ex){ex.printStackTrace();}}}return 0;}
}
3.3、Flume 配置文件
vim flume-self-source-sink.conf
a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource
# 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.3:8088/hello
# 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 自定义Sink
a1.sinks.k1.type = com.weilong.flumeselfdefinition.MySink
a1.sinks.k1.url = jdbc:mysql://192.168.30.3:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
a1.sinks.k1.username = root
a1.sinks.k1.password = 146815
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、将jar包放入lib目录
4.1、将自定义jar包放入lib目录

4.2、将数据库驱动jar包放入lib目录
驱动jar包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

注:mysql 驱动jar包不放进lib,会出现驱动类找不到。
5、启动 Flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source-sink.conf -Dflume.root.logger=INFO,console
注:启动Flume 之前,自定义 web 服务也要启动。

6、结果
成功保存进数据库。

7、总结
本文结合前文完成 Flume 的 Source 和 Sink 的自定义,帮助大家能够完成各种场景下的Flume的使用。关于更高级Flume的知识,关注下面公众号。
本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

相关文章:
Flume 之自定义Sink
1、简介 前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。 2、自定…...
【1】SM4 CBC-MAC 机制
0x01 题目 MSG1: e55e3e24a3ae7797808fdca05a16ac15eb5fa2e6185c23a814a35ba32b4637c2 MAC1: 0712c867aa6ec7c1bb2b66312367b2c8 ----------------------------------------------------- MSG2: d8d94f33797e1f41cab9217793b2d0f02b93d46c2ead104dce4bfec453767719 MAC2: 4366…...
响应式编程Reactor API大全(下)
Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例: pom依赖 <dependencyMan…...
【STM32】HAL库的STOP低功耗模式UART串口唤醒,解决首字节出错的问题(全网第一解决方案)
【STM32】HAL库的STOP低功耗模式UART串口唤醒,解决首字节出错的问题(全网第一解决方案) 前文: 【STM32】HAL库的STOP低功耗模式UART串口唤醒,第一个接收字节出错的问题(疑难杂症) 目前已解决 …...
Python 语法糖
一、基本概念 语法糖,可以理解为:“甜蜜” 的便捷语法。 它是编程语言为程序提供的更简洁、更易读的语法实现的语法结构,它并不影响语言的功能,仅仅是一种更便捷的书写方式。 这就像你制作蛋糕时,使用现代烤箱而不是…...
一个小程序跳转到另一个小程序中如何实现
小程序 保证两个小程序是一样的主体才可以跳转。怎么知道是不是同样的主体呢? 小程序的后台管理-设置-基本设置-基本信息。查看主体信息。 跳转 <button clicktoOtherMini()>跳转到另一个小程序</button> function toOtherMini(){wx.navigateToMini…...
STM32+HAL库驱动ADXL345传感器(SPI协议)
STM32HAL库驱动ADXL345传感器(SPI协议) ADXL345传感器简介实物STM32CubeMX配置SPI配置片选引脚配置串口配置 特别注意(重点部分)核心代码效果展示 ADXL345传感器简介 ADXL345 是 ADI 公司推出的基于 iMEMS 技术的 3 轴、数字输出加…...
Redis实现全局唯一Id
一、全局唯一ID 每个店铺都可以发布优惠券: 当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题: id的规律性太明显 受单表数据量的限制 场景分析:如果我们的…...
【J-Flash基本使用总结】
【J-Flash基本使用总结】 VX:hao541022348 ■ 烧录文件■ 创建新的工程■ 烧录模式-SWD模式■ J-Flash下载程序到单片机 ■ J-Flash拼接多个hex或bin文件■ J-Flash读单片机的option byte■ J-Flash读单片机Flash数据■ 将读出来的文件用jflash烧录到其他的芯片■ 设…...
宝塔发布网站问题汇总和记录
1、添加网站站点后打不开 解决办法,关闭防跨站攻击2 2、laravel项目部署到linux的时候出现The stream or file "/home/www/storage/logs/laravel.log" could not be opened in append mode 给目录加权限 chmod -R 777 storage 3、Class "Redis"…...
决战排序之巅(二)
决战排序之巅(二) 排序测试函数 void verify(int* arr, int n) 归并排序递归方案代码可行性测试 非递归方案代码可行性测试 特点分析 计数排序代码实现代码可行性测试 特点分析 归并排序 VS 计数排序(Release版本)说明1w rand( ) …...
自动化网络监控:每分钟自动检测网站可用性
🧙♂️ 诸位好,吾乃诸葛妙计,编程界之翘楚,代码之大师。算法如流水,逻辑如棋局。 📜 吾之笔记,内含诸般技术之秘诀。吾欲以此笔记,传授编程之道,助汝解技术难题。 &…...
Asp .Net Core 系列:集成 Ocelot+Consul实现网关、服务注册、服务发现
什么是Ocelot? Ocelot是一个开源的ASP.NET Core微服务网关,它提供了API网关所需的所有功能,如路由、认证、限流、监控等。 Ocelot是一个简单、灵活且功能强大的API网关,它可以与现有的服务集成,并帮助您保护、监控和扩展您的微…...
MSSQL行转列、列转行
行转列 SELECT * FROM student PIVOT ( SUM(score) FOR subject IN (语文, 数学, 英语) ) AS PivotedData; 列转行 SELECT * FROM student1 UNPIVOT ( score FOR subject IN ("语文","数学","英语") )AS PivotedData;...
【MySQL】创建和管理表
文章目录 前置 标识符命名规则一、MySQL数据类型二、创建和管理数据库2.1 创建数据库2.2 使用数据库2.3 修改数据库2.4 删除数据库 三、创建表3.1 创建方式一3.2 创建方式二3.3 查看数据表结构 四、修改表4.1 增加一个列4.2 修改一个列4.3 重命名一个列4.4 删除一个列 五、重命…...
缓存和数据库一致性
前言: 项目的难点是如何保证缓存和数据库的一致性。无论我们是先更新数据库,后更新缓存还是先更新数据库,然后删除缓存,在并发场景之下,仍然会存在数据不一致的情况(也存在删除失败的情况,删除…...
iOS UI掉帧和卡顿优化解决方案记录
UI卡顿原理 在 VSync 信号到来后,系统图形服务会通过 CADisplayLink 等机制通知 App,App 主线程开始在 CPU 中计算显示内容,比如视图的创建、布局计算、图片解码、文本绘制等。随后 CPU 会将计算好的内容提交到 GPU 去,由 GPU 进行…...
transbigdata 笔记: 轨迹密集化/稀疏化 轨迹平滑
1 密集化 transbigdata.traj_densify(data, col[Vehicleid, Time, Lng, Lat], timegap15) 轨迹致密化,保证至多每隔timegap秒都有一个轨迹点 这边插补使用的是pandas的interpolate,method设置的是index 1.1 举例 transbigdata 笔记: 官方…...
反向代理的本质是什么?
反向代理是一种网络架构模式,通常用于提供静态内容、处理安全、负载均衡和缓存等任务。在这种架构中,客户端发送的请求首先到达反向代理服务器,然后由反向代理服务器将请求转发给后端的实际服务器。反向代理服务器可以处理和修改请求和响应&a…...
Kali Linux保姆级教程|零基础从入门到精通,看完这一篇就够了!(附工具包)
作为一名从事网络安全的技术人员,不懂Kali Linux的话,连脚本小子都算不上。 Kali Linux预装了数百种享誉盛名的渗透工具,使你可以更轻松地测试、破解以及进行与数字取证相关的任何其他工作。 今天给大家分享一套Kali Linux资料合集…...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
DBLP数据库是什么?
DBLP(Digital Bibliography & Library Project)Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高,数据库文献更新速度很快,很好地反映了国际计算机科学学术研…...
Linux 下 DMA 内存映射浅析
序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存,但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程,可以参考这篇文章,我觉得写的非常…...
【iOS】 Block再学习
iOS Block再学习 文章目录 iOS Block再学习前言Block的三种类型__ NSGlobalBlock____ NSMallocBlock____ NSStackBlock__小结 Block底层分析Block的结构捕获自由变量捕获全局(静态)变量捕获静态变量__block修饰符forwarding指针 Block的copy时机block作为函数返回值将block赋给…...
如何做好一份技术文档?从规划到实践的完整指南
如何做好一份技术文档?从规划到实践的完整指南 🌟 嗨,我是IRpickstars! 🌌 总有一行代码,能点亮万千星辰。 🔍 在技术的宇宙中,我愿做永不停歇的探索者。 ✨ 用代码丈量世界&…...
