当前位置: 首页 > article >正文

《Python Web部署应知应会》No2:如何基于FastAPI 和 OLLAMA 架构实现高并发 AI 推理服务

《Python Web部署应知应会》No2:如何基于FastAPI 和 OLLAMA 架构实现高并发 AI 推理服务(上)

摘要:

在 FastAPI 和 OLLAMA 架构中实现高并发 AI 推理服务,并优化性能指标采集和缓存策略,可以充分利用 asyncio 的异步 I/O 操作来提升吞吐量和响应速度。以下是一个详细的解决方案,分为基础实现架构概述、实现步骤、性能指标采集、结合 FastAPI 和 OLLAMA、优化方案详细实现(批量推理、负载均衡、动态缓存失效、监控与告警)几个关键部分进行说明,本文章末提供了一个小型博客网站实现高并发推理的案例设计思路,本文章分为上中下三个系列组成,文章系列(中)将介绍具体网站实践,文章系列(下)将介绍高并发网站性能测试和监控的实现。


一、基础方案和优化方法:

1. 架构概述

  • FastAPI: 提供高性能的异步 Web 框架,支持异步请求处理。
  • OLLAMA: 假设它是一个 AI 推理引擎(例如基于 LLM 的推理服务),可以通过 API 或库调用进行交互。
  • Asyncio: 用于管理异步任务,确保 I/O 密集型操作(如网络请求、数据库访问等)不会阻塞主线程。
  • 缓存策略: 使用内存缓存(如 Redis 或 functools.lru_cache)存储频繁使用的推理结果,减少重复计算。
  • 性能指标采集: 利用异步任务记录性能数据(如请求耗时、错误率等),并将其汇总到监控系统。

在这里插入图片描述

2. 实现步骤

(1) 异步推理调用

使用 asyncio 实现对 OLLAMA 推理服务的异步调用。假设 OLLAMA 提供了一个 HTTP API,可以使用 httpx 库(支持异步请求)与之交互。

import httpx
from fastapi import FastAPI, Dependsapp = FastAPI()# 假设 OLLAMA 的推理服务地址
OLLAMA_API_URL = "http://ollama-service:8000/inference"async def call_ollama(prompt: str):async with httpx.AsyncClient() as client:response = await client.post(OLLAMA_API_URL,json={"prompt": prompt},timeout=30.0)response.raise_for_status()  # 确保请求成功return response.json()

(2) 缓存策略

为了减少重复推理的计算开销,可以引入缓存机制。以下是两种常见的缓存方式:

  • 内存缓存(适合小规模应用):
    使用 functools.lru_cacheaiocache 库实现简单的内存缓存。
from functools import lru_cache@lru_cache(maxsize=128)
def cached_inference(prompt: str):# 这里假设推理是同步的,如果是异步的,需要调整逻辑return call_ollama_sync(prompt)async def call_ollama_with_cache(prompt: str):# 异步包装同步缓存调用return await asyncio.to_thread(cached_inference, prompt)
  • 分布式缓存(适合大规模应用):
    使用 Redis 作为分布式缓存,利用 aioredis 库实现异步操作。
import aioredisredis = aioredis.from_url("redis://localhost")async def call_ollama_with_redis_cache(prompt: str):cache_key = f"inference:{prompt}"result = await redis.get(cache_key)if result:return result.decode("utf-8")  # 缓存命中# 缓存未命中,调用推理服务result = await call_ollama(prompt)await redis.setex(cache_key, 3600, result)  # 缓存1小时return result

(3) 性能指标采集

通过中间件或后台任务记录性能指标,例如请求耗时、成功率等。

  • 中间件记录请求耗时
    在 FastAPI 中添加一个中间件,记录每个请求的处理时间。
import time
from fastapi import Request, Response@app.middleware("http")
async def add_process_time_header(request: Request, call_next):start_time = time.time()response: Response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(process_time)return response
  • 后台任务采集指标
    使用 asyncio.create_task 定期将性能数据发送到监控系统(如 Prometheus 或 Datadog)。
