当前位置: 首页 > article >正文

day09_实时类标签/指标

文章目录

  • day09_实时类标签/指标
  • 一、日志数据实时采集
    • 2、Flume简介
      • 2.3 项目日志数据采集Flume配置
        • 2.3.1 涉及的Flume组件和参数
        • 2.3.2 Nginx日志采集
        • 2.3.3 用户行为日志采集
  • 二、Nginx日志数据统计
    • 1、日志格式说明
    • 2、数据ETL
      • 2.1 日志抽取
        • 2.1.1 正则表达式
        • 2.1.2 基于Spark实现Nginx数据匹配
      • 2.2 字段解析
        • 2.2.1 日期格式转换
        • 2.2.2 IP解析地理位置(了解)
        • 2.2.3 UA解析
      • 2.3 完整代码
      • 2.4 使用Hive读取HDFS数据
    • 3、指标统计
    • 1、尝试进行用户行为日志的数据ETL、指标统计

day09_实时类标签/指标

在这里插入图片描述
在这里插入图片描述

一、日志数据实时采集

2、Flume简介

2.3 项目日志数据采集Flume配置

zookeeper、Kafka的启动命令

启动zookeeper(没有启动的,才需要执行)
/export/server/zookeeper/bin/zkServer.sh start启动Kafka
cd /export/server/kafka/bin
nohup ./kafka-server-start.sh ../config/server.sql 2>&1 &Kafka其他的相关命令
cd /export/server/kafka/bin
查看当前集群有哪些Topic
./kafka-topics.sh --list --bootstrap-server up01:9092
新建Topic(分区数没要求,副本数<=broker节点个数)
./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log
参看Topic的详细信息
./kafka-topics.sh --describe --bootstrap-server up01:9092 --topic xtzg_nginx_log注意: 要提前创建好Kafka的Topic
2.3.1 涉及的Flume组件和参数
  • source
type: 类型,固定值TAILDIR。能同时监控一个目录或者多个文件,也能动态监控每个文件的变化,还支持断点续传,不会出现重复消费问题。
fiilegroups:  以空格分隔的文件组列表。每个文件组表示一组要跟踪的文件。
filegroups.<filegroupName>: 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。
positionFile: JSON格式的文件,记录每个文件的inode、绝对路径和最后位置。注意: type的TAILDIR大小写不能随便写
  • channel
type: 类型,固定值 org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers: Kafka集群中的broker列表。格式:hostname:port,多个用逗号隔开。
kafka.topic: channel要用的topic
parseAsFlumeEvent: 是否需要对采集到的数据解析为Event对象,然后在内容前面增加topic前缀,会导致后续的内容会有部分缺失的情况。一般是false

补充:

如果采集到的数据最终想要输出到Kafka中,可以直接选择使用Kafka Channel。
注意: Kafka Channel和Kafka Sink,虽然都是将数据输出到Kafka中,但是两者的配置参数有区别
2.3.2 Nginx日志采集

在这里插入图片描述

  • 创建nginx_to_kafka.conf文件

在这里插入图片描述

  • nginx_to_kafka.conf配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/workspace/user_profile/log_generate/datacollection/source_data/access-nginx.*
a1.sources.r1.positionFile = /export/data/flume/nginx_position.json#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = up01:9092
a1.channels.c1.kafka.topic = xtzg_nginx_log
a1.channels.c1.parseAsFlumeEvent = false#组装 
a1.sources.r1.channels = c1注意: 1- a1.sources.r1.filegroups.f1该参数值要改成你自己的路径2- 文件的模糊匹配的正则表达式中写的是.*表示匹配任意内容
将上面的配置文件复制到/export/server/flume/conf
cp /export/data/workspace/user_profile/scripts/flume/nginx_to_kafka.conf /export/server/flume/conf
  • 在Kafka上创建topic(前提开启zk,kafka)
cd /export/server/kafka/bin./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log
  • 启动Flume
cd /export/server/flumebin/flume-ng agent -n a1 -c conf/ -f conf/nginx_to_kafka.conf
  • 查看Kafka中的数据
cd /export/server/kafka/bin./kafka-console-consumer.sh --bootstrap-server up01:9092 --topic xtzg_nginx_log
  • 启动

运行python中的NginxLogSimulationData.py。查看kafka中数据变化,如果看到新增数据则配置成功。确认无误后关停Flume采集任务。

2.3.3 用户行为日志采集
  • 创建user_event_to_kafka.conf文件
    在这里插入图片描述

  • user_event_to_kafka.conf配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/workspace/user_profile/log_generate/datacollection/source_data/user-event.*
