大数据项目实战之数据仓库:用户行为采集平台——第4章 用户行为数据采集模块
第4章 用户行为数据采集模块
4.1 数据通道

4.2 环境准备
4.2.1 集群所有进程查看脚本
1)在/home/atguigu/bin目录下创建脚本xcall
[atguigu@hadoop102 bin]$ vim xcall
2)在脚本中编写如下内容
#! /bin/bashfor i in hadoop102 hadoop103 hadoop104
doecho --------- $i ----------ssh $i "$*"
done
3)修改脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 xcall
4)启动脚本
[atguigu@hadoop102 bin]$ xcall jps
4.2.2 Hadoop安装
1)安装步骤
详见:尚硅谷大数据技术之Hadoop(入门)
(1)集群规划
| hadoop102 | hadoop103 | hadoop104 | |
|---|---|---|---|
| HDFS | NameNode DataNode | DataNode | DataNode SecondaryNameNode |
| YARN | NodeManager | Resourcemanager NodeManager | NodeManager |
注意:尽量使用离线方式安装
2)项目经验
(1)项目经验之HDFS存储多目录
①生产环境服务器磁盘情况

②在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题
HDFS的DataNode节点保存数据的路径由dfs.datanode.data.dir参数决定,其默认值为file://${hadoop.tmp.dir}/dfs/data,若服务器有多个磁盘,必须对该参数进行修改。如服务器磁盘如上图所示,则该参数应修改为如下的值。
<property><name>dfs.datanode.data.dir</name><value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>
注意:因为每台服务器节点的磁盘情况不同,所以这个配置配完之后,不需要分发
(2)项目经验之集群数据均衡
①节点间数据均衡
开启数据均衡命令:
start-balancer.sh -threshold 10
对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。
停止数据均衡命令
stop-balancer.sh
注意:于HDFS需要启动单独的Rebalance Server来执行Rebalance操作,所以尽量不要在NameNode上执行start-balancer.sh,而是找一台比较空闲的机器。
②磁盘间数据均衡
生成均衡计划(我们只有一块磁盘,不会生成计划)
hdfs diskbalancer -plan hadoop103
执行均衡计划
hdfs diskbalancer -execute hadoop103.plan.json
查看当前均衡任务的执行情况
hdfs diskbalancer -query hadoop103
取消均衡任务
hdfs diskbalancer -cancel hadoop103.plan.json
(3)项目经验之Hadoop参数调优
①HDFS参数调优hdfs-site.xml
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。
对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。
<property><name>dfs.namenode.handler.count</name><value>10</value>
</property>
dfs.namenode.handler.count=20×〖log〗_e^(Cluster Size),比如集群规模为8台时,此参数设置为41。可通过简单的python代码计算该值,代码如下。
[atguigu@hadoop102 ~]$ python
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import math
>>> print int(20*math.log(8))
41
>>> quit()
②YARN参数调优yarn-site.xml
情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
解决办法:
内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
(a)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
4.2.3 Zookeeper安装
1)安装步骤
详见:尚硅谷大数据技术之Zookeeper
4.2.4 Kafka安装
1)安装步骤
详见:尚硅谷大数据技术之Kafka
4.2.5 Flume安装
按照采集通道规划,需在hadoop102,hadoop103,hadoop104三台节点分别部署一个Flume。可参照以下步骤先在hadoop102安装,然后再进行分发。
1)安装步骤
详见:尚硅谷大数据技术之Flume
2)分发Flume
[atguigu@hadoop102 ~]$ xsync /opt/module/flume/
3)项目经验
(1)堆内存调整
Flume堆内存通常设置为4G或更高,配置方式如下:
修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数**(虚拟机环境暂不配置)**
export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
注:
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;
-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。
4.3 日志采集Flume
4.3.1 日志采集Flume配置概述
按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:
1)TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
2)KafkaChannel
采用Kafka Channel,省去了Sink,提高了效率。
日志采集Flume关键配置如下:

