离线数仓构建案例一

数据采集
日志数据(文件)到Kafka
自己写个程序模拟一些用户的行为数据,这些数据存在一个文件夹中。
接着使用flume监控采集这些文件,然后发送给kafka中待消费。
1、flume采集配置文件
监控文件将数据发给kafka的flume配置文件:
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.168.10.100:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1a1.sources.r1.channels = c1
这边设置parseAsFlumeEvent = false后,数据就不会以flume的事件event的形式传递,就没有head了,只有body数据,head虽然对这个离线案例有用,但是如果要弄实时数仓,flink也会到kafka中取数据,这时head对于实时的就没用了。所以这边设置成false,也能减少数据传输的大小。
2、拦截器过滤数据
在source和channel之间设置拦截器,做一个轻度的清洗。
编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
(4)在com.atguigu.gmall.flume.utils包下创建JSONUtil类
package com.atguigu.gmall.flume.utils;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.JSONException;public class JSONUtil {/** 通过异常判断是否是json字符串* 是:返回true 不是:返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}}
(5)在com.atguigu.gmall.flume.interceptor包下创建ETLInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}// a1.sources.r1.interceptors.i1.type 的值是这个的全类名public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}}
(6)打包

(7)需要先将打好的包放入到flume的lib目录下:/opt/module/flume/lib文件夹下面。

3、启动flume采集验证
使用上面的配置文件启动flume监控,,
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
接着创建一个Kafka消费者消费topic_log主题
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.100:9092 --topic topic_log
然后往文件中追加数据看能不能消费到。

看到完整的json被消费了,不完整的json被拦截器过滤了

日志数据(文件)同步给Hadoop的hdfs
现在数据已经在Kafka了,下一步就是要将数据发给Hadoop存储,并且要按天进行分区。
按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。

1、创建flume消费者
创建flume消费者从Kafka中消费数据发给hdfs。
目前的数据位于kafka中,原本可以直接用下面的这种flume架构,但由于flume的上游将数据存到kafka的时候,只存了body,这边将数据发给hdfs中需要按照时间落盘,所以需要拦截器加上head,给每条数据在head中添加时间信息,但是拦截器需要有flume source才能生效。所以这种架构就不行。需要使用带有source的架构。

带有source的架构模式
拦截器:
// 必须在在header中添加名为timestamp字段的时间戳@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);String ts = JSONObject.parseObject(log).getString("ts");headers.put("timestamp",ts);return event;}
flume配置文件:
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.10.100:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id = topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1、数据位于kafka中source用kafka source
kafka source其实就是一个kafka消费者,定义消费者组id,防止使用默认的组id导致消费不到数据,如果有两个消费者都消费toppic_id主题,同一个消费者组id一样的只有一个消费者能消费到。
a1.sources.r1.batchSize = 2000,一次批量写入channel通道的最大消息数。
a1.sources.r1.batchDurationMillis = 2000,若没达到一批次的消息数量,达到这个时间了也将消息都发给channel通道。这时间设置成产生2000条大概花费的时间。
2、Channel用file channel,我猜测:是由于要发送给hdfs,又因为hdfs是文件系统
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,flume就可以将来自Source的数据写到不同的目录硬盘,但是这边是单机,就设置了一个,可以增大Flume吞吐量。
a1.channels.c1.maxFileSize = 2146435071,file channel数据存储在文件中, 单个日志文件的最大大小(以字节计)。
a1.channels.c1.capacity = 1000000,file channel的最大容量 1000000条
a1.channels.c1.keep-alive = 6

回滚后,source要重新到文件或者kafka中取这2000条数据

3、数据发给HDFS所以sink用hdfs sink
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
path中包含时间的转移序列,用于将不同时间的数据放到不同的路径。
基于以上hdfs.rollInterval=10:hdfs当达到10秒后滚动形成文件
hdfs.rollSize=134217728:hdfs数据当达到128M形成文件
hdfs.rollCount =0:event事件条数达到多少条形成文件
几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
还没达到形成新文件的时候,是以.tmp结尾存在的,这个时候是没用的。
2、启动flume消费者
进入flume的家目录下执行:
bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
效果:

效果分析:文件一有新的日志数据写入,就会被flume采集到kafka的topic_log主题中,就会被flume消费者发到hdfs中的路径文件中。这样会有几个问题:
- 一有数据就发给hdfs中形成一个文件,就会产生大量的小文件,上面每个文件就几百B大小。
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
数据漂移问题:
加入拦截器解决数据漂移、修改参数解决小文件问题后:

可以看到现在起码不是几十B了,因为现在时间10秒就形成新文件,到时候可以根据128M生成的时间设置。

现在这条数据链路已经打通了。
=====================
业务数据(MySQL)到HDFS
在离线数仓中,业务数据是很重要的一个来源,为后续的计算提供数据来源,离线数仓一般一天采集同步一次业务数据到离线数仓中,供后续使用(存储、计算、处理、分析)。
1、数据同步方案
同步的策略有增量同步(效率好、逻辑复杂)和全量同步(数据量大变化少时效率低、逻辑简单)。增量同步就是只将有变更的数据同步过来;而全量同步是每次都将全表同步过来,覆盖原有的数据。一般而言一个数据库中:大表变化多全量、大表变化少增量、小表都用全量。
| 同步策略 | 优点 | 缺点 |
| 全量同步 | 逻辑简单 | 在某些情况下效率较低。例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。 |
| 增量同步 | 效率高,无需同步和存储重复数据 | 逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用 |



全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal等工具,下面对增量同步不同方案进行简要对比。
| 增量同步方案 | DataX/Sqoop | Maxwell/Canal |
| 对数据库的要求 | 原理是基于查询,故若想通过select查询获取新增及变化数据,就要求数据表中存在create_time、update_time字段,然后根据这些字段获取变更数据。 | 要求数据库记录变更操作,例如MySQL需开启binlog。 |
| 数据的中间状态 | 由于是离线批量同步,故若一条数据在一天中变化多次,该方案只能获取最后一个状态,中间状态无法获取。 | 由于是实时获取所有的数据变更操作,所以可以获取变更数据的所有中间状态。 |
2、各个表同步策略
一般而言一个数据库中:大表变化多全量、大表变化少增量、小表都用全量。

2.1、部署DataX全量同步数据
使用DataX全量同步数据给HDFS。
1、正常步骤需要为每个全量同步的表各自创建一个DataX任务的json文件,每个表都由公主和王子来写json文件,实在是有点麻烦,直接搞个脚本自动生成(如果报错把注释去掉):
# ecoding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"#生成DataX配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table):connection = get_connection()cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table):return map(lambda x: x[0], get_mysql_meta(database, table))def get_hive_columns(database, table):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table)return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)def generate_json(source_database, source_table):job = {"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk": "","connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","path": "${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode": "append","fieldDelimiter": "\t","compress": "gzip"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f)def main(args):source_database = ""source_table = ""options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valuegenerate_json(source_database, source_table)if __name__ == '__main__':main(sys.argv[1:])
注:由于目标路径包含一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交任务时通过参数动态传入,参数名称为targetdir
2、使用方式:
安装Python Mysql驱动由于需要使用Python访问Mysql数据库,故需安装驱动,命令如下:
sudo yum install -y MySQL-python
脚本使用说明
python gen_import_config.py -d database -t table
通过-d传入数据库名,-t传入表名,执行上述命令即可生成该表的DataX同步配置文件。
3、每个表的json文件都要这样执行也比较,直接再弄个脚本为每个表生成:
#!/bin/bashpython ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info
4、测试生产的配置文件是否可用
由于DataX同步任务要求目标路径提前存在,故需手动创建路径,当前activity_info表的目标路径应为/origin_data/gmall/db/activity_info_full/2020-06-14。命令不行可以手动创建。
hadoop fs -mkdir /origin_data/gmall/db/activity_info_full/2020-06-14
执行DataX同步命令
$ python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14" /opt/module/datax/job/import/gmall.activity_info.json
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14" /opt/module/datax/job/import/gmall.activity_info.json
5、观察结果
观察HFDS目标路径是否出现数据。
6、全量同步脚本
#!/bin/bashDATAX_HOME=/opt/module/datax# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;thendo_date=$2
elsedo_date=`date -d "-1 day" +%F`
fi#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {hadoop fs -test -e $1if [[ $? -eq 1 ]]; thenecho "路径$1不存在,正在创建......"hadoop fs -mkdir -p $1elseecho "路径$1已经存在"fs_count=$(hadoop fs -count $1)content_size=$(echo $fs_count | awk '{print $3}')if [[ $content_size -eq 0 ]]; thenecho "路径$1为空"elseecho "路径$1不为空,正在清空......"hadoop fs -rm -r -f $1/*fifi
}#数据同步
import_data() {datax_config=$1target_dir=$2# 先在HDFS中创建目录handle_targetdir $target_dirpython $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}case $1 in
"activity_info")import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date;;
"activity_rule")import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date;;
"base_category1")import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date;;
"base_category2")import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date;;
"base_category3")import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date;;
"base_dic")import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date;;
"base_province")import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date;;
"base_region")import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date;;
"base_trademark")import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date;;
"cart_info")import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date;;
"coupon_info")import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date;;
"sku_attr_value")import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date;;
"sku_info")import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date;;
"sku_sale_attr_value")import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date;;
"spu_info")import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date;;
"all")import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_dateimport_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_dateimport_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date;;
esac
使用方式:
./mysql_to_hdfs_full.sh all 2023-10-25
all表示全量同步脚本设置的所有表,第二个参数是创建的文件夹时间,如果不传默认取前一天的时间,比如今天是2023年10月25日,则创建2023-10-24文件夹存放数据,生产环境中,都是凌晨1点多开始全量同步前面一天的数据。所以生产中第二个参数不传。
2.2、Maxwell增量同步数据
使用Maxwell增量同步业务数据到kafka,再由Flume采集到HDFS
1、创建一个Maxwell增量同步MySQL中需要增量同步的业务表,发送给kafka的topic_db主题

如果MySQL的端口不是3306,Maxwell的配置文件记得加上

mxw.sh start
2、由于有些表是全量同步,所以需要在MySQL的配置文件中将全量同步的表去掉bin_log
3、创建flume消费topic_db主题发送给hdfs
flume配置文件:
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.10.100:9092
a1.sources.r1.kafka.topics=topic_db
a1.sources.r1.kafka.consumer.group.id = topic_db
a1.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TableNameTimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
由于在hdfs中落盘的需要按照以上的格式,而%{tableName} 这种,hdfs会到event的头部中找tableName来解析,%Y-%m-%d会找timestamp的值解析。以下是一条event格式,但是如果不设置的话会按默认的时间。
timestamp^R^M1698288936059^R^B{"database":"gmall","table":"comment_info","type":"insert","ts":1698288933,"xid":912,"commit":true,"data":{"id":1716761825009860616,"user_id":13,"nick_name":null,"head_img":null,"sku_id":12,"spu_id":25,"order_id":null,"appraise":null,"comment_txt":"æµ<8b>è¯<95>1219","create_time":null,"operate_time":null}}^Qib<8e>)^@^@^@^@^@^?^@^@^@^_^W^M^D^@^@^@^Q©^?äi<8b>^A^@^@^Yr^?äi<8b>^A^@^@^E^M^A^@^@^@^@^?^@^@^@$^W^M^B^@^@^@^Qª^?äi<8b>^A^@
以下是Maxwell增量同步MySQL的一条数据,其中ts字段需要个性化定制Maxwell才能生成,ts是当时监控到这条数据变更的时间,可以将这个时间设置给event的头部timestamp。
{"database": "gmall","table": "comment_info","type": "insert","ts": 1698288933,"xid": 912,"commit": true,"data": {"id": 1716761825009860616,"user_id": 13,"nick_name": null,"head_img": null,"sku_id": 12,"spu_id": 25,"order_id": null,"appraise": null,"comment_txt": "æµ<8b>è¯<95>1219","create_time": null,"operate_time": null}
}
拦截器
// flume采集的每条数据 event@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();byte[] body = event.getBody();String db = new String(body, StandardCharsets.UTF_8);// 获取Maxwell输出的时间戳 单位是秒Long ts = JSONObject.parseObject(db).getLong("ts");String table = JSONObject.parseObject(db).getString("table");// flume的hdfs sink解析需要 毫秒headers.put("timestamp", String.valueOf(ts * 1000));headers.put("tableName",table);return event;}
4、增量表首日全量
通常情况下,增量表需要在首日(首次)进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
#!/bin/bash# 该脚本的作用是初始化所有的增量表,只需执行一次MAXWELL_HOME=/opt/module/maxwellimport_data() {$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}case $1 in
"cart_info")import_data cart_info;;
"comment_info")import_data comment_info;;
"coupon_use")import_data coupon_use;;
"favor_info")import_data favor_info;;
"order_detail")import_data order_detail;;
"order_detail_activity")import_data order_detail_activity;;
"order_detail_coupon")import_data order_detail_coupon;;
"order_info")import_data order_info;;
"order_refund_info")import_data order_refund_info;;
"order_status_log")import_data order_status_log;;
"payment_info")import_data payment_info;;
"refund_payment")import_data refund_payment;;
"user_info")import_data user_info;;
"all")import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;;
esac
离线数仓环境准备
现在日志数据和业务数据都采集过来了,是位于hdfs的文件中,需要把数据加入到我们数据仓库中,第一步先加入到hive中。
1、hive安装
大数据-hive-CSDN博客
相关文章:
离线数仓构建案例一
数据采集 日志数据(文件)到Kafka 自己写个程序模拟一些用户的行为数据,这些数据存在一个文件夹中。 接着使用flume监控采集这些文件,然后发送给kafka中待消费。 1、flume采集配置文件 监控文件将数据发给kafka的flume配置文件…...
nginx优雅如何优雅的接管【跨域配置】
跨域问题太常见了,这里不做详细赘述。文章主要想说一下,如何统一管理和更好的来管理 跨域配置 跨域的常见配置有两种 后台代码设置和网关设置 1、后台代码设置 以springboot为例代码如下(水一下文章长度...) Configuration pu…...
远离危险的购买手机的渠道
今年上半年从淘宝特价版上面的官方旗舰店买了一个oppo手机,第一次买我打算不要了,所以就退了回去,过了几天我又觉得还是买一个比较好,所以就又买了一个,型号我绝不说了700-1000z这个价位的手机带个高通骁龙芯片的&…...
外包干了2个多月,技术明显有退步了。。。。。
先说一下自己的情况,本科生,19年通过校招进入武汉某软件公司,干了接近4年的功能测试,今年国庆,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…...
【Java项目管理工具】Maven
Maven 文章目录 Maven一、简介二、安装和配置三、GAVP四、IDEA Maven Java Web工程五、插件、命令、生命周期六、依赖配置七、构建配置八、依赖传递与依赖冲突九、Maven工程继承和聚合关系9.1 工程继承关系9.2 工程聚合关系 十、Maven私服10.1 Nexus下载安装10.2 Nexus上的各种…...
solidity案例详解(六)服务评价合约
有服务提供商和用户两类实体,其中服务提供商部署合约,默认诚信为true,用户负责使用智能合约接受服务及评价,服务提供商的评价信息存储在一个映射中,可以根据服务提 供商的地址来查找评价信息。用户评价信息,…...
使用kubeadm搭建高可用的K8s集群
文章目录 1. 安装要求2. 准备环境3. 所有master节点部署keepalived3.1 安装相关包和keepalived3.2配置master节点3.3 启动和检查 4. 部署haproxy4.1 安装4.2 配置4.3 启动和检查 5. 所有节点安装Docker/kubeadm/kubelet5.1 安装Docker5.2 添加阿里云YUM软件源5.3 安装kubeadm&a…...
C#图像处理OpenCV开发指南(CVStar,07)——通用滤波(Filter2D)的实例代码
1 函数定义 void Filter2D (Mat src, Mat dst, int ddepth, InputArray kernel, Point anchor Point(-1,-1), double delta 0, int borderType BORDER_DEFAULT ) 1.1 原型 #include <opencv2/imgproc.hpp> Convolves an image wit…...
c++函数模板STL详解
函数模板 函数模板语法 所谓函数模板,实际上是建立一个通用函数,其函数类型和形参类型不具体指定,用一个虚拟的类型来代表。这个通用函数就称为函数模板。 凡是函数体相同的函数都可以用这个模板来代替,不必定义多个函数…...
Java利用UDP实现简单群聊
一、创建新项目 首先新建一个新的项目,并按如下操作 二、实现代码 界面ChatFrame类 package 群聊; import javax.swing.*; import java.awt.*; import java.awt.event.*; import java.net.InetAddress; public abstract class ChatFrame extends JFrame { p…...
fastapi.templating与HTMLResponse
要声明一个模板对象,应将存储html模板的文件夹作为参数提供。在当前工作目录中,我们将创建一个 “templates “目录。 templates Jinja2Templates(directory“templates”) 我们现在要把这个页面的HTML代码渲染成HTMLResponse。让我们修改一下hello()函…...
当初为什么选择计算机这类的行业?
CSDN给了这么一个话题: 还记得当初自己为什么选择计算机? 当初你问我为什么选择计算机,我笑着回答:“因为我梦想成为神奇的码农!我想像编织魔法一样编写程序,创造出炫酷的虚拟世界!”谁知道&…...
tif文件转png、Excel
l利用gdal读取tif中的地理信息和波段数组,然后保存想要的格式即可。 from osgeo import gdal from PIL import Image import numpy as np import cv2 as cv from matplotlib import pyplot as plt# 读取.tif文件 def read_tif(file_path):dataset gdal.Open(file_…...
【PyTorch】训练过程可视化
文章目录 1. 训练过程中的可视化1.1. alive_progress1.2. rich.progress 2. 训练结束后的可视化2.1. tensorboardX2.1.1. 安装2.1.2. 使用 1. 训练过程中的可视化 主要是监控训练的进度。 1.1. alive_progress 安装 pip install alive_progress使用 from alive_progress i…...
深入理解Go语言GC机制
1、Go 1.3之前的标记-清除(mark and sweep)算法 Go 1.3之前的时候主要用的是普通的标记-清除算法,此算法主要由两个主要的步骤: 标记(Mark phase)清除(Sweep phase) 1)…...
qt-C++笔记之组件-分组框QGroupBox
qt-C笔记之组件-分组框QGroupBox code review! 文章目录 qt-C笔记之组件-分组框QGroupBox1.《Qt 6 C开发指南》p752.《Qt 官方文档》3.《Qt 5.12实战》——5.9 分组框控件 1.《Qt 6 C开发指南》p75 2.《Qt 官方文档》 中间段落翻译: 我把示例补充完整: …...
qt 定时器用法
在qt开发中,定时器是我们经常用到的。我们接下来说一下定时器的三种用法,需要注意的是定时器事件是在主线程中触发的,因此在处理耗时操作时应特别小心,以避免阻塞应用程序的事件循环。 1. 三种定时器使用 1.1 QObject的定时器 …...
用23种设计模式打造一个cocos creator的游戏框架----(九)访问者模式
1、模式标准 模式名称:访问者模式 模式分类:行为型 模式意图:将数据操作与数据结构分离,使得在不修改数据结构的前提下,可以添加或改变对数据的操作。 结构图: 适用于: 当你需要对一个复杂对…...
根文件系统初步测试
一. 简介 上一篇文章学习了向所编译生成的根文件系统中加入 lib库文件。文章地址如下: 根文件系统lib库添加与初步测试-CSDN博客 本文继上一篇文章的学习,本文对之前制作的根文件系统进行一次初步测试。 二. 根文件系统初步测试 为了方便测试&#…...
【精选】设计模式——策略设计模式-两种举例说明,具体代码实现
Java策略设计模式 简介 策略设计模式是一种行为型设计模式,它允许在运行时选择算法的行为。 在软件开发中,我们常常需要根据不同情况采取不同的行为。通常的做法是使用大量的条件语句来实现这种灵活性,但这会导致代码变得复杂、难以维护和扩…...
网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...


