当前位置: 首页 > news >正文

离线数仓同步数据3

业务数据_增量表数据同步

  • 1)Flume配置概述
  • 2)Flume配置实操
  • 3)通道测试
  • 4)编写Flume启停脚本

在这里插入图片描述

1)Flume配置概述

Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。
需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

2)Flume配置实操

1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf
[atguigu@hadoop104 flume]$ mkdir job
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_db.conf 
(2)配置文件内容如下
a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Buildera1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1(3)编写Flume拦截器
新建一个Maven项目,并在pom.xml文件中加入如下配置
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类
package com.atguigu.gmall.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampAndTableNameInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);Long ts = jsonObject.getLong("ts");//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒String timeMills = String.valueOf(ts * 1000);String tableName = jsonObject.getString("table");headers.put("timestamp", timeMills);headers.put("tableName", tableName);return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}@Overridepublic void configure(Context context) {}}
}重新打包
将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下
[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

3)通道测试

1)启动Zookeeper、Kafka集群
(2)启动hadoop104的Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console
(3)生成模拟数据
[atguigu@hadoop102 bin]$ cd /opt/module/db_log/
[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 
(4)观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。
(5)数据目标路径的日期说明
仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

4)编写Flume启停脚本

为方便使用,此处编写一个Flume的启停脚本
(1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh
[atguigu@hadoop102 bin]$ vim f3.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 业务数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 业务数据flume-------"ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
(2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f3.sh
(3)f3启动
[atguigu@hadoop102 module]$ f3.sh start
(4)f3停止
[atguigu@hadoop102 module]$ f3.sh stop
2.2.6.3 Maxwell配置
1)Maxwell时间戳问题此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。
修改Maxwell配置文件config.properties,增加mock_date参数,如下
log_level=infoproducer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092#kafka topic配置
kafka_topic=topic_db#注:该参数仅在maxwell教学版中存在,修改该参数后重启Maxwell才可生效
mock_date=2020-06-14# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
注:该参数仅供学习使用,修改该参数后重启Maxwell才可生效。
重启Maxwell
[atguigu@hadoop102 bin]$ mxw.sh restart
重新生成模拟数据
[atguigu@hadoop102 bin]$ cd /opt/module/db_log/
[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 
观察HDFS目标路径日期是否正常

2.2.6.4 增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建mysql_to_kafka_inc_init.sh
[atguigu@hadoop102 bin]$ vim mysql_to_kafka_inc_init.sh
脚本内容如下
#!/bin/bash# 该脚本的作用是初始化所有的增量表,只需执行一次MAXWELL_HOME=/opt/module/maxwellimport_data() {$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}case $1 in
"cart_info")import_data cart_info;;
"comment_info")import_data comment_info;;
"coupon_use")import_data coupon_use;;
"favor_info")import_data favor_info;;
"order_detail")import_data order_detail;;
"order_detail_activity")import_data order_detail_activity;;
"order_detail_coupon")import_data order_detail_coupon;;
"order_info")import_data order_info;;
"order_refund_info")import_data order_refund_info;;
"order_status_log")import_data order_status_log;;
"payment_info")import_data payment_info;;
"refund_payment")import_data refund_payment;;
"user_info")import_data user_info;;
"all")import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;;
esac
2)为mysql_to_kafka_inc_init.sh增加执行权限
[atguigu@hadoop102 bin]$ chmod 777 ~/bin/mysql_to_kafka_inc_init.sh

3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除
[atguigu@hadoop102 ~]$ hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print KaTeX parse error: Expected 'EOF', got '}' at position 2: 8}̲' | xargs hadoo… mysql_to_kafka_inc_init.sh all
4)检查同步结果
观察HDFS上是否重新出现增量表数据。


2.3 采集通道启动/停止脚本
1)在/home/atguigu/bin目录下创建脚本cluster.sh
[atguigu@hadoop102 bin]$ vim cluster.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start"){echo ================== 启动 集群 ==================#启动 Zookeeper集群zk.sh start#启动 Hadoop集群hdp.sh start#启动 Kafka采集集群kf.sh start#启动采集 Flumef1.sh start#启动日志消费 Flumef2.sh start#启动业务消费 Flumef3.sh start#启动 maxwellmxw.sh start};;
"stop"){echo ================== 停止 集群 ==================#停止 Maxwellmxw.sh stop#停止 业务消费Flumef3.sh stop#停止 日志消费Flumef2.sh stop#停止 日志采集Flumef1.sh stop#停止 Kafka采集集群kf.sh stop#停止 Hadoop集群hdp.sh stop#停止 Zookeeper集群zk.sh stop};;
esac2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 cluster.sh
3)cluster集群启动脚本
[atguigu@hadoop102 module]$ cluster.sh start
4)cluster集群停止脚本
[atguigu@hadoop102 module]$ cluster.sh stop

