当前位置: 首页 > news >正文

离线数仓同步数据1

用户行为表数据同步

  • 2.1.4 日志消费Flume测试

[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf

com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消费Flume配置实操
1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 
2)配置文件内容如下#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置优化
1)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2HDFS Sink优化
(1HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。(2HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3)编写Flume拦截器
(1)数据漂移问题(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}3)重新打包(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。

2.1.4 日志消费Flume测试

1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh 
5)观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 日志数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 日志数据flume-------"ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2启动
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop

相关文章:

离线数仓同步数据1

用户行为表数据同步 2.1.4 日志消费Flume测试 [gpbhadoop104 ~]$ cd /opt/module/flume/ [gpbhadoop104 flume]$ cd job/ [gpbhadoop104 job]$ rm file_to_kafka.confcom.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder #定义组件 a1.sourcesr1 a1.channelsc1…...

c语言开篇---跟着视频学C语言

标识符 标识符必须声明定义&#xff0c;可以是变量、函数或其他实体。 Int是标识符吗&#xff1f; 不是&#xff0c;int是c语言关键词&#xff0c;不是随意命名的 C语言关键词如下&#xff1a; 常量 不需要被声明&#xff0c;不能赋值更改。 printf函数 printf是由print打印…...

本地yum源-如学

学不学&#xff1f; 如学&#xff5e; 到底学不学&#xff1f; 如学&#xff5e; 学&#xff1f; 如学&#xff5e; 配置本地的镜像yum 使用到的 rpm 包 是根据centos8 里面自带的 在 /dev/cdrom 中包含着 一些系统自带的 rpm # 先将 /dev/cdrom 设备进行挂载 mkdir /up # 在…...

【实训】“宅急送”订餐管理系统(程序设计综合能力实训)

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 前言 大一小学期&#xff0c;我迎来了人生中的第一次实训…...

openeuler上安装polarismesh集群

1、安装MySQL数据库 数据库连接地址10.10.10.168 用户root 密码123456 MySQL安装参考搭建DSS环境&#xff08;六&#xff09;之安装基础环境MySQL_linux安装dss_青春不流名的博客-CSDN博客 2、安装Redis集群 IPResid PORTSentinel PORTPASSWORDCluster NAME10.10.10.110637…...

Java基础——stream

流 stream是什么&#xff1f;stream优点stream和集合的区别stream的创建steam的操作从steam中取值 stream是什么&#xff1f; stream可以简化对集合的操作&#xff0c;具体操作由流内部实现&#xff0c;而无需用户自行实现过程 stream优点 对于以下ArrayList List<Strin…...

Spring Quartz 持久化解决方案

Quartz是实现了序列化接口的&#xff0c;包括接口&#xff0c;所以可以使用标准方式序列化到数据库。 而Spring2.5.6在集成Quartz时却未能考虑持久化问题。 Spring对JobDetail进行了封装&#xff0c;却未实现序列化接口&#xff0c;所以持久化的时候会产生NotSerializable问题&…...

基于Java+SpringBoot+Vue前后端分离火锅店管理系统设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…...

Unity——导航系统补充说明

一、导航系统补充说明 1、导航与动画 我们可以通过设置动画状态机的变量&#xff0c;让动画匹配由玩家直接控制的角色的移动。那么自动导航的角色如何与动画系统结合呢&#xff1f; 有两个常用的属性可以获得导航代理当前的状态&#xff1a; 一是agent.velocity&#xff0c;…...

nginx实现负载均衡load balance

目录 nginx实现负载均衡load balance相关算法负载均衡https的访问后端的real server是否知道真正访问的用户的IP地址健康检查提升负载均衡的并发数量七层负载均衡和四层负载均衡七层负载均衡四层负载均衡四层和七层的区别502错误 nginx实现负载均衡load balance 准备&#xff…...

淘宝订单接口:连接消费者与商家的桥梁

当我们谈论淘宝订单接口时&#xff0c;我们谈论的是淘宝网为卖家和买家提供的一个用于处理订单的核心系统。通过这个接口&#xff0c;卖家可以接收订单、处理订单状态&#xff0c;并更新买家和平台的状态信息&#xff1b;买家则可以实时追踪自己的订单状态&#xff0c;更好地掌…...

数据结构-第一期——数组(Python)

