将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3 纯python的经济方案)
前情:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
python脚本实现
厉害的小伙伴最终使用python脚本免费实现了这个功能
文件结构
以下是cursor对代码的评审意见:
这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:
1. 整体架构
项目采用了良好的模块化设计,主要包含以下组件:
- main.py: 主程序入口,协调整体流程
- azure_log_downloader.py: Azure blob 存储日志下载器
- elasticsearch_sender.py: Elasticsearch 数据发送器
- models.py: 数据模型定义
- log_config.py: 日志配置
- .env: 环境配置文件
2. 具体评审
main.py
优点:
- 支持日期范围处理
- 良好的错误处理和日志记录
- 模块化的设计,职责划分清晰
建议改进:
- 可以考虑添加命令行参数支持,使日期范围可配置
- 可以添加并发处理支持,提高处理速度
azure_log_downloader.py
优点:
- 清晰的类设计
- 良好的错误处理
建议改进:
- 可以添加重试机制
- 可以考虑支持异步操作
elasticsearch_sender.py
优点:
- 实现了批量处理
- 包含重试机制
- 良好的连接测试功能
建议改进:
- 可以考虑添加索引模板管理
- 可以添加更多的 ES 配置选项
models.py
优点:
- 使用 dataclass 简化了类定义
- 详细的字段解析逻辑
- 良好的类型注解
建议改进:
- 可以添加字段验证
- 可以考虑使用 Pydantic 进行数据验证
log_config.py
优点:
- 完善的日志配置
- 支持文件和控制台输出
建议改进:
- 可以考虑添加日志轮转
- 可以添加日志级别配置选项
3. 安全性建议
- 建议使用更安全的方式存储敏感信息(如 API keys)
- 建议添加 SSL 证书验证
- 建议添加请求速率限制
4. 性能建议
- 考虑添加并发处理
- 可以实现增量更新机制
- 考虑添加缓存机制
5. 可维护性建议
- 添加单元测试
- 添加更详细的文档
- 考虑添加监控指标
- 添加 CI/CD 配置
总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。
具体脚本
1. azure_log_downloader.py:
from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLogclass AzureLogDownloader:def __init__(self, connection_string, container_name, blob_name, logger=None):self.logger = loggerself.connection_string = connection_stringself.container_name = container_nameself.blob_name = blob_namedef _get_blob_client(self):blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)blob_client = blob_service_client.get_blob_client(container=self.container_name, blob=self.blob_name)return blob_clientdef download_and_transform(self):"""Download and transform log data from Azure storage"""try:blob_client = self._get_blob_client()if not blob_client.exists():self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")return []blob_data = blob_client.download_blob().readall().decode('utf-8')transformed_entries = []for line in blob_data.splitlines():if line.strip():try:log_entry = json.loads(line)log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)if log_obj:transformed_entries.append(log_obj)except json.JSONDecodeError as e:self.logger.error(f"Error parsing line: {str(e)}")continueself.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")return transformed_entriesexcept Exception as e:self.logger.error(f"Error downloading blob: {str(e)}")self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")self.logger.error(f"Error type: {type(e).__name__}")return []
2. elasticsearch_sender.py:
from elasticsearch import Elasticsearch, helpers
import time
import uuidclass ElasticsearchSender:def __init__(self, host, api_key=None, index_name="logs", logger=None):self.logger = loggerself.config = {'hosts': host,'timeout': 30,'retry_on_timeout': True,'max_retries': 3,'verify_certs': False,'ssl_show_warn': False,'use_ssl': True}if api_key:self.config['api_key'] = api_keyself.index_name = index_nameself.es = Elasticsearch(**self.config)def test_connection(self):"""Test Elasticsearch connection"""try:info = self.es.info()self.logger.info("\nElasticsearch Server Info:")self.logger.info(f"Version: {info['version']['number']}")self.logger.info(f"Cluster Name: {info['cluster_name']}")return Trueexcept Exception as e:self.logger.error(f"\nElasticsearch connection failed: {str(e)}")return Falsedef send_logs(self, log_entries, batch_size=500, max_retries=3):"""Send logs to Elasticsearch"""def generate_actions():for entry in log_entries:doc_data = entry.__dict__.copy()if 'time' in doc_data:doc_data['@timestamp'] = doc_data.pop('time')action = {'_index': self.index_name,'_id': str(uuid.uuid4()),'_source': doc_data}yield actionsuccess_count = 0failure_count = 0retry_count = 0while retry_count < max_retries:try:success, failed = helpers.bulk(self.es,generate_actions(),chunk_size=batch_size,raise_on_error=False,raise_on_exception=False)success_count += successfailure_count += len(failed) if failed else 0self.logger.info(f"\nBatch processing results:")self.logger.info(f"- Successfully indexed: {success_count} documents")self.logger.info(f"- Failed: {failure_count} documents")if not failed:breakretry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)except Exception as e:self.logger.error(f"\nBulk indexing error: {str(e)}")retry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)else:self.logger.info("Maximum retry attempts reached")breakreturn success_count, failure_count
3. log_config.py:
import logging
import os
from datetime import UTC, datetimedef setup_logger(target_date: datetime = None, log_prefix: str = "app"):base_dir = os.path.dirname(os.path.abspath(__file__))log_dir = os.path.join(base_dir, 'logs')if not os.path.exists(log_dir):os.makedirs(log_dir)current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')logger = logging.getLogger('AccessLog')logger.setLevel(logging.INFO)file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return logger
4. models.py:
from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional@dataclass
class StorageOperationLog:time: datetimecategory: Optional[str]operationName: Optional[str]callerIpAddress: Optional[str]location: Optional[str]uri: Optional[str]durationMs: Optional[int]referrerHeader: Optional[str]userAgentHeader: Optional[str]requestBodySize: Optional[int]responseBodySize: Optional[int]serverLatencyMs: Optional[int]objectKey: Optional[str]functionName: Optional[str]file_extension: Optional[str]@staticmethoddef parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:"""Parse objectKey to get institution_id and functionName"""try:container_match = re.search(r'container-(\d+)', object_key)parts = object_key.split('/')function_name = Noneif container_match:container_index = next((i for i, part in enumerate(parts) if 'container-' in part), None)if container_index is not None and container_index + 1 < len(parts):function_name = parts[container_index + 1]file_extension = Noneif parts and '.' in parts[-1]:file_extension = parts[-1].split('.')[-1].lower()return function_name, file_extensionexcept Exception as e:if logger:logger.error(f"Error parsing object_key {object_key}: {str(e)}")return None, None@classmethoddef from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:"""Create StorageOperationLog instance from raw log entry"""try:properties = entry.get('properties', {})object_key = properties.get('objectKey', '')function_name, file_extension = cls.parse_object_key(object_key)return cls(time=entry.get('time'),category=entry.get('category'),operationName=entry.get('operationName'),callerIpAddress=entry.get('callerIpAddress'),location=entry.get('location'),uri=entry.get('uri'),durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,referrerHeader=properties.get('referrerHeader'),userAgentHeader=properties.get('userAgentHeader'),requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,objectKey=object_key,functionName=function_name,file_extension=file_extension)except Exception as e:if logger:logger.error(f"Error creating StorageOperationLog: {str(e)}")return Nonedef __post_init__(self):if isinstance(self.time, str):if 'Z' in self.time:time_parts = self.time.split('.')if len(time_parts) > 1:microseconds = time_parts[1].replace('Z', '')[:6]time_str = f"{time_parts[0]}.{microseconds}Z"self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")else:self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")
5. main.py:
from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import osload_dotenv()def _get_index_name(target_date: datetime):"""Get full index name for the specified date"""return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(year=target_date.year,month=target_date.month)def _get_blob_name_list(target_date: datetime):"""Get blob paths for all hours of the specified date"""blobs = []for hour in range(24):blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(year=blob_time.year,month=blob_time.month,day=blob_time.day,hour=blob_time.hour)blobs.append(blob_name)return blobsdef main():start_date = datetime(2024, 1, 1, tzinfo=UTC)end_date = datetime(2024, 1, 2, tzinfo=UTC)current_date = start_datewhile current_date <= end_date:target_date = current_datelogger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))try:logger.info(f"\nProcessing data for {current_date.date()}")elasticsearch_index = _get_index_name(target_date)sender = ElasticsearchSender(os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),os.getenv('ELASTICSEARCH_API_KEY'),elasticsearch_index,logger)if not sender.test_connection():logger.error("Elasticsearch connection failed")current_date += timedelta(days=1)continuetotal_logs = total_success = total_failed = 0blobs = _get_blob_name_list(target_date)for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):logger.info(f"\nProcessing container: {container}")for blob_name in blobs:logger.info(f"\nProcessing blob: {blob_name}")downloader = AzureLogDownloader(os.getenv('AZURE_STORAGE_URI'),container,blob_name,logger)try:log_entries = downloader.download_and_transform()success, failed = sender.send_logs(log_entries)total_logs += len(log_entries)total_success += successtotal_failed += failedexcept Exception as e:logger.error(f"Error processing {blob_name}: {str(e)}")continuelogger.info(f"\n{current_date.date()} Processing completed:")logger.info(f"Total documents processed: {total_logs}")logger.info(f"Successfully indexed: {total_success}")logger.info(f"Failed: {total_failed}")finally:for handler in logger.handlers[:]:handler.close()logger.removeHandler(handler)current_date += timedelta(days=1)if __name__ == "__main__":main()
6. .env :
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app
前情后续:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3)-CSDN博客
相关文章:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3 纯python的经济方案)
前情: 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客 python脚本实现 厉害的小伙伴最终使用python脚本免费…...

