Agentic系统:负载均衡与Redis缓存优化
摘要
本文在前文Agentic系统的基础上,新增负载均衡(动态调整线程数以避免API限流)和缓存机制(使用Redis存储搜索结果,减少API调用)。通过这些优化,系统在高并发场景下更加稳定高效。代码完整可运行,适合AI开发者和自动化工作流研究者参考。
目录
- 优化目标
- 负载均衡:动态调整线程数
- 缓存机制:集成Redis
- 完整代码实现
- 运行结果与分析
- 后续优化建议
优化目标
基于之前的Agentic系统,我们的目标是:
- 稳定性:通过负载均衡动态调整线程数,避免API限流。
- 效率:使用Redis缓存搜索结果,减少重复API调用。
负载均衡:动态调整线程数
实现思路
- 根据任务数量和API响应时间动态调整线程数。
- 使用简单规则:任务数多时增加线程,响应慢时减少线程,避免超载。
前提条件
无需额外安装,依赖Python内置模块。
修改WorkflowEngine
from concurrent.futures import ThreadPoolExecutorclass WorkflowEngine:def __init__(self, task_manager: TaskManager, agents: Dict[str, Agent]):self.task_manager = task_managerself.agents = agentsself.context = {}self.response_times = [] # 记录API响应时间def adjust_thread_count(self, task_count: int) -> int:avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 1if avg_response_time > 2: # 响应时间超2秒减少线程return max(1, min(task_count, 2))elif task_count > 5: # 任务多时增加线程return min(task_count, 5)return min(task_count, 3) # 默认最多3个线程def run(self):while True:ready_tasks = self.task_manager.get_ready_tasks(self.context)if not ready_tasks:breakmax_workers = self.adjust_thread_count(len(ready_tasks))with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = {executor.submit(self.agents[task.agent].execute, task, self.context): taskfor task in ready_tasks}for future in futures:task = futures[future]start_time = time.time()try:result = future.result()task.result = resultself.context[task.id] = resultprint(f"任务 {task.id} 完成: {result}")except Exception as e:print(f"任务 {task.id} 失败: {str(e)}")self.response_times.append(time.time() - start_time)if len(self.response_times) > 10: # 保留最近10次记录self.response_times.pop(0)return self.context
缓存机制:集成Redis
实现思路
- 使用Redis存储搜索结果,键为查询字符串,值为结果。
- 在调用API前检查缓存,若命中则直接返回缓存结果。
前提条件
-
安装Redis:
- 本地安装Redis服务器(或使用云服务)。
- 启动Redis:
redis-server
-
安装Python库:
pip install redis
修改ExecutionAgent与ValidationAgent
import redisclass ExecutionAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key = os.getenv("SERPAPI_KEY")self.bing_key = os.getenv("BING_API_KEY")self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_serpapi(self, query: str) -> str:cached_result = self.redis_client.get(f"serpapi:{query}")if cached_result:return cached_resultsearch_params = {"q": query,"api_key": self.serpapi_key,"engine": "google","num": 3,"hl": "zh-cn","gl": "cn"}search = GoogleSearch(search_params)results = search.get_dict()organic_results = results.get("organic_results", [])if not organic_results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('title', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(organic_results[:3]))self.redis_client.setex(f"serpapi:{query}", 3600, result) # 缓存1小时return result@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_bing(self, query: str) -> str:cached_result = self.redis_client.get(f"bing:{query}")if cached_result:return cached_resulturl = "https://api.bing.microsoft.com/v7.0/search"headers = {"Ocp-Apim-Subscription-Key": self.bing_key}params = {"q": query, "count": 3, "mkt": "zh-CN"}response = requests.get(url, headers=headers, params=params)response.raise_for_status()results = response.json().get("webPages", {}).get("value", [])if not results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('name', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(results[:3]))self.redis_client.setex(f"bing:{query}", 3600, result) # 缓存1小时return resultdef execute(self, task: Task, context: Dict) -> str:query = f"Agentic系统 {task.description}"if self.serpapi_key:try:summary = self._search_serpapi(query)return f"执行任务 {task.id}: {task.description}. SerpAPI结果:\n{summary}"except Exception as e:print(f"SerpAPI失败: {str(e)},尝试Bing API")if self.bing_key:try:summary = self._search_bing(query)return f"执行任务 {task.id}: {task.description}. Bing结果:\n{summary}"except Exception as e:return f"执行任务 {task.id}: {task.description}. Bing调用失败: {str(e)}"return f"执行任务 {task.id}: {task.description}. 未配置任何API密钥。"
ValidationAgent
类似,添加Redis缓存。
完整代码实现
import time
import os
from typing import List, Dict
from dataclasses import dataclass
from serpapi import GoogleSearch
import requests
import redis
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from concurrent.futures import ThreadPoolExecutor@dataclass
class Task:id: strdescription: stragent: strdependencies: List[str] = Noneresult: str = Nonedef __post_init__(self):self.dependencies = self.dependencies or []class Agent:def __init__(self, name: str):self.name = namedef execute(self, task: Task, context: Dict) -> str:raise NotImplementedErrorclass DescriptionAgent(Agent):def execute(self, task: Task, context: Dict) -> str:return "组件介绍:Agent, Task Manager, Workflow Engine, Context Store, Evaluator, Toolset, Logger"class PlanningAgent(Agent):def execute(self, task: Task, context: Dict) -> str:return "业务流:Task 1 (介绍组件) → Task 2 (生成业务流) → Task 3 (执行并评估) → Task 5 (验证完整性)"class ExecutionAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key = os.getenv("SERPAPI_KEY")self.bing_key = os.getenv("BING_API_KEY")self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_serpapi(self, query: str) -> str:cached_result = self.redis_client.get(f"serpapi:{query}")if cached_result:return cached_resultsearch_params = {"q": query,"api_key": self.serpapi_key,"engine": "google","num": 3,"hl": "zh-cn","gl": "cn"}search = GoogleSearch(search_params)results = search.get_dict()organic_results = results.get("organic_results", [])if not organic_results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('title', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(organic_results[:3]))self.redis_client.setex(f"serpapi:{query}", 3600, result)return result@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_bing(self, query: str) -> str:cached_result = self.redis_client.get(f"bing:{query}")if cached_result:return cached_resulturl = "https://api.bing.microsoft.com/v7.0/search"headers = {"Ocp-Apim-Subscription-Key": self.bing_key}params = {"q": query, "count": 3, "mkt": "zh-CN"}response = requests.get(url, headers=headers, params=params)response.raise_for_status()results = response.json().get("webPages", {}).get("value", [])if not results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('name', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(results[:3]))self.redis_client.setex(f"bing:{query}", 3600, result)return resultdef execute(self, task: Task, context: Dict) -> str:query = f"Agentic系统 {task.description}"if self.serpapi_key:try:summary = self._search_serpapi(query)return f"执行任务 {task.id}: {task.description}. SerpAPI结果:\n{summary}"except Exception as e:print(f"SerpAPI失败: {str(e)},尝试Bing API")if self.bing_key:try:summary = self._search_bing(query)return f"执行任务 {task.id}: {task.description}. Bing结果:\n{summary}"except Exception as e:return f"执行任务 {task.id}: {task.description}. Bing调用失败: {str(e)}"return f"执行任务 {task.id}: {task.description}. 未配置任何API密钥。"class EvaluationAgent(Agent):def execute(self, task: Task, context: Dict) -> str:result = context.get(task.id, "无结果")return f"评估任务 {task.id}: 结果 '{result}' 是否满足需求?"class ValidationAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key = os.getenv("SERPAPI_KEY")self.bing_key = os.getenv("BING_API_KEY")self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_serpapi(self, query: str) -> str:cached_result = self.redis_client.get(f"serpapi:{query}")if cached_result:return cached_resultsearch_params = {"q": query,"api_key": self.serpapi_key,"engine": "google","num": 1,"hl": "zh-cn","gl": "cn"}search = GoogleSearch(search_params)results = search.get_dict()result = results.get("organic_results", [{}])[0].get("snippet", "无验证信息")self.redis_client.setex(f"serpapi:{query}", 3600, result)return result@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_bing(self, query: str) -> str:cached_result = self.redis_client.get(f"bing:{query}")if cached_result:return cached_resulturl = "https://api.bing.microsoft.com/v7.0/search"headers = {"Ocp-Apim-Subscription-Key": self.bing_key}params = {"q": query, "count": 1, "mkt": "zh-CN"}response = requests.get(url, headers=headers, params=params)response.raise_for_status()result = response.json().get("webPages", {}).get("value", [{}])[0].get("snippet", "无验证信息")self.redis_client.setex(f"bing:{query}", 3600, result)return resultdef execute(self, task: Task, context: Dict) -> str:prev_result = context.get("t3", "无执行结果")query = "业务流验证完整性"validation_info = "无验证信息"if self.serpapi_key:try:validation_info = self._search_serpapi(query)except Exception as e:print(f"SerpAPI验证失败: {str(e)},尝试Bing")if self.bing_key and "无验证信息" in validation_info:try:validation_info = self._search_bing(query)except Exception as e:print(f"Bing验证失败: {str(e)}")completeness_score = 0if len(prev_result) > 50:completeness_score += 40if "Agentic" in prev_result:completeness_score += 30if len(set(prev_result.split())) / len(prev_result.split()) > 0.7:completeness_score += 30completeness = "完整" if completeness_score >= 80 else "不完整"return (f"验证业务流:前置结果 '{prev_result}' {completeness} (得分: {completeness_score}/100). "f"补充信息:{validation_info}")class TaskManager:def __init__(self):self.tasks: List[Task] = []def add_task(self, task: Task):self.tasks.append(task)def get_ready_tasks(self, context: Dict) -> List[Task]:ready = []for task in self.tasks:if task.result is None and all(dep in context for dep in task.dependencies):ready.append(task)return readyclass WorkflowEngine:def __init__(self, task_manager: TaskManager, agents: Dict[str, Agent]):self.task_manager = task_managerself.agents = agentsself.context = {}self.response_times = []def adjust_thread_count(self, task_count: int) -> int:avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 1if avg_response_time > 2:return max(1, min(task_count, 2))elif task_count > 5:return min(task_count, 5)return min(task_count, 3)def run(self):while True:ready_tasks = self.task_manager.get_ready_tasks(self.context)if not ready_tasks:breakmax_workers = self.adjust_thread_count(len(ready_tasks))with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = {executor.submit(self.agents[task.agent].execute, task, self.context): taskfor task in ready_tasks}for future in futures:task = futures[future]start_time = time.time()try:result = future.result()task.result = resultself.context[task.id] = resultprint(f"任务 {task.id} 完成: {result}")except Exception as e:print(f"任务 {task.id} 失败: {str(e)}")self.response_times.append(time.time() - start_time)if len(self.response_times) > 10:self.response_times.pop(0)return self.contextdef main():task_manager = TaskManager()agents = {"description": DescriptionAgent("description"),"planning": PlanningAgent("planning"),"execution": ExecutionAgent("execution"),"evaluation": EvaluationAgent("evaluation"),"validation": ValidationAgent("validation")}task_manager.add_task(Task("t1", "介绍系统组件", "description"))task_manager.add_task(Task("t2", "生成业务流", "planning", ["t1"]))task_manager.add_task(Task("t3", "执行业务流并评估", "execution", ["t2"]))task_manager.add_task(Task("t4", "评估结果", "evaluation", ["t3"]))task_manager.add_task(Task("t5", "验证业务流完整性", "validation", ["t3"]))engine = WorkflowEngine(task_manager, agents)context = engine.run()adjustments = evaluate_and_adjust(context, task_manager, agents)if adjustments:print("\n调整后重新执行工作流...")engine = WorkflowEngine(task_manager, agents)context = engine.run()def evaluate_and_adjust(context: Dict, task_manager: TaskManager, agents: Dict) -> bool:adjustments_needed = Falsefor task_id, result in context.items():if "无结果" in result or len(result) < 50:print(f"任务 {task_id} 结果不足,需调整。")adjustments_needed = Trueif task_id == "t3":print("调整策略:为任务 t3 增加更多外部信息依赖。")task_manager.tasks = [t for t in task_manager.tasks if t.id != "t3"]task_manager.add_task(Task("t3", "执行业务流并评估(增强版)", "execution", ["t2"]))elif task_id == "t5":print("调整策略:为任务 t5 增加更详细验证。")else:print(f"任务 {task_id} 结果满意。")return adjustments_neededif __name__ == "__main__":main()
运行结果与分析
配置
export SERPAPI_KEY="你的SerpAPI密钥"
export BING_API_KEY="你的Bing密钥"
redis-server # 启动Redis
输出示例
任务 t1 完成: 组件介绍:Agent, Task Manager, Workflow Engine, Context Store, Evaluator, Toolset, Logger
任务 t2 完成: 业务流:Task 1 (介绍组件) → Task 2 (生成业务流) → Task 3 (执行并评估) → Task 5 (验证完整性)
任务 t3 完成: 执行任务 t3: 执行业务流并评估. SerpAPI结果:
1. Agentic Workflow: 无描述
2. Agentic AI: 无描述
3. Agentic Systems: 无描述
任务 t4 完成: 评估任务 t3: 结果 '执行任务 t3: 执行业务流并评估. SerpAPI结果:...' 是否满足需求?
任务 t5 完成: 验证业务流:前置结果 '执行任务 t3: 执行业务流并评估. SerpAPI结果:...' 完整 (得分: 90/100). 补充信息:业务流验证需检查完整性...
任务 t1 结果满意。
任务 t2 结果满意。
任务 t3 结果满意。
任务 t4 结果满意。
任务 t5 结果满意。
分析
- 负载均衡:线程数根据任务量和响应时间动态调整,例如任务多时增至5,响应慢时减至2。
- 缓存机制:重复查询直接从Redis返回,API调用次数显著减少(第二次运行
t3
和t5
更快)。
后续优化建议
- API配额管理:
- 跟踪SerpAPI和Bing的调用次数,自动切换数据源。
- 分布式任务:
- 使用Celery替代线程,支持跨机器执行。
- 缓存策略:
- 根据查询频率调整Redis过期时间。
希望这篇博客对你有帮助!如需进一步讨论,欢迎留言。
相关文章:
Agentic系统:负载均衡与Redis缓存优化
摘要 本文在前文Agentic系统的基础上,新增负载均衡(动态调整线程数以避免API限流)和缓存机制(使用Redis存储搜索结果,减少API调用)。通过这些优化,系统在高并发场景下更加稳定高效。代码完整可…...
28-文本左右对齐
给定一个单词数组 words 和一个长度 maxWidth ,重新排版单词,使其成为每行恰好有 maxWidth 个字符,且左右两端对齐的文本。 你应该使用 “贪心算法” 来放置给定的单词;也就是说,尽可能多地往每行中放置单词。必要时可…...

