demo3/instrument_utils.py

176 lines
6.4 KiB
Python
Raw Normal View History

2024-12-03 14:19:55 +00:00
import visa
from typing import Optional, Tuple, Union
import os
from datetime import datetime
import tempfile
class InstrumentInterface:
def __init__(self, resource_string: str):
"""
初始化仪器接口
Args:
resource_string: VISA资源字符串例如 "TCPIP::192.168.1.43::inst0::INSTR"
"""
self.rm = visa.ResourceManager()
self.instrument = self.rm.open_resource(resource_string)
self.instrument.timeout = 30000 # 30秒超时
# 查询仪器信息以确定系统类型
success, idn = self.query("*IDN?")
if success:
self.instrument_info = idn
# 根据IDN确定仪器类型和路径
if "R&S" in idn:
self.temp_path = "/tmp/" # R&S通常基于Linux
elif "Keysight" in idn or "Agilent" in idn:
self.temp_path = "C:/temp/" # Keysight通常基于Windows
else:
# 默认使用/tmp/因为大多数仪器基于Linux
self.temp_path = "/tmp/"
else:
raise ConnectionError("Failed to identify instrument")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def write(self, command: str) -> bool:
"""发送SCPI命令"""
try:
self.instrument.write(command)
return self._check_error()
except Exception as e:
print(f"Error writing command {command}: {str(e)}")
return False
def query(self, command: str) -> Tuple[bool, str]:
"""查询SCPI命令"""
try:
response = self.instrument.query(command)
success = self._check_error()
return success, response.strip()
except Exception as e:
print(f"Error querying {command}: {str(e)}")
return False, ""
def _check_error(self) -> bool:
"""检查仪器错误"""
try:
errors = self.instrument.query("SYST:ERR?")
if "No error" in errors:
return True
print(f"Instrument error: {errors}")
return False
except Exception as e:
print(f"Error checking system errors: {str(e)}")
return False
def _transfer_file_to_local(self,
instrument_path: str,
local_path: str,
delete_source: bool = True) -> bool:
"""
从仪器传输文件到本地
Args:
instrument_path: 仪器上的文件路径
local_path: 本地保存路径
delete_source: 传输后是否删除仪器上的源文件
Returns:
bool: 操作是否成功
"""
try:
# 确保本地目录存在
os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True)
# 读取仪器文件数据
success, data = self.query(f"MMEM:DATA? '{instrument_path}'")
if not success:
return False
# 保存到本地
with open(local_path, 'wb') as f:
f.write(data)
# 清理仪器上的源文件(如果需要)
if delete_source:
self.write(f"MMEM:DEL '{instrument_path}'")
return True
except Exception as e:
print(f"Error transferring file: {str(e)}")
return False
def screenshot_to_file(self,
local_path: str,
format: str = "PNG") -> bool:
"""
保存仪器截图到本地文件
"""
try:
# 生成临时文件名
temp_filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}"
temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/')
# 配置并执行截图
self.write(f":HCOP:DEV:LANG {format}")
self.write(f":MMEM:NAME '{temp_filepath}'")
self.write(":HCOP:IMM") # 执行截图
# 传输文件到本地
return self._transfer_file_to_local(temp_filepath, local_path)
except Exception as e:
print(f"Error saving screenshot: {str(e)}")
return False
def data_to_file(self,
local_path: str,
trace_number: int = 1,
format: str = "CSV") -> bool:
"""
保存轨迹数据到本地文件
"""
try:
# 生成临时文件名
temp_filename = f"trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}"
temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/')
# 配置数据导出格式
self.write(":FORM:DEXP:DSEP POIN")
self.write(f":FORM:DEXP:FORM {format}")
self.write(":FORM:DEXP:HEAD OFF")
self.write(":FORM:DEXP:TRAC ALL")
# 保存到仪器临时文件
self.write(f":MMEM:STOR{trace_number}:TRAC {trace_number},'{temp_filepath}'")
# 传输文件到本地
return self._transfer_file_to_local(temp_filepath, local_path)
except Exception as e:
print(f"Error saving trace data: {str(e)}")
return False
def file_to_local(self,
instrument_path: str,
local_path: str,
delete_source: bool = False) -> bool:
"""
从仪器复制任意文件到本地
Args:
instrument_path: 仪器上的文件路径
local_path: 本地保存路径
delete_source: 是否删除仪器上的源文件
"""
return self._transfer_file_to_local(instrument_path, local_path, delete_source)
def close(self):
"""关闭连接"""
try:
self.instrument.close()
self.rm.close()
except Exception as e:
print(f"Error closing connection: {str(e)}")