当前位置: 首页 > news >正文

【大数据之Flume】七、Flume进阶之自定义Sink

(1)概述:
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

  Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

  Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

  自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

  MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

  实现相应方法:
    configure(Context context)//初始化 context(读取配置文件内容)
    process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

  适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。

(2)需求:
  使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

(3)分析:
在这里插入图片描述

步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。

package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event  = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}

(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。

# Name the components on this agent 
#组件声明
a1.sources = r1
a1.sinks = k1 
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port =44444# Describe the sink 
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console

(5)开启端口并发送消息

nc localhost 44444

(6)结果
在这里插入图片描述

相关文章:

【大数据之Flume】七、Flume进阶之自定义Sink

&#xff08;1&#xff09;概述&#xff1a;   Sink 不断地轮询 Channel 中的事件且批量地移除它们&#xff0c;并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前&#xff0c;每个 Sink 用 Chan…...

vue对于时间的处理

2023-08-05 11:25:45 假如这个就是我们要传的时间字符串 比如今天是2023-08-05&#xff08;同一天&#xff09;&#xff1a;现在把这个时间字符串传入到 formatDate&#xff08;&#xff09;这个方法&#xff0c;就会给你返回 11:25 比如今天是2023-08-06&#xff08;前一天&a…...

Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug

近日&#xff0c;Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复&#xff0c;共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题&#xff1a; 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …...

科技云报道:一波未平一波又起?AI大模型再出邪恶攻击工具

AI大模型的快速向前奔跑&#xff0c;让我们见识到了AI的无限可能&#xff0c;但也展示了AI在虚假信息、深度伪造和网络攻击方面的潜在威胁。 据安全分析平台Netenrich报道&#xff0c;近日&#xff0c;一款名为FraudGPT的AI工具近期在暗网上流通&#xff0c;并被犯罪分子用于编…...

深度对话|如何设计合适的网络经济激励措施

近日&#xff0c;我们与Mysten Labs的首席经济学家Alonso de Gortari进行了对话&#xff0c;讨论了如何在网络运营商和参与者之间找到激励措施的平衡&#xff0c;以及Sui的经济如何不断发展。 是什么让您选择将自己的经济学背景应用于区块链和Web3领域&#xff1f; 起初&…...

opencv带GStreamer之Windows编译

目录 1、下载GStreamer和安装2. GSTReamer CMake配置3. 验证是否配置成功 1、下载GStreamer和安装 下载地址如下&#xff1a; gstreamer-1.0-msvc-x86_64-1.18.2.msi gstreamer-1.0-devel-msvc-x86_64-1.18.2.msi 安装目录无要求&#xff0c;主要是安装完设置环境变量 xxx\1…...

Java并发编程之锁的升级

Java 中的锁机制是多线程编程中的一部分。锁一共有4种状态&#xff0c;级别从低到高依次是&#xff1a;无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态&#xff0c;这几个状态会随着竞争情况逐渐升级。 锁可以升级但不能降级&#xff0c;意味着偏向锁升级成轻量级锁后不能…...

多核异构处理器A核与M核通信过程

多核异构处理器是指集成了不同类型或架构的CPU的系统级芯片&#xff08;SoC&#xff09;。 例如&#xff0c;有些处理器同时包含了高性能的A核&#xff08;如Cortex-A&#xff09;和低功耗的M核&#xff08;如Cortex-M&#xff09;。 这样的设计可以让不同的CPU负责不同的任务…...

面试热题(反转链表)

给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 链表的题&#xff0c;大部分都可以用指针或者递归可以做&#xff0c;指针如果做不出来的话&#xff0c;…...

竞赛项目 深度学习的水果识别 opencv python

文章目录 0 前言2 开发简介3 识别原理3.1 传统图像识别原理3.2 深度学习水果识别 4 数据集5 部分关键代码5.1 处理训练集的数据结构5.2 模型网络结构5.3 训练模型 6 识别效果7 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习…...

Java项目部署云windows细节

springboot项目 pom文件中必须要有这个插件&#xff08;正常其实都有就是我手贱以前不小心删除了&#xff09; 他的作用是查找主类 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-…...

软件功能测试有什么注意事项?功能测试报告起到什么作用?

软件功能测试是软件开发过程中至关重要的一环&#xff0c;它用于评估软件功能的质量和稳定性&#xff0c;并确保软件能够按照预期进行工作。然而&#xff0c;在进行功能测试时&#xff0c;有一些注意事项需要特别关注&#xff0c;以确保测试的准确性和有效性。 一、软件功能测…...

Kubernetes 调度 约束

调度约束 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作&#xff0c;保持数据同步的&#xff0c;每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件&#xff0c;向 APIServer 发送命令&#xff0c;在 Node 节点上面建立 Pod 和 Container。 APIServer…...

Grafana技术文档-概念-《十分钟扫盲》

Grafana官网链接 Grafana: The open observability platform | Grafana Labs 基本概念 Grafana是一个开源的度量分析和可视化套件&#xff0c;常用于对大量数据进行实时分析和可视化。以下是Grafana的基本概念&#xff1a; 数据源&#xff08;Data Source&#xff09;&#…...

【JavaEE进阶】Spring 更简单的读取和存储对象

文章目录 一. 存储Bean对象1. 配置扫描路径2. 添加注解存储 Bean 对象2.1 使用五大类注解存储Bean2.2 为什么要有五大类注解&#xff1f;2.3 有关获取Bean参数的命名规则 3. 使用方法注解储存 Bean 对象3.1 方法注解储存对象的用法3.2 Bean的重命名3.3 同⼀类型多个 Bean 报错 …...

KafKa集群搭建和知识点

一、KafKa概述 1.1 定义 KafKa是一个分布式的基于发布/订阅模式的消息队列&#xff0c;主要应用于大数据试试处理领域 是一个分布式、支持分区的&#xff08;partition&#xff09;、多副本的&#xff08;replica&#xff09;&#xff0c;基于zookeeper协调的分布式消息系统&a…...

剑指 Offer 56 - I. 数组中数字出现的次数题解

题目描述&#xff1a;剑指 Offer 56 - I. 数组中数字出现的次数 - 力扣&#xff08;LeetCode&#xff09; 一个整型数组 nums 里除两个数字之外&#xff0c;其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n)&#xff0c;空间复杂度是O(1)。 示…...

CSDN付费专栏写作协议

一、总则 1.1、欢迎您选用CSDN付费专栏服务&#xff08;“本服务”&#xff09;。以下所述条款和条件即构成您与CSDN就使用本服务所达成的协议&#xff08;“本协议&#xff09;。本协议被视为《CSDN用户服务条款》&#xff08;链接&#xff1a;https://passport.csdn.net/ser…...

[保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现

描述 将一个长度最多为30位数字的十进制非负整数转换为二进制数输出。 输入描述&#xff1a; 多组数据&#xff0c;每行为一个长度不超过30位的十进制非负整数。 &#xff08;注意是10进制数字的个数可能有30个&#xff0c;而非30bits的整数&#xff09; 输出描述&#xff…...

vue3多条件搜索功能

搜索功能在后台管理页面中非常常见&#xff0c;本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...

Xela矩阵三轴触觉传感器的工作原理解析与应用场景

Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知&#xff0c;帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量&#xff0c;能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度&#xff0c;还为机器人、医疗设备和制造业的智…...