当前位置: 首页 > news >正文

【Python】FANUC机器人OPC UA通信并记录数据

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):client = Client(url)client.connect()return client
  • 获取根节点和对象节点
def get_root_node(client: Client):return client.get_root_node()
def get_objects_node(client: Client):return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variables

5. 备份旧CSV文件

def backup_csv_file(filename):if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):"""创建客户端实例并连接到服务端"""client = Client(url)client.connect()return clientdef get_root_node(client: Client):"""获取服务器命名空间中的根节点"""return client.get_root_node()def get_objects_node(client: Client):"""获取服务器的对象节点"""return client.get_objects_node()def get_variables(node: Node, path=""):"""遍历所有子节点并返回变量节点的路径和数值"""variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variablesdef backup_csv_file(filename):"""如果CSV文件已存在则备份"""if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

相关文章:

【Python】FANUC机器人OPC UA通信并记录数据

目录 引言机器人仿真环境准备代码实现1. 导入库2. 设置参数3. 日志配置4. OPC UA通信5. 备份旧CSV文件6. 主函数 总结 引言 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化…...

Linux 中断处理

一、基本概念 1、中断及中断上下文 中断是一种由硬件设备产生的信号,不同设备产生的中断通过中断号来区分。CPU在接收到中断信号后,根据中断号执行对应的中断处理程序(Interrupt Service Routine) 内核对异常和中断的处理类似&a…...

人大金昌netcore适配,调用oracle模式下存储过程\包,返回参数游标

using KdbndpConnection conn new KdbndpConnection("Host192.168.133.221;Port54321;Databasedb1;Poolingtrue;User IDsystem;Password123");conn.Open();//存储过程调用也是类似using var cmd conn.CreateCommand();cmd.CommandText "模式.包名称.存储过程…...

pandas常用的一些操作