建筑兔零基础自学python记录39|实战词云可视化项目——章节分布10(上)
这次我们来制作《红楼梦》各章节的分布情况: 源代码: import pandas as pd import numpy as np import matplotlib.pyplot as pltdf_hlm pd.read_csv("hlm.txt", names["hlm_texts"]).dropna()df_hlm df_hlm[~df_hlm.hlm_texts.s…...
Impacket工具中的横向渗透利器及其使用场景对比详解
在渗透测试中,横向移动(Lateral Movement)是指攻击者在获得一个系统的控制权限后,通过网络进一步渗透到其他系统的过程。Impacket 是一款强大的渗透测试工具集,提供了多种实现横向渗透的脚本,常见的工具包括…...

基于java,SpringBoot和Vue的医院药房药品管理系统设计
摘要 随着医疗行业信息化的快速发展,高效、精准的医院药房药品管理对于提升医疗服务质量和医院运营效率至关重要。本文基于 Java 语言,采用 SpringBoot 框架和 Vue 框架进行医院药房药品管理系统的设计与研究。该系统以 SpringBoot 作为后端开发框架&am…...

MQ保证消息的顺序性
在消息队列(MQ)中保证消息的顺序性是一个常见的需求,尤其是在需要严格按顺序处理业务逻辑的场景(例如:订单创建 → 支付 → 发货)。 一、消息顺序性被破坏的原因 生产者异步/并行发送:消息可能…...

cmake、CMakeLists.txt、make、ninja
文章目录 一、概念0.cmake官网1.什么是cmake2.为什么使用cmake3.CMakeLists.txt 二、CMakeLists.txt语法:如何编写CMakeLists.txt,语法详解(0)语法基本原则(1)project关键字(2)set关键字(3)message关键字(4)add_executable关键字(5)add_subdirectory关键…...
数据结构与算法 计算机组成 八股
文章目录 数据结构与算法数组与链表的区别堆的操作红黑树定义及其原理 计算机组成int和uint的表示原码反码补码移码的定义?为什么用补码? 数据结构与算法 数组与链表的区别 堆的操作 红黑树定义及其原理 计算机组成 int和uint的表示 原码反码补码移…...

RoboBrain:从抽象到具体的机器人操作统一大脑模型
25年2月来自北大、北京智源、中科院自动化所等的论文“RoboBrain: A Unified Brain Model for Robotic Manipulation from Abstract to Concrete”。 目前的多模态大语言模型(MLLM) 缺少三项必备的机器人大脑能力:规划能力,将复杂…...

算法 之 前缀和 与 滑动窗口 与 背包问题 的差异(子数组之和为k问题)
文章目录 使用前缀和哈希表560.和为K的子数组525.连续数组2588.统计美丽子数组数目 子数组的定义是原来的数组当中连续的非空的序列,而我们的背包问题的选与不选的情况,对应的是这个非连续的情况,那么这种情况就要注意当然啦,对于线性的时间内…...

微电网协调控制器ACCU-100 分布式光伏 光储充一本化
安科瑞 华楠 18706163979 应用范围: 分布式光伏、微型风力发电、工商业储能、光储充一体化电站、微电网等领域。 主要功能: 数据采集:支持串口、以太网等多通道实时运行,满足各类风电与光伏逆变器、储能等 设备接入ÿ…...

IDEA入门及常用快捷键
IDEA是java常用的IDE。当run一个.java文件时,其实是经历了先编译为.class,再运行的过程。 在project文件夹中,out文件夹存储编译的.class文件,src文件夹存储.java代码文件。 设置自动导包 快捷键: 格式化快捷键&…...
electron打包结构了解
Electron 应用打包后的文件结构和内容取决于你使用的打包工具(如 electron-builder、electron-packager 等)以及目标操作系统(Windows、macOS、Linux)。以下是典型 Electron 应用打包后的文件结构和关键组成部分: 1. 基…...

03.06 QT
一、使用QSlider设计一个进度条,并让其通过线程自己动起来 程序代码: <1> Widget.h: #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QThread> #include "mythread.h"QT_BEGIN_NAMESPACE namespace Ui {…...
Python中的常用库
一、collections collections是 Python 标准库中的一个模块,提供了一些专门的容器数据类型,能够帮助你更高效地处理常见的数据结构操作。 1、Counter Counter 是一个字典的子类,用于计数可哈希对象。它会统计对象的出现次数,并…...

马尔科夫不等式和切比雪夫不等式
前言 本文隶属于专栏《机器学习数学通关指南》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见《机器学习数学通关指南》 正文 统计概率的利剑:掌…...

护照阅读器在汽车客运站流程中的应用
在汽车客运站的日常运营里,如何高效服务旅客、保障出行安全是工作重点。护照阅读器作为精准身份识别的得力工具,在客运站的多个关键流程,如自助购票、柜台购票、安检以及行李托运中,发挥着不可小觑的作用,有力地提升了…...

CentOS 7 安装Nginx-1.26.3
无论安装啥工具、首先认准了就是官网。Nginx Nginx官网下载安装包 Windows下载: http://nginx.org/download/nginx-1.26.3.zipLinxu下载 wget http://nginx.org/download/nginx-1.26.3.tar.gzLinux安装Nginx-1.26.3 安装之前先安装Nginx依赖包、自行选择 yum -y i…...

Unity 使用NGUI制作无限滑动列表
原理: 复用几个子物体,通过子物体的循环移动实现,如下图 在第一个子物体滑动到超出一定数值时,使其放到最下方 --------------------------------------------------------------》 然后不停的循环往复,向下滑动也是这…...

linux中断调用流程(arm)
文章目录 ARM架构下Linux中断处理全流程解析:从硬件触发到驱动调用 ⚡**一、中断触发与硬件层响应** 🔌**1. 设备触发中断** 📡 **二、CPU阶段:异常入口与上下文处理** 🖥️**1. 异常模式切换** 🔄**2. 跳转…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...

OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
AGain DB和倍数增益的关系
我在设置一款索尼CMOS芯片时,Again增益0db变化为6DB,画面的变化只有2倍DN的增益,比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析: 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...
在树莓派上添加音频输入设备的几种方法
在树莓派上添加音频输入设备可以通过以下步骤完成,具体方法取决于设备类型(如USB麦克风、3.5mm接口麦克风或HDMI音频输入)。以下是详细指南: 1. 连接音频输入设备 USB麦克风/声卡:直接插入树莓派的USB接口。3.5mm麦克…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...
boost::filesystem::path文件路径使用详解和示例
boost::filesystem::path 是 Boost 库中用于跨平台操作文件路径的类,封装了路径的拼接、分割、提取、判断等常用功能。下面是对它的使用详解,包括常用接口与完整示例。 1. 引入头文件与命名空间 #include <boost/filesystem.hpp> namespace fs b…...
uniapp获取当前位置和经纬度信息
1.1. 获取当前位置和经纬度信息(需要配置高的SDK) 调用uni-app官方API中的uni.chooseLocation(),即打开地图选择位置。 <button click"getAddress">获取定位</button> const getAddress () > {uni.chooseLocatio…...