【大数据之Flume】七、Flume进阶之自定义Sink
(1)概述:
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink
MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
configure(Context context)//初始化 context(读取配置文件内容)
process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。
(2)需求:
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
(3)分析:

步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。
package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}
(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。
# Name the components on this agent
#组件声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port =44444# Describe the sink
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)开启任务
bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console
(5)开启端口并发送消息
nc localhost 44444
(6)结果

相关文章:
【大数据之Flume】七、Flume进阶之自定义Sink
(1)概述: Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Chan…...
vue对于时间的处理
2023-08-05 11:25:45 假如这个就是我们要传的时间字符串 比如今天是2023-08-05(同一天):现在把这个时间字符串传入到 formatDate()这个方法,就会给你返回 11:25 比如今天是2023-08-06(前一天&a…...
Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug
近日,Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复,共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题: 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …...
科技云报道:一波未平一波又起?AI大模型再出邪恶攻击工具
AI大模型的快速向前奔跑,让我们见识到了AI的无限可能,但也展示了AI在虚假信息、深度伪造和网络攻击方面的潜在威胁。 据安全分析平台Netenrich报道,近日,一款名为FraudGPT的AI工具近期在暗网上流通,并被犯罪分子用于编…...
深度对话|如何设计合适的网络经济激励措施
近日,我们与Mysten Labs的首席经济学家Alonso de Gortari进行了对话,讨论了如何在网络运营商和参与者之间找到激励措施的平衡,以及Sui的经济如何不断发展。 是什么让您选择将自己的经济学背景应用于区块链和Web3领域? 起初&…...
opencv带GStreamer之Windows编译
目录 1、下载GStreamer和安装2. GSTReamer CMake配置3. 验证是否配置成功 1、下载GStreamer和安装 下载地址如下: gstreamer-1.0-msvc-x86_64-1.18.2.msi gstreamer-1.0-devel-msvc-x86_64-1.18.2.msi 安装目录无要求,主要是安装完设置环境变量 xxx\1…...
Java并发编程之锁的升级
Java 中的锁机制是多线程编程中的一部分。锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。 锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能…...
多核异构处理器A核与M核通信过程
多核异构处理器是指集成了不同类型或架构的CPU的系统级芯片(SoC)。 例如,有些处理器同时包含了高性能的A核(如Cortex-A)和低功耗的M核(如Cortex-M)。 这样的设计可以让不同的CPU负责不同的任务…...
面试热题(反转链表)
给你单链表的头指针 head 和两个整数 left 和 right ,其中 left < right 。请你反转从位置 left 到位置 right 的链表节点,返回 反转后的链表 。 链表的题,大部分都可以用指针或者递归可以做,指针如果做不出来的话,…...
竞赛项目 深度学习的水果识别 opencv python
文章目录 0 前言2 开发简介3 识别原理3.1 传统图像识别原理3.2 深度学习水果识别 4 数据集5 部分关键代码5.1 处理训练集的数据结构5.2 模型网络结构5.3 训练模型 6 识别效果7 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 深度学习…...
Java项目部署云windows细节
springboot项目 pom文件中必须要有这个插件(正常其实都有就是我手贱以前不小心删除了) 他的作用是查找主类 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-…...
软件功能测试有什么注意事项?功能测试报告起到什么作用?
软件功能测试是软件开发过程中至关重要的一环,它用于评估软件功能的质量和稳定性,并确保软件能够按照预期进行工作。然而,在进行功能测试时,有一些注意事项需要特别关注,以确保测试的准确性和有效性。 一、软件功能测…...
Kubernetes 调度 约束
调度约束 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作,保持数据同步的,每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件,向 APIServer 发送命令,在 Node 节点上面建立 Pod 和 Container。 APIServer…...
Grafana技术文档-概念-《十分钟扫盲》
Grafana官网链接 Grafana: The open observability platform | Grafana Labs 基本概念 Grafana是一个开源的度量分析和可视化套件,常用于对大量数据进行实时分析和可视化。以下是Grafana的基本概念: 数据源(Data Source)&#…...
【JavaEE进阶】Spring 更简单的读取和存储对象
文章目录 一. 存储Bean对象1. 配置扫描路径2. 添加注解存储 Bean 对象2.1 使用五大类注解存储Bean2.2 为什么要有五大类注解?2.3 有关获取Bean参数的命名规则 3. 使用方法注解储存 Bean 对象3.1 方法注解储存对象的用法3.2 Bean的重命名3.3 同⼀类型多个 Bean 报错 …...
KafKa集群搭建和知识点
一、KafKa概述 1.1 定义 KafKa是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据试试处理领域 是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统&a…...
剑指 Offer 56 - I. 数组中数字出现的次数题解
题目描述:剑指 Offer 56 - I. 数组中数字出现的次数 - 力扣(LeetCode) 一个整型数组 nums 里除两个数字之外,其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n),空间复杂度是O(1)。 示…...
CSDN付费专栏写作协议
一、总则 1.1、欢迎您选用CSDN付费专栏服务(“本服务”)。以下所述条款和条件即构成您与CSDN就使用本服务所达成的协议(“本协议)。本协议被视为《CSDN用户服务条款》(链接:https://passport.csdn.net/ser…...
[保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现
描述 将一个长度最多为30位数字的十进制非负整数转换为二进制数输出。 输入描述: 多组数据,每行为一个长度不超过30位的十进制非负整数。 (注意是10进制数字的个数可能有30个,而非30bits的整数) 输出描述ÿ…...
vue3多条件搜索功能
搜索功能在后台管理页面中非常常见,本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机
这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...
