当前位置: 首页 > news >正文

【大数据之Flume】七、Flume进阶之自定义Sink

(1)概述:
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

  Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

  Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

  自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

  MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

  实现相应方法:
    configure(Context context)//初始化 context(读取配置文件内容)
    process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

  适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。

(2)需求:
  使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

(3)分析:
在这里插入图片描述

步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。

package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event  = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}

(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。

# Name the components on this agent 
#组件声明
a1.sources = r1
a1.sinks = k1 
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port =44444# Describe the sink 
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console

(5)开启端口并发送消息

nc localhost 44444

(6)结果
在这里插入图片描述

相关文章:

【大数据之Flume】七、Flume进阶之自定义Sink

&#xff08;1&#xff09;概述&#xff1a;   Sink 不断地轮询 Channel 中的事件且批量地移除它们&#xff0c;并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前&#xff0c;每个 Sink 用 Chan…...

vue对于时间的处理

2023-08-05 11:25:45 假如这个就是我们要传的时间字符串 比如今天是2023-08-05&#xff08;同一天&#xff09;&#xff1a;现在把这个时间字符串传入到 formatDate&#xff08;&#xff09;这个方法&#xff0c;就会给你返回 11:25 比如今天是2023-08-06&#xff08;前一天&a…...

Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug

近日&#xff0c;Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复&#xff0c;共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题&#xff1a; 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …...

科技云报道:一波未平一波又起?AI大模型再出邪恶攻击工具

AI大模型的快速向前奔跑&#xff0c;让我们见识到了AI的无限可能&#xff0c;但也展示了AI在虚假信息、深度伪造和网络攻击方面的潜在威胁。 据安全分析平台Netenrich报道&#xff0c;近日&#xff0c;一款名为FraudGPT的AI工具近期在暗网上流通&#xff0c;并被犯罪分子用于编…...

深度对话|如何设计合适的网络经济激励措施

近日&#xff0c;我们与Mysten Labs的首席经济学家Alonso de Gortari进行了对话&#xff0c;讨论了如何在网络运营商和参与者之间找到激励措施的平衡&#xff0c;以及Sui的经济如何不断发展。 是什么让您选择将自己的经济学背景应用于区块链和Web3领域&#xff1f; 起初&…...

opencv带GStreamer之Windows编译

目录 1、下载GStreamer和安装2. GSTReamer CMake配置3. 验证是否配置成功 1、下载GStreamer和安装 下载地址如下&#xff1a; gstreamer-1.0-msvc-x86_64-1.18.2.msi gstreamer-1.0-devel-msvc-x86_64-1.18.2.msi 安装目录无要求&#xff0c;主要是安装完设置环境变量 xxx\1…...

Java并发编程之锁的升级

Java 中的锁机制是多线程编程中的一部分。锁一共有4种状态&#xff0c;级别从低到高依次是&#xff1a;无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态&#xff0c;这几个状态会随着竞争情况逐渐升级。 锁可以升级但不能降级&#xff0c;意味着偏向锁升级成轻量级锁后不能…...

多核异构处理器A核与M核通信过程

多核异构处理器是指集成了不同类型或架构的CPU的系统级芯片&#xff08;SoC&#xff09;。 例如&#xff0c;有些处理器同时包含了高性能的A核&#xff08;如Cortex-A&#xff09;和低功耗的M核&#xff08;如Cortex-M&#xff09;。 这样的设计可以让不同的CPU负责不同的任务…...

面试热题(反转链表)

给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 链表的题&#xff0c;大部分都可以用指针或者递归可以做&#xff0c;指针如果做不出来的话&#xff0c;…...

竞赛项目 深度学习的水果识别 opencv python

文章目录 0 前言2 开发简介3 识别原理3.1 传统图像识别原理3.2 深度学习水果识别 4 数据集5 部分关键代码5.1 处理训练集的数据结构5.2 模型网络结构5.3 训练模型 6 识别效果7 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习…...

Java项目部署云windows细节

springboot项目 pom文件中必须要有这个插件&#xff08;正常其实都有就是我手贱以前不小心删除了&#xff09; 他的作用是查找主类 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-…...

软件功能测试有什么注意事项?功能测试报告起到什么作用?

软件功能测试是软件开发过程中至关重要的一环&#xff0c;它用于评估软件功能的质量和稳定性&#xff0c;并确保软件能够按照预期进行工作。然而&#xff0c;在进行功能测试时&#xff0c;有一些注意事项需要特别关注&#xff0c;以确保测试的准确性和有效性。 一、软件功能测…...

Kubernetes 调度 约束

调度约束 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作&#xff0c;保持数据同步的&#xff0c;每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件&#xff0c;向 APIServer 发送命令&#xff0c;在 Node 节点上面建立 Pod 和 Container。 APIServer…...

Grafana技术文档-概念-《十分钟扫盲》

Grafana官网链接 Grafana: The open observability platform | Grafana Labs 基本概念 Grafana是一个开源的度量分析和可视化套件&#xff0c;常用于对大量数据进行实时分析和可视化。以下是Grafana的基本概念&#xff1a; 数据源&#xff08;Data Source&#xff09;&#…...

【JavaEE进阶】Spring 更简单的读取和存储对象

文章目录 一. 存储Bean对象1. 配置扫描路径2. 添加注解存储 Bean 对象2.1 使用五大类注解存储Bean2.2 为什么要有五大类注解&#xff1f;2.3 有关获取Bean参数的命名规则 3. 使用方法注解储存 Bean 对象3.1 方法注解储存对象的用法3.2 Bean的重命名3.3 同⼀类型多个 Bean 报错 …...

KafKa集群搭建和知识点

一、KafKa概述 1.1 定义 KafKa是一个分布式的基于发布/订阅模式的消息队列&#xff0c;主要应用于大数据试试处理领域 是一个分布式、支持分区的&#xff08;partition&#xff09;、多副本的&#xff08;replica&#xff09;&#xff0c;基于zookeeper协调的分布式消息系统&a…...

剑指 Offer 56 - I. 数组中数字出现的次数题解

题目描述&#xff1a;剑指 Offer 56 - I. 数组中数字出现的次数 - 力扣&#xff08;LeetCode&#xff09; 一个整型数组 nums 里除两个数字之外&#xff0c;其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n)&#xff0c;空间复杂度是O(1)。 示…...

CSDN付费专栏写作协议

一、总则 1.1、欢迎您选用CSDN付费专栏服务&#xff08;“本服务”&#xff09;。以下所述条款和条件即构成您与CSDN就使用本服务所达成的协议&#xff08;“本协议&#xff09;。本协议被视为《CSDN用户服务条款》&#xff08;链接&#xff1a;https://passport.csdn.net/ser…...

[保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现

描述 将一个长度最多为30位数字的十进制非负整数转换为二进制数输出。 输入描述&#xff1a; 多组数据&#xff0c;每行为一个长度不超过30位的十进制非负整数。 &#xff08;注意是10进制数字的个数可能有30个&#xff0c;而非30bits的整数&#xff09; 输出描述&#xff…...

vue3多条件搜索功能

搜索功能在后台管理页面中非常常见&#xff0c;本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

聊聊 Pulsar:Producer 源码解析

一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台&#xff0c;以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中&#xff0c;Producer&#xff08;生产者&#xff09; 是连接客户端应用与消息队列的第一步。生产者…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

mac:大模型系列测试

0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何&#xff0c;是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试&#xff0c;是可以跑通文章里面的代码。训练速度也是很快的。 注意…...