import asynciometrics_queue = asyncio.Queue()async def metrics_collector():while True:metric = await metrics_queue.get()# 将 metric 发送到监控系统print(f"Collected Metric: {metric}")metrics_queue.task_done()# 启动后台任务
@app.on_event("startup")
async def startup_event():asyncio.create_task(metrics_collector())

(4) 结合 FastAPI 和 OLLAMA

将上述组件整合到 FastAPI 中,提供一个完整的高并发推理接口。

@app.post("/infer")
async def infer(prompt: str):start_time = time.time()# 调用推理服务(带缓存)try:result = await call_ollama_with_redis_cache(prompt)status = "success"except Exception as e:result = {"error": str(e)}status = "failure"process_time = time.time() - start_time# 记录性能指标await metrics_queue.put({"prompt": prompt,"status": status,"process_time": process_time})return result

3. 优化建议

(1) 批量推理

如果多个请求可以合并为一个批量推理请求(Batch Inference),可以显著提高吞吐量。例如,累积一定数量的请求后一次性发送给 OLLAMA。

(2) 负载均衡

在高并发场景下,部署多个实例并通过负载均衡器(如 Nginx 或 Kubernetes Service)分发流量。

(3) 动态缓存失效

对于时效性要求较高的数据,可以设置动态缓存失效策略。例如,根据数据更新频率自动刷新缓存。

(4) 监控与告警

结合 Prometheus 和 Grafana,实时监控服务性能,并设置告警规则(如请求失败率超过阈值)。


4. 总结

通过上述方法,我们可以实现一个高效的 FastAPI + OLLAMA 高并发推理服务:

  • 利用 asyncio 和异步库(如 httpxaioredis)提升 I/O 性能。
  • 通过缓存策略减少重复计算,降低延迟。
  • 使用中间件和后台任务采集性能指标,持续优化服务。

这种架构不仅能够满足高并发需求,还能通过缓存和性能监控进一步提升用户体验和系统稳定性。

二、完整实例代码:个人博客

以下是一个完整的个人博客 Flask 网站设计方案,结合了 FastAPI 和 OLLAMA 架构中的高并发 AI 推理服务技术(包括批量推理、负载均衡、动态缓存失效、监控与告警)。这个方案将分为以下几个部分:


1. 项目结构

项目的文件框架如下:

personal_blog/
├── app/                     # Flask 应用代码
│   ├── __init__.py          # 初始化 Flask 应用
│   ├── routes.py            # 博客路由
│   ├── models.py            # 数据库模型
│   ├── ai_service.py        # AI 推理服务集成
│   ├── cache.py             # 缓存逻辑
│   └── metrics.py           # 性能指标采集
├── static/                  # 静态资源
│   ├── css/                 # 样式表
│   ├── js/                  # JavaScript 文件
│   └── images/              # 图片资源
├── templates/               # 模板文件
│   ├── base.html            # 基础模板
│   ├── index.html           # 首页
│   ├── post.html            # 博客文章页面
│   └── ai_response.html     # AI 推理结果页面
├── migrations/              # 数据库迁移文件
├── requirements.txt         # Python 依赖
├── run.py                   # 启动脚本
└── README.md                # 项目说明文档

2. 技术栈

  • 前端: HTML + CSS + JavaScript(Bootstrap 或 TailwindCSS 可选)
  • 后端: Flask(主框架) + FastAPI(AI 推理服务)
  • 数据库: SQLite(小型项目适用)或 PostgreSQL(生产环境推荐)
  • 缓存: Redis(动态缓存失效)
  • 监控: Prometheus + Grafana
  • 负载均衡: Nginx 或 Kubernetes Service(可选)

3. 详细实现

(1) Flask 应用初始化

app/__init__.py 中初始化 Flask 应用,并集成数据库和缓存。

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import aioredis# 初始化 Flask 应用
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False# 初始化数据库
db = SQLAlchemy(app)# 初始化 Redis 缓存
redis = aioredis.from_url("redis://localhost")# 导入路由
from app.routes import *# 创建数据库表
with app.app_context():db.create_all()

(2) 数据库模型

app/models.py 中定义博客文章的数据库模型。

from . import dbclass Post(db.Model):id = db.Column(db.Integer, primary_key=True)title = db.Column(db.String(100), nullable=False)content = db.Column(db.Text, nullable=False)created_at = db.Column(db.DateTime, default=db.func.current_timestamp())

