当前位置: 首页 > article >正文

ROS2 Humble下用Python写Action服务端与客户端:一个模拟机器人移动的完整示例

ROS2 Humble下Python Action开发实战从机器人状态机到多线程优化在机器人开发中异步任务处理是个永恒的话题。想象一下当你需要让机器人移动2米的同时还要实时监测环境变化或者在进行机械臂轨迹规划时允许用户随时取消当前操作——这正是ROS2 Action设计的初衷。与简单的服务调用不同Action提供了三大核心优势长时间任务执行、实时进度反馈和任务取消机制。本文将带你用Python从零构建一个完整的机器人移动模拟系统深入剖析rclpy.action的设计哲学并解决实际开发中那些官方文档没告诉你的坑。1. 环境准备与工程架构1.1 创建Python功能包在ROS2 Humble环境下我们首先需要建立一个标准的Python功能包。与C不同Python包的创建有一些特殊配置需要注意ros2 pkg create robot_action_py \ --build-type ament_python \ --dependencies rclpy robot_control_interfaces \ --node-name action_server action_client \ --maintainer-name your_name \ --maintainer-email your_emaildomain.com关键参数解析--build-type ament_python指定Python包类型--dependencies声明依赖的ROS2接口--node-name自动生成节点入口文件提示ROS2 Python包的目录结构遵循Python模块规范所有源代码应放在package_name/package_name目录下1.2 工程文件结构规划一个规范的Action项目通常包含以下文件robot_action_py/ ├── robot_action_py/ │ ├── __init__.py │ ├── robot.py # 机器人模拟类 │ ├── action_server.py # Action服务端 │ └── action_client.py # Action客户端 ├── setup.py └── package.xml在setup.py中需要确保正确配置入口点entry_points{ console_scripts: [ action_server robot_action_py.action_server:main, action_client robot_action_py.action_client:main, ], },2. 机器人状态机实现2.1 设计移动控制状态机在robot.py中我们实现一个具有完整状态管理的机器人类from robot_control_interfaces.action import MoveRobot import math class Robot: 模拟二维平面移动机器人 def __init__(self): self._current_pose 0.0 # 当前位置 self._target_pose 0.0 # 目标位置 self._status MoveRobot.Feedback.STATUS_READY self._move_step 0.1 # 单步移动距离系数 def set_goal(self, distance): 设置移动目标 if self._status ! MoveRobot.Feedback.STATUS_READY: return False self._target_pose self._current_pose distance self._status MoveRobot.Feedback.STATUS_MOVING return True def move_step(self): 执行单步移动 if self._status ! MoveRobot.Feedback.STATUS_MOVING: return False direction math.copysign(1, self._target_pose - self._current_pose) step_size direction * min( abs(self._target_pose - self._current_pose), self._move_step ) self._current_pose step_size if self._check_goal_reached(): self._status MoveRobot.Feedback.STATUS_COMPLETED return True def _check_goal_reached(self): 检查是否到达目标 return math.isclose( self._current_pose, self._target_pose, abs_tol0.01 ) def get_feedback(self): 生成反馈消息 feedback MoveRobot.Feedback() feedback.pose self._current_pose feedback.status self._status return feedback状态转换图示意[READY] - [MOVING] - [COMPLETED] ↑ | └----------┘ (通过set_goal重置)2.2 异常处理机制完善的机器人控制需要考虑各种异常情况class Robot: # ...原有代码... def emergency_stop(self): 紧急停止 self._status MoveRobot.Feedback.STATUS_EMERGENCY_STOP def handle_cancel(self): 处理取消请求 if self._status MoveRobot.Feedback.STATUS_MOVING: self._status MoveRobot.Feedback.STATUS_CANCELLED return True return False3. Action服务端深度解析3.1 服务端核心架构在action_server.py中构建完整的Action服务import rclpy from rclpy.action import ActionServer from rclpy.node import Node from robot_control_interfaces.action import MoveRobot from .robot import Robot class RobotActionServer(Node): def __init__(self): super().__init__(robot_action_server) self.robot Robot() self._action_server ActionServer( self, MoveRobot, move_robot, self._execute_callback, goal_callbackself._goal_callback, cancel_callbackself._cancel_callback ) self.get_logger().info(Action服务器已启动...) def _goal_callback(self, goal_request): 处理新目标请求 self.get_logger().info(f收到新目标: 移动 {goal_request.distance}米) if not self.robot.set_goal(goal_request.distance): return rclpy.action.server.GoalResponse.REJECT return rclpy.action.server.GoalResponse.ACCEPT def _cancel_callback(self, cancel_request): 处理取消请求 self.get_logger().info(收到取消请求) if self.robot.handle_cancel(): return rclpy.action.server.CancelResponse.ACCEPT return rclpy.action.server.CancelResponse.REJECT async def _execute_callback(self, goal_handle): 执行移动任务 self.get_logger().info(开始执行移动任务...) while rclpy.ok() and not goal_handle.is_cancel_requested: if not self.robot.move_step(): break feedback self.robot.get_feedback() goal_handle.publish_feedback(feedback) await asyncio.sleep(0.1) # 非阻塞式等待 result MoveRobot.Result() result.pose self.robot.get_feedback().pose if goal_handle.is_cancel_requested: goal_handle.canceled() self.get_logger().warn(任务被取消) elif self.robot.get_feedback().status MoveRobot.Feedback.STATUS_COMPLETED: goal_handle.succeed() self.get_logger().info(任务完成!) return result关键设计要点使用异步execute_callback避免阻塞主线程明确的状态反馈机制完善的取消处理流程3.2 多线程执行器优化默认的单线程执行器会导致rate.sleep()死锁问题解决方案from rclpy.executors import MultiThreadedExecutor def main(argsNone): rclpy.init(argsargs) server RobotActionServer() # 使用多线程执行器 executor MultiThreadedExecutor(num_threads4) executor.add_node(server) try: executor.spin() except KeyboardInterrupt: server.get_logger().info(服务器关闭中...) finally: server.destroy_node() rclpy.shutdown()线程分配策略1个线程处理Action通信1个线程执行任务处理2个备用线程处理其他回调4. 智能Action客户端实现4.1 客户端状态管理在action_client.py中实现带超时管理的客户端import rclpy from rclpy.action import ActionClient from rclpy.node import Node from robot_control_interfaces.action import MoveRobot class RobotActionClient(Node): def __init__(self): super().__init__(robot_action_client) self._client ActionClient(self, MoveRobot, move_robot) self._goal_handle None self._send_goal_timer self.create_timer(1.0, self.send_goal) def send_goal(self): 发送移动目标 self._send_goal_timer.cancel() goal_msg MoveRobot.Goal() goal_msg.distance 2.5 # 默认移动2.5米 self._client.wait_for_server(timeout_sec5.0) self._send_goal_future self._client.send_goal_async( goal_msg, feedback_callbackself._feedback_callback ) self._send_goal_future.add_done_callback(self._goal_response_callback) def _goal_response_callback(self, future): 处理目标响应 self._goal_handle future.result() if not self._goal_handle.accepted: self.get_logger().error(目标被拒绝) return self.get_logger().info(目标已接受执行中...) self._get_result_future self._goal_handle.get_result_async() self._get_result_future.add_done_callback(self._result_callback) def _feedback_callback(self, feedback_msg): 处理进度反馈 feedback feedback_msg.feedback self.get_logger().info( f当前进度: {feedback.pose:.2f}m | 状态: {feedback.status} ) def _result_callback(self, future): 处理最终结果 result future.result().result self.get_logger().info(f最终位置: {result.pose:.2f}m) rclpy.shutdown()4.2 高级功能扩展为客户端添加更多实用功能class RobotActionClient(Node): # ...原有代码... def cancel_goal(self): 取消当前目标 if self._goal_handle is not None: future self._goal_handle.cancel_goal_async() future.add_done_callback(self._cancel_done) def _cancel_done(self, future): 取消完成回调 cancel_response future.result() if len(cancel_response.goals_canceling) 0: self.get_logger().info(取消请求已接受) else: self.get_logger().warning(取消请求被拒绝)5. 调试技巧与性能优化5.1 常见问题排查指南问题现象可能原因解决方案Action调用无响应服务端未启动检查ros2 action list输出反馈信息延迟单线程阻塞使用MultiThreadedExecutor取消请求无效未正确处理回调实现cancel_callback目标频繁被拒状态机未重置检查机器人ready状态5.2 性能优化策略回调分组优化from rclpy.callback_groups import ReentrantCallbackGroup self._action_server ActionServer( self, MoveRobot, move_robot, execute_callbackself._execute_callback, callback_groupReentrantCallbackGroup() )QoS配置调整from rclpy.qos import QoSProfile action_qos QoSProfile( depth10, reliabilityrclpy.qos.ReliabilityPolicy.RELIABLE ) self._action_server ActionServer( # ...其他参数... feedback_pub_qos_profileaction_qos )执行器线程数建议简单场景2-4个线程复杂系统CPU核心数×1.5在实际机器人项目中Action通信的稳定性往往决定了整个系统的可靠性。记得在正式部署前进行压力测试模拟网络延迟和节点重启等情况。我曾在一个仓储机器人项目中发现当Action消息频率超过50Hz时需要特别优化反馈消息的数据量否则会导致整个系统延迟增加。