相关文章:

离线数仓同步数据3

业务数据_增量表数据同步 1&#xff09;Flume配置概述2&#xff09;Flume配置实操3&#xff09;通道测试4&#xff09;编写Flume启停脚本 1&#xff09;Flume配置概述 Flume需要将Kafka中topic_db主题的数据传输到HDFS&#xff0c;故其需选用KafkaSource以及HDFSSink&#xff…...

Prometheus+Grafana 搭建应用监控系统

一、背景 完善的监控系统可以提高应用的可用性和可靠性&#xff0c;在提供更优质服务的前提下&#xff0c;降低运维的投入和工作量&#xff0c;为用户带来更多的商业利益和客户体验。下面就带大家彻底搞懂监控系统&#xff0c;使用Prometheus Grafana搭建完整的应用监控系统。 …...

Spring Boot整合Log4j2.xml的问题

文章目录 问题解决参考 问题 Spring Boot整合Log4j2.xml的时候返回以下错误&#xff1a; Caused by: org.apache.logging.log4j.LoggingException: log4j-slf4j-impl cannot be present with log4j-to-slf4j 进行了解决。 解决 Spring Boot整合Log4j2.xml经过以下操作&#…...

代码随想录算法训练营第五十八天 | 739. 每日温度,496.下一个更大元素 I

代码随想录算法训练营第五十八天 | 739. 每日温度&#xff0c;496.下一个更大元素 I 739. 每日温度496.下一个更大元素 I 739. 每日温度 题目链接 视频讲解 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answe…...

【动手学深度学习】--文本预处理

文章目录 文本预处理1.读取数据集2.词元化3.词表4.整合所有功能 文本预处理 学习视频&#xff1a;文本预处理【动手学深度学习v2】 官方笔记&#xff1a;文本预处理 对于序列数据处理问题&#xff0c;在【序列模型】中评估了所需的统计工具和预测时面临的挑战&#xff0c;这…...

2023年最佳研发管理平台评选:哪家表现出色?

“研发管理平台哪家好&#xff1f;以下是一些知名的研发管理软件品牌&#xff1a;Zoho Projects、JIRA、Trello、Microsoft Teams、GitLab。’” 企业需要不断创新以保持竞争力。研发是企业创新的核心&#xff0c;而研发管理平台则为企业提供了一个有效的工具来支持和管理其研发…...

轻量容器引擎Docker基础使用

轻量容器引擎Docker Docker是什么 Docker 是一个开源项目&#xff0c;诞生于 2013 年初&#xff0c;最初是 dotCloud 公司内部的一个业余项目。 它基于 Google 公司推出的 Go 语言实现&#xff0c;项目后来加入了 Linux 基金会&#xff0c;遵从了 Apache 2.0 协议&#xff0c;…...

questions

1.JDK 和 JRE 有什么区别&#xff1f; JDK&#xff1a;Java Development Kit 的简称&#xff0c;java 开发工具包&#xff0c;提供了 java 的开发环境和运行环境 JRE&#xff1a;Java Runtime Environment 的简称&#xff0c;java 运行环境&#xff0c;为 java 的运行提供了所需…...

MojoTween:使用「Burst、Jobs、Collections、Mathematics」优化实现的Unity顶级「Tween动画引擎」

MojoTween是一个令人惊叹的Tween动画引擎&#xff0c;针对C#和Unity进行了高度优化&#xff0c;使用了Burst、Jobs、Collections、Mathematics等新技术编码。 MojoTween提供了一套完整的解决方案&#xff0c;将Tween动画应用于Unity Objects的各个方面&#xff0c;并可以通过E…...

Vue3响应式源码实现

Vue3响应式源码实现 初始化项目结构 vue-proxy ├── effect.js ├── effect.ts ├── index.html ├── index.js ├── package.json ├── reactive.js ├── reactive.ts └── webpack.config.jsreactive.ts import { track, trigger } from "./effect&q…...

【RapidAI】P1 中文文本切割程序