(3) 路由和视图

app/routes.py 中定义博客的路由和视图函数。

from flask import render_template, request, redirect, url_for
from .models import Post
from .ai_service import call_ollama_with_cache
from .cache import redis@app.route('/')
def index():posts = Post.query.order_by(Post.created_at.desc()).all()return render_template('index.html', posts=posts)@app.route('/post/<int:post_id>')
def view_post(post_id):post = Post.query.get_or_404(post_id)return render_template('post.html', post=post)@app.route('/create', methods=['GET', 'POST'])
def create_post():if request.method == 'POST':title = request.form['title']content = request.form['content']new_post = Post(title=title, content=content)db.session.add(new_post)db.session.commit()return redirect(url_for('index'))return render_template('create_post.html')@app.route('/ai-inference', methods=['POST'])
async def ai_inference():prompt = request.form['prompt']result = await call_ollama_with_cache(prompt)return render_template('ai_response.html', result=result)

(4) AI 推理服务

app/ai_service.py 中实现 FastAPI 的高并发 AI 推理服务集成。

批量推理
import asyncio
from collections import defaultdict
import httpxBATCH_SIZE = 10
BATCH_TIMEOUT = 2
OLLAMA_API_URL = "http://ollama-service:8000/inference"
batch_queue = asyncio.Queue()
batch_results = defaultdict(asyncio.Future)async def batch_inference_worker():while True:batch_prompts = []try:while len(batch_prompts) < BATCH_SIZE:prompt, future = await asyncio.wait_for(batch_queue.get(), timeout=BATCH_TIMEOUT)batch_prompts.append(prompt)batch_results[prompt] = futureexcept asyncio.TimeoutError:passif batch_prompts:results = await call_ollama_batch(batch_prompts)for prompt, result in zip(batch_prompts, results):batch_results[prompt].set_result(result)async def call_ollama_batch(prompts: list):async with httpx.AsyncClient() as client:response = await client.post(OLLAMA_API_URL,json={"prompts": prompts},timeout=30.0)response.raise_for_status()return response.json()["results"]
Redis 动态缓存失效
async def call_ollama_with_cache(prompt: str):cache_key = f"inference:{prompt}"result = await redis.get(cache_key)if result:return result.decode("utf-8")result = await call_ollama(prompt)ttl = calculate_ttl(prompt)await redis.setex(cache_key, ttl, result)return resultdef calculate_ttl(prompt: str) -> int:if "urgent" in prompt.lower():return 60return 3600

(5) 监控与告警

app/metrics.py 中集成 Prometheus。

from prometheus_client import Counter, Histogram, start_http_serverREQUEST_COUNT = Counter("request_count", "Total number of requests", ["status"])
REQUEST_LATENCY = Histogram("request_latency_seconds", "Request latency in seconds")@app.before_request
def before_request():request.start_time = time.time()@app.after_request
def after_request(response):process_time = time.time() - request.start_timestatus = "success" if response.status_code < 400 else "failure"REQUEST_COUNT.labels(status=status).inc()REQUEST_LATENCY.observe(process_time)return responsestart_http_server(8001)

(6) 静态资源与模板

templates/static/ 中提供前端页面和静态资源。

示例模板 (base.html)
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Personal Blog</title><link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body><header><h1>My Personal Blog</h1></header><main>{% block content %}{% endblock %}</main>
</body>
</html>

4. 启动与部署

(1) 启动脚本

run.py 中启动 Flask 应用。

from app import appif __name__ == '__main__':app.run(debug=True)
(2) 部署建议
  • 使用 Gunicorn 启动 Flask 应用:gunicorn -w 4 app:app
  • 配置 Nginx 作为反向代理和负载均衡器。
  • 部署 Prometheus 和 Grafana 进行性能监控。

5. 总结

通过上述设计,我们实现了一个完整的个人博客网站,集成了高并发 AI 推理服务(FastAPI + OLLAMA),并实现了批量推理、动态缓存失效、监控与告警等优化方案。这种架构不仅功能强大,还能满足高并发需求,适合中小型博客应用。

