当前位置: 首页 > news >正文

大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,consoleflume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency>
</dependencies>

编写代码

package icu.wzk;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 这里是逐条处理String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);// 获取Event的HeaderMap<String, String> headerMap = event.getHeaders();// 解析Body获取JSON字符串String[] bodyArr = eventBody.split("\\s+");try {String jsonStr = bodyArr[6];// 解析JSON字符串获取时间戳JSONObject jsonObject = JSON.parseObject(jsonStr);String timestampStr = jsonObject.getJSONObject("app_active").getString("time");// 将时间戳转换字符串 yyyy-MM-dd// 将字符串转换为Longlong timestampLong = Long.parseLong(timestampStr);DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");Instant instant = Instant.ofEpochMilli(timestampLong);LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());String date = formatter.format(localDateTime);// 将转换后的字符串放置header中headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "Unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> list) {List<Event> lstEvent = new ArrayList<>();for (Event event : list) {Event outEvent = intercept(event);if (outEvent != null) {lstEvent.add(outEvent);}}return lstEvent;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}}

相关文章:

大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…...

idea maven 重新构建索引

当设置maven仓库为离线模式的时候&#xff0c;会出现一些问题。 比如本地的仓库被各种方式手动更新之后&#xff0c; 举例&#xff1a;我需要一个spring的包&#xff0c;在pmo文件中写好了引入包的代码 但是由于是离线模式没有办法触发自动下载&#xff0c;那么这个时候我可以…...

C#桌面应用制作计算器

C#桌面应用制作简易计算器&#xff0c;可实现数字之间的加减乘除、AC按键清屏、Del按键清除末尾数字、/-按键取数字相反数、%按键使数字缩小100倍、按键显示运算结果等...... 页面实现效果 功能实现 布局 计算器主体使用Panel容器&#xff0c;然后将button控件排列放置Pane…...

细说STM32单片机DMA中断收发RTC实时时间并改善其鲁棒性的方法

目录 一、DMA基础知识 1、DMA简介 (1)DMA控制器 (2)DMA流 (3)DMA请求 (4)仲裁器 (5)DMA传输属性 2、源地址和目标地址 3、DMA传输模式 4、传输数据量的大小 5、数据宽度 6、地址指针递增 7、DMA工作模式 8、DMA流的优先级别 9、FIFO或直接模式 10、单次传输或突…...

【Unity/Animator动画系统】多层动画状态机实现角色的基本移动

文章目录 前言实现顶层地面状态四方向混合树计算动画所需参数 空中状态分层动画 前言 最近打算做个Rougelike RPG 塔科夫 混搭风格的冒险游戏。暂且就当是一个有随机元素&#xff0c;有基地&#xff0c;死亡会掉落物品的近战塔科夫罢。 花了三天时间&#xff0c;整合了Mixa…...

每日算法一练:剑指offer——栈与队列篇(1)

1.图书整理II 读者来到图书馆排队借还书&#xff0c;图书管理员使用两个书车来完成整理借还书的任务。书车中的书从下往上叠加存放&#xff0c;图书管理员每次只能拿取书车顶部的书。排队的读者会有两种操作&#xff1a; push(bookID)&#xff1a;把借阅的书籍还到图书馆。pop…...

【Java】ArrayList与LinkedList详解!!!

目录 一&#x1f31e;、List 1&#x1f345;.什么是List&#xff1f; 2&#x1f345;.List中的常用方法 二&#x1f31e;、ArrayList 1&#x1f34d;.什么是ArrayList? 2&#x1f34d;.ArrayList的实例化 3&#x1f34d;.ArrayList的使用 4&#x1f34d;.ArrayList的遍…...

怎么用VIM查看UVM源码

利用ctags工具可以建立源码的索引表&#xff0c;在使用VIM或其他文本编辑器时&#xff0c;就可以跳转查看所调用的UVM或VIP的funtcion/task/class等源码了。 首先需要确认ctags安装&#xff0c;一般安装VIM后都有&#xff0c;如果没有可以手动安装。在VIM中可以输入:help ctag…...

数据结构C语言描述3(图文结合)--双链表、循环链表、约瑟夫环问题

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…...

第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令

文章目录 第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令TCP 设备的 READ 命令READ 修改 $ZA 和 $ZB$ZA 和 READ 命令 第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令 TCP 设备的 READ 命令 从服务器或客户端发出 READ 命令以读取客户端或服务器设置的…...

