176 lines
6.4 KiB
Python
176 lines
6.4 KiB
Python
import visa
|
||
from typing import Optional, Tuple, Union
|
||
import os
|
||
from datetime import datetime
|
||
import tempfile
|
||
|
||
class InstrumentInterface:
|
||
def __init__(self, resource_string: str):
|
||
"""
|
||
初始化仪器接口
|
||
Args:
|
||
resource_string: VISA资源字符串,例如 "TCPIP::192.168.1.43::inst0::INSTR"
|
||
"""
|
||
self.rm = visa.ResourceManager()
|
||
self.instrument = self.rm.open_resource(resource_string)
|
||
self.instrument.timeout = 30000 # 30秒超时
|
||
|
||
# 查询仪器信息以确定系统类型
|
||
success, idn = self.query("*IDN?")
|
||
if success:
|
||
self.instrument_info = idn
|
||
# 根据IDN确定仪器类型和路径
|
||
if "R&S" in idn:
|
||
self.temp_path = "/tmp/" # R&S通常基于Linux
|
||
elif "Keysight" in idn or "Agilent" in idn:
|
||
self.temp_path = "C:/temp/" # Keysight通常基于Windows
|
||
else:
|
||
# 默认使用/tmp/,因为大多数仪器基于Linux
|
||
self.temp_path = "/tmp/"
|
||
else:
|
||
raise ConnectionError("Failed to identify instrument")
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.close()
|
||
|
||
def write(self, command: str) -> bool:
|
||
"""发送SCPI命令"""
|
||
try:
|
||
self.instrument.write(command)
|
||
return self._check_error()
|
||
except Exception as e:
|
||
print(f"Error writing command {command}: {str(e)}")
|
||
return False
|
||
|
||
def query(self, command: str) -> Tuple[bool, str]:
|
||
"""查询SCPI命令"""
|
||
try:
|
||
response = self.instrument.query(command)
|
||
success = self._check_error()
|
||
return success, response.strip()
|
||
except Exception as e:
|
||
print(f"Error querying {command}: {str(e)}")
|
||
return False, ""
|
||
|
||
def _check_error(self) -> bool:
|
||
"""检查仪器错误"""
|
||
try:
|
||
errors = self.instrument.query("SYST:ERR?")
|
||
if "No error" in errors:
|
||
return True
|
||
print(f"Instrument error: {errors}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"Error checking system errors: {str(e)}")
|
||
return False
|
||
|
||
def _transfer_file_to_local(self,
|
||
instrument_path: str,
|
||
local_path: str,
|
||
delete_source: bool = True) -> bool:
|
||
"""
|
||
从仪器传输文件到本地
|
||
Args:
|
||
instrument_path: 仪器上的文件路径
|
||
local_path: 本地保存路径
|
||
delete_source: 传输后是否删除仪器上的源文件
|
||
Returns:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
# 确保本地目录存在
|
||
os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True)
|
||
|
||
# 读取仪器文件数据
|
||
success, data = self.query(f"MMEM:DATA? '{instrument_path}'")
|
||
if not success:
|
||
return False
|
||
|
||
# 保存到本地
|
||
with open(local_path, 'wb') as f:
|
||
f.write(data)
|
||
|
||
# 清理仪器上的源文件(如果需要)
|
||
if delete_source:
|
||
self.write(f"MMEM:DEL '{instrument_path}'")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"Error transferring file: {str(e)}")
|
||
return False
|
||
|
||
def screenshot_to_file(self,
|
||
local_path: str,
|
||
format: str = "PNG") -> bool:
|
||
"""
|
||
保存仪器截图到本地文件
|
||
"""
|
||
try:
|
||
# 生成临时文件名
|
||
temp_filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}"
|
||
temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/')
|
||
|
||
# 配置并执行截图
|
||
self.write(f":HCOP:DEV:LANG {format}")
|
||
self.write(f":MMEM:NAME '{temp_filepath}'")
|
||
self.write(":HCOP:IMM") # 执行截图
|
||
|
||
# 传输文件到本地
|
||
return self._transfer_file_to_local(temp_filepath, local_path)
|
||
|
||
except Exception as e:
|
||
print(f"Error saving screenshot: {str(e)}")
|
||
return False
|
||
|
||
def data_to_file(self,
|
||
local_path: str,
|
||
trace_number: int = 1,
|
||
format: str = "CSV") -> bool:
|
||
"""
|
||
保存轨迹数据到本地文件
|
||
"""
|
||
try:
|
||
# 生成临时文件名
|
||
temp_filename = f"trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}"
|
||
temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/')
|
||
|
||
# 配置数据导出格式
|
||
self.write(":FORM:DEXP:DSEP POIN")
|
||
self.write(f":FORM:DEXP:FORM {format}")
|
||
self.write(":FORM:DEXP:HEAD OFF")
|
||
self.write(":FORM:DEXP:TRAC ALL")
|
||
|
||
# 保存到仪器临时文件
|
||
self.write(f":MMEM:STOR{trace_number}:TRAC {trace_number},'{temp_filepath}'")
|
||
|
||
# 传输文件到本地
|
||
return self._transfer_file_to_local(temp_filepath, local_path)
|
||
|
||
except Exception as e:
|
||
print(f"Error saving trace data: {str(e)}")
|
||
return False
|
||
|
||
def file_to_local(self,
|
||
instrument_path: str,
|
||
local_path: str,
|
||
delete_source: bool = False) -> bool:
|
||
"""
|
||
从仪器复制任意文件到本地
|
||
Args:
|
||
instrument_path: 仪器上的文件路径
|
||
local_path: 本地保存路径
|
||
delete_source: 是否删除仪器上的源文件
|
||
"""
|
||
return self._transfer_file_to_local(instrument_path, local_path, delete_source)
|
||
|
||
def close(self):
|
||
"""关闭连接"""
|
||
try:
|
||
self.instrument.close()
|
||
self.rm.close()
|
||
except Exception as e:
|
||
print(f"Error closing connection: {str(e)}") |