相关文章:

《Python Web部署应知应会》No2:如何基于FastAPI 和 OLLAMA 架构实现高并发 AI 推理服务

《Python Web部署应知应会》No2&#xff1a;如何基于FastAPI 和 OLLAMA 架构实现高并发 AI 推理服务&#xff08;上&#xff09; 摘要&#xff1a; 在 FastAPI 和 OLLAMA 架构中实现高并发 AI 推理服务&#xff0c;并优化性能指标采集和缓存策略&#xff0c;可以充分利用 asy…...

NUUO摄像头debugging_center_utils命令执行漏洞

免责声明&#xff1a;本号提供的网络安全信息仅供参考&#xff0c;不构成专业建议。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权&#xff0c;请及时与我联系&#xff0c;我将尽快处理并删除相关内容。 漏洞描述 NUUO NVR是中国台湾省NUUO公司旗…...

uv 命令用conda命令解释

uv&#xff1a;安装 | uv-zh-cn 功能 | uv-zh-cn #showkey -a 可看按键的"\eOP"转义序列是啥# 绑定快捷键 f1 到 source .venv/bin/activate函数 bind "\eOP": "source .venv/bin/activate " #conda activate# 绑定快捷键 f2 到uv add函数 …...

解决【vite-plugin-top-level-await】 插件导致的 Bindings Not Found 错误

解决【vite-plugin-top-level-await】 插件导致的 Bindings Not Found 错误 环境设置 操作系统: macOS硬件平台: M1 Pro前端框架: Vue 3Node.js 版本: 20 在使用 Vue 项目时&#xff0c;我们尝试集成 vite-plugin-top-level-await 插件以支持顶层 await 语法。然而&#xff…...

2.pycharm部署Ai - 编程好助手

一、pycharm安装continue插件 1.提前安装好pycharm&#xff0c;并双击打开 2.File – Setting 3.Plugins – 搜索Continue &#xff0c; 点击Install安装 4.点ok 二、获取硅基流动API 1.登入网站&#xff1a;https://siliconflow.cn/zh-cn/#/&#xff0c;并注册登入 2.获取AP…...

uniapp + Axios + 小程序封装网络请求

前言 小程序自带的网络请求使用起来比较麻烦&#xff0c;不便于管理&#xff0c;就需要封装网络请求&#xff0c;减少繁琐步骤&#xff0c;封装最终效果&#xff0c;根据类别将网络请求封装在文件中&#xff0c;使用得时候调用文件名名称加文件中得自定义名称&#xff0c;就可…...

初始数据库--MySQL

一&#xff0c;数据库&#xff08;DataBase&#xff09;概述 思考程序开发中数据存储问题&#xff1f; 变量 int c 10; 对象{名字&#xff0c;价格&#xff0c;颜色} 集合类-->对象 以上方式存储数据都是在内存中存储 程序运行结束后数据都释放了 …...

ENSP学习day12

链路聚合–LACP&#xff08;Link Aggregation Control Protocol&#xff09; 链路聚合是指将多条物理链路通过一种技术进行绑定&#xff0c;从而形成一个逻辑上更高带宽、更可靠的链路。这种技术能够提高网络传输速率和可靠性&#xff0c;同时实现负载均衡。链路聚合可以在不同…...

【LeetCode 题解】算法:29.两数相除

在算法的世界里&#xff0c;常常会出现一些打破常规、挑战思维的题目。LeetCode 第 29 题 “两数相除” 便是其中之一。这道题不仅要求我们在不能使用乘法、除法和取余运算的前提下实现两数相除&#xff0c;还需要处理 32 位有符号整数的溢出问题&#xff0c;对编程者的逻辑思维…...

打包python文件生成exe

下载PyInstaller 官网 pip install pyinstaller验证是否安装成功 pyinstaller --version打包 pyinstaller "C:\Documents and Settings\project\myscript.py"会生成.spec,build,dist三项&#xff0c;其中build,dist为文件夹&#xff0c;dist是最后的可执行文件&a…...

【NLP】13. NLP推理方法详解 --- 穷举和贪心搜索