【C++】哈希表的实现详解

哈希表的实现详解 一、哈希常识1.1、哈希概念1.2、哈希冲突1.3、哈希函数&#xff08;直接定执 除留余数&#xff09;1.4、哈希冲突解决闭散列&#xff08;线性探测 二次探测&#xff09;开散列 二、闭散列哈希表的模拟实现2.1、框架2.2、哈希节点状态的类2.3、哈希表的扩容2…...

高阶C语言之五:(数据)文件

目录 文件名 文件类型 文件指针 文件的打开和关闭 文件打开模式 文件操作函数&#xff08;顺序&#xff09; 0、“流” 1、字符输出函数fputc 2、字符输入函数fgetc 3、字符串输出函数fputs 4、 字符串输入函数fgets 5、格式化输入函数fscanf 6、格式化输出函数fpr…...

服务器上部署并启动 Go 语言框架 **GoZero** 的项目

要在服务器上部署并启动 Go 语言框架 **GoZero** 的项目&#xff0c;下面是一步步的操作指南&#xff1a; ### 1. 安装 Go 语言环境 首先&#xff0c;确保你的服务器上已安装 Go 语言。如果还没有安装&#xff0c;可以通过以下步骤进行安装&#xff1a; #### 1.1 安装 Go 语…...

【Java SE 】继承 与 多态 详解

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 继承 1.1 继承的原因 1.2 继承的概念 1.3 继承的语法 2. 子类访问父类 2.1 子类访问父类成员变量 2.1.1 子类与父类不存在同名成员变量 2.1.2 子类…...

【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法

【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法 目录 文章目录 【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法目录摘要&#xff1a;研究背景&#xff1a;问题与挑战&#xff1a;如何解…...

秋招大概到此结束了

1、背景 学院本&#xff0c;软工&#xff0c;秋招只有同程&#xff0c;快手和网易面试&#xff0c;后两家kpi&#xff08;因为面试就很水&#xff09;&#xff0c;秋招情况&#xff1a;哈啰&#xff08;实习转正ing&#xff09;&#xff0c;同程测开offer。 2、走测开的原因 很…...

华为OD机试真题---字符串化繁为简

华为OD机试真题中的“字符串化繁为简”题目是一个涉及字符串处理和等效关系传递的问题。以下是对该题目的详细解析&#xff1a; 一、题目描述 给定一个输入字符串&#xff0c;字符串只可能由英文字母&#xff08;a~z、A~Z&#xff09;和左右小括号&#xff08;(、)&#xff0…...

概念解读|K8s/容器云/裸金属/云原生...这些都有什么区别?

随着容器技术的日渐成熟&#xff0c;不少企业用户都对应用系统开展了容器化改造。而在容器基础架构层面&#xff0c;很多运维人员都更熟悉虚拟化环境&#xff0c;对“容器圈”的各种概念容易混淆&#xff1a;容器就是 Kubernetes 吗&#xff1f;容器云又是什么&#xff1f;容器…...

初识Arkts

创建对象&#xff1a; 类&#xff1a; 类声明引入一个新类型&#xff0c;并定义其字段、方法和构造函数。 定义类后&#xff0c;可以使用关键字new创建实例 可以使用对象字面量创建实例 在以下示例中&#xff0c;定义了Person类&#xff0c;该类具有字段name和surname、构造函…...

基本的SELECT语句

1.SQL概述 SQL&#xff08;Structured Query Language&#xff09;是一种用于管理和操作关系数据库的编程语言。它是一种标准化的语言&#xff0c;用于执行各种数据库操作&#xff0c;包括创建、查询、插入、更新和删除数据等。 SQL语言具有简单、易学、高效的特点&#xff0c;…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

【 java 虚拟机知识 第一篇 】

目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...

Linux 下 DMA 内存映射浅析

序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存&#xff0c;但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程&#xff0c;可以参考这篇文章&#xff0c;我觉得写的非常…...

基于stm32F10x 系列微控制器的智能电子琴(附完整项目源码、详细接线及讲解视频)

注&#xff1a;文章末尾网盘链接中自取成品使用演示视频、项目源码、项目文档 所用硬件&#xff1a;STM32F103C8T6、无源蜂鸣器、44矩阵键盘、flash存储模块、OLED显示屏、RGB三色灯、面包板、杜邦线、usb转ttl串口 stm32f103c8t6 面包板 …...