将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3 纯python的经济方案)
前情:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
python脚本实现
厉害的小伙伴最终使用python脚本免费实现了这个功能
文件结构

以下是cursor对代码的评审意见:
这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:
1. 整体架构
项目采用了良好的模块化设计,主要包含以下组件:
- main.py: 主程序入口,协调整体流程
- azure_log_downloader.py: Azure blob 存储日志下载器
- elasticsearch_sender.py: Elasticsearch 数据发送器
- models.py: 数据模型定义
- log_config.py: 日志配置
- .env: 环境配置文件
2. 具体评审
main.py
优点:
- 支持日期范围处理
- 良好的错误处理和日志记录
- 模块化的设计,职责划分清晰
建议改进:
- 可以考虑添加命令行参数支持,使日期范围可配置
- 可以添加并发处理支持,提高处理速度
azure_log_downloader.py
优点:
- 清晰的类设计
- 良好的错误处理
建议改进:
- 可以添加重试机制
- 可以考虑支持异步操作
elasticsearch_sender.py
优点:
- 实现了批量处理
- 包含重试机制
- 良好的连接测试功能
建议改进:
- 可以考虑添加索引模板管理
- 可以添加更多的 ES 配置选项
models.py
优点:
- 使用 dataclass 简化了类定义
- 详细的字段解析逻辑
- 良好的类型注解
建议改进:
- 可以添加字段验证
- 可以考虑使用 Pydantic 进行数据验证
log_config.py
优点:
- 完善的日志配置
- 支持文件和控制台输出
建议改进:
- 可以考虑添加日志轮转
- 可以添加日志级别配置选项
3. 安全性建议
- 建议使用更安全的方式存储敏感信息(如 API keys)
- 建议添加 SSL 证书验证
- 建议添加请求速率限制
4. 性能建议
- 考虑添加并发处理
- 可以实现增量更新机制
- 考虑添加缓存机制
5. 可维护性建议
- 添加单元测试
- 添加更详细的文档
- 考虑添加监控指标
- 添加 CI/CD 配置
总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。
具体脚本
1. azure_log_downloader.py:
from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLogclass AzureLogDownloader:def __init__(self, connection_string, container_name, blob_name, logger=None):self.logger = loggerself.connection_string = connection_stringself.container_name = container_nameself.blob_name = blob_namedef _get_blob_client(self):blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)blob_client = blob_service_client.get_blob_client(container=self.container_name, blob=self.blob_name)return blob_clientdef download_and_transform(self):"""Download and transform log data from Azure storage"""try:blob_client = self._get_blob_client()if not blob_client.exists():self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")return []blob_data = blob_client.download_blob().readall().decode('utf-8')transformed_entries = []for line in blob_data.splitlines():if line.strip():try:log_entry = json.loads(line)log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)if log_obj:transformed_entries.append(log_obj)except json.JSONDecodeError as e:self.logger.error(f"Error parsing line: {str(e)}")continueself.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")return transformed_entriesexcept Exception as e:self.logger.error(f"Error downloading blob: {str(e)}")self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")self.logger.error(f"Error type: {type(e).__name__}")return []
2. elasticsearch_sender.py:
from elasticsearch import Elasticsearch, helpers
import time
import uuidclass ElasticsearchSender:def __init__(self, host, api_key=None, index_name="logs", logger=None):self.logger = loggerself.config = {'hosts': host,'timeout': 30,'retry_on_timeout': True,'max_retries': 3,'verify_certs': False,'ssl_show_warn': False,'use_ssl': True}if api_key:self.config['api_key'] = api_keyself.index_name = index_nameself.es = Elasticsearch(**self.config)def test_connection(self):"""Test Elasticsearch connection"""try:info = self.es.info()self.logger.info("\nElasticsearch Server Info:")self.logger.info(f"Version: {info['version']['number']}")self.logger.info(f"Cluster Name: {info['cluster_name']}")return Trueexcept Exception as e:self.logger.error(f"\nElasticsearch connection failed: {str(e)}")return Falsedef send_logs(self, log_entries, batch_size=500, max_retries=3):"""Send logs to Elasticsearch"""def generate_actions():for entry in log_entries:doc_data = entry.__dict__.copy()if 'time' in doc_data:doc_data['@timestamp'] = doc_data.pop('time')action = {'_index': self.index_name,'_id': str(uuid.uuid4()),'_source': doc_data}yield actionsuccess_count = 0failure_count = 0retry_count = 0while retry_count < max_retries:try:success, failed = helpers.bulk(self.es,generate_actions(),chunk_size=batch_size,raise_on_error=False,raise_on_exception=False)success_count += successfailure_count += len(failed) if failed else 0self.logger.info(f"\nBatch processing results:")self.logger.info(f"- Successfully indexed: {success_count} documents")self.logger.info(f"- Failed: {failure_count} documents")if not failed:breakretry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)except Exception as e:self.logger.error(f"\nBulk indexing error: {str(e)}")retry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)else:self.logger.info("Maximum retry attempts reached")breakreturn success_count, failure_count
3. log_config.py:
import logging
import os
from datetime import UTC, datetimedef setup_logger(target_date: datetime = None, log_prefix: str = "app"):base_dir = os.path.dirname(os.path.abspath(__file__))log_dir = os.path.join(base_dir, 'logs')if not os.path.exists(log_dir):os.makedirs(log_dir)current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')logger = logging.getLogger('AccessLog')logger.setLevel(logging.INFO)file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return logger
4. models.py:
from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional@dataclass
class StorageOperationLog:time: datetimecategory: Optional[str]operationName: Optional[str]callerIpAddress: Optional[str]location: Optional[str]uri: Optional[str]durationMs: Optional[int]referrerHeader: Optional[str]userAgentHeader: Optional[str]requestBodySize: Optional[int]responseBodySize: Optional[int]serverLatencyMs: Optional[int]objectKey: Optional[str]functionName: Optional[str]file_extension: Optional[str]@staticmethoddef parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:"""Parse objectKey to get institution_id and functionName"""try:container_match = re.search(r'container-(\d+)', object_key)parts = object_key.split('/')function_name = Noneif container_match:container_index = next((i for i, part in enumerate(parts) if 'container-' in part), None)if container_index is not None and container_index + 1 < len(parts):function_name = parts[container_index + 1]file_extension = Noneif parts and '.' in parts[-1]:file_extension = parts[-1].split('.')[-1].lower()return function_name, file_extensionexcept Exception as e:if logger:logger.error(f"Error parsing object_key {object_key}: {str(e)}")return None, None@classmethoddef from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:"""Create StorageOperationLog instance from raw log entry"""try:properties = entry.get('properties', {})object_key = properties.get('objectKey', '')function_name, file_extension = cls.parse_object_key(object_key)return cls(time=entry.get('time'),category=entry.get('category'),operationName=entry.get('operationName'),callerIpAddress=entry.get('callerIpAddress'),location=entry.get('location'),uri=entry.get('uri'),durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,referrerHeader=properties.get('referrerHeader'),userAgentHeader=properties.get('userAgentHeader'),requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,objectKey=object_key,functionName=function_name,file_extension=file_extension)except Exception as e:if logger:logger.error(f"Error creating StorageOperationLog: {str(e)}")return Nonedef __post_init__(self):if isinstance(self.time, str):if 'Z' in self.time:time_parts = self.time.split('.')if len(time_parts) > 1:microseconds = time_parts[1].replace('Z', '')[:6]time_str = f"{time_parts[0]}.{microseconds}Z"self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")else:self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")
5. main.py:
from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import osload_dotenv()def _get_index_name(target_date: datetime):"""Get full index name for the specified date"""return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(year=target_date.year,month=target_date.month)def _get_blob_name_list(target_date: datetime):"""Get blob paths for all hours of the specified date"""blobs = []for hour in range(24):blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(year=blob_time.year,month=blob_time.month,day=blob_time.day,hour=blob_time.hour)blobs.append(blob_name)return blobsdef main():start_date = datetime(2024, 1, 1, tzinfo=UTC)end_date = datetime(2024, 1, 2, tzinfo=UTC)current_date = start_datewhile current_date <= end_date:target_date = current_datelogger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))try:logger.info(f"\nProcessing data for {current_date.date()}")elasticsearch_index = _get_index_name(target_date)sender = ElasticsearchSender(os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),os.getenv('ELASTICSEARCH_API_KEY'),elasticsearch_index,logger)if not sender.test_connection():logger.error("Elasticsearch connection failed")current_date += timedelta(days=1)continuetotal_logs = total_success = total_failed = 0blobs = _get_blob_name_list(target_date)for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):logger.info(f"\nProcessing container: {container}")for blob_name in blobs:logger.info(f"\nProcessing blob: {blob_name}")downloader = AzureLogDownloader(os.getenv('AZURE_STORAGE_URI'),container,blob_name,logger)try:log_entries = downloader.download_and_transform()success, failed = sender.send_logs(log_entries)total_logs += len(log_entries)total_success += successtotal_failed += failedexcept Exception as e:logger.error(f"Error processing {blob_name}: {str(e)}")continuelogger.info(f"\n{current_date.date()} Processing completed:")logger.info(f"Total documents processed: {total_logs}")logger.info(f"Successfully indexed: {total_success}")logger.info(f"Failed: {total_failed}")finally:for handler in logger.handlers[:]:handler.close()logger.removeHandler(handler)current_date += timedelta(days=1)if __name__ == "__main__":main()
6. .env :
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app
前情后续:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3)-CSDN博客
相关文章:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3 纯python的经济方案)
前情: 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客 python脚本实现 厉害的小伙伴最终使用python脚本免费…...
1️⃣Java中的集合体系学习汇总(List/Map/Set 详解)
目录 01. Java中的集合体系 02. 单列集合体系 1. Collection系列集合的遍历方式 (1)迭代器遍历(2)增强for遍历编辑(3)Lambda表达式遍历 03.List集合详解 04.Set集合详解 05.总结 Collection系列…...
闪豆多平台视频批量下载器
1. 视频链接获取与解析 首先,在哔哩哔哩网页中随意点击一个视频,比如你最近迷上了一个UP主的美食制作视频,想要下载下来慢慢学。点击视频后,复制视频页面的链接。复制完成后,不要急着关闭浏览器,因为接下来…...
深入解析:如何用Java爬取淘宝分类详情接口(cat_get)
一、前言 淘宝分类详情接口(cat_get)是淘宝开放平台提供的一个接口,允许开发者获取淘宝商品的分类详情,包括分类ID、分类名称、父分类等信息。这些数据对于电商分析、市场研究和商品分类管理等具有重要价值。本文将详细介绍如何使…...
语音识别的预训练模型
语音识别的预训练模型 语音识别模型 大致分为两类: 连接时序分类(Connectionist Temporal Classification, CTC):仅编码器(encoder-only)的模型,顶部带有线性分类(CTC)头序列到序列(Sequence-to-sequence, Seq2Seq):编码器-解码器(encoder-decoder)模型,编码器…...
element-ui制作多颜色选择器
将颜色存储到一个数组中去。 <template><div class"color-picker-container" style"margin-top: 10px;"><!-- 显示已选颜色 --><div class"color-selection"><divv-for"(color, index) in selectedColors"…...
JVM体系结构
目录 一. JVM 规范 二. JVM 实现 (1) HotSpot (2) JRockit (3) IBM JDK(J9 VM) (4) Azul Zing (5) OpenJ9 三. JVM 实现的选择 四. JVM 的核心组件 五.JVM总结 六.Java 虚拟机(JVM)架构概述 1.Java 虚拟机(…...
wandb使用遇到的一些问题
整合了一下使用wandb遇到的问题 1.请问下如果电脑挂了代理,应该怎么办呢?提示:Network error (ProxyError), entering retry loop. 在本地(而非服务器)运行代码时,常常因为开启代理而无法使用wandb&#…...
Java中的继承
引入继承 Java中使用类对实体进行描述,类经过实例化之后的产物对象,就可以用来表示现实中的实体,描述的事物错综复杂,事物之间可能会存在一些关联,因此我们就需要将他们共性抽取,面向对象的思想中提出了继…...
Cadence笔记--原理图导入PCB
1、以PMU6050为例,首先在原理图双击PMU6050器件,在PCB_Footprint目录填写PCB封装名称并且保存,如下图所示: 2、确保原理图命名的名称不一样,否则会出错,这里PMU6050更改了NC等名称,如下图所示&a…...
从AI生成内容到虚拟现实:娱乐体验的新边界
引言 在快速发展的科技时代,娱乐行业正经历一场前所未有的变革。传统的娱乐方式正与先进技术融合,创造出全新的沉浸式体验。从AI生成的个性化内容,到虚拟现实带来的身临其境的互动场景,科技不仅改变了我们消费娱乐的方式…...
【Linux】gdb_进程概念
📢博客主页:https://blog.csdn.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文由 JohnKi 原创,首发于 CSDN🙉 📢未来很长&#…...
安全类脚本:拒绝ssh暴力破解
要求如下: 一个小时内,连续密码错误4次。 Linux lastb 命令用于列出登入系统失败的用户相关信息。 实验过程如下: 1. 创建两个IP地址不同的干净环境,分别是:192.168.46.101 Rocky 2 和 192.168.46.120 openEuler 2. 2.…...
Android15源码编译问题处理
最近想在Raspberry Pi5上面运行自己编译的Android15镜像,参考如下链接来处理: GitHub - raspberry-vanilla/android_local_manifest GitHub - raspberry-vanilla/android_kernel_manifest 代码同步完后,编译就出问题了,总是提示: FAILED: analyzing Android.bp files and…...
图解Git——分布式Git《Pro Git》
分布式工作流程 Centralized Workflow(集中式工作流) 所有开发者都与同一个中央仓库同步代码,每个人通过拉取、提交来合作。如果两个开发者同时修改了相同的文件,后一个开发者必须在推送之前合并其他人的更改。 Integration-Mana…...
Linux内核编程(二十一)USB应用及驱动开发
一、基础知识 1. USB接口是什么? USB接口(Universal Serial Bus)是一种通用串行总线,广泛使用的接口标准,主要用于连接计算机与外围设备(如键盘、鼠标、打印机、存储设备等)之间的数据传输和电…...
什么是数据仓库?
什么是数据仓库? 数据仓库(Data Warehouse,简称DW)是一种面向分析和决策的数据存储系统,它将企业中分散的、异构的数据按照一定的主题和模型进行集成和存储,为数据分析、报表生成以及商业智能(…...
计算机网络 (48)P2P应用
前言 计算机网络中的P2P(Peer to Peer,点对点)应用是一种去中心化的网络通信模式,它允许设备(或节点)直接连接并共享资源,而无需传统的客户端-服务器模型。 一、P2P技术原理 去中心化架构&#…...
SK海力士(SK Hynix)是全球领先的半导体制造商之一,其在无锡的工厂主要生产DRAM和NAND闪存等存储器产品。
SK海力士(SK Hynix)是全球领先的半导体制造商之一,其在无锡的工厂主要生产DRAM和NAND闪存等存储器产品。以下是SK海力士的一些主要产品型号和类别: DRAM 产品 DDR4 DRAM 特点: 高速、低功耗,广泛应用于PC、服务器和移…...
FunASR 在Linux/Unix 平台编译
第一步拉取镜像并生成容器: ### 镜像启动 通过下述命令拉取并启动FunASR软件包的docker镜像: shell sudo docker pull \ registry.cn-hangzhou.aliyuncs.com/funasr_repo/funasr:funasr-runtime-sdk-online-cpu-0.1.12 mkdir -p ./funasr-runtime-…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...
ubuntu系统文件误删(/lib/x86_64-linux-gnu/libc.so.6)修复方案 [成功解决]
报错信息:libc.so.6: cannot open shared object file: No such file or directory: #ls, ln, sudo...命令都不能用 error while loading shared libraries: libc.so.6: cannot open shared object file: No such file or directory重启后报错信息&…...
基于小程序老人监护管理系统源码数据库文档
摘 要 近年来,随着我国人口老龄化问题日益严重,独居和居住养老机构的的老年人数量越来越多。而随着老年人数量的逐步增长,随之而来的是日益突出的老年人问题,尤其是老年人的健康问题,尤其是老年人产生健康问题后&…...
uniapp获取当前位置和经纬度信息
1.1. 获取当前位置和经纬度信息(需要配置高的SDK) 调用uni-app官方API中的uni.chooseLocation(),即打开地图选择位置。 <button click"getAddress">获取定位</button> const getAddress () > {uni.chooseLocatio…...
