离线数仓同步数据1
用户行为表数据同步
- 2.1.4 日志消费Flume测试
[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf
com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消费Flume配置实操
1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf
2)配置文件内容如下#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置优化
1)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2)HDFS Sink优化
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3)编写Flume拦截器
(1)数据漂移问题(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}
(3)重新打包(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。
2.1.4 日志消费Flume测试
1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh
5)观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 日志数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 日志数据flume-------"ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2启动
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop
相关文章:
离线数仓同步数据1
用户行为表数据同步 2.1.4 日志消费Flume测试 [gpbhadoop104 ~]$ cd /opt/module/flume/ [gpbhadoop104 flume]$ cd job/ [gpbhadoop104 job]$ rm file_to_kafka.confcom.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder #定义组件 a1.sourcesr1 a1.channelsc1…...

c语言开篇---跟着视频学C语言
标识符 标识符必须声明定义,可以是变量、函数或其他实体。 Int是标识符吗? 不是,int是c语言关键词,不是随意命名的 C语言关键词如下: 常量 不需要被声明,不能赋值更改。 printf函数 printf是由print打印…...
本地yum源-如学
学不学? 如学~ 到底学不学? 如学~ 学? 如学~ 配置本地的镜像yum 使用到的 rpm 包 是根据centos8 里面自带的 在 /dev/cdrom 中包含着 一些系统自带的 rpm # 先将 /dev/cdrom 设备进行挂载 mkdir /up # 在…...

【实训】“宅急送”订餐管理系统(程序设计综合能力实训)
👀樊梓慕:个人主页 🎥个人专栏:《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 🌝每一个不曾起舞的日子,都是对生命的辜负 前言 大一小学期,我迎来了人生中的第一次实训…...

openeuler上安装polarismesh集群
1、安装MySQL数据库 数据库连接地址10.10.10.168 用户root 密码123456 MySQL安装参考搭建DSS环境(六)之安装基础环境MySQL_linux安装dss_青春不流名的博客-CSDN博客 2、安装Redis集群 IPResid PORTSentinel PORTPASSWORDCluster NAME10.10.10.110637…...
Java基础——stream
流 stream是什么?stream优点stream和集合的区别stream的创建steam的操作从steam中取值 stream是什么? stream可以简化对集合的操作,具体操作由流内部实现,而无需用户自行实现过程 stream优点 对于以下ArrayList List<Strin…...
Spring Quartz 持久化解决方案
Quartz是实现了序列化接口的,包括接口,所以可以使用标准方式序列化到数据库。 而Spring2.5.6在集成Quartz时却未能考虑持久化问题。 Spring对JobDetail进行了封装,却未实现序列化接口,所以持久化的时候会产生NotSerializable问题&…...

基于Java+SpringBoot+Vue前后端分离火锅店管理系统设计和实现
博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…...

Unity——导航系统补充说明
一、导航系统补充说明 1、导航与动画 我们可以通过设置动画状态机的变量,让动画匹配由玩家直接控制的角色的移动。那么自动导航的角色如何与动画系统结合呢? 有两个常用的属性可以获得导航代理当前的状态: 一是agent.velocity,…...
nginx实现负载均衡load balance
目录 nginx实现负载均衡load balance相关算法负载均衡https的访问后端的real server是否知道真正访问的用户的IP地址健康检查提升负载均衡的并发数量七层负载均衡和四层负载均衡七层负载均衡四层负载均衡四层和七层的区别502错误 nginx实现负载均衡load balance 准备ÿ…...
淘宝订单接口:连接消费者与商家的桥梁
当我们谈论淘宝订单接口时,我们谈论的是淘宝网为卖家和买家提供的一个用于处理订单的核心系统。通过这个接口,卖家可以接收订单、处理订单状态,并更新买家和平台的状态信息;买家则可以实时追踪自己的订单状态,更好地掌…...

数据结构-第一期——数组(Python)
目录 00、前言: 01、一维数组 一维数组的定义和初始化 一维变长数组 一维正向遍历 一维反向遍历 一维数组的区间操作 竞赛小技巧:不用从a[0]开始,从a[1]开始 蓝桥杯真题练习1 读入一维数组 例题一 例题二 例题三 实战训…...

八 动手学深度学习v2 ——卷积神经网络之卷积+填充步幅+池化+LeNet
目录 1. 图像卷积总结2. 填充和步幅 padding和stride3. 多输入多输出通道4. 池化层5. LeNet 1. 图像卷积总结 二维卷积层的核心计算是二维互相关运算。最简单的形式是,对二维输入数据和卷积核执行互相关操作,然后添加一个偏置。核矩阵和偏移是可学习的参…...

SparkCore
第1章 RDD概述 1.1 什么是RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 RDD类比工厂生产。 …...

配置 Windows 系统环境变量
直接按键盘上面的 WINS 打开 Windows 搜索 搜索“编辑系统环境变量” 也可以右键此电脑->属性->高级系统设置打开相同的界面 点击环境变量 一般添加就是添加在框出的 Path 里面,双击可以看到现有的环境变量并进行编辑 例如我在博客中写把 Java 的 jdk 解压好…...
【计算机视觉】图片文件格式的讲解
文章目录 一、图片的压缩二、计算机表示颜色三、JPG和PNG3.1 JPG3.2 PNG 一、图片的压缩 图片文件格式有可能会对图片的文件大小进行不同程度的压缩,图片的压缩分为有损压缩和无损压缩两种。 有损压缩。指在压缩文件大小的过程中,损失了一部分图片的信…...

2023最全的性能测试种类介绍,这6个种类特别重要!
系统的性能是一个很大的概念,覆盖面非常广泛,包括执行效率、资源占用、系统稳定性、安全性、兼容性、可靠性、可扩展性等,性能测试就是描述测试对象与性能相关的特征并对其进行评价而实施的一类测试。 性能测试是一个统称,它其实包…...
代码随想录算法训练营19期第43天
1049. 最后一块石头的重量 II 视频讲解:动态规划之背包问题,这个背包最多能装多少?LeetCode:1049.最后一块石头的重量II_哔哩哔哩_bilibili 代码随想录 初步思路:动态规划。 总结:套用01背包 dp[j…...
微信小程序wx.previewImage实现图片预览
在微信小程序中,wx.previewImage函数用于预览图片,可以将一组图片以轮播的方式展示给用户,并支持用户手势操作进行切换。 使用wx.previewImage函数需要传入一个参数对象,该对象包含以下属性: current: String&#x…...

Java实现Modbus读写数据
背景 由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题: 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接,而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...

Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...

Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...