将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3 纯python的经济方案)
前情:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
python脚本实现
厉害的小伙伴最终使用python脚本免费实现了这个功能
文件结构

以下是cursor对代码的评审意见:
这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:
1. 整体架构
项目采用了良好的模块化设计,主要包含以下组件:
- main.py: 主程序入口,协调整体流程
- azure_log_downloader.py: Azure blob 存储日志下载器
- elasticsearch_sender.py: Elasticsearch 数据发送器
- models.py: 数据模型定义
- log_config.py: 日志配置
- .env: 环境配置文件
2. 具体评审
main.py
优点:
- 支持日期范围处理
- 良好的错误处理和日志记录
- 模块化的设计,职责划分清晰
建议改进:
- 可以考虑添加命令行参数支持,使日期范围可配置
- 可以添加并发处理支持,提高处理速度
azure_log_downloader.py
优点:
- 清晰的类设计
- 良好的错误处理
建议改进:
- 可以添加重试机制
- 可以考虑支持异步操作
elasticsearch_sender.py
优点:
- 实现了批量处理
- 包含重试机制
- 良好的连接测试功能
建议改进:
- 可以考虑添加索引模板管理
- 可以添加更多的 ES 配置选项
models.py
优点:
- 使用 dataclass 简化了类定义
- 详细的字段解析逻辑
- 良好的类型注解
建议改进:
- 可以添加字段验证
- 可以考虑使用 Pydantic 进行数据验证
log_config.py
优点:
- 完善的日志配置
- 支持文件和控制台输出
建议改进:
- 可以考虑添加日志轮转
- 可以添加日志级别配置选项
3. 安全性建议
- 建议使用更安全的方式存储敏感信息(如 API keys)
- 建议添加 SSL 证书验证
- 建议添加请求速率限制
4. 性能建议
- 考虑添加并发处理
- 可以实现增量更新机制
- 考虑添加缓存机制
5. 可维护性建议
- 添加单元测试
- 添加更详细的文档
- 考虑添加监控指标
- 添加 CI/CD 配置
总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。
具体脚本
1. azure_log_downloader.py:
from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLogclass AzureLogDownloader:def __init__(self, connection_string, container_name, blob_name, logger=None):self.logger = loggerself.connection_string = connection_stringself.container_name = container_nameself.blob_name = blob_namedef _get_blob_client(self):blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)blob_client = blob_service_client.get_blob_client(container=self.container_name, blob=self.blob_name)return blob_clientdef download_and_transform(self):"""Download and transform log data from Azure storage"""try:blob_client = self._get_blob_client()if not blob_client.exists():self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")return []blob_data = blob_client.download_blob().readall().decode('utf-8')transformed_entries = []for line in blob_data.splitlines():if line.strip():try:log_entry = json.loads(line)log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)if log_obj:transformed_entries.append(log_obj)except json.JSONDecodeError as e:self.logger.error(f"Error parsing line: {str(e)}")continueself.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")return transformed_entriesexcept Exception as e:self.logger.error(f"Error downloading blob: {str(e)}")self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")self.logger.error(f"Error type: {type(e).__name__}")return []
2. elasticsearch_sender.py:
from elasticsearch import Elasticsearch, helpers
import time
import uuidclass ElasticsearchSender:def __init__(self, host, api_key=None, index_name="logs", logger=None):self.logger = loggerself.config = {'hosts': host,'timeout': 30,'retry_on_timeout': True,'max_retries': 3,'verify_certs': False,'ssl_show_warn': False,'use_ssl': True}if api_key:self.config['api_key'] = api_keyself.index_name = index_nameself.es = Elasticsearch(**self.config)def test_connection(self):"""Test Elasticsearch connection"""try:info = self.es.info()self.logger.info("\nElasticsearch Server Info:")self.logger.info(f"Version: {info['version']['number']}")self.logger.info(f"Cluster Name: {info['cluster_name']}")return Trueexcept Exception as e:self.logger.error(f"\nElasticsearch connection failed: {str(e)}")return Falsedef send_logs(self, log_entries, batch_size=500, max_retries=3):"""Send logs to Elasticsearch"""def generate_actions():for entry in log_entries:doc_data = entry.__dict__.copy()if 'time' in doc_data:doc_data['@timestamp'] = doc_data.pop('time')action = {'_index': self.index_name,'_id': str(uuid.uuid4()),'_source': doc_data}yield actionsuccess_count = 0failure_count = 0retry_count = 0while retry_count < max_retries:try:success, failed = helpers.bulk(self.es,generate_actions(),chunk_size=batch_size,raise_on_error=False,raise_on_exception=False)success_count += successfailure_count += len(failed) if failed else 0self.logger.info(f"\nBatch processing results:")self.logger.info(f"- Successfully indexed: {success_count} documents")self.logger.info(f"- Failed: {failure_count} documents")if not failed:breakretry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)except Exception as e:self.logger.error(f"\nBulk indexing error: {str(e)}")retry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)else:self.logger.info("Maximum retry attempts reached")breakreturn success_count, failure_count
3. log_config.py:
import logging
import os
from datetime import UTC, datetimedef setup_logger(target_date: datetime = None, log_prefix: str = "app"):base_dir = os.path.dirname(os.path.abspath(__file__))log_dir = os.path.join(base_dir, 'logs')if not os.path.exists(log_dir):os.makedirs(log_dir)current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')logger = logging.getLogger('AccessLog')logger.setLevel(logging.INFO)file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return logger
4. models.py:
from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional@dataclass
class StorageOperationLog:time: datetimecategory: Optional[str]operationName: Optional[str]callerIpAddress: Optional[str]location: Optional[str]uri: Optional[str]durationMs: Optional[int]referrerHeader: Optional[str]userAgentHeader: Optional[str]requestBodySize: Optional[int]responseBodySize: Optional[int]serverLatencyMs: Optional[int]objectKey: Optional[str]functionName: Optional[str]file_extension: Optional[str]@staticmethoddef parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:"""Parse objectKey to get institution_id and functionName"""try:container_match = re.search(r'container-(\d+)', object_key)parts = object_key.split('/')function_name = Noneif container_match:container_index = next((i for i, part in enumerate(parts) if 'container-' in part), None)if container_index is not None and container_index + 1 < len(parts):function_name = parts[container_index + 1]file_extension = Noneif parts and '.' in parts[-1]:file_extension = parts[-1].split('.')[-1].lower()return function_name, file_extensionexcept Exception as e:if logger:logger.error(f"Error parsing object_key {object_key}: {str(e)}")return None, None@classmethoddef from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:"""Create StorageOperationLog instance from raw log entry"""try:properties = entry.get('properties', {})object_key = properties.get('objectKey', '')function_name, file_extension = cls.parse_object_key(object_key)return cls(time=entry.get('time'),category=entry.get('category'),operationName=entry.get('operationName'),callerIpAddress=entry.get('callerIpAddress'),location=entry.get('location'),uri=entry.get('uri'),durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,referrerHeader=properties.get('referrerHeader'),userAgentHeader=properties.get('userAgentHeader'),requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,objectKey=object_key,functionName=function_name,file_extension=file_extension)except Exception as e:if logger:logger.error(f"Error creating StorageOperationLog: {str(e)}")return Nonedef __post_init__(self):if isinstance(self.time, str):if 'Z' in self.time:time_parts = self.time.split('.')if len(time_parts) > 1:microseconds = time_parts[1].replace('Z', '')[:6]time_str = f"{time_parts[0]}.{microseconds}Z"self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")else:self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")
5. main.py:
from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import osload_dotenv()def _get_index_name(target_date: datetime):"""Get full index name for the specified date"""return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(year=target_date.year,month=target_date.month)def _get_blob_name_list(target_date: datetime):"""Get blob paths for all hours of the specified date"""blobs = []for hour in range(24):blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(year=blob_time.year,month=blob_time.month,day=blob_time.day,hour=blob_time.hour)blobs.append(blob_name)return blobsdef main():start_date = datetime(2024, 1, 1, tzinfo=UTC)end_date = datetime(2024, 1, 2, tzinfo=UTC)current_date = start_datewhile current_date <= end_date:target_date = current_datelogger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))try:logger.info(f"\nProcessing data for {current_date.date()}")elasticsearch_index = _get_index_name(target_date)sender = ElasticsearchSender(os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),os.getenv('ELASTICSEARCH_API_KEY'),elasticsearch_index,logger)if not sender.test_connection():logger.error("Elasticsearch connection failed")current_date += timedelta(days=1)continuetotal_logs = total_success = total_failed = 0blobs = _get_blob_name_list(target_date)for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):logger.info(f"\nProcessing container: {container}")for blob_name in blobs:logger.info(f"\nProcessing blob: {blob_name}")downloader = AzureLogDownloader(os.getenv('AZURE_STORAGE_URI'),container,blob_name,logger)try:log_entries = downloader.download_and_transform()success, failed = sender.send_logs(log_entries)total_logs += len(log_entries)total_success += successtotal_failed += failedexcept Exception as e:logger.error(f"Error processing {blob_name}: {str(e)}")continuelogger.info(f"\n{current_date.date()} Processing completed:")logger.info(f"Total documents processed: {total_logs}")logger.info(f"Successfully indexed: {total_success}")logger.info(f"Failed: {total_failed}")finally:for handler in logger.handlers[:]:handler.close()logger.removeHandler(handler)current_date += timedelta(days=1)if __name__ == "__main__":main()
6. .env :
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app
前情后续:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3)-CSDN博客
相关文章:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3 纯python的经济方案)
前情: 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客 python脚本实现 厉害的小伙伴最终使用python脚本免费…...
1️⃣Java中的集合体系学习汇总(List/Map/Set 详解)
目录 01. Java中的集合体系 02. 单列集合体系 1. Collection系列集合的遍历方式 (1)迭代器遍历(2)增强for遍历编辑(3)Lambda表达式遍历 03.List集合详解 04.Set集合详解 05.总结 Collection系列…...
闪豆多平台视频批量下载器
1. 视频链接获取与解析 首先,在哔哩哔哩网页中随意点击一个视频,比如你最近迷上了一个UP主的美食制作视频,想要下载下来慢慢学。点击视频后,复制视频页面的链接。复制完成后,不要急着关闭浏览器,因为接下来…...
深入解析:如何用Java爬取淘宝分类详情接口(cat_get)
一、前言 淘宝分类详情接口(cat_get)是淘宝开放平台提供的一个接口,允许开发者获取淘宝商品的分类详情,包括分类ID、分类名称、父分类等信息。这些数据对于电商分析、市场研究和商品分类管理等具有重要价值。本文将详细介绍如何使…...
语音识别的预训练模型
语音识别的预训练模型 语音识别模型 大致分为两类: 连接时序分类(Connectionist Temporal Classification, CTC):仅编码器(encoder-only)的模型,顶部带有线性分类(CTC)头序列到序列(Sequence-to-sequence, Seq2Seq):编码器-解码器(encoder-decoder)模型,编码器…...
element-ui制作多颜色选择器
将颜色存储到一个数组中去。 <template><div class"color-picker-container" style"margin-top: 10px;"><!-- 显示已选颜色 --><div class"color-selection"><divv-for"(color, index) in selectedColors"…...
JVM体系结构
目录 一. JVM 规范 二. JVM 实现 (1) HotSpot (2) JRockit (3) IBM JDK(J9 VM) (4) Azul Zing (5) OpenJ9 三. JVM 实现的选择 四. JVM 的核心组件 五.JVM总结 六.Java 虚拟机(JVM)架构概述 1.Java 虚拟机(…...
wandb使用遇到的一些问题
整合了一下使用wandb遇到的问题 1.请问下如果电脑挂了代理,应该怎么办呢?提示:Network error (ProxyError), entering retry loop. 在本地(而非服务器)运行代码时,常常因为开启代理而无法使用wandb&#…...
Java中的继承
引入继承 Java中使用类对实体进行描述,类经过实例化之后的产物对象,就可以用来表示现实中的实体,描述的事物错综复杂,事物之间可能会存在一些关联,因此我们就需要将他们共性抽取,面向对象的思想中提出了继…...
Cadence笔记--原理图导入PCB
1、以PMU6050为例,首先在原理图双击PMU6050器件,在PCB_Footprint目录填写PCB封装名称并且保存,如下图所示: 2、确保原理图命名的名称不一样,否则会出错,这里PMU6050更改了NC等名称,如下图所示&a…...
从AI生成内容到虚拟现实:娱乐体验的新边界
引言 在快速发展的科技时代,娱乐行业正经历一场前所未有的变革。传统的娱乐方式正与先进技术融合,创造出全新的沉浸式体验。从AI生成的个性化内容,到虚拟现实带来的身临其境的互动场景,科技不仅改变了我们消费娱乐的方式…...
【Linux】gdb_进程概念
📢博客主页:https://blog.csdn.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文由 JohnKi 原创,首发于 CSDN🙉 📢未来很长&#…...
安全类脚本:拒绝ssh暴力破解
要求如下: 一个小时内,连续密码错误4次。 Linux lastb 命令用于列出登入系统失败的用户相关信息。 实验过程如下: 1. 创建两个IP地址不同的干净环境,分别是:192.168.46.101 Rocky 2 和 192.168.46.120 openEuler 2. 2.…...
Android15源码编译问题处理
最近想在Raspberry Pi5上面运行自己编译的Android15镜像,参考如下链接来处理: GitHub - raspberry-vanilla/android_local_manifest GitHub - raspberry-vanilla/android_kernel_manifest 代码同步完后,编译就出问题了,总是提示: FAILED: analyzing Android.bp files and…...
图解Git——分布式Git《Pro Git》
分布式工作流程 Centralized Workflow(集中式工作流) 所有开发者都与同一个中央仓库同步代码,每个人通过拉取、提交来合作。如果两个开发者同时修改了相同的文件,后一个开发者必须在推送之前合并其他人的更改。 Integration-Mana…...
Linux内核编程(二十一)USB应用及驱动开发
一、基础知识 1. USB接口是什么? USB接口(Universal Serial Bus)是一种通用串行总线,广泛使用的接口标准,主要用于连接计算机与外围设备(如键盘、鼠标、打印机、存储设备等)之间的数据传输和电…...
什么是数据仓库?
什么是数据仓库? 数据仓库(Data Warehouse,简称DW)是一种面向分析和决策的数据存储系统,它将企业中分散的、异构的数据按照一定的主题和模型进行集成和存储,为数据分析、报表生成以及商业智能(…...
计算机网络 (48)P2P应用
前言 计算机网络中的P2P(Peer to Peer,点对点)应用是一种去中心化的网络通信模式,它允许设备(或节点)直接连接并共享资源,而无需传统的客户端-服务器模型。 一、P2P技术原理 去中心化架构&#…...
SK海力士(SK Hynix)是全球领先的半导体制造商之一,其在无锡的工厂主要生产DRAM和NAND闪存等存储器产品。
SK海力士(SK Hynix)是全球领先的半导体制造商之一,其在无锡的工厂主要生产DRAM和NAND闪存等存储器产品。以下是SK海力士的一些主要产品型号和类别: DRAM 产品 DDR4 DRAM 特点: 高速、低功耗,广泛应用于PC、服务器和移…...
FunASR 在Linux/Unix 平台编译
第一步拉取镜像并生成容器: ### 镜像启动 通过下述命令拉取并启动FunASR软件包的docker镜像: shell sudo docker pull \ registry.cn-hangzhou.aliyuncs.com/funasr_repo/funasr:funasr-runtime-sdk-online-cpu-0.1.12 mkdir -p ./funasr-runtime-…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