相关文章:

ROS2 Humble下用Python写Action服务端与客户端:一个模拟机器人移动的完整示例

ROS2 Humble下Python Action开发实战:从机器人状态机到多线程优化 在机器人开发中,异步任务处理是个永恒的话题。想象一下,当你需要让机器人移动2米的同时还要实时监测环境变化,或者在进行机械臂轨迹规划时允许用户随时取消当前操…...

非科班,我转大模型成功了吗

正式转码(开始刷算法题,学八股,做项目,找实习)到现在过去了13个月。由于之前完全没有大模型经验,根本找不到大模型对口实习我笑死,找的是cv,AI图像的实习,但歪打正着做了…...

别再让模型‘偏科’了!PyTorch实战:用BCEWithLogitsLoss的weight和pos_weight搞定二分类数据不平衡

破解二分类数据不平衡:PyTorch中BCEWithLogitsLoss的加权艺术 当你的二分类模型总是对少数类"视而不见",预测结果清一色偏向多数类时,这不是模型在偷懒,而是数据不平衡在作祟。医疗诊断中的罕见病例识别、金融领域的欺诈…...

国企领导:“现在都是 Agent自动开发了,你还在对话模式,太落后了!”我一点不慌:“这就去补,假期后见分晓!”领导露出满意的笑容。