a1.sources.r1.positionFile = /export/data/flume/user_event_position.json#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = up01:9092
a1.channels.c1.kafka.topic = xtzg_user_event
a1.channels.c1.parseAsFlumeEvent = false#组装 
a1.sources.r1.channels = c1
  • 在Kafka上创建topic(前提开启zk,kafka)
cd /export/server/kafkabin/kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_user_event --partitions 1 --replication-factor 1
  • 启动Flume
cd /export/server/flumebin/flume-ng agent -n a1 -c conf/ -f conf/user_event_to_kafka.conf
  • 查看Kafka中的数据
cd /export/server/kafkabin/kafka-console-consumer.sh --bootstrap-server up01:9092 --from-beginning --topic xtzg_user_event
  • 启动

运行python中的EventSimulationJsonData.py。查看kafka中数据变化,如果看到新增数据则配置成功。确认无误后关停Flume采集任务。

二、Nginx日志数据统计

1、日志格式说明

​ Nginx(发音 恩几可使)是异步框架的网页服务器,也可以用作反向代理、负载平衡器和HTTP缓存。该软件由俄罗斯程序员伊戈尔·赛索耶夫(Игорь Сысоев)开发并于2004年首次公开发布

  • Nginx日志包含access_logerror_log两种类型日志数据。项目中分析的数据为:access_log
  • Nginx开源官网:https://nginx.org/
  • 项目采集Nginx数据格式。以下为一条Nginx日志:
116.85.48.25 - - [12/Nov/2024:11:36:46 +0800] "GET /login.html HTTP/1.1" 404 729 "https://xtx.itcast.cn/referAFriend.html" "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.18(0x17001233) NetType/WIFI Language/zh_CN" "-"Nginx日志格式说明:116.85.48.25: 用户访问IP地址- - : 用户标识(cookie信息)[14/Jul/2022:17:40:41 +0800]:  访问时间 + 时区GET : 请求方式/css/40.30d6d2b.css: 请求资源HTTP/1.1 : 请求的协议500 : 请求的状态码 (500 服务器错误,  200 成功  302 重定向  404 访问到未知资源)951 : 响应返回的字节大小"https://www.htv.com/official/component?WT.mc_id=3" : 来源的URL(从那个地方跳转到此页面)"Mozilla/5......:  浏览器标识

2、数据ETL

2.1 日志抽取

2.1.1 正则表达式
Java版本:
(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]Python版本:
(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"
2.1.2 基于Spark实现Nginx数据匹配

代码实现:

from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("nginx_etl")\.master("local[*]")\.config("spark.sql.shuffle.partitions",2)\.getOrCreate()# 2- 数据输入:读取Kafka中的数据""""startingOffsets","earliest":该配置,在实际工作中一般不需要配置。这里是为了开发代码方便"""init_df = spark.readStream.format("kafka")\.option("kafka.bootstrap.servers","192.168.88.166:9092")\.option("subscribe","xtzg_nginx_log")\.option("startingOffsets","earliest")\.load()# 结构化流中不能以show()方式打印数据数据内容# init_df.show()# 3- 数据ETL处理# 3.1- value字段解码的操作"""cast(StringType()):将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)下面两种方式都可以,推荐使用第一种,因为性能更好"""# type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))type_cast_df = init_df.selectExpr("cast(value as string) as value")# 3.2- 通过正则表达式提取Nginx的字段pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'regexp_df = type_cast_df.select(F.regexp_extract("value",pattern,1).alias("ip"),F.regexp_extract("value",pattern,3).alias("datetime"),F.regexp_extract("value",pattern,4).alias("t1"),F.regexp_extract("value",pattern,5).alias("request"),F.regexp_extract("value",pattern,6).alias("url"),F.regexp_extract("value",pattern,7).alias("protocol"),F.regexp_extract("value",pattern,8).alias("code"),F.regexp_extract("value",pattern,9).alias("sendbytes"),F.regexp_extract("value",pattern,10).alias("refferer"),F.regexp_extract("value",pattern,11).alias("useragent"),F.regexp_extract("value",pattern,12).alias("proxyaddr"))# 4- 数据输出,启动流式任务regexp_df.writeStream.format("console").outputMode("append").start().awaitTermination()

运行结果截图:
在这里插入图片描述

可能遇到的错误:
在这里插入图片描述

原因: regexp_extract函数只能传递Java版的正则表达式,不能用Python的

2.2 字段解析

需求:根据nginx日志,ip标识唯一的用户,需要ip分组,统计得到用户访问的pv、uv、区域、状态码、终端设备的操作系统、设备品牌、浏览器、访问时间(年-月-日 时:分:秒)

2.2.1 日期格式转换

Python的datetime函数库

  • 相关函数:
    • strftime(): 把日期对象转成指定的时间格式的字符串
    • strptime(): 把指定格式的日期字符串转换为日期对象
  • 参考文档: https://docs.python.org/zh-cn/3/library/datetime.html#strftime-strptime-behavior
  • 解析格式: %d/%b/%Y:%H:%M:%S %z => %Y-%m-%d %H:%M:%S
    • 28/Jul/2022:16:22:07 +0800 => 日期对象 => 2022-07-28 16:22:07
  • 测试代码

Python方式

from datetime import datetimeif __name__ == '__main__':date_str = "11/Feb/2025:14:34:49 +0800"print(datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S %z").strftime("%Y-%m-%d %H:%M:%S"))

SparkSQL方式(重点掌握)

regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime","dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))

