当前位置: 首页 > news >正文

【大数据之Flume】七、Flume进阶之自定义Sink

(1)概述:
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

  Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

  Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

  自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

  MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

  实现相应方法:
    configure(Context context)//初始化 context(读取配置文件内容)
    process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

  适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。

(2)需求:
  使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

(3)分析:
在这里插入图片描述

步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。

package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event  = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}

(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。

# Name the components on this agent 
#组件声明
a1.sources = r1
a1.sinks = k1 
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port =44444# Describe the sink 
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console

(5)开启端口并发送消息

nc localhost 44444

(6)结果
在这里插入图片描述

相关文章:

【大数据之Flume】七、Flume进阶之自定义Sink

&#xff08;1&#xff09;概述&#xff1a;   Sink 不断地轮询 Channel 中的事件且批量地移除它们&#xff0c;并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前&#xff0c;每个 Sink 用 Chan…...

vue对于时间的处理

2023-08-05 11:25:45 假如这个就是我们要传的时间字符串 比如今天是2023-08-05&#xff08;同一天&#xff09;&#xff1a;现在把这个时间字符串传入到 formatDate&#xff08;&#xff09;这个方法&#xff0c;就会给你返回 11:25 比如今天是2023-08-06&#xff08;前一天&a…...

Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug

近日&#xff0c;Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复&#xff0c;共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题&#xff1a; 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …...

科技云报道:一波未平一波又起?AI大模型再出邪恶攻击工具

AI大模型的快速向前奔跑&#xff0c;让我们见识到了AI的无限可能&#xff0c;但也展示了AI在虚假信息、深度伪造和网络攻击方面的潜在威胁。 据安全分析平台Netenrich报道&#xff0c;近日&#xff0c;一款名为FraudGPT的AI工具近期在暗网上流通&#xff0c;并被犯罪分子用于编…...

深度对话|如何设计合适的网络经济激励措施

近日&#xff0c;我们与Mysten Labs的首席经济学家Alonso de Gortari进行了对话&#xff0c;讨论了如何在网络运营商和参与者之间找到激励措施的平衡&#xff0c;以及Sui的经济如何不断发展。 是什么让您选择将自己的经济学背景应用于区块链和Web3领域&#xff1f; 起初&…...

opencv带GStreamer之Windows编译

目录 1、下载GStreamer和安装2. GSTReamer CMake配置3. 验证是否配置成功 1、下载GStreamer和安装 下载地址如下&#xff1a; gstreamer-1.0-msvc-x86_64-1.18.2.msi gstreamer-1.0-devel-msvc-x86_64-1.18.2.msi 安装目录无要求&#xff0c;主要是安装完设置环境变量 xxx\1…...

Java并发编程之锁的升级

Java 中的锁机制是多线程编程中的一部分。锁一共有4种状态&#xff0c;级别从低到高依次是&#xff1a;无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态&#xff0c;这几个状态会随着竞争情况逐渐升级。 锁可以升级但不能降级&#xff0c;意味着偏向锁升级成轻量级锁后不能…...

多核异构处理器A核与M核通信过程

多核异构处理器是指集成了不同类型或架构的CPU的系统级芯片&#xff08;SoC&#xff09;。 例如&#xff0c;有些处理器同时包含了高性能的A核&#xff08;如Cortex-A&#xff09;和低功耗的M核&#xff08;如Cortex-M&#xff09;。 这样的设计可以让不同的CPU负责不同的任务…...

面试热题(反转链表)

给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 链表的题&#xff0c;大部分都可以用指针或者递归可以做&#xff0c;指针如果做不出来的话&#xff0c;…...

竞赛项目 深度学习的水果识别 opencv python

文章目录 0 前言2 开发简介3 识别原理3.1 传统图像识别原理3.2 深度学习水果识别 4 数据集5 部分关键代码5.1 处理训练集的数据结构5.2 模型网络结构5.3 训练模型 6 识别效果7 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习…...

Java项目部署云windows细节

springboot项目 pom文件中必须要有这个插件&#xff08;正常其实都有就是我手贱以前不小心删除了&#xff09; 他的作用是查找主类 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-…...

软件功能测试有什么注意事项?功能测试报告起到什么作用?

软件功能测试是软件开发过程中至关重要的一环&#xff0c;它用于评估软件功能的质量和稳定性&#xff0c;并确保软件能够按照预期进行工作。然而&#xff0c;在进行功能测试时&#xff0c;有一些注意事项需要特别关注&#xff0c;以确保测试的准确性和有效性。 一、软件功能测…...

Kubernetes 调度 约束

调度约束 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作&#xff0c;保持数据同步的&#xff0c;每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件&#xff0c;向 APIServer 发送命令&#xff0c;在 Node 节点上面建立 Pod 和 Container。 APIServer…...

Grafana技术文档-概念-《十分钟扫盲》

Grafana官网链接 Grafana: The open observability platform | Grafana Labs 基本概念 Grafana是一个开源的度量分析和可视化套件&#xff0c;常用于对大量数据进行实时分析和可视化。以下是Grafana的基本概念&#xff1a; 数据源&#xff08;Data Source&#xff09;&#…...

【JavaEE进阶】Spring 更简单的读取和存储对象

文章目录 一. 存储Bean对象1. 配置扫描路径2. 添加注解存储 Bean 对象2.1 使用五大类注解存储Bean2.2 为什么要有五大类注解&#xff1f;2.3 有关获取Bean参数的命名规则 3. 使用方法注解储存 Bean 对象3.1 方法注解储存对象的用法3.2 Bean的重命名3.3 同⼀类型多个 Bean 报错 …...

KafKa集群搭建和知识点

一、KafKa概述 1.1 定义 KafKa是一个分布式的基于发布/订阅模式的消息队列&#xff0c;主要应用于大数据试试处理领域 是一个分布式、支持分区的&#xff08;partition&#xff09;、多副本的&#xff08;replica&#xff09;&#xff0c;基于zookeeper协调的分布式消息系统&a…...

剑指 Offer 56 - I. 数组中数字出现的次数题解

题目描述&#xff1a;剑指 Offer 56 - I. 数组中数字出现的次数 - 力扣&#xff08;LeetCode&#xff09; 一个整型数组 nums 里除两个数字之外&#xff0c;其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n)&#xff0c;空间复杂度是O(1)。 示…...

CSDN付费专栏写作协议

一、总则 1.1、欢迎您选用CSDN付费专栏服务&#xff08;“本服务”&#xff09;。以下所述条款和条件即构成您与CSDN就使用本服务所达成的协议&#xff08;“本协议&#xff09;。本协议被视为《CSDN用户服务条款》&#xff08;链接&#xff1a;https://passport.csdn.net/ser…...

[保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现

描述 将一个长度最多为30位数字的十进制非负整数转换为二进制数输出。 输入描述&#xff1a; 多组数据&#xff0c;每行为一个长度不超过30位的十进制非负整数。 &#xff08;注意是10进制数字的个数可能有30个&#xff0c;而非30bits的整数&#xff09; 输出描述&#xff…...

vue3多条件搜索功能

搜索功能在后台管理页面中非常常见&#xff0c;本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...

一些实用的chrome扩展0x01

简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序&#xff0c;无论是测试应用程序、搜寻漏洞还是收集情报&#xff0c;它们都能提升工作流程。 FoxyProxy 代理管理工具&#xff0c;此扩展简化了使用代理&#xff08;如 Burp…...

大数据驱动企业决策智能化的路径与实践

&#x1f4dd;个人主页&#x1f339;&#xff1a;慌ZHANG-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、引言&#xff1a;数据驱动的企业竞争力重构 在这个瞬息万变的商业时代&#xff0c;“快者胜”的竞争逻辑愈发明显。企业如何在复杂环…...

npm安装electron下载太慢,导致报错

npm安装electron下载太慢&#xff0c;导致报错 背景 想学习electron框架做个桌面应用&#xff0c;卡在了安装依赖&#xff08;无语了&#xff09;。。。一开始以为node版本或者npm版本太低问题&#xff0c;调整版本后还是报错。偶尔执行install命令后&#xff0c;可以开始下载…...

window 显示驱动开发-如何查询视频处理功能(三)

​D3DDDICAPS_GETPROCAMPRANGE请求类型 UMD 返回指向 DXVADDI_VALUERANGE 结构的指针&#xff0c;该结构包含特定视频流上特定 ProcAmp 控件属性允许的值范围。 Direct3D 运行时在D3DDDIARG_GETCAPS的 pInfo 成员指向的变量中为特定视频流的 ProcAmp 控件属性指定DXVADDI_QUER…...