1️⃣Java中的集合体系学习汇总(List/Map/Set 详解)
目录 01. Java中的集合体系 02. 单列集合体系 1. Collection系列集合的遍历方式 (1)迭代器遍历(2)增强for遍历编辑(3)Lambda表达式遍历 03.List集合详解 04.Set集合详解 05.总结 Collection系列…...

闪豆多平台视频批量下载器
1. 视频链接获取与解析 首先,在哔哩哔哩网页中随意点击一个视频,比如你最近迷上了一个UP主的美食制作视频,想要下载下来慢慢学。点击视频后,复制视频页面的链接。复制完成后,不要急着关闭浏览器,因为接下来…...
深入解析:如何用Java爬取淘宝分类详情接口(cat_get)
一、前言 淘宝分类详情接口(cat_get)是淘宝开放平台提供的一个接口,允许开发者获取淘宝商品的分类详情,包括分类ID、分类名称、父分类等信息。这些数据对于电商分析、市场研究和商品分类管理等具有重要价值。本文将详细介绍如何使…...
语音识别的预训练模型
语音识别的预训练模型 语音识别模型 大致分为两类: 连接时序分类(Connectionist Temporal Classification, CTC):仅编码器(encoder-only)的模型,顶部带有线性分类(CTC)头序列到序列(Sequence-to-sequence, Seq2Seq):编码器-解码器(encoder-decoder)模型,编码器…...