在这里插入图片描述

2.2.2 IP解析地理位置(了解)

根据IP解析地理位置

  • 方式一: 使用ip解析地理位置API
    • ip地址:http://opendata.baidu.com/api.php?query=117.136.12.79&co=&resource_id=6006&oe=utf8
    • 像百度地图开发平台 / 高德地图开放平台 … 都会提供IP解析的服务接口
    • 百度地图:https://lbs.baidu.com/faq/api?title=webapi/ip-api-base
    • 高德地图:https://lbs.amap.com/api/webservice/guide/api/ipconfig
    • 其他平台:https://www.nowapi.com/
  • 方式二: (了解)使用geo_ip依赖包和GeoLite2-City.mmdb库
    • 依赖包:geoip2~=4.5.0
    • 下载地址:https://gitcode.com/crownp/geolite2_demo/blob/master/src/main/resources/GeoLite2-City.mmdb
  • IP在线解析测试代码

Python的Requests库的介绍:https://requests.readthedocs.io/en/latest/

#!/usr/bin/env python
# @desc : 
__coding__ = "utf-8"
__author__ = "bytedance"import requestsdef parse_ip(ip_str):params = {"query": ip_str,"co": "","resource_id": "6006","oe": "utf8",}# 发送请求response = requests.get(url="https://opendata.baidu.com/api.php", params=params)# 解析响应内容result = response.json()status = result['status']if status == '0':# 正常try:return result['data'][0]['location'].split(" ")[0]except:return "未知区域"else:return "未知区域"if __name__ == '__main__':ip_str = "127.0.0.1"ip_str = "10.254.1.97"ip_str = "157.148.69.76"area = parse_ip(ip_str)print(area)
2.2.3 UA解析

UA说明

  • UA为useragent简称,特指用户访问系统使用的客户端信息,一般包含操作系统,浏览器,设备品牌信息等
  • UA字符串信息:http://useragentstring.com/
  • 使用,需导入UA解析依赖包:from user_agents import parse
  • UA的作用
    • 1.客户端识别:通过User-Agent,服务器能够识别客户端的类型和版本,从而提供相应的内容和服务。比如,在移动设备上展示适合屏幕大小的网页布局,或在不同浏览器上提供兼容性优化。
    • 2.统计分析:网站和应用开发者可以利用User-Agent来收集客户端的信息,进行用户行为分析和统计。这有助于了解用户使用的设备和偏好,以便进行产品和服务的改进。
    • 3.安全性:User-Agent也可以用于安全验证和防止恶意行为。通过分析User-Agent,服务器可以检测到异常或伪造的请求,并采取相应的安全措施。
  • 测试代码:
from user_agents import parseif __name__ == '__main__':ua_str = "Mozilla/5.0 (iPhone; CPU iPhone OS 13_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0 MQQBrowser/11.0.7 Mobile/15E148 Safari/604.1 QBWebViewUA/2 QBWebViewType/1 WKType/1"result = parse(ua_str)# os操作系统信息print("os----------")print(result.os.family)print(result.os.version)print(result.os.version_string)# brower浏览器信息print("browser----------")print(result.browser.family)print(result.browser.version)print(result.browser.version_string)# device设备信息print("device----------")print(result.device.family)print(result.device.model)

2.3 完整代码

需要将结果数据同时写入到Kafka和HDFS。清洗后的日志,可以用于其他业务分析,具有一定的价值。因为Kafka不能永久保存数据,所以需要把数据存储到HDFS一份。

