当前位置: 首页 > article >正文

08-MLOps与工程落地——特征存储:Feast

特征存储Feast在线/离线特征存储、特征复用、训练服务一致性一、Feast概述1.1 什么是特征存储importmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings(ignore)print(*60)print(Feast特征存储系统)print(*60)# Feast架构图fig,axplt.subplots(figsize(14,8))ax.axis(off)# 组件components{离线存储:(0.2,0.7),在线存储:(0.5,0.7),注册表:(0.8,0.7),特征定义:(0.2,0.4),特征服务:(0.5,0.4),训练数据:(0.8,0.4),}forname,(x,y)incomponents.items():circleplt.Circle((x,y),0.08,colorlightblue,ecblack)ax.add_patch(circle)ax.text(x,y,name,hacenter,vacenter,fontsize7)# 连接ax.annotate(,xy(0.5,0.62),xytext(0.28,0.7),arrowpropsdict(arrowstyle-,lw1))ax.annotate(,xy(0.72,0.7),xytext(0.58,0.7),arrowpropsdict(arrowstyle-,lw1))ax.annotate(,xy(0.5,0.32),xytext(0.5,0.48),arrowpropsdict(arrowstyle-,lw1))ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title(Feast架构,fontsize14)plt.tight_layout()plt.show()print(\n Feast核心价值:)print( - 训练/服务特征一致性)print( - 特征复用和共享)print( - 低延迟在线特征获取)print( - 大规模离线特征处理)print( - 特征血缘追踪)二、Feast安装与配置2.1 安装配置deffeast_setup():Feast安装配置print(\n*60)print(Feast安装配置)print(*60)code # 1. 安装Feast # pip install feast # 2. 安装特定存储后端 # pip install feast[redis] # Redis在线存储 # pip install feast[aws] # AWS S3/Redshift # pip install feast[gcp] # GCP BigQuery/GCS # pip install feast[postgres] # PostgreSQL # 3. 初始化Feast仓库 # feast init my_feature_repo # cd my_feature_repo # 4. 项目结构 # my_feature_repo/ # ├── feature_store.yaml # ├── features.py # └── data/ # └── driver_stats.parquet # 5. feature_store.yaml配置 project: my_feature_repo registry: data/registry.db provider: local online_store: type: redis connection_string: localhost:6379 offline_store: type: file print(code)feast_setup()三、特征定义3.1 定义特征deffeature_definition():特征定义print(\n*60)print(特征定义)print(*60)code # features.py from feast import Entity, FeatureView, Feature, FileSource, ValueType from datetime import timedelta # 1. 定义实体 user Entity( nameuser_id, value_typeValueType.INT64, descriptionUser identifier, labels{team: marketing} ) item Entity( nameitem_id, value_typeValueType.INT64, descriptionItem identifier ) # 2. 定义数据源 user_features_source FileSource( pathdata/user_features.parquet, event_timestamp_columnevent_timestamp, created_timestamp_columncreated_timestamp, date_partition_columndate, descriptionUser features data source ) item_features_source FileSource( pathdata/item_features.parquet, event_timestamp_columnevent_timestamp ) # 3. 定义特征视图 user_features FeatureView( nameuser_features, entities[user_id], ttltimedelta(days30), features[ Feature(nameage, dtypeValueType.INT64), Feature(namegender, dtypeValueType.STRING), Feature(namecity, dtypeValueType.STRING), Feature(nameuser_activity_score, dtypeValueType.FLOAT), Feature(nameuser_engagement_level, dtypeValueType.INT64), Feature(nameregistration_days, dtypeValueType.INT64), ], batch_sourceuser_features_source, onlineTrue, tags{team: marketing}, descriptionUser demographic and behavioral features ) item_features FeatureView( nameitem_features, entities[item_id], ttltimedelta(days7), features[ Feature(namecategory, dtypeValueType.STRING), Feature(nameprice, dtypeValueType.FLOAT), Feature(nameitem_rating, dtypeValueType.FLOAT), Feature(nameinventory_count, dtypeValueType.INT64), Feature(namepopularity_score, dtypeValueType.FLOAT), ], batch_sourceitem_features_source, onlineTrue ) # 4. 定义特征服务 feature_service FeatureService( namerecommendation_service, features[ user_features[[age, gender, user_activity_score]], item_features[[category, price, item_rating]] ], tags{team: recommendation}, descriptionFeatures for recommendation service ) # 5. 高级特征请求时特征 from feast import RequestSource request_source RequestSource( namerequest_features, schema{ current_timestamp: ValueType.INT64, user_location: ValueType.STRING } ) # 6. 特征变换On-demand from feast import on_demand_feature_view from feast.types import Float32, Int64 on_demand_feature_view( namederived_features, sources[user_features, request_source], features[ Feature(nameage_squared, dtypeInt64), Feature(namenormalized_score, dtypeFloat32) ] ) def derived_features(inputs: dict): return { age_squared: inputs[user_features][age] ** 2, normalized_score: inputs[user_features][user_activity_score] / 100.0 } print(code)feature_definition()四、数据导入4.1 离线数据导入defdata_ingestion():数据导入print(\n*60)print(数据导入)print(*60)code import pandas as pd import numpy as np from datetime import datetime, timedelta # 1. 生成示例数据 def generate_sample_data(): np.random.seed(42) n_samples 10000 data pd.DataFrame({ user_id: np.random.randint(1, 1001, n_samples), age: np.random.randint(18, 70, n_samples), gender: np.random.choice([M, F], n_samples), city: np.random.choice([北京, 上海, 广州, 深圳], n_samples), user_activity_score: np.random.uniform(0, 100, n_samples), event_timestamp: [datetime.now() - timedelta(daysnp.random.randint(0, 30)) for _ in range(n_samples)] }) return data # 2. 保存数据 data generate_sample_data() data.to_parquet(data/user_features.parquet, indexFalse) # 3. 应用特征定义 from feast import FeatureStore store FeatureStore(repo_path.) # 4. 应用特征定义到注册表 store.apply() # 5. 导入历史数据离线存储 store.materialize( start_datedatetime.now() - timedelta(days30), end_datedatetime.now() ) # 6. 导入到在线存储 store.materialize_incremental(end_datedatetime.now()) print(code)data_ingestion()五、特征获取5.1 训练数据获取deftraining_data_retrieval():训练数据获取print(\n*60)print(训练数据获取)print(*60)code from feast import FeatureStore import pandas as pd # 初始化 store FeatureStore(repo_path.) # 1. 获取历史特征训练数据 entity_df pd.DataFrame({ user_id: [1, 2, 3, 4, 5], item_id: [100, 200, 300, 400, 500], event_timestamp: pd.date_range(start2024-01-01, periods5, freqD) }) training_features store.get_historical_features( entity_dfentity_df, features[ user_features:age, user_features:gender, user_features:user_activity_score, item_features:category, item_features:price, item_features:item_rating ] ).to_df() print(training_features.head()) # 2. 获取特征服务 feature_service store.get_feature_service(recommendation_service) training_data store.get_historical_features( entity_dfentity_df, featuresfeature_service ).to_df() # 3. 获取训练数据带标签 entity_df_with_label pd.DataFrame({ user_id: [1, 2, 3, 4, 5], item_id: [100, 200, 300, 400, 500], event_timestamp: pd.date_range(start2024-01-01, periods5, freqD), label: [1, 0, 1, 0, 1] # 点击标签 }) training_df store.get_historical_features( entity_dfentity_df_with_label, featuresfeature_service ).to_df() # 4. 特征验证 from feast.infra.offline_stores.file import FileOfflineStoreConfig # 检查特征完整性 missing_features training_df.isnull().sum() print(fMissing features:\\n{missing_features}) # 特征统计 feature_stats training_df.describe() print(fFeature statistics:\\n{feature_stats}) print(code)training_data_retrieval()5.2 在线特征获取defonline_features():在线特征获取print(\n*60)print(在线特征获取)print(*60)code from feast import FeatureStore import time store FeatureStore(repo_path.) # 1. 单个实体特征 features store.get_online_features( features[ user_features:age, user_features:gender, user_features:user_activity_score, item_features:price, item_features:item_rating ], entity_rows[ {user_id: 123, item_id: 456}, ] ).to_dict() print(fOnline features: {features}) # 2. 批量获取 entity_rows [ {user_id: 123, item_id: 456}, {user_id: 124, item_id: 457}, {user_id: 125, item_id: 458}, ] batch_features store.get_online_features( featuresfeature_service, entity_rowsentity_rows ).to_df() print(fBatch features shape: {batch_features.shape}) # 3. 性能测试 def benchmark_online_features(n_requests100): start time.time() for i in range(n_requests): features store.get_online_features( features[user_features:age], entity_rows[{user_id: i}] ) elapsed time.time() - start print(fAverage latency: {elapsed/n_requests*1000:.2f}ms) print(fThroughput: {n_requests/elapsed:.2f} req/s) benchmark_online_features() # 4. 缓存策略 from functools import lru_cache lru_cache(maxsize1000) def get_cached_features(user_id, item_id): 缓存特征结果 return store.get_online_features( featuresfeature_service, entity_rows[{user_id: user_id, item_id: item_id}] ).to_dict() print(code)online_features()六、特征服务部署6.1 特征服务deffeature_server():特征服务部署print(\n*60)print(特征服务部署)print(*60)code # 1. 启动特征服务 # feast serve -h 0.0.0.0 -p 6566 # 2. gRPC客户端 import grpc from feast.serving import ServingService_pb2, ServingService_pb2_grpc channel grpc.insecure_channel(localhost:6566) stub ServingService_pb2_grpc.ServingServiceStub(channel) # 构建请求 request ServingService_pb2.GetOnlineFeaturesRequest( features[user_features:age, user_features:gender], entities[ ServingService_pb2.GetOnlineFeaturesRequest.EntityRow( fields{user_id: grpc.Value(int64_value123)} ) ] ) response stub.GetOnlineFeatures(request) # 3. HTTP/REST API import requests response requests.post( http://localhost:6566/api/v1/features, json{ features: [user_features:age, user_features:gender], entities: [{user_id: 123}] } ) print(response.json()) # 4. Docker部署 # Dockerfile FROM feastdev/feature-server:latest COPY feature_store.yaml /feature_store.yaml # 启动服务 CMD [feast, serve, -h, 0.0.0.0, -p, 6566] # docker build -t feature-server . # docker run -p 6566:6566 feature-server # 5. Kubernetes部署 apiVersion: apps/v1 kind: Deployment metadata: name: feature-server spec: replicas: 3 selector: matchLabels: app: feature-server template: metadata: labels: app: feature-server spec: containers: - name: feature-server image: feature-server:latest ports: - containerPort: 6566 env: - name: REDIS_HOST value: redis-service - name: REDIS_PORT value: 6379 resources: requests: memory: 512Mi cpu: 250m limits: memory: 1Gi cpu: 500m --- apiVersion: v1 kind: Service metadata: name: feature-server spec: selector: app: feature-server ports: - port: 6566 targetPort: 6566 type: LoadBalancer print(code)feature_server()七、完整工作流7.1 端到端示例defcomplete_workflow():完整工作流print(\n*60)print(完整工作流示例)print(*60)code from feast import FeatureStore import pandas as pd from sklearn.ensemble import RandomForestClassifier import mlflow # 1. 初始化 store FeatureStore(repo_path.) # 2. 定义训练实体 entity_df pd.DataFrame({ user_id: [1, 2, 3, 4, 5], item_id: [100, 200, 300, 400, 500], event_timestamp: pd.date_range(start2024-01-01, periods5, freqD), label: [1, 0, 1, 0, 1] }) # 3. 获取特征 feature_service store.get_feature_service(recommendation_service) training_df store.get_historical_features( entity_dfentity_df, featuresfeature_service ).to_df() # 4. 准备训练数据 X training_df.drop([label, event_timestamp, user_id, item_id], axis1) y training_df[label] # 5. 训练模型 model RandomForestClassifier(n_estimators100) model.fit(X, y) # 6. 保存模型 with mlflow.start_run(): mlflow.log_param(model_type, random_forest) mlflow.log_metric(accuracy, model.score(X, y)) mlflow.sklearn.log_model(model, model) # 7. 在线推理 def predict(user_id, item_id): # 获取在线特征 features store.get_online_features( featuresfeature_service, entity_rows[{user_id: user_id, item_id: item_id}] ).to_df() # 预测 X_pred features.drop([user_id, item_id], axis1) prediction model.predict(X_pred) return prediction[0] # 8. 推理示例 result predict(123, 456) print(fPrediction: {result}) print(code)complete_workflow()八、总结组件存储类型用途离线存储数据湖/仓库批量特征处理在线存储Redis/DynamoDB低延迟特征服务注册表数据库特征元数据管理Feast最佳实践合理设置TTL避免数据过期使用特征服务管理特征组实施特征监控和验证优化在线存储性能定期清理过期特征