EXCLE操作 读取Excel data1 pd.read_excel(excle_dir) 读Excel取跳过前几行: data1 pd.read_excel(excle_dir,skiprows1) 获取总行数 data1.shape[0] 获取总列数 data1.shape[1] 指定某列数据类型 data1 pd.read_excel("C:数据导入.xlsx",dtype…...

【鸿蒙开发】系统组件Row

Row组件 Row沿水平方向布局容器 接口: Row(value?:{space?: number | string }) 参数: 参数名 参数类型 必填 参数描述 space string | number 否 横向布局元素间距。 从API version 9开始,space为负数或者justifyContent设置为…...

Hadoop和zookeeper集群相关执行脚本(未完,持续更新中~)

1、Hadoop集群查看状态 搭建Hadoop数据集群时,按以下路径操作即可生成脚本 [test_1analysis01 bin]$ pwd /home/test_1/hadoop/bin [test_01analysis01 bin]$ vim jpsall #!/bin/bash for host in analysis01 analysis02 analysis03 do echo $host s…...

蓝桥杯算法题:栈(Stack)

这道题考的是递推动态规划,可能不是很难,不过这是自己第一次靠自己想出状态转移方程,所以纪念一下: 要做这些题目,首先要把题目中会出现什么状态给找出来,然后想想他们的状态可以通过什么操作转移&#xf…...

JavaWeb-监听器

文章目录 1.基本介绍2.ServletContextListener1.基本介绍2.创建maven项目,导入依赖3.代码演示1.实现ServletContextListener接口2.配置web.xml3.结果 3.ServletContextAttributeListener监听器1.基本介绍2.代码实例1.ServletContextAttributeListener.java2.配置web…...

系统架构设计基础知识

一. 系统架构概述系统架构的定义 系统架构(System Architecture)是系统的一种整体的高层次的结构表示,是系统的骨架和根基,支撑和链接各个部分,包括构件、连接件、约束规范以及指导这些内容设计与演化的原理&#xff0…...

Vue自定义指令介绍及使用方法

介绍​ 除了 Vue 内置的一系列指令 (比如 v-model 或 v-show) 之外,Vue 还允许你注册自定义的指令 (Custom Directives)。 之前已经介绍了两种在 Vue 中重用代码的方式:组件 和 组合式函数。组件是主要的构建模块,而组合式函数则侧重于有状态…...

React 组件生命周期函数的用法和示例代码

React 中的生命周期函数可以分为三个阶段:Mounting(挂载),Updating(更新)和 Unmounting(卸载)。每个阶段都有不同的函数,用于执行不同的操作。 Mounting(挂载…...

【nginx运维】[emerg]: bind() to 0.0.0.0:80 failed (98: Address already in use)

关于nginx端口被占用的问题: If you get following error, when you try to start nginx… [emerg]: bind() to 0.0.0.0:80 failed (98: Address already in use) Then it means nginx or some other process is already using port 80. You can kill it using: su…...

浏览器工作原理与实践--虚拟DOM:虚拟DOM和实际的DOM有何不同

虚拟DOM是最近非常火的技术,两大著名前端框架React和Vue都使用了虚拟DOM,所以我觉得非常有必要结合浏览器的工作机制对虚拟DOM进行一次分析。当然了,React和Vue框架本身所蕴含的知识点非常多,而且也不是我们专栏的重点&#xff0c…...

arm工作模式、arm9通用寄存器、异常向量表中irq的异常向量、cpsr中的哪几位是用来设置工作模式以及r13,r14,15别名是什么?有什么作用?

ARM 首先先介绍一下ARM公司。 ARM成立于1990年11月,前身为Acorn计算机公司 主要设计ARM系列RISC处理器内核 授权ARM内核给生产和销售半导体的合作伙伴ARM公司不生产芯片 提供基于ARM架构的开发设计技术软件工具评估版调试工具应用软件总线架构外围设备单元等等CPU中…...

电脑上音频太多,播放速度又不一致,如何批量调节音频播放速度?

批量调节音频速度是现代音频处理中的一个重要环节,尤其在音乐制作、电影剪辑、有声书制作等领域,它能够帮助制作者快速高效地调整音频的播放速度,从而满足特定的制作需求。本文将详细介绍批量调节音频速度的方法、技巧和注意事项,…...

pe格式从入门到图形化显示(十)-扩展最后一个节

文章目录 前言一、怎么扩展最后一个节?二、扩大节1.扩展节2.保存文件 前言 通过分析和解析Windows PE格式,并使用qt进行图形化显示 一、怎么扩展最后一个节? 在PE文件中,扩大最后一个节通常是通过修改PE文件头中的节表来实现的。…...

设计模式之创建型模式---建造者模式

文章目录 建造者模式概述经典的建造者模式建造者模式的变种总结 建造者模式概述 建造者模式是一种广泛使用的设计模式,在三方开源库和各种SDK中经常见到。建造者设计模式在四人帮的经典著作《设计模式:可复用面向对象软件基础》中被提及,它的…...

如何从零开始训练一个语言模型

如何从零开始训练一个语言模型 #mermaid-svg-gtUlIrFtNPw1oV5a {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-gtUlIrFtNPw1oV5a .error-icon{fill:#552222;}#mermaid-svg-gtUlIrFtNPw1oV5a .error-text{fill:#5522…...

Python 设计一个监督自己的软件1

基本要求:每做一件事,软件就会按照事情权重加相应的分数,总分数也会增加,要可视化页面 使用Python编写的一个简单的日常任务记录和评分系统,包括可视化页面。 首先,我们定义一个任务字典,其中包含各种日常任务及其对应的权重分数…...

商家转账到零钱权限开通操作攻略

商家转账到零钱是什么? 商家转账到零钱是微信商户号里的一个功能,很早以前叫企业付款到零钱。 从2022年5月18日,原“企业付款到零钱”升级为“商家转账到零钱”,已开通商户的功能使用暂不受影响,新开通商户可前往「产…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

Android Wi-Fi 连接失败日志分析

1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言: 最近在做行为检测相关的模型,用的是时空图卷积网络(STGCN),但原有kinetic-400数据集数据质量较低,需要进行细粒度的标注,同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...