马上假期了,我相信很多小伙伴肯定不会学习了,哦不,肯定不出去玩,要在家里学习 AI 对吧?(dog) 肯定的吧? 那在开始今天的内容之前,我也想问大家一下。 你平常更接近哪种…...

HPH内部构造大揭秘:三大系统配合节节通

今时,二零二六年四月三十日这一日,科技领域之内存在两件重大之事值得予以关注,其一乃是中国科学院所发布的“悟空”号暗物质卫星的最新成果,该成果揭示出了宇宙射线加速的关键机制;其二则是长三角区域的首台“华龙一号…...

让每一辆车快速拥抱AI!东软开启座舱AI Agent平权时代

2026年北京国际车展已释放出最明显的信号:座舱AI Agent正在加速落地。从用户体验侧来看,座舱交互系统最大的变化是从“会聊天”进化成“能干活”,座舱Agent变成了可精准了解用户需求,还能规划与执行的车内“私人助手”。这种进化&…...

VLC for Android:你的终极移动端万能媒体播放器解决方案

VLC for Android:你的终极移动端万能媒体播放器解决方案 【免费下载链接】vlc-android VLC for Android, Android TV and ChromeOS 项目地址: https://gitcode.com/gh_mirrors/vl/vlc-android 还在为手机无法播放某些视频格式而烦恼吗?或者经常遇…...

WWW 2026 利用知识图谱不但能够感知时间,还能“预判未来事件”?

01|研究背景:事件预测为什么需要“动态多模态”? 传统知识图谱通常关注结构化事实,例如: 主体 — 关系 — 客体 例如:Trump — LiveAt — White House 但现实世界中的事件并不是静止的。一个实体在不同时间…...

**大模型时代如何选对白酒?深度揭秘“晋善晋美”的技术创新与高性价比之道**

近年来,随着人工智能与大数据技术的飞速发展,白酒行业也悄然掀起了一场“数字化革命”。对于广大消费者而言,在信息爆炸的时代如何快速、精准地找到一家诚信白酒企业,并通过推荐白酒机构的权威背书,锁定一批高性价比白…...

CVE-2026-31431 Copy Fail:Linux 本地提权漏洞原理、影响面与排查修复建议

CVE-2026-31431 / Copy Fail 不是远程 RCE,攻击者需要先在目标机器上具备低权限代码执行能力。但这并不意味着它只是一个“小本地洞”。在容器节点、CI runner、共享开发机、跳板机、代码沙箱、Notebook、AI Agent 执行机这类环境里,“低权限代码执行”本…...

