当前位置: 首页 > article >正文

华为云Flexus+DeepSeek征文 | 基于DeepSeek-V3构建企业知识库问答机器人实战

作者简介

我是摘星,一名专注于云计算和AI技术的开发者。本次通过华为云MaaS平台体验DeepSeek系列模型,将实际使用经验分享给大家,希望能帮助开发者快速掌握华为云AI服务的核心能力。

目录

作者简介

1. 引言

2. 技术选型与架构设计

2.1 技术栈选择

2.2 系统架构设计

2.3 核心工作流程

3. 环境准备与部署

3.1 华为云Flexus环境配置

3.2 基础环境安装

3.3 Python依赖安装

4. 核心功能实现

4.1 文档预处理模块

4.2 向量数据库模块

4.3 DeepSeek-V3集成模块

4.4 问答引擎核心模块

5. Web服务接口实现

5.1 FastAPI后端服务

5.2 Streamlit前端界面

6. 容器化部署

6.1 Docker配置

6.2 Docker Compose配置

7. 系统优化与监控

7.1 性能优化策略

7.2 监控与日志系统

8. 测试与验证

8.1 单元测试

8.2 性能测试

9. 部署指南

9.1 生产环境部署

9.2 Nginx配置

10. 总结与展望

10.1 项目总结

10.2 使用效果分析

10.3 未来改进方向

10.4 技术要点回顾

参考文献


 

1. 引言

随着人工智能技术的快速发展,企业对于智能化知识管理和问答系统的需求日益增长。传统的知识管理方式往往存在信息检索效率低、知识分散、更新不及时等问题。而基于大语言模型的知识库问答机器人能够有效解决这些痛点,为企业提供高效、准确的知识服务。

本文将详细介绍如何利用华为云Flexus云服务器和DeepSeek-V3大语言模型,构建一个功能完善的企业知识库问答机器人。通过本实战项目,您将学会:

  • 如何设计企业级知识库问答系统架构
  • 如何部署和配置DeepSeek-V3模型
  • 如何实现知识库的向量化存储和检索
  • 如何构建用户友好的问答界面
  • 如何优化系统性能和准确性

图1: 企业知识库问答机器人系统架构图

2. 技术选型与架构设计

2.1 技术栈选择

在构建企业知识库问答机器人时,我们选择以下技术栈:

  • 云平台: 华为云Flexus云服务器
  • 大语言模型: DeepSeek-V3
  • 向量数据库: ChromaDB
  • 文本嵌入: Sentence-Transformers
  • Web框架: FastAPI + Streamlit
  • 数据处理: Python + pandas
  • 容器化: Docker

2.2 系统架构设计

整个系统采用模块化设计,主要包含以下几个核心组件:

图1: 企业知识库问答机器人系统架构图

2.3 核心工作流程

系统的核心工作流程如下:

图2: 系统工作流程图

3. 环境准备与部署

3.1 华为云Flexus环境配置

首先,我们需要在华为云上创建Flexus云服务器实例:

# 推荐配置

# CPU: 8核

# 内存: 32GB

# 存储: 500GB SSD

# 操作系统: Ubuntu 22.04 LTS

3.2 基础环境安装

在服务器上安装必要的软件环境:

#!/bin/bash
# 更新系统包
sudo apt update && sudo apt upgrade -y# 安装Python 3.11
sudo apt install python3.11 python3.11-pip python3.11-venv -y# 安装Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh# 安装Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/v2.20.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose# 创建项目目录
mkdir -p /opt/enterprise-qa-bot
cd /opt/enterprise-qa-bot

3.3 Python依赖安装

创建虚拟环境并安装依赖:

# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
streamlit==1.28.1
chromadb==0.4.15
sentence-transformers==2.2.2
openai==1.3.6
pandas==2.1.3
numpy==1.24.3
pydantic==2.5.0
python-multipart==0.0.6
aiofiles==23.2.1
langchain==0.0.340
langchain-community==0.0.1
python-docx==0.8.11
PyPDF2==3.0.1
tiktoken==0.5.1
# 创建虚拟环境
python3.11 -m venv venv
source venv/bin/activate# 安装依赖
pip install -r requirements.txt

4. 核心功能实现

4.1 文档预处理模块

首先实现文档预处理模块,用于处理各种格式的企业文档:

# document_processor.py
import os
import pandas as pd
from typing import List, Dict
from pathlib import Path
import PyPDF2
from docx import Document
import tiktokenclass DocumentProcessor:"""文档预处理器类支持处理PDF、Word、TXT等格式的文档"""def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):"""初始化文档处理器Args:chunk_size: 文本块大小chunk_overlap: 文本块重叠大小"""self.chunk_size = chunk_sizeself.chunk_overlap = chunk_overlapself.encoding = tiktoken.get_encoding("cl100k_base")def extract_text_from_pdf(self, file_path: str) -> str:"""从PDF文件中提取文本Args:file_path: PDF文件路径Returns:提取的文本内容"""text = ""try:with open(file_path, 'rb') as file:pdf_reader = PyPDF2.PdfReader(file)for page in pdf_reader.pages:text += page.extract_text() + "\n"except Exception as e:print(f"处理PDF文件 {file_path} 时出错: {str(e)}")return textdef extract_text_from_docx(self, file_path: str) -> str:"""从Word文档中提取文本Args:file_path: Word文档路径Returns:提取的文本内容"""text = ""try:doc = Document(file_path)for paragraph in doc.paragraphs:text += paragraph.text + "\n"except Exception as e:print(f"处理Word文档 {file_path} 时出错: {str(e)}")return textdef extract_text_from_txt(self, file_path: str, encoding: str = 'utf-8') -> str:"""从文本文件中提取内容Args:file_path: 文本文件路径encoding: 文件编码Returns:文件内容"""try:with open(file_path, 'r', encoding=encoding) as file:return file.read()except UnicodeDecodeError:# 尝试其他编码for enc in ['gbk', 'gb2312', 'latin-1']:try:with open(file_path, 'r', encoding=enc) as file:return file.read()except UnicodeDecodeError:continueexcept Exception as e:print(f"处理文本文件 {file_path} 时出错: {str(e)}")return ""def split_text_into_chunks(self, text: str, metadata: Dict = None) -> List[Dict]:"""将文本分割成块Args:text: 输入文本metadata: 文档元数据Returns:文本块列表"""if not text.strip():return []# 使用tiktoken计算token数量tokens = self.encoding.encode(text)chunks = []start = 0chunk_id = 0while start < len(tokens):# 确定当前块的结束位置end = min(start + self.chunk_size, len(tokens))# 解码当前块chunk_tokens = tokens[start:end]chunk_text = self.encoding.decode(chunk_tokens)# 创建块数据chunk_data = {'id': f"{metadata.get('filename', 'unknown')}_{chunk_id}",'text': chunk_text.strip(),'metadata': {**(metadata or {}),'chunk_id': chunk_id,'start_token': start,'end_token': end}}chunks.append(chunk_data)# 移动到下一个块的开始位置(考虑重叠)start = end - self.chunk_overlapchunk_id += 1return chunksdef process_documents(self, document_dir: str) -> List[Dict]:"""批量处理文档目录Args:document_dir: 文档目录路径Returns:处理后的文档块列表"""all_chunks = []# 支持的文件格式supported_extensions = {'.pdf', '.docx', '.txt', '.md'}# 遍历文档目录for file_path in Path(document_dir).rglob('*'):if file_path.suffix.lower() in supported_extensions:print(f"正在处理文件: {file_path}")# 根据文件格式提取文本if file_path.suffix.lower() == '.pdf':text = self.extract_text_from_pdf(str(file_path))elif file_path.suffix.lower() == '.docx':text = self.extract_text_from_docx(str(file_path))else:  # .txt, .mdtext = self.extract_text_from_txt(str(file_path))# 准备元数据metadata = {'filename': file_path.name,'filepath': str(file_path),'file_type': file_path.suffix.lower(),'file_size': file_path.stat().st_size}# 分割文本并添加到结果中chunks = self.split_text_into_chunks(text, metadata)all_chunks.extend(chunks)print(f"文件 {file_path.name} 处理完成,生成 {len(chunks)} 个文本块")print(f"总共处理了 {len(all_chunks)} 个文本块")return all_chunks