目录 00、前言&#xff1a; 01、一维数组 一维数组的定义和初始化 一维变长数组 一维正向遍历 一维反向遍历 一维数组的区间操作 竞赛小技巧&#xff1a;不用从a[0]开始&#xff0c;从a[1]开始 蓝桥杯真题练习1 读入一维数组 例题一 例题二​ 例题三 实战训…...

八 动手学深度学习v2 ——卷积神经网络之卷积+填充步幅+池化+LeNet

目录 1. 图像卷积总结2. 填充和步幅 padding和stride3. 多输入多输出通道4. 池化层5. LeNet 1. 图像卷积总结 二维卷积层的核心计算是二维互相关运算。最简单的形式是&#xff0c;对二维输入数据和卷积核执行互相关操作&#xff0c;然后添加一个偏置。核矩阵和偏移是可学习的参…...

SparkCore

第1章 RDD概述 1.1 什么是RDD RDD&#xff08;Resilient Distributed Dataset&#xff09;叫做弹性分布式数据集&#xff0c;是Spark中最基本的数据抽象。代码中是一个抽象类&#xff0c;它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 RDD类比工厂生产。 …...

配置 Windows 系统环境变量

直接按键盘上面的 WINS 打开 Windows 搜索 搜索“编辑系统环境变量” 也可以右键此电脑->属性->高级系统设置打开相同的界面 点击环境变量 一般添加就是添加在框出的 Path 里面&#xff0c;双击可以看到现有的环境变量并进行编辑 例如我在博客中写把 Java 的 jdk 解压好…...

【计算机视觉】图片文件格式的讲解

文章目录 一、图片的压缩二、计算机表示颜色三、JPG和PNG3.1 JPG3.2 PNG 一、图片的压缩 图片文件格式有可能会对图片的文件大小进行不同程度的压缩&#xff0c;图片的压缩分为有损压缩和无损压缩两种。 有损压缩。指在压缩文件大小的过程中&#xff0c;损失了一部分图片的信…...

2023最全的性能测试种类介绍,这6个种类特别重要!

系统的性能是一个很大的概念&#xff0c;覆盖面非常广泛&#xff0c;包括执行效率、资源占用、系统稳定性、安全性、兼容性、可靠性、可扩展性等&#xff0c;性能测试就是描述测试对象与性能相关的特征并对其进行评价而实施的一类测试。 性能测试是一个统称&#xff0c;它其实包…...

代码随想录算法训练营19期第43天

1049. 最后一块石头的重量 II 视频讲解&#xff1a;动态规划之背包问题&#xff0c;这个背包最多能装多少&#xff1f;LeetCode&#xff1a;1049.最后一块石头的重量II_哔哩哔哩_bilibili 代码随想录 初步思路&#xff1a;动态规划。 总结&#xff1a;套用01背包 dp[j…...

微信小程序wx.previewImage实现图片预览

在微信小程序中&#xff0c;wx.previewImage函数用于预览图片&#xff0c;可以将一组图片以轮播的方式展示给用户&#xff0c;并支持用户手势操作进行切换。 使用wx.previewImage函数需要传入一个参数对象&#xff0c;该对象包含以下属性&#xff1a; current: String&#x…...

Java实现Modbus读写数据

背景 由于当时项目周期赶&#xff0c;引入了一个PLC4X组件&#xff0c;上手快。接下来就是使用这个组件遇到的一些问题&#xff1a; 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接&#xff0c;而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机&#xff0c;点进去 点进去 查看URL&#xff0c;有 ?fileflag.php说明存在文件包含&#xff0c;原理是php://filter 协议 当它与包含函数结合时&#xff0c;php://filter流会被当作php文件执行。 用php://filter加编码&#xff0c;能让PHP把文件内容…...

车载诊断架构 --- ZEVonUDS(J1979-3)简介第一篇

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…...

HTTPS证书一年多少钱?

HTTPS证书作为保障网站数据传输安全的重要工具&#xff0c;成为众多网站运营者的必备选择。然而&#xff0c;面对市场上种类繁多的HTTPS证书&#xff0c;其一年费用究竟是多少&#xff0c;又受哪些因素影响呢&#xff1f; 首先&#xff0c;HTTPS证书通常在PinTrust这样的专业平…...