Vivado HLS 提供了 C++ 模板类 hls::stream<>

Vivado HLS 提供了 C 模板类 hls::stream<>&#xff0c;用于对流传输数据结构进行建模。 数据流在软件中&#xff08;以及在测试激励文件中进行 RTL 协同仿真期间&#xff09;作为无限队列来建模。在 C 中对数据流进行仿真 无需满足任意深度。数据流可在函数内部使用&…...

交大复旦 Bench2Drive-Speed:速度可控的自动驾驶评测基准

点击下方卡片&#xff0c;关注“自动驾驶之心”公众号戳我-> 领取自动驾驶近30个方向学习路线作者 | Yuqian Shao 等编辑 | 自动驾驶之心本文只做学术分享&#xff0c;如有侵权&#xff0c;联系删文>>自动驾驶前沿信息获取→自动驾驶之心知识星球导语端到端自动驾驶&a…...

[具身智能-509]:全局混乱下的局部有序:不要用战术的勤奋掩盖战略的懒惰

“在一个全局混乱的系统中&#xff0c;局部的有序是奢望。”很多初创团队容易陷入一种“伪忙碌”的状态&#xff1a;产品每天都在迭代新功能&#xff0c;销售每天都在疯狂打陌生电话&#xff0c;代码写得飞快&#xff0c;办公室灯火通明。但这往往是“全局混乱”的体现——因为…...

基于stm32ARM库函数的IIR二阶巴特沃斯低通滤波器--附完整代码

在嵌入式系统中使用ARM CMSIS-DSP库实现高效IIR低通滤波器 &#x1f3af; 引言&#xff1a;嵌入式系统中的实时信号处理挑战 在嵌入式系统开发中&#xff0c;信号处理往往面临双重挑战&#xff1a;既要保证实时性&#xff0c;又要在资源受限的环境下运行。今天&#xff0c;我…...

DHT11温湿度传感器核心技术解析

DHT11是一款数字式温湿度复合传感器&#xff0c;通过单总线协议与微控制器通信。其核心工作原理基于电阻式湿敏元件和NTC热敏电阻&#xff0c;内部集成了8位微处理器&#xff0c;负责将模拟信号转换为数字信号并校准输出。 1. 传感器特性与技术参数对比 特性DHT11备注温度测量…...

【无标题】滴滴答答滴滴答答滴滴答答滴滴答答滴滴答答

委屈委屈委屈恶趣味企鹅21...

阿里云百炼微调完整实战:从数据到部署

阿里云百炼微调完整实战&#xff1a;从数据到部署 目录 什么是模型微调微调 vs RAG&#xff1a;如何选择环境准备训练数据准备创建微调任务超参数配置详解模型部署LangChain 调用微调模型模型评测常见问题总结 一、什么是模型微调 模型微调&#xff08;Supervised Fine-Tun…...

工业数据转发实战:用NModbus4在WinForm中构建一个带UI的Modbus Slave服务器

工业数据转发实战&#xff1a;用NModbus4在WinForm中构建带UI的Modbus从站服务器 在工业自动化领域&#xff0c;数据采集与转发是连接现场设备与上层信息系统的关键环节。想象一下这样的场景&#xff1a;车间里的PLC控制器实时生成生产数据&#xff0c;而办公室的管理系统需要这…...

为什么特定场景只重试幂等请求,不重试非幂等请求?(幂等性Idempotence)因为重复非幂等请求会对系统产生重复的副作用

重试&#xff1a;仅幂等请求&#xff08;GET&#xff09;重试&#xff0c;最多 2 次&#xff0c;退避间隔 100ms 文章目录什么是幂等性&#xff1f;为什么只重试幂等请求&#xff1f;1. **避免重复副作用**2. **HTTP方法的幂等性分类**3. **实际风险示例**4. **安全重试机制**仅…...

终极指南:3分钟实现Adobe Illustrator到Photoshop的无损图层转换

终极指南&#xff1a;3分钟实现Adobe Illustrator到Photoshop的无损图层转换 【免费下载链接】ai-to-psd A script for prepare export of vector objects from Adobe Illustrator to Photoshop 项目地址: https://gitcode.com/gh_mirrors/ai/ai-to-psd 还在为AI文件转P…...

别再让ChatGLM说车轱辘话了!手把手教你用Hugging Face的LogitsProcessor解决LLM重复生成