中文文本切割程序 基本信息代码解析相关包获取 yaml 关键文件类的构造函数切分语句部分特殊处理 PDF重点切分去除数组中空字符串再度切分后长度 附录附录一&#xff1a;完整代码附录二&#xff1a;可继续思考问题 基本信息 文件名&#xff1a; chinese_text_splitter.py 文件地…...

4、QT中的网络编程

一、Linux中的网络编程 1、子网和公网的概念 子网网络&#xff1a;局域网&#xff0c;只能进行内网的通信公网网络&#xff1a;因特网&#xff0c;服务器等可以进行远程的通信 2、网络分层模型 4层模型&#xff1a;应用层、传输层、网络层、物理层 应用层&#xff1a;用户自…...

单例模式(饿汉式单例 VS 懒汉式单例)

所谓的单例模式就是保证某个类在程序中只有一个对象 一、如何控制只产生一个对象&#xff1f; 1.构造方法私有化&#xff08;保证对象的产生个数&#xff09; 创建类的对象&#xff0c;要通过构造方法产生对象 构造方法若是public权限&#xff0c;对于类的外部&#xff0c;可…...

Oracle数据库连接之TNS-12541异常

在进行数据库开发的时候&#xff0c;通常需要使用PLSQL Developer开发工具连接Oralce数据库&#xff0c;在进行连接时&#xff0c;经常性的会提示TNS-12541:TNS:no listener&#xff08;没有监听&#xff09;&#xff0c;从而导致PLSQL Developer 无法连接到数据库实例&#xf…...

sql中的排序函数dense_rank(),RANK()和row_number()

dense_rank()&#xff0c;RANK()和row_number()是SQL中的排序函数。 为方便后面的函数差异比对清晰直观&#xff0c;准备数据表如下&#xff1a; 1.dense_rank() 函数语法&#xff1a;dense_rank() over( order by 列名 【desc/asc】) DENSE_RANK()是连续排序&#xff0c;比如…...

Flask狼书笔记 | 05_数据库

文章目录 5 数据库5.1 数据库的分类5.2 ORM5.3 使用Flask_SQLAlchemy5.4 数据库操作5.5 定义关系5.6 更新数据库表5.7 数据库进阶小结 5 数据库 这一章学习如何在Python中使用DBMS&#xff08;数据库管理系统&#xff09;&#xff0c;来对数据库进行管理和操作。本书使用SQLit…...

HJ70 矩阵乘法计算量估算

Powered by:NEFU AB-IN Link 文章目录 HJ70 矩阵乘法计算量估算题意思路代码 HJ70 矩阵乘法计算量估算 题意 矩阵乘法的运算量与矩阵乘法的顺序强相关。 例如&#xff1a; A是一个5010的矩阵&#xff0c;B是1020的矩阵&#xff0c;C是205的矩阵 计算ABC有两种顺序&#xff1a;…...

Doris数据库使用记录

新建表 create table tonly_attendance(vin varchar(64),diggings_name varchar(256),area varchar(64),diggings_type varchar(256),work_time decimal(20,2),engine_run_time decimal(20,2),upload_time varchar(64))DUPLICATE KEY (vin)distributed by hash (vin)删除之…...

华为OD机试真题【篮球比赛】

1、题目描述 【篮球比赛】 一个有N个选手参加比赛&#xff0c;选手编号为1~N&#xff08;3<N<100&#xff09;&#xff0c;有M&#xff08;3<M<10&#xff09;个评委对选手进行打分。 打分规则为每个评委对选手打分&#xff0c;最高分10分&#xff0c;最低分1分。…...

sublime text 格式化json快捷键配置

以 controlcommandj 为例。 打开Sublime Text&#xff0c;依次点击左上角菜单Sublime Text->Preferences->Key Bindings&#xff0c;出现以下文件&#xff1a; 左边的是Sublime Text默认的快捷键&#xff0c;不可编辑。右边是我们自定义快捷键的地方&#xff0c;在中括号…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

3-11单元格区域边界定位(End属性)学习笔记

返回一个Range 对象&#xff0c;只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意&#xff1a;它移动的位置必须是相连的有内容的单元格…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...

面试高频问题

文章目录 &#x1f680; 消息队列核心技术揭秘&#xff1a;从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"&#xff1f;性能背后的秘密1.1 顺序写入与零拷贝&#xff1a;性能的双引擎1.2 分区并行&#xff1a;数据的"八车道高速公路"1.3 页缓存与批量处理…...