element-ui制作多颜色选择器
将颜色存储到一个数组中去。 <template><div class"color-picker-container" style"margin-top: 10px;"><!-- 显示已选颜色 --><div class"color-selection"><divv-for"(color, index) in selectedColors"…...

JVM体系结构
目录 一. JVM 规范 二. JVM 实现 (1) HotSpot (2) JRockit (3) IBM JDK(J9 VM) (4) Azul Zing (5) OpenJ9 三. JVM 实现的选择 四. JVM 的核心组件 五.JVM总结 六.Java 虚拟机(JVM)架构概述 1.Java 虚拟机(…...
wandb使用遇到的一些问题
整合了一下使用wandb遇到的问题 1.请问下如果电脑挂了代理,应该怎么办呢?提示:Network error (ProxyError), entering retry loop. 在本地(而非服务器)运行代码时,常常因为开启代理而无法使用wandb&#…...

Java中的继承
引入继承 Java中使用类对实体进行描述,类经过实例化之后的产物对象,就可以用来表示现实中的实体,描述的事物错综复杂,事物之间可能会存在一些关联,因此我们就需要将他们共性抽取,面向对象的思想中提出了继…...

Cadence笔记--原理图导入PCB
1、以PMU6050为例,首先在原理图双击PMU6050器件,在PCB_Footprint目录填写PCB封装名称并且保存,如下图所示: 2、确保原理图命名的名称不一样,否则会出错,这里PMU6050更改了NC等名称,如下图所示&a…...

从AI生成内容到虚拟现实:娱乐体验的新边界
引言 在快速发展的科技时代,娱乐行业正经历一场前所未有的变革。传统的娱乐方式正与先进技术融合,创造出全新的沉浸式体验。从AI生成的个性化内容,到虚拟现实带来的身临其境的互动场景,科技不仅改变了我们消费娱乐的方式…...

【Linux】gdb_进程概念
📢博客主页:https://blog.csdn.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文由 JohnKi 原创,首发于 CSDN🙉 📢未来很长&#…...

安全类脚本:拒绝ssh暴力破解
要求如下: 一个小时内,连续密码错误4次。 Linux lastb 命令用于列出登入系统失败的用户相关信息。 实验过程如下: 1. 创建两个IP地址不同的干净环境,分别是:192.168.46.101 Rocky 2 和 192.168.46.120 openEuler 2. 2.…...
Android15源码编译问题处理
最近想在Raspberry Pi5上面运行自己编译的Android15镜像,参考如下链接来处理: GitHub - raspberry-vanilla/android_local_manifest GitHub - raspberry-vanilla/android_kernel_manifest 代码同步完后,编译就出问题了,总是提示: FAILED: analyzing Android.bp files and…...

图解Git——分布式Git《Pro Git》
分布式工作流程 Centralized Workflow(集中式工作流) 所有开发者都与同一个中央仓库同步代码,每个人通过拉取、提交来合作。如果两个开发者同时修改了相同的文件,后一个开发者必须在推送之前合并其他人的更改。 Integration-Mana…...

Linux内核编程(二十一)USB应用及驱动开发
一、基础知识 1. USB接口是什么? USB接口(Universal Serial Bus)是一种通用串行总线,广泛使用的接口标准,主要用于连接计算机与外围设备(如键盘、鼠标、打印机、存储设备等)之间的数据传输和电…...
什么是数据仓库?
什么是数据仓库? 数据仓库(Data Warehouse,简称DW)是一种面向分析和决策的数据存储系统,它将企业中分散的、异构的数据按照一定的主题和模型进行集成和存储,为数据分析、报表生成以及商业智能(…...

计算机网络 (48)P2P应用
前言 计算机网络中的P2P(Peer to Peer,点对点)应用是一种去中心化的网络通信模式,它允许设备(或节点)直接连接并共享资源,而无需传统的客户端-服务器模型。 一、P2P技术原理 去中心化架构&#…...
SK海力士(SK Hynix)是全球领先的半导体制造商之一,其在无锡的工厂主要生产DRAM和NAND闪存等存储器产品。
SK海力士(SK Hynix)是全球领先的半导体制造商之一,其在无锡的工厂主要生产DRAM和NAND闪存等存储器产品。以下是SK海力士的一些主要产品型号和类别: DRAM 产品 DDR4 DRAM 特点: 高速、低功耗,广泛应用于PC、服务器和移…...
FunASR 在Linux/Unix 平台编译
第一步拉取镜像并生成容器: ### 镜像启动 通过下述命令拉取并启动FunASR软件包的docker镜像: shell sudo docker pull \ registry.cn-hangzhou.aliyuncs.com/funasr_repo/funasr:funasr-runtime-sdk-online-cpu-0.1.12 mkdir -p ./funasr-runtime-…...

铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...

ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
力扣-35.搜索插入位置
题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
redis和redission的区别
Redis 和 Redisson 是两个密切相关但又本质不同的技术,它们扮演着完全不同的角色: Redis: 内存数据库/数据结构存储 本质: 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能: 提供丰…...

stm32wle5 lpuart DMA数据不接收
配置波特率9600时,需要使用外部低速晶振...