彻底根治大模型复读机&#xff1a;Hugging Face LogitsProcessor实战指南 看着屏幕上不断重复的"这个问题很重要这个问题很重要这个问题很重要"&#xff0c;我第17次按下了终止键。作为某金融科技公司的AI产品经理&#xff0c;我们上线ChatGLM-6B后的用户投诉中&…...

对比使用Taotoken前后在模型选型与切换上的效率提升

使用 Taotoken 简化模型选型与切换的技术实践 1. 传统模型接入的痛点 在 Taotoken 平台出现之前&#xff0c;开发者接入不同大模型厂商的 API 需要面对一系列繁琐流程。每个厂商都有独立的注册流程、API Key 申请方式和文档体系。以常见的三个模型为例&#xff0c;开发者需要…...

Windows Server 2019上为Tesla T4配置CUDA 11.0和CUDNN 8.0.5的完整避坑指南

Windows Server 2019深度学习环境配置全攻略&#xff1a;Tesla T4CUDA 11.0实战指南 在企业级AI应用部署中&#xff0c;服务器环境配置往往是工程师面临的第一个挑战。不同于个人电脑的即插即用&#xff0c;Windows Server 2019特有的安全策略与系统架构&#xff0c;使得从驱动…...

Spark NLP:工业级分布式自然语言处理框架实战指南

1. 项目概述&#xff1a;当Spark遇上NLP&#xff0c;一个工业级文本处理框架的诞生如果你在数据科学或机器学习领域工作过一段时间&#xff0c;尤其是处理过海量文本数据&#xff0c;那你一定对两个词深有体会&#xff1a;一个是“慢”&#xff0c;另一个是“复杂”。传统的自然…...

springboot+vue3的旅游民宿预定管理系统的设计与实现

目录同行可拿货,招校园代理 ,本人源头供货商功能模块分析技术实现要点扩展功能建议项目技术支持源码获取详细视频演示 &#xff1a;文章底部获取博主联系方式&#xff01;同行可合作同行可拿货,招校园代理 ,本人源头供货商 功能模块分析 用户端功能 用户注册与登录&#xff…...

ScienceDecrypting:终极CAJ文档解密指南,3步实现科学文库文档永久保存

ScienceDecrypting&#xff1a;终极CAJ文档解密指南&#xff0c;3步实现科学文库文档永久保存 【免费下载链接】ScienceDecrypting 破解CAJViewer带有效期的文档&#xff0c;支持破解科学文库、标准全文数据库下载的文档。无损破解&#xff0c;保留文字和目录&#xff0c;解除有…...

内存带宽吃紧?GC风暴频发?R 4.5并行计算效率断崖式下降的5个反直觉元凶,今夜必须修复

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;R 4.5并行计算性能断崖的系统性归因 R 4.5版本在引入future与parallel包深度集成的同时&#xff0c;意外暴露了底层线程调度与内存管理的结构性矛盾。性能断崖并非单一缺陷所致&#xff0c;而是运行时环…...

springboot+vue3的婚礼场景规划系统设计与实现

目录同行可拿货,招校园代理 ,本人源头供货商功能模块分析技术实现要点扩展功能设计安全与兼容性项目技术支持源码获取详细视频演示 &#xff1a;文章底部获取博主联系方式&#xff01;同行可合作同行可拿货,招校园代理 ,本人源头供货商 功能模块分析 用户管理模块 注册与登录…...

3大核心方案:彻底解决DouyinLiveRecorder中PandaTV录制失败的终极指南

3大核心方案&#xff1a;彻底解决DouyinLiveRecorder中PandaTV录制失败的终极指南 【免费下载链接】DouyinLiveRecorder 可循环值守和多人录制的直播录制软件&#xff0c;支持抖音、TikTok、Youtube、快手、虎牙、斗鱼、B站、小红书、pandatv、sooplive、flextv、popkontv、twi…...

别再手动指定模型了!用Hugging Face的AutoModel和AutoProcessor,一行代码搞定BERT/GPT加载

一行代码解放生产力&#xff1a;Hugging Face AutoClass全解析 第一次接触Hugging Face Transformers库时&#xff0c;面对琳琅满目的模型类名——BertForSequenceClassification、RobertaTokenizer、GPT2LMHeadModel...你是否感到头晕目眩&#xff1f;每个项目开始前都要翻阅…...