因为每天都有很多日志,所以需要对日志进行分区。可以通过partitionBy()方法进行分区写入到HDFS。分区的字段需要进行计算。

另外,为了减少小文件生成,可以使用trigger来指定写入的时间间隔。

  • 先创建Kafka的Topic
cd /export/server/kafka/bin
./kafka-topics.sh --create --bootstrap-server up01:9092 --topic dwd_nginx_etl_result
  • 完整代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, MapType
import requests
from user_agents import parseos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("nginx_etl")\.master("local[*]")\.config("spark.sql.shuffle.partitions",2)\.getOrCreate()# 配置checkpointLocation路径,推荐使用HDFS路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")# 2- 数据输入:读取Kafka中的数据""""startingOffsets","earliest":该配置,在实际工作中一般不需要配置。这里是为了开发代码方便"""init_df = spark.readStream.format("kafka")\.option("kafka.bootstrap.servers","192.168.88.166:9092")\.option("subscribe","xtzg_nginx_log")\.option("startingOffsets","earliest")\.load()# 结构化流中不能以show()方式打印数据数据内容# init_df.show()# 3- 数据ETL处理# 3.1- value字段解码的操作"""cast(StringType()):将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)下面两种方式都可以,推荐使用第一种,因为性能更好"""# type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))type_cast_df = init_df.selectExpr("cast(value as string) as value")# 3.2- 通过正则表达式提取Nginx的字段pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'# 这里不允许使用Python正则表达式,只能使用Java正则表达式# pattern = '(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"'regexp_df = type_cast_df.select(F.regexp_extract("value",pattern,1).alias("ip"),F.regexp_extract("value",pattern,3).alias("datetime"),F.regexp_extract("value",pattern,4).alias("t1"),F.regexp_extract("value",pattern,5).alias("request"),F.regexp_extract("value",pattern,6).alias("url"),F.regexp_extract("value",pattern,7).alias("protocol"),F.regexp_extract("value",pattern,8).alias("code"),F.regexp_extract("value",pattern,9).alias("sendbytes"),F.regexp_extract("value",pattern,10).alias("refferer"),F.regexp_extract("value",pattern,11).alias("useragent"),F.regexp_extract("value",pattern,12).alias("proxyaddr"))# 3.3- 日期时间格式转换datetime_df = regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime","dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))# 3.4- IP地理位置解析@F.udf(returnType=StringType())def parse_ip(ip_str):params = {"query": ip_str,"co": "","resource_id": "6006","oe": "utf8",}# 发送请求response = requests.get(url="https://opendata.baidu.com/api.php", params=params)# 解析响应内容result = response.json()status = result['status']if status == '0':# 正常try:return result['data'][0]['location'].split(" ")[0]except:return "未知区域"else:return "未知区域"area_df = datetime_df.withColumn("area",parse_ip("ip"))# 3.5- UA解析"""为什么这里用户自定义函数推荐返回字典?方便后续取值"""@F.udf(returnType=MapType(keyType=StringType(),valueType=StringType()))def parse_ua(ua_str):result = parse(ua_str)os = result.os.familybrowser = result.browser.familydevice = result.device.modelreturn {"os":os,"browser":browser,"device":device}ua_df = area_df.withColumn("os",parse_ua("useragent")['os'])\.withColumn("browser", parse_ua("useragent")['browser'])\.withColumn("device", parse_ua("useragent")['device'])# 4- 数据输出,启动流式任务# 4.1- 输出到HDFS# 新增一个分区字段dt_df = ua_df.withColumn("dt",F.split("datetime"," ")[0])# partitionBy表示按照哪个字段进行分区dt_df.writeStream.format("orc").partitionBy("dt")\.option("path","hdfs://192.168.88.166:8020/xtzg/etl/dwd_nginx_etl_result")\.start()# 4.2- 输出到Kafka# 注意:一般将数据内容转换为JSON格式输出到Kafka中,为了后续使用方便# 注意:输出到Kafka中的字段名称只能叫valuekafka_df = ua_df.select(F.to_json(F.struct("ip","datetime","t1","request","url","protocol","code","sendbytes","refferer","useragent","proxyaddr","area","os","browser","device")).alias("value"))kafka_df.writeStream.format("kafka")\.option("kafka.bootstrap.servers","192.168.88.166:9092")\.option("topic","dwd_nginx_etl_result")\.start()# 4.3- 输出到控制台(为了测试)# awaitTermination()只能加在最后一个start()的后面dt_df.writeStream.format("console").outputMode("append").start().awaitTermination()