4.3.2 日志采集Flume配置实操
1)创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ vim job/file_to_kafka.conf
2)配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1
3)编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
(4)在com.atguigu.gmall.flume.utils包下创建JSONUtil类
package com.atguigu.gmall.flume.utils;import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true 不是:返回false
* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}
(5)在com.atguigu.gmall.flume.interceptor包下创建ETLInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
}
(6)打包

(7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
4.3.3 日志采集Flume测试
1)启动Zookeeper、Kafka集群
2)启动hadoop102的日志采集Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
3)启动一个Kafka的Console-Consumer
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh
5)观察Kafka消费者是否能消费到数据
4.3.4 日志采集Flume启停脚本
1)分发日志采集Flume配置文件和拦截器
若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。
[atguigu@hadoop102 flume]$ scp -r job hadoop103:/opt/module/flume/
[atguigu@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/
2)方便起见,此处编写一个日志采集Flume进程的启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh
[atguigu@hadoop102 bin]$ vim f1.sh
在脚本中填写如下内容
#!/bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"done
};;
"stop"){for i in hadoop102 hadoop103doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done};;
esac
3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f1.sh
4)f1启动
[atguigu@hadoop102 module]$ f1.sh start
5)f2停止
[atguigu@hadoop102 module]$ f1.sh stop
相关文章:
大数据项目实战之数据仓库:用户行为采集平台——第4章 用户行为数据采集模块
第4章 用户行为数据采集模块 4.1 数据通道 4.2 环境准备 4.2.1 集群所有进程查看脚本 1)在/home/atguigu/bin目录下创建脚本xcall [atguiguhadoop102 bin]$ vim xcall2)在脚本中编写如下内容 #! /bin/bashfor i in hadoop102 hadoop103 hadoop104 d…...
《统计学习方法》(李航)——学习笔记
第一章 概论统计学习,又称统计机器学习(机器学习),现在提到的 机器学习 往往指的就是 统计机器学习。统计学习研究的对象是数据,其对数据的基本假设是同类数据存在一定的统计规律性,因此可以用概率统计方法…...
阿里云EMR集群搭建及使用
目录 1.简介 1.什么是EMR 2.组成 3.与自建hadoop集群对比 4.产品架构 2.使用 1.创建EMR集群 1.登录EMR on ECS控制台 2.软件设置 3.硬件设置 3.基础配置 2.配置 1.组件配置 2.用户管理 3.安全组 4.Gateway 3.组件UI 1.简介 1.什么是EMR EMR是运行在阿里云平台…...
学习streamlit-4
st.slider 今天学习st.slider滑块组件的使用。 st.slider滑块组件通常被用来作为应用的输入,支持整数、浮点数、日期、时间和日期时间。 下面的示例程序包含以下简单功能,以演示st.slider滑块组件: 用户通过调整滑块选择值应用打印出所选…...
高级Oracle DBA面试题及答案
作为高级 Oracle DBA,您将负责 Oracle 数据库基础架构的设计、安装、配置、监控和维护。您还将负责制定和实施备份和恢复计划,并确保数据的安全性和完整性。要成功担任此职位,您需要对 Oracle 数据库架构有深入的了解,并能够有效地…...
程序员成长路线
程序员在成长的过程中,不同的阶段,需要关注的问题点一会都会有所不同,今天给大家分享下自己的感受。 0-1年,入门,掌握语言基础、提高工具的使用熟练度。 工作第一年,主要围绕ssm三件套、mysql、red…...
【Galois工具开发之路】关于类的重新装载思路
思路 当一个java的类文件发生变更,如果动态的热更新这个新的类文件?目前来说,有两种可能的方式 新增一个自定义ClassLoader,名为NC,让NC去load这个新的类文件,这样就完成了新的类定义的替换 但目前Java有…...
哪款蓝牙耳机音质好?内行推荐四款高音质蓝牙耳机
蓝牙耳机经过近几年的快速发展,在音质上的表现也越来越好。哪款蓝牙耳机音质好?最近看到很多人问。接下来,我来给大家推荐四款高音质蓝牙耳机,可以当个参考。 一、南卡小音舱蓝牙耳机 参考价:246 发声单元ÿ…...
Android程序自动在线升级安装
安卓小白分享: Android程序自动在线升级安装.(通过GetSharedDownloadsPath方法) 1>.修改AndroidManifest.template.xml ( 此文件在你DELPHI项目的目录中,如找不到就文件查找吧) 最好把此文件拖到DELPHI, 用DELPHI打开,(这样,它会一行一行格式清楚) 找到文字<%u…...
JS的BroadcastChannel与MessageChannel
BroadcastChannel与MessageChannel BroadcastChannel BroadcastChannel以广播的形式进行通信 BroadcastChannel用于创建浏览器标签页之间的通信 使用BroadcastChannel的浏览器标签页面必须要遵循同源策略 页面1使用BroadcastChannel创建一个频道,页面2使用Broadc…...
nextjs开发 + vercel 部署 ssr ssg
前言 最近想实践下ssr 就打算用nextjs 做一个人博客 , vercel 部署 提供免费域名,来学习实践下ssr ssg nextjs 一个轻量级的react服务端渲染框架 vercel 由 Next.js 的创建者制作 支持nextjs 部署 免费静态网站托管 初始化项目 npx create-next-app p…...
Good Idea, 利用MySQL JSON特性优化千万级文库表
👳我亲爱的各位大佬们好😘😘😘 ♨️本篇文章记录的为 利用MySQL JSON特性优化千万级文库表 相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉…...
【python游戏制作】快来跟愤怒的小鸟一起攻击肥猪们的堡垒吧
前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 为了防止/报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器, 仿佛炮弹一样去攻击肥猪们的堡垒,保卫自己的鸟蛋 这个游戏大家没玩过的想必也听说过~ 今天就给大家分享一下用python写的愤怒的…...
ARM 学习(一)
ARM 处理器的运行模式ARM处理器共有7种运行模式,如下表所示:处理器模式描述用户模式(User)正常程序运行模式中断模式(IRQ)用于通常的中断处理快速中断模式(FIQ)用于高速传输和通道处…...
深入分析Java的序列化与反序列化
序列化是一种对象持久化的手段。普遍应用在网络传输、RMI等场景中。本文通过分析ArrayList的序列化来介绍Java序列化的相关内容。主要涉及到以下几个问题: 怎么实现Java的序列化 为什么实现了java.io.Serializable接口才能被序列化 transient的作用是什么 怎么自…...
、Tomcat源码分析-类加载器
接下来,我们再来看下 tomcat 是如何创建 common 类加载器的。关键代码如下所示,在创建类加载器时,会读取相关的路径配置,并把路径封装成 Repository 对象,然后交给 ClassLoaderFactory 创建类加载器。 Bootstrap.java…...
反转链表相关的练习(下)
目录 一、回文链表 二、 重排链表 三、旋转链表 一、回文链表 给你一个单链表的头节点 head ,请你判断该链表是否为回文链表。如果是,返回 true ;否则,返回 false 。 示例 1: 输入:head [1,2,2,1] 输…...
2.进程和线程
1.进程1.1 终止正常退出(自愿)出错退出(自愿)严重错误(非自愿)被其他进程杀死(非自愿)1.2 状态就绪态:可运行,但因为其他进程正在运行而暂时停止阻塞态:除非某种外部事件发生,否则进程不能运行1.3 实现一个进程在执行过程中可能被…...
C++回顾(十四)—— 函数模板
14.1 概述 所谓函数模板(function template),实际上是建立一个通用函数,其函数类型和形参类型不具体指定,用一个虚拟的类型来代表。这个通用函数就称为函数模板。凡是函数体相同的函数都可以用这个模板来代替,不必定义多个函数&a…...
如何做好项目各干系人的管理及应对?
如何更好地识别、分析和管理项目关系人?主要有以下几个方面: 1、项目干系人的分析 一般对项目干系人的分析有2种方法, 方法一:权利(影响),即对项目可以产生影响的人; 方法二…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...