NLP推理方法详解 — 穷举和贪心搜索 在自然语言处理&#xff08;NLP&#xff09;任务中&#xff0c;推理&#xff08;Inference&#xff09;是指在给定模型的情况下&#xff0c;找到最可能的输出序列。由于模型通常是神经网络&#xff0c;它会为每个可能的输出分配一个概率&am…...

Redis延时队列在订单超时未报到场景的应用分享

一、引言 在电商、医疗预约等众多业务场景中&#xff0c;经常会遇到需要处理超时任务的情况。比如医疗预约订单&#xff0c;如果患者在支付成功后&#xff0c;到了预约结束时间还未报到&#xff0c;系统需要自动取消订单。为了实现这样的功能&#xff0c;我们可以利用 Redis 延…...

精心整理-2024最新网络安全-信息安全全套资料(学习路线、教程笔记、工具软件、面试文档).zip

2024最新网络安全-信息安全全套资料&#xff08;学习路线、教程笔记、工具软件、面试文档&#xff09;&#xff0c;视频教程文档资料共55GB。 一、网络安全-信息安全学习路线 0、网络安全-信息安全思维导图.jpg 1、网络安全大师课 V2024.pdf 2、网络安全行业白皮书.pdf 3、网络…...

多人协同进行qt应用程序开发应该注意什么?

多人协同进行Qt应用程序开发的注意事项 多人协同开发Qt应用程序时&#xff0c;需要注意以下几个关键方面以确保开发效率和代码质量&#xff1a; 1. 代码版本控制 使用Git&#xff1a;建立合理的分支策略&#xff08;如Git Flow&#xff09;.gitignore配置&#xff1a;确保忽…...

8.4考研408简单选择排序与堆排序知识点深度解析

考研408「简单选择排序与堆排序」知识点全解析 一、简单选择排序 1.1 定义与核心思想 简单选择排序&#xff08;Selection Sort&#xff09;是一种选择排序算法&#xff0c;其核心思想是&#xff1a; 每趟选择&#xff1a;从待排序序列中选择最小&#xff08;或最大&#x…...

C++中ShellExecute函数使用方法说明,如果一开始参数为隐藏,后面还能再显示出来吗

文章目录 一、ShellExecute基础用法函数原型关键参数 nShowCmd示例代码&#xff1a;启动程序并隐藏窗口 二、隐藏后能否重新显示窗口直接答案 三、实现隐藏后显示窗口的步骤1. 获取目标窗口句柄2. 显示窗口 四、完整流程示例五、注意事项六、总结 在C中使用ShellExecute函数时&…...

MySQL数据库精研之旅第五期:CRUD的趣味探索(上)

专栏&#xff1a;MySQL数据库成长记 个人主页&#xff1a;手握风云 目录 一、CRUD简介 二、Create新增 2.1. 语法 2.2. 示例 三、Retrieve检索 3.1. 语法 3.2. 示例 一、CRUD简介 CURD是对数据库中的记录进行基本的增删改查操作&#xff1a;Create(创建)、Retrieve(检索…...

【文件上传】✈️大文件的上传服务器的简单实现

&#x1f4a5;&#x1f4a5;✈️✈️欢迎阅读本文章❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;本篇文章阅读大约耗时五分钟。 ⛳️motto&#xff1a;不积跬步、无以千里 &#x1f4cb;&#x1f4cb;&#x1f4cb;本文目录如下&#xff1a;&#x1f381;&#x1f381;&a…...

Windows DOS窗口12个命令

DOS 命令是指在 Windows 命令提示符&#xff08;CMD&#xff09;中使用的命令行工具&#xff0c;源于早期的 Disk Operating System。虽然现代 Windows 系统更多使用图形界面&#xff0c;但命令提示符仍然是测试人员的重要工具。测试人员通常需要执行文件操作、测试网络连接、监…...

AI加Python的文本数据情感分析流程效果展示与代码实现

本文所使用数据来自于梯田景区评价数据。 一、数据预处理 数据清洗 去除重复值、空值及无关字符(如表情符号、特殊符号等)。 提取中文文本,过滤非中文字符。 统一文本格式(如全角转半角、繁体转简体)。 中文分词与去停用词 使用 jieba 分词工具进行分词。 加载自定义词…...