可能遇到的错误一:
在这里插入图片描述

原因: 结构化流中将数据输出到文件系统中,需要配置checkpointLocation

可能遇到的错误二:
在这里插入图片描述

原因: 输出到Kafka中的字段名称只能叫value

2.4 使用Hive读取HDFS数据

  • 创建表
CREATE external TABLE dwd.dwd_nginx_etl_result (ip string,`datetime` string,t1 string,request string,url string,protocol string,code string,sendbytes string,refferer string,useragent string,proxyaddr string,area string,os string,browser string,device string
)COMMENT 'nginx日志'PARTITIONED BY (dt string)STORED AS ORCLOCATION '/xtzg/etl/dwd_nginx_etl_result'TBLsql ('orc.compress' = 'SNAPPY')
;
  • 同步分区
MSCK REPAIR TABLE dwd.dwd_nginx_etl_result;

3、指标统计

  • 需求
统计实时请求总数(pv)
统计用户数(uv)
统计用户访问所在区域省(类似抖音的位置显示)
统计用户响应状态码
统计用户使用设备终端信息
统计用户操作系统信息
统计用户首次访问系统的时间
统计用户末次访问系统的时间ip: 用户访问系统的唯一地址
pv:访问系统的页面次数
uv:访问系统的用户数
area:访问系统用户来自的区域,根据ip解析出地址位置
status_code:访问系统用户请求http协议响应状态码
device_os:设备终端,从ua中提取手机或电脑的系统
device_brand:设备品牌名称,从ua中提取手机或电脑的品牌
browser_name:访问系统用户使用的浏览器名称
first_access_time:用户首次访问系统的时间
last_access_time:用户首次访问系统的时间
  • Doris建表语句

使用unique模型。