4.2 向量数据库模块

实现向量数据库管理模块,用于存储和检索文档向量:

# vector_store.py
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Optional
import numpy as npclass VectorStore:"""向量数据库管理类基于ChromaDB实现文档向量存储和检索"""def __init__(self, persist_directory: str = "./chroma_db",collection_name: str = "enterprise_knowledge",embedding_model: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):"""初始化向量数据库Args:persist_directory: 数据库持久化目录collection_name: 集合名称embedding_model: 嵌入模型名称"""self.persist_directory = persist_directoryself.collection_name = collection_name# 初始化ChromaDB客户端self.client = chromadb.PersistentClient(path=persist_directory,settings=Settings(allow_reset=True))# 初始化嵌入模型print(f"正在加载嵌入模型: {embedding_model}")self.embedding_model = SentenceTransformer(embedding_model)print("嵌入模型加载完成")# 获取或创建集合self.collection = self.client.get_or_create_collection(name=collection_name,metadata={"description": "企业知识库向量存储"})def add_documents(self, documents: List[Dict]) -> None:"""添加文档到向量数据库Args:documents: 文档列表,每个文档包含id、text和metadata"""if not documents:print("没有文档需要添加")return# 提取文本内容texts = [doc['text'] for doc in documents]ids = [doc['id'] for doc in documents]metadatas = [doc.get('metadata', {}) for doc in documents]print(f"正在生成 {len(texts)} 个文档的向量表示...")# 生成嵌入向量embeddings = self.embedding_model.encode(texts, show_progress_bar=True,normalize_embeddings=True).tolist()print("向量生成完成,正在存储到数据库...")# 批量添加到ChromaDBbatch_size = 100for i in range(0, len(documents), batch_size):batch_ids = ids[i:i+batch_size]batch_embeddings = embeddings[i:i+batch_size]batch_texts = texts[i:i+batch_size]batch_metadatas = metadatas[i:i+batch_size]self.collection.add(ids=batch_ids,embeddings=batch_embeddings,documents=batch_texts,metadatas=batch_metadatas)print(f"已处理 {min(i+batch_size, len(documents))}/{len(documents)} 个文档")print("所有文档已成功添加到向量数据库")def search_similar_documents(self, query: str, top_k: int = 5,similarity_threshold: float = 0.7) -> List[Dict]:"""检索相似文档Args:query: 查询文本top_k: 返回的文档数量similarity_threshold: 相似度阈值Returns:相似文档列表"""# 生成查询向量query_embedding = self.embedding_model.encode([query], normalize_embeddings=True)[0].tolist()# 在ChromaDB中搜索results = self.collection.query(query_embeddings=[query_embedding],n_results=top_k,include=['documents', 'metadatas', 'distances'])# 格式化结果similar_docs = []for i in range(len(results['ids'][0])):distance = results['distances'][0][i]similarity = 1 - distance  # ChromaDB使用欧几里得距离# 过滤低相似度的文档if similarity >= similarity_threshold:doc = {'id': results['ids'][0][i],'text': results['documents'][0][i],'metadata': results['metadatas'][0][i],'similarity': similarity}similar_docs.append(doc)return similar_docsdef get_collection_stats(self) -> Dict:"""获取集合统计信息Returns:统计信息字典"""count = self.collection.count()return {'total_documents': count,'collection_name': self.collection_name,'persist_directory': self.persist_directory}def clear_collection(self) -> None:"""清空集合"""self.client.delete_collection(self.collection_name)self.collection = self.client.create_collection(name=self.collection_name,metadata={"description": "企业知识库向量存储"})print("集合已清空")

4.3 DeepSeek-V3集成模块

实现与DeepSeek-V3模型的集成:

# deepseek_client.py
import openai
from typing import List, Dict, Optional
import json
import timeclass DeepSeekClient:"""DeepSeek-V3模型客户端提供与DeepSeek模型的交互接口"""def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):"""初始化DeepSeek客户端Args:api_key: DeepSeek API密钥base_url: API基础URL"""self.client = openai.OpenAI(api_key=api_key,base_url=base_url)# 模型配置self.model_name = "deepseek-chat"self.max_tokens = 2000self.temperature = 0.1def generate_answer(self, question: str, context_documents: List[Dict],system_prompt: Optional[str] = None) -> Dict:"""基于上下文文档生成答案Args:question: 用户问题context_documents: 相关上下文文档system_prompt: 系统提示词Returns:生成的答案和元信息"""# 构建上下文context = self._build_context(context_documents)# 默认系统提示词if system_prompt is None:system_prompt = """你是一个专业的企业知识库问答助手。请基于提供的上下文信息回答用户的问题。回答要求:
1. 仅基于提供的上下文信息回答,不要添加无关信息
2. 如果上下文中没有相关信息,请明确说明无法找到相关信息
3. 回答要准确、简洁、有条理
4. 如果可能,请引用具体的文档来源
5. 使用中文回答上下文信息:
{context}
"""# 构建消息messages = [{"role": "system","content": system_prompt.format(context=context)},{"role": "user","content": f"问题:{question}"}]try:# 调用DeepSeek APIstart_time = time.time()response = self.client.chat.completions.create(model=self.model_name,messages=messages,max_tokens=self.max_tokens,temperature=self.temperature,stream=False)response_time = time.time() - start_time# 提取回答answer = response.choices[0].message.content# 构建返回结果result = {'answer': answer,'question': question,'context_documents': context_documents,'response_time': response_time,'model_info': {'model': self.model_name,'temperature': self.temperature,'max_tokens': self.max_tokens},'usage': {'prompt_tokens': response.usage.prompt_tokens,'completion_tokens': response.usage.completion_tokens,'total_tokens': response.usage.total_tokens}}return resultexcept Exception as e:print(f"调用DeepSeek API时出错: {str(e)}")return {'answer': "抱歉,生成答案时出现错误,请稍后再试。",'error': str(e),'question': question}def _build_context(self, documents: List[Dict]) -> str:"""构建上下文字符串Args:documents: 文档列表Returns:格式化的上下文字符串"""if not documents:return "暂无相关上下文信息。"context_parts = []for i, doc in enumerate(documents, 1):# 获取文档信息text = doc.get('text', '')metadata = doc.get('metadata', {})similarity = doc.get('similarity', 0)# 格式化文档doc_context = f"""
文档{i}(相似度: {similarity:.3f}):
来源:{metadata.get('filename', '未知')}
内容:{text}
---
"""context_parts.append(doc_context)return "\n".join(context_parts)def stream_generate_answer(self, question: str, context_documents: List[Dict],system_prompt: Optional[str] = None):"""流式生成答案(用于实时显示)Args:question: 用户问题context_documents: 相关上下文文档system_prompt: 系统提示词Yields:生成的文本片段"""# 构建上下文和消息(与generate_answer相同)context = self._build_context(context_documents)if system_prompt is None:system_prompt = """你是一个专业的企业知识库问答助手。请基于提供的上下文信息回答用户的问题。回答要求:
1. 仅基于提供的上下文信息回答,不要添加无关信息
2. 如果上下文中没有相关信息,请明确说明无法找到相关信息
3. 回答要准确、简洁、有条理
4. 如果可能,请引用具体的文档来源
5. 使用中文回答上下文信息:
{context}
"""messages = [{"role": "system","content": system_prompt.format(context=context)},{"role": "user","content": f"问题:{question}"}]try:# 流式调用APIresponse = self.client.chat.completions.create(model=self.model_name,messages=messages,max_tokens=self.max_tokens,temperature=self.temperature,stream=True)for chunk in response:if chunk.choices[0].delta.content is not None:yield chunk.choices[0].delta.contentexcept Exception as e:yield f"错误:{str(e)}"

4.4 问答引擎核心模块

实现整合各个组件的问答引擎:

# qa_engine.py
from typing import List, Dict, Optional
from vector_store import VectorStore
from deepseek_client import DeepSeekClient
from document_processor import DocumentProcessor
import timeclass QAEngine:"""问答引擎核心类整合文档处理、向量检索和答案生成功能"""def __init__(self, deepseek_api_key: str,vector_store_config: Dict = None,retrieval_config: Dict = None):"""初始化问答引擎Args:deepseek_api_key: DeepSeek API密钥vector_store_config: 向量数据库配置retrieval_config: 检索配置"""# 初始化向量数据库vs_config = vector_store_config or {}self.vector_store = VectorStore(**vs_config)# 初始化DeepSeek客户端self.deepseek_client = DeepSeekClient(api_key=deepseek_api_key)# 初始化文档处理器self.doc_processor = DocumentProcessor()# 检索配置self.retrieval_config = retrieval_config or {'top_k': 5,'similarity_threshold': 0.7}# 记录问答历史self.qa_history = []def add_knowledge_base(self, documents_dir: str) -> Dict:"""添加知识库文档Args:documents_dir: 文档目录路径Returns:处理结果统计"""print(f"开始处理知识库文档目录: {documents_dir}")start_time = time.time()# 处理文档document_chunks = self.doc_processor.process_documents(documents_dir)if not document_chunks:return {'success': False,'message': '没有找到可处理的文档','processed_docs': 0,'processing_time': 0}# 添加到向量数据库self.vector_store.add_documents(document_chunks)processing_time = time.time() - start_timereturn {'success': True,'message': '知识库添加成功','processed_docs': len(document_chunks),'processing_time': processing_time}def ask_question(self, question: str, include_sources: bool = True) -> Dict:"""回答问题Args:question: 用户问题include_sources: 是否包含来源信息Returns:问答结果"""start_time = time.time()# 检索相关文档print(f"检索问题相关文档: {question}")relevant_docs = self.vector_store.search_similar_documents(query=question,**self.retrieval_config)if not relevant_docs:result = {'question': question,'answer': '抱歉,我在知识库中没有找到与您问题相关的信息。请尝试换个问法或确认知识库是否包含相关内容。','sources': [],'retrieval_time': time.time() - start_time,'relevant_docs_count': 0}else:# 生成答案print(f"找到 {len(relevant_docs)} 个相关文档,正在生成答案...")generation_result = self.deepseek_client.generate_answer(question=question,context_documents=relevant_docs)# 准备来源信息sources = []if include_sources:for doc in relevant_docs:source_info = {'filename': doc['metadata'].get('filename', '未知'),'similarity': doc['similarity'],'text_preview': doc['text'][:200] + '...' if len(doc['text']) > 200 else doc['text']}sources.append(source_info)result = {'question': question,'answer': generation_result.get('answer', '生成答案时出现错误'),'sources': sources,'retrieval_time': time.time() - start_time,'response_time': generation_result.get('response_time', 0),'relevant_docs_count': len(relevant_docs),'model_usage': generation_result.get('usage', {}),'error': generation_result.get('error')}# 记录问答历史self.qa_history.append({'timestamp': time.time(),'question': question,'answer': result['answer'],'docs_count': result['relevant_docs_count']})return resultdef stream_ask_question(self, question: str):"""流式问答(用于实时显示)Args:question: 用户问题Yields:答案片段"""# 检索相关文档relevant_docs = self.vector_store.search_similar_documents(query=question,**self.retrieval_config)if not relevant_docs:yield "抱歉,我在知识库中没有找到与您问题相关的信息。"return# 流式生成答案for chunk in self.deepseek_client.stream_generate_answer(question=question,context_documents=relevant_docs):yield chunkdef get_statistics(self) -> Dict:"""获取系统统计信息Returns:统计信息字典"""vector_stats = self.vector_store.get_collection_stats()return {'knowledge_base': vector_stats,'qa_sessions': len(self.qa_history),'recent_questions': [qa['question'] for qa in self.qa_history[-10:]]}def clear_knowledge_base(self) -> None:"""清空知识库"""self.vector_store.clear_collection()print("知识库已清空")def export_qa_history(self, filename: str = None) -> str:"""导出问答历史Args:filename: 导出文件名Returns:导出文件路径"""if filename is None:filename = f"qa_history_{int(time.time())}.json"import jsonwith open(filename, 'w', encoding='utf-8') as f:json.dump(self.qa_history, f, ensure_ascii=False, indent=2)return filename

5. Web服务接口实现

5.1 FastAPI后端服务

# main.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import os
import tempfile
import shutil
from qa_engine import QAEngineapp = FastAPI(title="企业知识库问答API", version="1.0.0")# 配置CORS
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 初始化问答引擎
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:raise ValueError("请设置DEEPSEEK_API_KEY环境变量")qa_engine = QAEngine(deepseek_api_key=DEEPSEEK_API_KEY)# 请求模型
class QuestionRequest(BaseModel):question: strinclude_sources: bool = Trueclass KnowledgeBaseRequest(BaseModel):documents_dir: str# 响应模型
class QuestionResponse(BaseModel):question: stranswer: strsources: List[dict] = []retrieval_time: floatresponse_time: float = 0relevant_docs_count: interror: Optional[str] = None@app.get("/")
async def root():"""根路径"""return {"message": "企业知识库问答API服务正在运行"}@app.post("/ask", response_model=QuestionResponse)
async def ask_question(request: QuestionRequest):"""问答接口"""try:result = qa_engine.ask_question(question=request.question,include_sources=request.include_sources)return QuestionResponse(**result)except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.post("/knowledge-base/add")
async def add_knowledge_base(request: KnowledgeBaseRequest):"""添加知识库"""try:if not os.path.exists(request.documents_dir):raise HTTPException(status_code=404, detail="文档目录不存在")result = qa_engine.add_knowledge_base(request.documents_dir)return resultexcept Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.post("/knowledge-base/upload")
async def upload_documents(files: List[UploadFile] = File(...)):"""上传文档到知识库"""try:# 创建临时目录temp_dir = tempfile.mkdtemp()# 保存上传的文件for file in files:file_path = os.path.join(temp_dir, file.filename)with open(file_path, "wb") as buffer:shutil.copyfileobj(file.file, buffer)# 处理文档result = qa_engine.add_knowledge_base(temp_dir)# 清理临时目录shutil.rmtree(temp_dir)return resultexcept Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.get("/statistics")
async def get_statistics():"""获取系统统计信息"""try:return qa_engine.get_statistics()except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.delete("/knowledge-base")
async def clear_knowledge_base():"""清空知识库"""try:qa_engine.clear_knowledge_base()return {"message": "知识库已清空"}except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.get("/health")
async def health_check():"""健康检查"""return {"status": "healthy", "service": "enterprise-qa-bot"}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)

5.2 Streamlit前端界面

# streamlit_app.py
import streamlit as st
import requests
import json
import time
from typing import List, Dict# 配置页面
st.set_page_config(page_title="企业知识库问答机器人",page_icon="🤖",layout="wide",initial_sidebar_state="expanded"
)# API配置
API_BASE_URL = "http://localhost:8000"def init_session_state():"""初始化会话状态"""if 'chat_history' not in st.session_state:st.session_state.chat_history = []if 'knowledge_base_stats' not in st.session_state:st.session_state.knowledge_base_stats = {}def call_api(endpoint: str, method: str = "GET", data: dict = None, files: dict = None):"""调用API"""url = f"{API_BASE_URL}{endpoint}"try:if method == "GET":response = requests.get(url)elif method == "POST":if files:response = requests.post(url, files=files)else:response = requests.post(url, json=data)elif method == "DELETE":response = requests.delete(url)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:st.error(f"API调用失败: {str(e)}")return Nonedef display_chat_message(role: str, content: str, sources: List[Dict] = None):"""显示聊天消息"""with st.chat_message(role):st.write(content)# 显示来源信息if sources and role == "assistant":with st.expander("📚 参考来源", expanded=False):for i, source in enumerate(sources, 1):st.write(f"**来源 {i}**: {source['filename']}")st.write(f"**相似度**: {source['similarity']:.3f}")st.write(f"**内容预览**: {source['text_preview']}")st.write("---")def main():"""主函数"""init_session_state()# 标题st.title("🤖 企业知识库问答机器人")st.caption("基于DeepSeek-V3构建的智能问答系统")# 侧边栏 - 知识库管理with st.sidebar:st.header("📁 知识库管理")# 上传文档st.subheader("上传文档")uploaded_files = st.file_uploader("选择文档文件",type=['pdf', 'docx', 'txt', 'md'],accept_multiple_files=True,help="支持PDF、Word、文本和Markdown格式")if st.button("上传并处理文档", disabled=not uploaded_files):with st.spinner("正在处理文档..."):files = {"files": [("files", file) for file in uploaded_files]}result = call_api("/knowledge-base/upload", "POST", files=files)if result and result.get('success'):st.success(f"成功处理 {result['processed_docs']} 个文档块")st.info(f"处理时间: {result['processing_time']:.2f} 秒")else:st.error("文档处理失败")# 知识库统计st.subheader("知识库统计")if st.button("刷新统计信息"):stats = call_api("/statistics")if stats:st.session_state.knowledge_base_stats = statsif st.session_state.knowledge_base_stats:stats = st.session_state.knowledge_base_statskb_stats = stats.get('knowledge_base', {})st.metric("文档总数", kb_stats.get('total_documents', 0))st.metric("问答会话", stats.get('qa_sessions', 0))# 最近问题recent_questions = stats.get('recent_questions', [])if recent_questions:st.write("**最近问题:**")for q in recent_questions[-5:]:st.write(f"• {q}")# 清空知识库st.subheader("危险操作")if st.button("🗑️ 清空知识库", type="secondary"):if st.checkbox("确认清空知识库"):result = call_api("/knowledge-base", "DELETE")if result:st.success("知识库已清空")st.session_state.chat_history = []# 主界面 - 聊天st.header("💬 智能问答")# 显示聊天历史for message in st.session_state.chat_history:display_chat_message(message["role"], message["content"], message.get("sources"))# 问题输入if question := st.chat_input("请输入您的问题..."):# 显示用户问题display_chat_message("user", question)st.session_state.chat_history.append({"role": "user", "content": question})# 生成回答with st.chat_message("assistant"):with st.spinner("正在思考..."):# 调用问答APIresponse = call_api("/ask", "POST", {"question": question,"include_sources": True})if response:answer = response.get('answer', '生成答案时出现错误')sources = response.get('sources', [])# 显示答案st.write(answer)# 显示性能信息col1, col2, col3 = st.columns(3)with col1:st.metric("检索时间", f"{response.get('retrieval_time', 0):.2f}s")with col2:st.metric("生成时间", f"{response.get('response_time', 0):.2f}s")with col3:st.metric("相关文档", response.get('relevant_docs_count', 0))# 显示来源if sources:with st.expander("📚 参考来源", expanded=False):for i, source in enumerate(sources, 1):st.write(f"**来源 {i}**: {source['filename']}")st.write(f"**相似度**: {source['similarity']:.3f}")st.write(f"**内容预览**: {source['text_preview']}")st.write("---")# 保存到聊天历史st.session_state.chat_history.append({"role": "assistant","content": answer,"sources": sources})else:error_msg = "抱歉,服务暂时不可用,请稍后再试。"st.error(error_msg)st.session_state.chat_history.append({"role": "assistant","content": error_msg})# 底部信息st.markdown("---")st.markdown("💡 **使用提示**: ""1. 先上传相关文档到知识库 ""2. 然后就可以基于文档内容进行问答 ""3. 系统会自动检索最相关的内容来回答问题")if __name__ == "__main__":main()

6. 容器化部署

6.1 Docker配置

创建Dockerfile用于容器化部署:

# Dockerfile
FROM python:3.11-slim# 设置工作目录
WORKDIR /app# 安装系统依赖
RUN apt-get update && apt-get install -y \gcc \g++ \curl \&& rm -rf /var/lib/apt/lists/*# 复制依赖文件
COPY requirements.txt .# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码
COPY . .# 创建必要的目录
RUN mkdir -p /app/chroma_db /app/uploads /app/logs# 暴露端口
EXPOSE 8000 8501# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \CMD curl -f http://localhost:8000/health || exit 1# 启动脚本
COPY start.sh /start.sh
RUN chmod +x /start.shCMD ["/start.sh"]

启动脚本:

#!/bin/bash
# start.shecho "启动企业知识库问答机器人..."# 启动FastAPI后端服务
echo "启动API服务..."
uvicorn main:app --host 0.0.0.0 --port 8000 &# 等待API服务启动
sleep 10# 启动Streamlit前端
echo "启动前端界面..."
streamlit run streamlit_app.py --server.port 8501 --server.address 0.0.0.0 &# 保持容器运行
wait

6.2 Docker Compose配置

# docker-compose.yml
version: '3.8'services:qa-bot:build: .container_name: enterprise-qa-botports:- "8000:8000"  # API服务- "8501:8501"  # Streamlit界面environment:- DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}volumes:- ./data:/app/data- ./chroma_db:/app/chroma_db- ./logs:/app/logsrestart: unless-stoppedhealthcheck:test: ["CMD", "curl", "-f", "http://localhost:8000/health"]interval: 30stimeout: 10sretries: 3nginx:image: nginx:alpinecontainer_name: qa-bot-nginxports:- "80:80"- "443:443"volumes:- ./nginx.conf:/etc/nginx/nginx.conf- ./ssl:/etc/nginx/ssldepends_on:- qa-botrestart: unless-stoppedvolumes:chroma_db:logs:

7. 系统优化与监控

7.1 性能优化策略

为了提高系统性能,我们可以实施以下优化策略:

# performance_optimizer.py
import functools
import time
from typing import Dict, List
import threading
from concurrent.futures import ThreadPoolExecutorclass PerformanceOptimizer:"""性能优化器提供缓存、批处理、异步处理等优化功能"""def __init__(self):self.cache = {}self.cache_lock = threading.Lock()self.max_cache_size = 1000self.cache_ttl = 3600  # 1小时def cache_result(self, ttl: int = None):"""结果缓存装饰器Args:ttl: 缓存存活时间(秒)"""def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):# 生成缓存键cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"with self.cache_lock:# 检查缓存if cache_key in self.cache:cached_result, timestamp = self.cache[cache_key]cache_age = time.time() - timestampif cache_age < (ttl or self.cache_ttl):print(f"缓存命中: {func.__name__}")return cached_resultelse:# 缓存过期,删除del self.cache[cache_key]# 执行函数result = func(*args, **kwargs)with self.cache_lock:# 清理过期缓存if len(self.cache) >= self.max_cache_size:self._cleanup_cache()# 存储结果self.cache[cache_key] = (result, time.time())return resultreturn wrapperreturn decoratordef _cleanup_cache(self):"""清理过期缓存"""current_time = time.time()expired_keys = []for key, (_, timestamp) in self.cache.items():if current_time - timestamp > self.cache_ttl:expired_keys.append(key)for key in expired_keys:del self.cache[key]print(f"清理了 {len(expired_keys)} 个过期缓存项")# 应用性能优化
optimizer = PerformanceOptimizer()# 在QAEngine中应用缓存
class OptimizedQAEngine(QAEngine):"""优化版问答引擎"""@optimizer.cache_result(ttl=1800)  # 30分钟缓存def ask_question(self, question: str, include_sources: bool = True) -> Dict:"""带缓存的问答方法"""return super().ask_question(question, include_sources)def batch_ask_questions(self, questions: List[str]) -> List[Dict]:"""批量问答"""with ThreadPoolExecutor(max_workers=4) as executor:futures = [executor.submit(self.ask_question, q) for q in questions]results = [future.result() for future in futures]return results

7.2 监控与日志系统

# monitoring.py
import logging
import time
import psutil
import threading
from typing import Dict
from datetime import datetime
import jsonclass SystemMonitor:"""系统监控类监控API性能、资源使用情况等"""def __init__(self, log_file: str = "system_monitor.log"):self.log_file = log_fileself.setup_logging()self.metrics = {'api_calls': 0,'successful_queries': 0,'failed_queries': 0,'total_response_time': 0,'avg_response_time': 0,'cache_hits': 0,'cache_misses': 0}self.metrics_lock = threading.Lock()# 启动监控线程self.monitor_thread = threading.Thread(target=self._monitor_system, daemon=True)self.monitor_thread.start()def setup_logging(self):"""设置日志记录"""logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler(self.log_file, encoding='utf-8'),logging.StreamHandler()])self.logger = logging.getLogger('SystemMonitor')def log_api_call(self, endpoint: str, response_time: float, success: bool):"""记录API调用"""with self.metrics_lock:self.metrics['api_calls'] += 1self.metrics['total_response_time'] += response_timeself.metrics['avg_response_time'] = (self.metrics['total_response_time'] / self.metrics['api_calls'])if success:self.metrics['successful_queries'] += 1else:self.metrics['failed_queries'] += 1self.logger.info(f"API调用 - 端点: {endpoint}, "f"响应时间: {response_time:.3f}s, "f"成功: {success}")def log_cache_event(self, event_type: str):"""记录缓存事件"""with self.metrics_lock:if event_type == 'hit':self.metrics['cache_hits'] += 1elif event_type == 'miss':self.metrics['cache_misses'] += 1def get_system_metrics(self) -> Dict:"""获取系统指标"""cpu_percent = psutil.cpu_percent(interval=1)memory = psutil.virtual_memory()disk = psutil.disk_usage('/')with self.metrics_lock:metrics = self.metrics.copy()return {'timestamp': datetime.now().isoformat(),'system': {'cpu_percent': cpu_percent,'memory_percent': memory.percent,'memory_used_gb': memory.used / (1024**3),'disk_percent': disk.percent,'disk_free_gb': disk.free / (1024**3)},'application': metrics}def _monitor_system(self):"""系统监控后台线程"""while True:try:metrics = self.get_system_metrics()# 记录关键指标if metrics['system']['cpu_percent'] > 80:self.logger.warning(f"CPU使用率过高: {metrics['system']['cpu_percent']:.1f}%")if metrics['system']['memory_percent'] > 85:self.logger.warning(f"内存使用率过高: {metrics['system']['memory_percent']:.1f}%")# 每5分钟记录一次详细指标self.logger.info(f"系统指标: {json.dumps(metrics, ensure_ascii=False, indent=2)}")time.sleep(300)  # 5分钟except Exception as e:self.logger.error(f"监控系统时出错: {str(e)}")time.sleep(60)# 全局监控实例
monitor = SystemMonitor()

8. 测试与验证

8.1 单元测试

# test_qa_engine.py
import unittest
import tempfile
import os
from qa_engine import QAEngine
from document_processor import DocumentProcessor
from vector_store import VectorStoreclass TestQAEngine(unittest.TestCase):"""问答引擎测试类"""def setUp(self):"""测试前设置"""self.test_api_key = "test_api_key"self.temp_dir = tempfile.mkdtemp()# 创建测试文档test_doc_path = os.path.join(self.temp_dir, "test_doc.txt")with open(test_doc_path, 'w', encoding='utf-8') as f:f.write("""人工智能(AI)是计算机科学的一个分支,它试图理解智能的本质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。""")def tearDown(self):"""测试后清理"""import shutilshutil.rmtree(self.temp_dir)def test_document_processing(self):"""测试文档处理功能"""processor = DocumentProcessor()chunks = processor.process_documents(self.temp_dir)self.assertGreater(len(chunks), 0)self.assertIn('text', chunks[0])self.assertIn('metadata', chunks[0])def test_vector_store(self):"""测试向量数据库功能"""vector_store = VectorStore(persist_directory=tempfile.mkdtemp())# 测试文档test_docs = [{'id': 'doc1','text': '人工智能是计算机科学的分支','metadata': {'source': 'test'}}]# 添加文档vector_store.add_documents(test_docs)# 搜索文档results = vector_store.search_similar_documents("人工智能", top_k=1)self.assertGreater(len(results), 0)self.assertIn('人工智能', results[0]['text'])if __name__ == '__main__':unittest.main()

8.2 性能测试

# performance_test.py
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
import requestsclass PerformanceTest:"""性能测试类"""def __init__(self, api_base_url: str = "http://localhost:8000"):self.api_base_url = api_base_urlself.test_questions = ["什么是人工智能?","机器学习的主要方法有哪些?","深度学习和机器学习的区别是什么?","自然语言处理包括哪些技术?","计算机视觉的应用领域有哪些?"]def test_single_request(self, question: str) -> float:"""测试单个请求的响应时间"""start_time = time.time()try:response = requests.post(f"{self.api_base_url}/ask",json={"question": question, "include_sources": True},timeout=30)response.raise_for_status()end_time = time.time()return end_time - start_timeexcept Exception as e:print(f"请求失败: {str(e)}")return -1def test_concurrent_requests(self, num_threads: int = 5, num_requests: int = 20):"""测试并发请求性能"""print(f"开始并发测试 - {num_threads} 线程, {num_requests} 请求")questions = (self.test_questions * (num_requests // len(self.test_questions) + 1))[:num_requests]response_times = []with ThreadPoolExecutor(max_workers=num_threads) as executor:futures = [executor.submit(self.test_single_request, q) for q in questions]for future in futures:response_time = future.result()if response_time > 0:response_times.append(response_time)if response_times:avg_time = statistics.mean(response_times)median_time = statistics.median(response_times)p95_time = sorted(response_times)[int(len(response_times) * 0.95)]print(f"性能测试结果:")print(f"- 成功请求: {len(response_times)}/{num_requests}")print(f"- 平均响应时间: {avg_time:.3f}s")print(f"- 中位数响应时间: {median_time:.3f}s")print(f"- 95%分位数响应时间: {p95_time:.3f}s")print(f"- 最快响应时间: {min(response_times):.3f}s")print(f"- 最慢响应时间: {max(response_times):.3f}s")else:print("所有请求都失败了")if __name__ == "__main__":tester = PerformanceTest()tester.test_concurrent_requests(num_threads=3, num_requests=15)

9. 部署指南

9.1 生产环境部署

#!/bin/bash
# deploy.sh - 生产环境部署脚本echo "开始部署企业知识库问答机器人..."# 1. 创建项目目录
PROJECT_DIR="/opt/enterprise-qa-bot"
sudo mkdir -p $PROJECT_DIR
cd $PROJECT_DIR# 2. 克隆代码或复制文件
# git clone https://github.com/your-repo/enterprise-qa-bot.git .# 3. 设置环境变量
cat > .env << EOF
DEEPSEEK_API_KEY=your_deepseek_api_key_here
ENVIRONMENT=production
LOG_LEVEL=INFO
EOF# 4. 构建Docker镜像
echo "构建Docker镜像..."
docker-compose build# 5. 启动服务
echo "启动服务..."
docker-compose up -d# 6. 等待服务启动
echo "等待服务启动..."
sleep 30# 7. 健康检查
echo "执行健康检查..."
if curl -f http://localhost:8000/health; thenecho "✅ API服务启动成功"
elseecho "❌ API服务启动失败"exit 1
fiif curl -f http://localhost:8501; thenecho "✅ 前端服务启动成功"
elseecho "❌ 前端服务启动失败"exit 1
fiecho "🎉 部署完成!"
echo "API服务地址: http://your-server:8000"
echo "前端界面地址: http://your-server:8501"

9.2 Nginx配置

# nginx.conf
events {worker_connections 1024;
}http {upstream api_backend {server qa-bot:8000;}upstream frontend_backend {server qa-bot:8501;}server {listen 80;server_name your-domain.com;# API代理location /api/ {proxy_pass http://api_backend/;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 超时设置proxy_connect_timeout 60s;proxy_send_timeout 60s;proxy_read_timeout 60s;}# 前端代理location / {proxy_pass http://frontend_backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# WebSocket支持(Streamlit需要)proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# 静态文件缓存location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg)$ {expires 1y;add_header Cache-Control "public, immutable";}}
}

10. 总结与展望

10.1 项目总结

通过本项目的实战开发,我们成功构建了一个基于DeepSeek-V3的企业知识库问答机器人。该系统具有以下特点:

技术优势:

  • 先进的语言模型: 采用DeepSeek-V3,具备强大的中文理解和生成能力
  • 高效的向量检索: 基于Sentence-Transformers和ChromaDB实现精准的语义搜索
  • 模块化架构: 清晰的代码结构,便于维护和扩展
  • 容器化部署: 支持Docker部署,便于环境管理和扩容

功能特性:

  • 多格式文档支持: 支持PDF、Word、文本等多种文档格式
  • 智能文档分割: 基于token数量的智能文本分块
  • 实时问答: 支持流式输出,提供良好的用户体验
  • 来源追溯: 提供答案来源信息,增强可信度

性能优化:

  • 结果缓存: 减少重复计算,提高响应速度
  • 并发处理: 支持多线程处理,提高系统吞吐量
  • 监控告警: 完善的监控体系,保障系统稳定运行

10.2 使用效果分析

基于实际测试,系统在以下方面表现优秀:

图3: 系统性能指标统计图

10.3 未来改进方向

  1. 多模态支持: 增加图片、表格等多模态内容的理解能力
  2. 知识图谱集成: 结合知识图谱技术,提供更加结构化的知识服务
  3. 个性化推荐: 基于用户历史行为,提供个性化的问题推荐
  4. 实时学习: 支持在线学习新知识,持续优化回答质量
  5. 多语言支持: 扩展对多种语言的支持能力

10.4 技术要点回顾

在本项目中,我们重点运用了以下技术:

  • 华为云Flexus: 提供稳定可靠的云计算基础设施
  • DeepSeek-V3: 提供强大的自然语言理解和生成能力
  • 向量数据库: 实现高效的语义检索
  • RAG架构: 结合检索和生成,提供准确的知识问答
  • 微服务架构: 实现系统的模块化和可扩展性

通过本项目的实践,我们深入理解了企业级AI应用的开发流程,掌握了从需求分析、架构设计、代码实现到部署运维的完整技术栈。这为后续开发更复杂的AI应用奠定了坚实的基础。

参考文献

  1. DeepSeek官方文档
  2. 华为云Flexus产品介绍
  3. ChromaDB官方文档
  4. Sentence-Transformers文档
  5. FastAPI官方文档
  6. Streamlit官方文档
  7. RAG技术原理与实践
  8. 企业级AI应用开发最佳实践

相关文章:

华为云Flexus+DeepSeek征文 | 基于DeepSeek-V3构建企业知识库问答机器人实战

作者简介 我是摘星&#xff0c;一名专注于云计算和AI技术的开发者。本次通过华为云MaaS平台体验DeepSeek系列模型&#xff0c;将实际使用经验分享给大家&#xff0c;希望能帮助开发者快速掌握华为云AI服务的核心能力。 目录 作者简介 1. 引言 2. 技术选型与架构设计 2.1 技…...

【Docker 01】Docker 简介

&#x1f308; 一、虚拟化、容器化 ⭐ 1. 什么是虚拟化、容器化 物理机&#xff1a;真实存在的服务器 / 计算机&#xff0c;对于虚拟机来说&#xff0c;物理机为虚拟机提供了硬件环境。虚拟化&#xff1a;通过虚拟化技术将一台计算机虚拟为 1 ~ n 台逻辑计算机。在一台计算机…...

信息最大化(Information Maximization)

信息最大化在目标域无标签的域自适应任务中&#xff0c;它迫使模型在没有真实标签的情况下&#xff0c;对未标记数据产生高置信度且类别均衡的预测。此外&#xff0c;这些预测也可以作为伪标签用于自训练。 例如&#xff0c;在目标域没有标签时&#xff0c;信息最大化损失可以…...

整数的字典序怎么算

在Python中&#xff0c;字典序&#xff08;lexicographical order&#xff09;通常指的是按照字符串的字典顺序进行比较或排序。对于整数来说&#xff0c;字典序可以理解为将整数转换为字符串后进行比较的顺序。 计算整数的字典序 要计算整数的字典序&#xff0c;可以按照以下…...

知识拓展卡————————关于Access、Trunk、Hybrid端口

目录 什么是Trunk List、VLAN ID、PVID&#xff1a; VLAN ID&#xff08;Virtual Local Area Network Identifier&#xff09;&#xff1a; Trunk List&#xff08;Trunk列表&#xff09;&#xff1a; PVID&#xff08;Prot VLAN ID&#xff09;: 关于Native VLAN &#x…...

AUTOSAR实战教程--DoIP_02_诊断链路建立流程

第一步&#xff1a;DoIP实体车辆声明/诊断仪车辆识别请求 打开激活线以后&#xff0c;DoIP实体发的三帧车辆声明报文。其中包含了DoIP实体的诊断逻辑地址&#xff08;可以类比DoCAN的物理请求/响应地址&#xff09;&#xff0c;对应车辆的VIN码&#xff08;若已配置&#xff0…...

音频剪辑软件少之又少好用

我们平时见到的图片以及视频编辑工具非常多&#xff0c;但是音频剪辑软件却是少之又少&#xff0c;更不用说有没有好用的&#xff0c;今天&#xff0c;给大家带来一款非常专业的音频剪辑软件&#xff0c;而且是会员喔。 软件简介 一款手机号登录即可以享受会员的超专业音频剪…...

客户端和服务器已成功建立 TCP 连接【输出解析】

文章目录 图片**1. 连接状态解析****第一条记录&#xff08;服务器监听&#xff09;****第二条记录&#xff08;客户端 → 服务器&#xff09;****第三条记录&#xff08;服务器 → 客户端&#xff09;** **2. 关键概念澄清****(1) 0.0.0.0 的含义****(2) 端口号的分配规则** *…...

多标签多分类 用什么函数激活

在多标签多分类任务中&#xff0c;激活函数的选择需要根据任务特性和输出层的设计来决定。以下是常见的激活函数及其适用场景&#xff1a; 一、多标签分类任务的特点 每个样本可以属于多个类别&#xff08;标签之间非互斥&#xff0c;例如一篇文章可能同时属于 “科技” 和 “…...

day26-计算机网络-4

1. tcp的11种状态 ss -ant -a 表示看所有状态 -n 表示不将ip解析为主机名 -t 表示tcp 1.1. closed状态&#xff08;客户端、服务端&#xff09; 客户端发起建立连接前的状态服务端启动服务前的状态 1.2. listen状态&#xff08;服务端&#xff09; 服务端软件运行的时候状…...

ngx_stream_geo_module在传输层实现高性能 IP Region 路由

一、模块定位与核心价值 层次&#xff1a;工作在 Stream (TCP/UDP) 层&#xff0c;和 ngx_http_geo_module 的 L7 语义互补。作用&#xff1a;基于客户端 IP 前缀 / 范围生成一个 Nginx 变量&#xff0c;可在后续 proxy_pass、map、limit_conn、access 等指令中使用&#xff0…...

国防科技大学计算机基础慕课课堂学习笔记

1.信息论 香农作为信息论的这个创始人&#xff0c;给出来了这个信息熵的计算方法&#xff0c;为我们现在的这个生活的很多领域奠定了基础&#xff0c;我第一次听说这个信息熵是在这个数学建模里面的理论学习中有关于这个&#xff1a;决策树的模型&#xff0c;在那个问题里面&a…...

【第七篇】 SpringBoot项目的热部署

简介 本文介绍了热部署&#xff08;Hot Deployment&#xff09;的概念、使用场景及在IDEA中的配置方法。热部署可在不重启应用的情况下动态更新代码&#xff0c;提升开发效率&#xff0c;适用于调试、微服务架构和自动化测试等场景。文章详细说明了热部署的实现步骤&#xff08…...

解决pycharm同一个文件夹下from *** import***仍显示No module named

1、&#xff0c;from ***import *&#xff0c;同文件夹中已有.py文件但是仍然报错No module named 原因是因为pycharm没有把文件夹设置为根目录&#xff0c;只需要在文件夹的上一级设置为根目录即可&#xff0c;测试过如果仅仅将当前的文件夹设置仍然报错&#xff0c;如果把最上…...

GO 基础语法和数据类型面试题及参考答案(上)

目录 Go 中变量定义方式有哪些&#xff1f;各有什么适用场景&#xff1f; 使用 : 定义变量的限制是什么&#xff1f; 全局变量可以使用 : 声明吗&#xff1f;为什么&#xff1f; Go 中如何声明一个多变量赋值&#xff1f;有哪些注意事项&#xff1f; 常量能否通过表达式赋值…...

使用 Redisson 实现分布式锁—解决方案详解

Redisson 是 Redis 官方推荐的 Java 客户端&#xff0c;提供了一系列分布式服务实现&#xff0c;其中分布式锁是其核心功能之一。本文将深入解析 Redisson 分布式锁的实现原理、高级特性和最佳实践。 一、Redisson 分布式锁的优势 与传统实现的对比 特性手动实现Redisson 实现…...

结合三维基因建模与智能体技术打造工业软件无码平台

通过深度整合 Protocol Buffers (Protobuf)、gRPC 及 Microsoft AI 技术&#xff0c;构建面向智能制造的高性能、智能化 PLM 平台。 一、Protocol Buffers 深度集成 1. 基因模型标准化定义 三维基因容器 Protobuf 规范&#xff1a; protobuf syntax "proto3"; pa…...

Python Day46

Task&#xff1a; 1.不同CNN层的特征图&#xff1a;不同通道的特征图 2.什么是注意力&#xff1a;注意力家族&#xff0c;类似于动物园&#xff0c;都是不同的模块&#xff0c;好不好试了才知道。 3.通道注意力&#xff1a;模型的定义和插入的位置 4.通道注意力后的特征图和热力…...

基于PostGIS的各地级市路网长度统计及Echarts图表可视化实践-以湖南省为例

目录 前言 一、路网长度计算 1、地级市列表查询 2、地级市路网长度查询 二、Echarts可视化实现 1、Echarts后端生成 2、引入Colormap配色 3、前端微调 三、总结 前言 在当今快速发展的社会中&#xff0c;交通路网的建设与布局对于一个地区的经济发展、居民生活以及城市…...

mac版excel如何制作时长版环形图

设置辅助列 创建簇状柱形图 将辅助列绘制在次坐标轴 工作时长在主坐标轴&#xff0c;右键分别更改图表类型为圆环。 辅助列圆环全部为灰色&#xff0c;边框为白色 辅助列设置透明度100% 设置辅助列和工作时长列同样的圆环大小 可得 核心&#xff1a;只要辅助列边框不透明…...

PCB设计教程【大师篇】——STM32开发板原理图设计(电源部分)

前言 本教程基于B站Expert电子实验室的PCB设计教学的整理&#xff0c;为个人学习记录&#xff0c;旨在帮助PCB设计新手入门。所有内容仅作学习交流使用&#xff0c;无任何商业目的。若涉及侵权&#xff0c;请随时联系&#xff0c;将会立即处理 目录 前言 1. 工程创建与前期…...

k8s4部署

configMap configmap概述&#xff1a;数据会存储在etcd数据库&#xff0c;其应用场景主要在应用程序的配置 configmap支持的类型&#xff08;1&#xff09;键值对&#xff08;2&#xff09;多行数据 pod使用configmap资源有两种常见的方式&#xff08;1&#xff09;变量注入&a…...

贝叶斯医学分析中“先验”的如何进行选择(文献解读)

贝叶斯医学分析中“先验”的如何进行选择&#xff08;文献解读&#xff09; 作者&#xff1a;Callum Taylor, Kathryn Puxty, Tara Quasim, Martin Shaw 文章标题&#xff1a;Understanding Bayesian analysis of clinical trials: an overview for clinicians 期刊名称&#x…...

【汇编逆向系列】七、函数调用包含多个参数之浮点型- XMM0-3寄存器

目录 1. 汇编代码 1.1 debug编译 1.2 release编译 2. 汇编分析 2.1 浮点参数传递规则 2.2 栈帧rsp的变化时序 2.3 参数的访问逻辑 2.4 返回值XMM0寄存器 3. 汇编转化 3.1 Debug编译 3.2 Release 编译 3.3 C语言转化 1. 汇编代码 上一节介绍了整型的函数传参&#x…...

【MySQL系列】MySQL 执行 SQL 文件

博客目录 一、MySQL 执行 SQL 文件的常见场景二、MySQL 执行 SQL 文件的主要方法1. 使用 MySQL 命令行客户端2. 在 MySQL 交互界面中使用 source 命令3. 使用 MySQL Workbench 等图形化工具4. 使用编程语言接口 三、执行 SQL 文件时的注意事项1. 字符集问题2. 事务处理3. 错误处…...

论文MR-SVD

每个像素 7 个 FLOPs意思&#xff1a; FLOPs&#xff08;浮点运算次数&#xff09;&#xff1a;衡量算法计算复杂度的指标&#xff0c;数值越小表示运算越高效。含义&#xff1a;对图像中每个像素进行处理时&#xff0c;仅需执行7 次浮点运算&#xff08;如加减乘除等&#xf…...

Java 日期时间类全面解析

Java 日期时间类全面解析&#xff1a;从传统到现代的演进 一、发展历程概览 二、传统日期类&#xff08;Java 8前&#xff09; 1. java.util.Date - 日期表示类 Date now new Date(); // 当前日期时间 System.out.println(now); // Wed May 15 09:30:45 CST 2023// 特定时间…...

【工具-Wireshark 抓包工具】

工具-Wireshark 抓包工具 ■ Wireshark 抓包工具■ 通过IP指定查看■■ ■ Wireshark 抓包工具 抓包工具】win 10 / win 11&#xff1a;WireShark 下载、安装、使用 Wireshark下载 阿里云镜像 ■ 通过IP指定查看 ■ ■...

Linux安全机制:从SELinux到Intel SGX的堡垒

Linux安全机制&#xff1a;从SELinux到Intel SGX的堡垒 数字世界的钢铁长城 引言&#xff1a;操作系统的"防御工事" 当服务器每天承受数百万次攻击尝试时&#xff0c;Linux内核的安全机制如同精密的防御系统&#xff0c;在纳秒级时间内做出响应。现代Linux安全架构已…...

设备驱动与文件系统:06 目录与文件

磁盘使用的最后一层抽象&#xff1a;文件系统 今天我们讲第31讲&#xff0c;这一讲将完成磁盘对磁盘使用的最后一层抽象。对此板使用最后一层抽象&#xff0c;抽象出来的是什么呢&#xff1f; 实际上我们使用过磁盘&#xff0c;大家应该有这样的认识&#xff0c;最后不管这个磁…...