相关文章:

08-MLOps与工程落地——特征存储:Feast

特征存储:Feast(在线/离线特征存储、特征复用、训练服务一致性) 一、Feast概述 1.1 什么是特征存储? import matplotlib.pyplot as plt from matplotlib.patches import Rectangle, FancyBboxPatch import warnings warnings.filt…...

GoBP:轻量级Go二进制协议框架的设计、实现与微服务实践

1. 项目概述与核心价值最近在梳理团队内部微服务架构的通信层时,我重新审视了各种RPC框架的选型。我们之前主要依赖gRPC,它在性能和跨语言支持上确实不错,但面对一些特定场景——比如需要极简依赖、快速原型验证,或者是对二进制协…...

STM32 快速入门(内核架构,启动方式,开发参考资料,芯片选型)

文章目录 1、启动方式(Start up) 2、开发参考资料 2.1 STM32 中文参考手册 3、通常的芯片选型步骤 4、存储器和总线构架 4.1 系统构架 4.1.1 ICode 总线 4.1.2 DCode 总线 4.1.3 系统总线 4.1.4 DMA 总线 4.1.5 总线矩阵 4.1.6 AHB/APB 桥(APB) 4.2 存储器组织(Memory organ…...

AI账号自动化管理工具:架构设计与风控对抗实践

1. 项目概述与核心价值最近在GitHub上看到一个挺有意思的项目,叫adminlove520/AI-Account-Toolkit。光看名字,你可能会觉得这又是一个“AI工具箱”,但仔细研究它的源码和文档后,我发现它的定位非常精准:一个专注于AI账…...

如何在Kindle等电子阅读器上享受完美漫画阅读体验

如何在Kindle等电子阅读器上享受完美漫画阅读体验 【免费下载链接】kcc KCC (a.k.a. Kindle Comic Converter) is a comic and manga converter for ebook readers. 项目地址: https://gitcode.com/gh_mirrors/kc/kcc 你是否曾经下载了心仪的漫画资源,却发现…...

从URDF到控制器:深入解读ros2_control中lt;ros2_controlgt;标签的完整配置语法与最佳实践

从URDF到控制器:ros2_control核心配置语法与工程实践全解析 当你在Gazebo中看着机械臂完美执行轨迹规划时,背后是ros2_control框架在精准协调硬件与控制器。但现实往往比教程复杂——多关节协作、混合硬件类型、非标准传动比等场景会让标准配置模板瞬间失…...

告别卡顿!LVGL V8.3手表UI页面切换的三种实战方案(附代码避坑点)

LVGL V8.3手表UI页面切换的三种实战方案与性能优化 在智能手表和嵌入式设备的UI开发中,流畅的页面切换体验往往是用户感知最直接的部分。当你在STM32或ESP32这类资源有限的MCU上实现UI时,一个卡顿的页面切换动画就足以让整个产品显得廉价。LVGL作为轻量…...

Unity URP Shader迁移实战:从CG到HLSL,我踩过的那些坑(附完整代码对比)

Unity URP Shader迁移实战:从CG到HLSL的深度避坑指南 第一次把项目从Built-in管线迁移到URP时,我盯着满屏的红色报错信息足足发呆了十分钟。那些曾经在CG中习以为常的写法,现在全都变成了HLSL中的"unrecognized identifier"。如果你…...

别再死记硬背了!用这5个实战乐谱例子,彻底搞懂D.C.、D.S.、Fine和Coda

别再死记硬背了!用这5个实战乐谱例子,彻底搞懂D.C.、D.S.、Fine和Coda 第一次看到乐谱上那些神秘的意大利语标记时,我完全摸不着头脑。直到有次乐队排练,因为跳错了D.S.段落,整个合奏乱成一团,才意识到这些…...

Vim 8.1+ 内置终端真香!告别频繁切换窗口,边写代码边调试的保姆级配置指南

Vim 8.1 内置终端真香!告别频繁切换窗口,边写代码边调试的保姆级配置指南 在开发者的日常工作中,频繁在编辑器和终端之间切换几乎是不可避免的。无论是调试Python脚本、查看服务器日志,还是运行构建命令,这种上下文切换…...

应对2026海外新规:留学生英文论文降AI避坑指南(附4款实测工具)

不知道各位小伙伴发现没有,处理英文文章这件事要比处理中文难很多。之前我自己的英文摘要写好后满心欢喜去跑检测,结果你猜怎么着?手打的摘要部分AI率居然高达85%......我折腾了两三天时间,查了各种资料,这才算真正搞懂…...

【2026实测】搞定海外检测算法:英文论文降AI率避坑指南与4款工具盘点

不知道各位小伙伴发现没有,处理英文文章这件事要比处理中文难很多。之前我自己的英文摘要写好后满心欢喜去跑检测,结果你猜怎么着?手打的摘要部分AI率居然高达85%......我折腾了两三天时间,查了各种资料,这才算真正搞懂…...

Clawdentity:为AI Agent构建去中心化身份与安全通信层

1. 项目概述:Clawdentity,为AI Agent构建去中心化身份与通信层如果你正在开发AI Agent应用,或者尝试将多个独立的智能体串联起来工作,那么“如何让它们安全、可靠地相互通信”这个问题,大概率已经让你头疼过。直接暴露…...

2025届学术党必备的十大AI论文助手实测分析

Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 在当下人工智能生成内容被广泛运用的情形中,把降低AIGC痕迹变为内容创作的关键课…...

别等罚单才看!AISMM Level-3服务承诺倒计时:企业AI系统必须在Q3前完成SLA对齐

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM与服务水平 在2026奇点智能技术大会上,AISMM(Autonomous Intelligence Service Maturity Model)首次作为核心评估框架发布&am…...

炉石佣兵战记自动化脚本:解放双手的5大核心功能全解析

炉石佣兵战记自动化脚本:解放双手的5大核心功能全解析 【免费下载链接】lushi_script This script is to save your time from Mercenaries mode of Hearthstone 项目地址: https://gitcode.com/gh_mirrors/lu/lushi_script 厌倦了在《炉石传说》佣兵战记模式…...

观察在虚拟机中调用Taotoken聚合API的延迟与稳定性表现

观察在虚拟机中调用Taotoken聚合API的延迟与稳定性表现 1. 测试环境与目的说明 本次测试旨在分享在个人本地虚拟机网络环境下,通过标准HTTP请求调用Taotoken聚合API的直观体验。测试环境为一台配置中等的本地虚拟机,运行常见的Linux发行版,…...

别再只用scikit-learn了!用mlxtend给你的机器学习项目加个‘瑞士军刀’(附实战代码)

解锁机器学习效率革命:用mlxtend打造你的Python工具箱 在数据科学家的日常工作中,我们常常陷入重复造轮子的困境——花费大量时间编写那些看似简单却频繁出现的功能代码。当你在scikit-learn中实现一个决策边界可视化时,是否曾想过&#xff1…...

本地优先AI面试助手Natively:开源、隐私与实时辅助的架构实践

1. 项目概述:一个本地优先、开源的AI面试与会议助手 如果你正在寻找一个能在实时面试或会议中提供智能辅助的工具,但同时又对市面上那些昂贵的、将你的对话数据上传到云端的产品心存疑虑,那么你找对地方了。Natively 正是为了解决这个痛点而…...

别再只用高斯模糊了!OpenCV双边滤波cv2.bilateralFilter保姆级调参指南(附Python代码)

解锁OpenCV双边滤波的隐藏潜力:从参数调优到工业级应用实战 在数字图像处理领域,双边滤波就像一位技艺高超的修图师,能够在去除噪点的同时完美保留边缘细节。但很多开发者仅仅停留在函数调用的层面,未能真正发挥这个算法的全部威力…...

Arm Cortex-A78AE寄存器系统与安全关键应用优化

1. Arm Cortex-A78AE寄存器系统概述 在处理器架构设计中,寄存器是最接近计算单元的存储元件,其访问速度比主存快数个数量级。Arm Cortex-A78AE作为一款面向安全关键应用的高性能处理器,其寄存器系统经过精心设计,在保持Armv8-A架构…...

Krones推出全球首款用于容器分配的机器人系统

Krones表示,多年来在利用机器人将包装件分组堆叠托盘层方面已取得了丰硕成果。而如今,这一技术原理被首次应用于容器进入包装机前的分配环节。Krones推出了名为Robobox SynFlow的全新模块化系统,这是业内首款采用机器人技术对容器进行可靠、轻…...

技术架构深度解析:Blender到虚幻引擎Datasmith资产管道实现方案

技术架构深度解析:Blender到虚幻引擎Datasmith资产管道实现方案 【免费下载链接】bl_datasmith UE Datasmith importer/exporter for Blender 项目地址: https://gitcode.com/gh_mirrors/bl/bl_datasmith 在实时渲染与离线创作工具日益融合的现代数字内容生产…...

题解:AtCoder AT_awc0063_c Maximizing Investment

本文分享的必刷题目是从蓝桥云课、洛谷、AcWing等知名刷题平台精心挑选而来,并结合各平台提供的算法标签和难度等级进行了系统分类。题目涵盖了从基础到进阶的多种算法和数据结构,旨在为不同阶段的编程学习者提供一条清晰、平稳的学习提升路径。 欢迎大…...

仿人机器人触觉与语音技术正加速突破

仿人机器人正快速从工厂、物流场景向更广泛的通用场景拓展,甚至逐步迈入家庭,成为老年人的陪伴与助理。这一进程背后,是生成式 AI 与智能体技术的持续驱动,以及感知能力的全面升级。Cadence CEO Anirudh Devgan 在近期的一次演讲中…...

ARM处理器勘误文档解析与分类指南

1. ARM处理器勘误文档解析与分类指南在嵌入式系统开发领域,处理器勘误文档(Errata Notice)是硬件工程师和底层软件开发者的必备参考资料。这份2004年发布的ARM SY003文档虽然显示当前版本没有实际勘误项,但其结构体系为我们提供了…...

AI辅助全栈开发实战:FastAPI+Angular构建旅行警告地图

1. 项目概述与核心思路最近在折腾一个挺有意思的玩意儿,一个叫“旅行警告地图”的交互式仪表盘。简单来说,这玩意儿能实时抓取德国联邦外交部发布的全球旅行安全建议和警告,然后在一个世界地图上给你直观地标出来。哪里是绿色可以放心去&…...

验证码的实现思路

参考视频:【开源项目学习】若依前后端分离版,通俗易懂,快速上手 点击观看 文章目录页面代码在views文件夹中登录页面login生成验证码如何生成的?反向代理机制使用idea的全局搜索对应的后端代码页面代码在views文件夹中 登录页面lo…...

别再复制粘贴了!手把手教你用C语言实现CRC-16 IBM校验(附四种代码对比与性能分析)

CRC-16 IBM校验实战指南:从原理到四种高效C语言实现 在嵌入式系统和通信协议开发中,数据完整性校验是确保信息可靠传输的基石。CRC-16 IBM作为工业界广泛采用的校验算法,其独特的多项式处理和位反序特性使其在Modbus等协议中表现优异。但网上…...

Locale Remulator:彻底解决多语言软件乱码问题的终极指南

Locale Remulator:彻底解决多语言软件乱码问题的终极指南 【免费下载链接】Locale_Remulator System Region and Language Simulator. 项目地址: https://gitcode.com/gh_mirrors/lo/Locale_Remulator Locale Remulator是一款创新的系统区域和语言模拟器&…...