CREATE DATABASE IF NOT EXISTS log_analysis_db;
CREATE TABLE IF NOT EXISTS log_analysis_db.nginx_log_result
(ip varchar(15) comment 'ip地址',pv int comment 'pv数',uv int comment 'uv数',area varchar(50) comment '用户所在区域,根据ip解析',status_code varchar(10) comment '请求响应状态码',device_os varchar(50) comment '设备系统,从ua中提取手机或电脑使用的系统',device_brand varchar(50) comment ',从ua中提取手机或电脑的品牌',browser_name varchar(50) comment '电脑和手机,使用浏览器,记录浏览器简称',first_access_time datetime comment 'nginx日志记录首次访问时间',last_access_time datetime comment 'nginx日志记录末次访问时间'
)
UNIQUE KEY(ip)
DISTRIBUTED BY HASH(ip) BUCKETS 10
sql("replication_num" = "1");
  • 完整代码
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder \.appName("nginx_analysis") \.master("local[*]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()# 2- 数据输入:读取Kafka中的数据init_df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("subscribe", "dwd_nginx_etl_result") \.option("startingOffsets", "earliest") \.load()# 3- 数据处理# 3.1- value字段类型转换type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))# 3.2- 从JSON中提取一个个字段"""json_tuple与get_json_object的区别get_json_object:优点:同时能够解析嵌套的JSON缺点:一次只能得到一个字段json_tuple:优点:一次能得到多个字段缺点:针对嵌套JSON,只能一层层解析"""parse_json_df = type_cast_df.select(F.json_tuple("value","ip","datetime","code","area","os","browser","device")\.alias("ip","datetime","status_code","area","device_os","browser_name","device_brand"))# 3.3- 指标统计# F.lit(1)生成一列,每行的数据内容一样,全都是1。与F.col函数作用类似# 因为类似area的这些字段的数据类型是字符串,聚合函数没有太适合的,因此使用firstresult_df = parse_json_df.groupBy("ip").agg(F.count("ip").alias("pv"),F.lit(1).alias("uv"),F.first("area").alias("area"),F.first("status_code").alias("status_code"),F.first("device_os").alias("device_os"),F.first("device_brand").alias("device_brand"),F.first("browser_name").alias("browser_name"),F.min("datetime").alias("first_access_time"),F.max("datetime").alias("last_access_time"))# 4- 数据输出# 4.1- 输出到Dorisdef write_2_doris(batch_df:DataFrame, batch_id):"""将DataFrame输出到Doris中:param batch_df: 有界的DataFrame:param batch_id: 批次ID:return:"""# 注意:一般先用append。如果明确知道要怎么做,那可以再使用overwritebatch_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/log_analysis_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="nginx_log_result",mode="append",sql={ 'user' : 'root', 'password' : '123456' })result_df.writeStream.foreachBatch(write_2_doris).outputMode("update").start()# 4.2- 输出到控制台result_df.writeStream.format("console").outputMode("update").start().awaitTermination()
  • 结果数据核对
./kafka-console-producer.sh --broker-list up01:9092 --topic dwd_nginx_etl_result
{"ip":"210.27.147.62","cookie":"- - [","datetime":"2024-11-14 11:11:11","t1":"] \"","request":"GET","url":"/search.html","protocol":"HTTP/1.1","code":"401","sendbytes":"58840","refferer":"https://www.douyin.com/goods-recommend/search.html?keyword=美味\"","useragent":"Mozilla/5.0 (Linux; U; Android 9; zh-CN; MI 9 Build/PKQ1.181121.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 UCBrowser/13.1.6.1096 Mobile Safari/537.36","proxyaddr":"-","area":"广东省广州市","os":"Android","browser":"UC Browser","device":"XiaoMi MI 9","dt":"2024-11-12"}

1、尝试进行用户行为日志的数据ETL、指标统计

提示:核心是如何解析JSON格式,得到一个个独立的字段

相关文章:

day09_实时类标签/指标

文章目录 day09_实时类标签/指标一、日志数据实时采集2、Flume简介2.3 项目日志数据采集Flume配置2.3.1 涉及的Flume组件和参数2.3.2 Nginx日志采集2.3.3 用户行为日志采集 二、Nginx日志数据统计1、日志格式说明2、数据ETL2.1 日志抽取2.1.1 正则表达式2.1.2 基于Spark实现Ngi…...

排序算法的魔法世界:用C语言揭开数据排列的奥秘

当数据开始跳集体舞:排序的意义 想象你面前有一群调皮的数字精灵在开派对,7和3在跳探戈,9和1在玩捉迷藏,5和2在抢蛋糕。这时候就需要排序算法这位神奇的派对管家出场了!它像音乐指挥家一样挥动魔棒,让所有数字精灵乖乖排成整齐的队伍。在计算机的世界里,排序算法就是处…...

网页模板免费HTML源码 HTML网页设计模板

在现代网站开发中&#xff0c;拥有一个美观且功能齐全的网页模板是至关重要的。对于许多开发者和设计师来说&#xff0c;获取高质量的免费HTML源码和网页设计模板可以大大简化开发流程。本文将探讨网页模板免费HTML源码的资源、优势以及如何有效利用这些模板。 什么是网页模板…...

Python实现语音识别详细教程【2025】最新教程

文章目录 前言一、环境搭建1. 下载 Python2. 安装 Python3 使用 pip 安装必要的库 二、使用 SpeechRecognition 库进行语音识别1.识别本地音频文件2.实时语音识别3. 使用其他语音识别引擎 注意事项 前言 以下是一份较为完整的 Python 语音识别教程&#xff0c;涵盖环境搭建、使…...

与传统光伏相比 城电科技的光伏太阳花有什么优势?

相比于传统光伏&#xff0c;城电科技的光伏太阳花有以下优势&#xff1a; 一、发电效率方面 智能追踪技术&#xff1a;光伏太阳花通过内置的智能追踪系统&#xff0c;采用全球定位跟踪算法&#xff0c;能够实时调整花瓣&#xff08;即光伏板&#xff09;的角度&#xff0c;确…...

Qt——连接MySQL数据库之ODBC的方法详细总结(各版本大同小异,看这一篇就够了)

【系列专栏】:博主结合工作实践输出的,解决实际问题的专栏,朋友们看过来! 《项目案例分享》 《极客DIY开源分享》 《嵌入式通用开发实战》 《C++语言开发基础总结》 《从0到1学习嵌入式Linux开发》 《QT开发实战》 《Android开发实战》 《实用硬件方案设计》 《结构建模设…...

Python的那些事第二十二篇:基于 Python 的 Django 框架在 Web 开发中的应用研究

基于 Python 的 Django 框架在 Web 开发中的应用研究 摘要 Django 是一个基于 Python 的高级 Web 框架,以其开发效率高、安全性和可扩展性强等特点被广泛应用于现代 Web 开发。本文首先介绍了 Django 的基本架构和核心特性,然后通过一个实际的 Web 开发项目案例,展示了 Dj…...

pytest测试专题 - 1.3 测试用例发现规则

<< 返回目录 1 pytest测试专题 - 1.3 测试用例发现规则 执行pytest命令时&#xff0c;可以不输入参数&#xff0c;或者只输入文件名或者目录名&#xff0c;pytest会自己扫描测试用例。那pytest基于什么规则找到用例呢&#xff1f; 文件名&#xff1a;满足文件名称为tes…...

【Bluedroid】 BLE连接源码分析(一)

BLE链接过程分析见【Bluedroid】BLE连接过程详解-CSDN博客,本篇主要围绕HCI_LE_Create_Connection展开。基于Android14源码进行分析。在蓝牙低功耗技术中,设备之间建立连接是进行数据传输等操作的前提。HCI LE Extended Create Connection Command 提供了一种更灵活、功能更丰…...

Unity DeepSeek API 聊天接入教程(0基础教学)

Unity DeepSeek API 聊天接入教程(0基础教学) 1.DeepSeek 介绍 DeepSeek是杭州深度求索人工智能基础技术研究有限公司推出的一款大语言模型。2025年1月20日&#xff0c;DeepSeek-R1正式上线&#xff0c;和当前市面上的主流AI相比&#xff0c;它在仅有极少标注数据的情况下&am…...

【16届蓝桥杯寒假刷题营】第1期DAY4

4.可达岛屿的个数 - 蓝桥云课 题目背景 在一个神奇的魔法世界中&#xff0c;有一座古老的迷幻之城。迷幻之城被分成 n 个鸟屿&#xff0c;编号从 1 到 n&#xff0c;共有 m 座桥。迷幻之城的居民们希望能够建立起紧密的联系&#xff0c;每个岛屿上的居民都想知道自己最多能到…...

Flink提交pyflink任务

1.官方文档&#xff1a; flink1.14:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs flink1.18:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs 2.提…...

大语言模型中one-hot编码和embedding之间的区别?

1. 维度与稀疏性 One-Hot编码 定义&#xff1a;每个词被表示为一个高维稀疏向量&#xff0c;维度等于词汇表大小。例如&#xff0c;词汇表有10,000个词&#xff0c;每个词对应一个10,000维的向量&#xff0c;其中仅有一个位置为1&#xff08;表示当前词&#xff09;&#xff0…...

CAN学习记录

CAN(Controller Area Network),是ISO国际标准化的串行通信协议&#xff0c;为了满足汽车产业的“减少线束的数量”、“通过多个LAN&#xff0c;进行大量数据的高速通信”的需求 低速CAN&#xff08;ISO11519)通信速率10~125kbps&#xff0c;总线长度可达1000米 高速CAN&#…...

滑动窗口算法篇:连续子区间与子串问题

1.滑动窗口原理 那么一谈到子区间的问题&#xff0c;我们可能会想到我们可以用我们的前缀和来应用子区间问题&#xff0c;但是这里对于子区间乃至子串问题&#xff0c;我们也可以尝试往滑动窗口的思路方向去进行一个尝试&#xff0c;那么说那么半天&#xff0c;滑动窗口是什么…...

机器翻译同样的文本,是从英语翻译成日语更准确还是中文翻译成日语更准确

在大多数情况下&#xff0c;从英语翻译成日语会比从中文翻译成日语更准确&#xff0c;原因如下&#xff1a; 1. 语言结构的相似性 英语和日语的句子结构更接近&#xff0c;特别是在语法、从句使用、定语位置等方面。例如&#xff0c;日语和英语都使用 SVO 结构&#xff08;主…...

MybatisMybatisPllus公共字段填充与配置逻辑删除

Mybatis/MybatisPllus公共字段填充与配置逻辑删除 在开发过程中&#xff0c;很多时候需要处理一些公共字段&#xff0c;例如&#xff1a;创建时间、修改时间、状态字段等。这些字段通常会在插入或更新数据时进行填充&#xff0c;以便记录数据的变化和状态。同时&#xff0c;逻…...

001-监控你的文件-FSWatch-C++开源库108杰

fswatch 原理与应用简介fswatch 安装fswatch 实践应用具体应用场景与细节补充 1. 简介 有些知识&#xff0c;你知道了不算厉害&#xff0c;但你要是不知道&#xff0c;就容易出乱。 很多时候&#xff0c;程序需要及时获取磁盘上某个文件对象&#xff08;文件夹、文件&#xff0…...

SpringMVC环境搭建

文章目录 1.模块创建1.创建一个webapp的maven项目2.目录结构 2.代码1.HomeController.java2.home.jsp3.applicationContext.xml Spring配置文件4.spring-mvc.xml SpringMVC配置文件5.web.xml 配置中央控制器以及Spring和SpringMVC配置文件的路径6.index.jsp 3.配置Tomcat1.配置…...

ESXi安装【真机和虚拟机】(超详细)

项目简介&#xff1a; ESXi&#xff08;Elastic Sky X Integrated&#xff09;是VMware公司开发的一种裸机虚拟化管理程序&#xff0c;允许用户在单一物理服务器上运行多个虚拟机&#xff08;VM&#xff09;。它直接安装在服务器硬件上&#xff0c;而不是操作系统之上&#xff…...

学习笔记之debian的thonny开发(尚未验证)--从stm32裸机到linux嵌入式系统

这应该算 stm32裸机用户 转 linux嵌入式系统 的入门学习笔记。 【鲁班猫】39-vnc远程桌面连接鲁班猫_哔哩哔哩_bilibili 本集的鲁班猫的视频介绍中&#xff0c;没有清晰明确指出需要linux开发板接入网络&#xff0c;接入网络可以使用有线网口或者wifi路由&#xff0c;有些提示…...

React常用库

React 生态系统非常丰富&#xff0c;有许多常用的库可以帮助开发者更高效地构建应用。以下是一些常见的 React 库及其用途&#xff1a; --- ### 1. **状态管理** - **Redux** 最流行的全局状态管理库&#xff0c;适合中大型应用。 官网: https://redux.js.org/ - **…...

「软件设计模式」桥接模式(Bridge Pattern)

深入解析桥接模式&#xff1a;解耦抽象与实现的艺术 一、模式思想&#xff1a;正交维度的优雅解耦 桥接模式&#xff08;Bridge Pattern&#xff09;通过分离抽象&#xff08;Abstraction&#xff09;与实现&#xff08;Implementation&#xff09;&#xff0c;使二者可以独立…...

Python 用户输入和While循环(使用while 循环来处理列表和字典)

大多数程序都旨在解决最终用户的问题&#xff0c;为此通常需要从用户那里获取一些信息。例如&#xff0c;假设有人要判断自己是否到了投票的年龄&#xff0c;要编写回答这个问题的程序&#xff0c;就 需要知道用户的年龄&#xff0c;这样才能给出答案。因此&#xff0c;这种程序…...

docker 基础命令使用(ubuntu)

docker 状态查询 docker ps docker ps -adocker --version docker info docker --help docker run --help docker ps --help ...docker 操作镜像命令 docker imagesdocker rmi 镜像id/镜像名docker 操作容器命令 docker ps docker ps -adocker run 命令 # 端口映射 -p 参数…...

Jenkins 安装插件 二

Jenkins 安装插件 二 一. 打开 Dashboard 打开 Jenkins 界面&#xff0c;不管在任何界面&#xff0c;只需要点击左上角 Dashboard 按钮即可 二. 打开 Manage Jenkins 找到 Manage Jenkins -> System Configuration -> Plugins 点击 Plugins 打开界面如下 Updates&a…...

深入解析与解决 Oracle 报错:ORA-29275 部分多字节字符20250213

&#x1f6e0;️ 深入解析与解决 Oracle 报错&#xff1a;ORA-29275 部分多字节字符 引言 &#x1f31f; 在与 Oracle 数据库打交道的日常工作中&#xff0c;你是否遇到过 ORA-29275: partial multibyte character 这个令人头疼的错误&#xff1f;这个错误通常与字符编码、数…...

CI/CD(二)docker-compose安装Jenkins

1、docker-compose.yml version: 3.8services:jenkins:image: jenkins/jenkins:lts # 使用官方的 Jenkins LTS 镜像container_name: jenkinsuser: root # 如果需要以 root 用户运行ports:- "8080:8080" # Jenkins Web 界面端口- "50000:50000" # 用于 Jen…...

Windows环境安装部署minimind步骤

Windows环境安装部署minimind步骤 必要的软件环境 git git&#xff0c;可下载安装版&#xff0c;本机中下载绿色版&#xff0c;解压到本地目录下&#xff08;如&#xff1a;c:\soft\git.win64&#xff09;&#xff0c;可将此路径添加到PATH环境变量中&#xff0c;供其他程序…...

使用Node.js进行串口通信

目录 一、 安装 serialport 库二.、实现方法1.打开串口并配置参数2. 向串口传递信息3. 接收串口信息4. 处理错误5. 关闭串口6. 使用解析器7. 获取串口列表 三、 完整示例代码 一、 安装 serialport 库 首先&#xff0c;需要安装 serialport 库。可以通过 npm 安装&#xff1a;…...