Go语言手动内存对齐的四大场景与实践指南

Go语言手动内存对齐的四大场景与实践指南 引言&#xff1a;Go的内存对齐机制 Go语言通过编译器自动处理内存对齐问题&#xff0c;开发者通常无需关心底层细节。然而&#xff0c;在特定场景下&#xff0c;手动干预内存对齐是避免程序崩溃或数据错乱的必要操作。本文将深入探讨G…...

PDF多表格结构识别与跨表语义对齐:基于对抗迁移的鲁棒相似度度量模型

文章目录 一. 项目结构二.流程分析2.1 批处理器核心代码解析 三. 跨页表格相似度匹配原理3.1 表头内容相似度-特征向量归一化3.2 表头内容相似度-余弦相似度3.3 定时缓存清理 ocr扫描有其局限性。对于pdf文本类型这种pdfbox&#xff0c;aspose-pdf&#xff0c;spire直接提取文本…...

docker启动nacos+redis+seata

docker启动nacos 最新版本的nacos需要再启动的时候设置mysql的一些属性&#xff0c;【也可以先启动nacos&#xff0c;再到配置文件中找到application.yml设置mysql的一些属性】。 1.如果直接启动nacos设置的mysql我们需要确定两个容器的ip都是一样的。 查看mysql容器中的ip命令…...

从 select 到 epoll:拆解 I/O 多路复用的演进与实战

目录 一、引言&#xff1a;为什么需要 I/O 多路复用&#xff1f; 二、select 1.函数介绍 2.原理 3.样例代码 4.优缺点总结 三、poll 1.函数介绍 2.样例代码 3.优缺点总结 四、epoll 1.函数介绍 2.原理 3.LT和ET两种工作模式 4.优缺点总结 五、核心机制对比&…...

Go后端架构探索:从 MVC 到 DDD 的演进之路

Go语言 MVC 与 DDD 分层架构详细对比 MVC和DDD是后台开发两种流行的分层架构思想&#xff0c;MVC&#xff08;Model-View-Controller&#xff09;是一种设计模式&#xff0c;主要用于分离用户界面、业务逻辑和数据模型&#xff0c;便于分层解耦&#xff0c;而DDD&#xff08;领…...

【力扣hot100题】(017)矩阵置零

还是挺简单的&#xff0c;使用哈希表记录需要置换的行列即可&#xff0c;这样就可以避免重复节省时间。 class Solution { public:void setZeroes(vector<vector<int>>& matrix) {unordered_set<int> row;unordered_set<int> line;for(int i0;i&l…...

One Commander 3,文件管理新体验

One Commander 3 是一款集多功能于一体 Windows 10/11的文件管理工具&#xff0c;其设计目的在于为用户带来多元化的操作体验。这款工具通过支持多栏界面布局&#xff0c;让用户能够迅速且高效地组织和管理文件。此外&#xff0c;它还提供了多主题选项和多种图标集&#xff0c;…...

Ubuntu 下 nginx-1.24.0 源码分析

main 函数在 src\core\nginx.c int ngx_cdecl main(int argc, char *const *argv) {ngx_buf_t *b;ngx_log_t *log;ngx_uint_t i;ngx_cycle_t *cycle, init_cycle;ngx_conf_dump_t *cd;ngx_core_conf_t *ccf;ngx_debug_init();if (ngx_strerror_in…...

c# ftp上传下载 帮助类

工作中FTP的上传和下载还是很常用的。如下载打标数据,上传打标结果等。 这个类常用方法都有了:上传,下载,判断文件夹是否存在,创建文件夹,获取当前目录下文件列表(不包括文件夹) ,获取当前目录下文件列表(不包括文件夹) ,获取FTP文件列表(包括文件夹), 获取当前目…...

Java进阶——静态代理与动态代理

代理模式是一种常用的设计模式&#xff0c;为其他对象提供一种代理以控制对这个对象的访问。代理模式就像是一个中间人&#xff0c;客户端通过代理来间接访问目标对象&#xff0c;可以在不修改目标对象的基础上&#xff0c;对目标对象的功能进行增强或扩展。代理模式主要分为静…...