添加python仪表控制示例

This commit is contained in:
murmur 2024-12-03 14:19:55 +00:00
parent 3c737308d1
commit 05d4e03681
2 changed files with 230 additions and 0 deletions

54
FSW.py Normal file
View File

@ -0,0 +1,54 @@
# python script created by FSW: 24:11:2020 04:53:44
import visa
def write_command(instrument, command) :
instrument.write(command)
return process_system_error(instrument)
def write_query(instrument, command) :
buffer = instrument.query(command)
bSuccess = process_system_error(instrument)
return bSuccess, buffer
def process_system_error(instrument) :
bSuccess = True
EsrErrorMask = 0x3C
if ((get_esr(instrument) & EsrErrorMask) != 0) :
print(instrument.query(":SYST:ERR?"))
instrument.write("*CLS")
bSuccess = False
return bSuccess
def get_esr(instrument) :
esr = instrument.query("*ESR?")
return int(esr)
VisaResourceManager = visa.ResourceManager()
#set center freQ
fre = '200e6'
descr = '_harm_dis_meas'
#location on FSW
tar_loc = 'd:\\murmur\\'
# connect to analyzer
Analyzer = VisaResourceManager.open_resource("TCPIP::192.168.1.43::inst0::INSTR")
print(Analyzer.query("*IDN?"))
success = write_command( Analyzer, "*CLS" )
success = write_command( Analyzer, ":SYST:DISP:UPD ON" )
success = write_command( Analyzer, ":INIT:CONT OFF" )
#success = write_command( Analyzer, ":DISP:WIND:SUBW:TRAC:Y:SCAL:RLEV 10" )
success = write_command( Analyzer, ":SENS:FREQ:CENT " + fre )
success = write_command( Analyzer, ":CALC:MARK:FUNC:HARM:STAT ON" )
success = write_command( Analyzer, ":INIT:CONT ON" )
success = write_command( Analyzer, ":FORM:DEXP:DSEP POIN" )
success = write_command( Analyzer, ":FORM:DEXP:FORM CSV" )
success = write_command( Analyzer, ":FORM:DEXP:HEAD OFF" )
success = write_command( Analyzer, ":FORM:DEXP:TRAC ALL" )
#success = write_command( Analyzer, ":MMEM:STOR1:TRAC 1,'d:\\" + fre + descr +".CSV'" )
#
success = write_command( Analyzer, ":MMEM:STOR1:TRAC 1,'" + tar_loc + fre + descr +".CSV'" )
success, data = write_query(Analyzer, f"MMEM:DATA? '{tar_loc}screenshot.png'")
with open("local_screenshot.png", "wb") as f:
f.write(data)
success = write_command( Analyzer, ":SENS:FREQ:MODE SWE" )
# back to local mode
success = write_command(Analyzer, "@LOC")
if success:
print("Done.\r\nFile's location is {}{}{}.CSV.\r\n".format(tar_loc,fre,descr))
# cleanup
Analyzer.close()
VisaResourceManager.close()

176
instrument_utils.py Normal file
View File

@ -0,0 +1,176 @@
import visa
from typing import Optional, Tuple, Union
import os
from datetime import datetime
import tempfile
class InstrumentInterface:
def __init__(self, resource_string: str):
"""
初始化仪器接口
Args:
resource_string: VISA资源字符串例如 "TCPIP::192.168.1.43::inst0::INSTR"
"""
self.rm = visa.ResourceManager()
self.instrument = self.rm.open_resource(resource_string)
self.instrument.timeout = 30000 # 30秒超时
# 查询仪器信息以确定系统类型
success, idn = self.query("*IDN?")
if success:
self.instrument_info = idn
# 根据IDN确定仪器类型和路径
if "R&S" in idn:
self.temp_path = "/tmp/" # R&S通常基于Linux
elif "Keysight" in idn or "Agilent" in idn:
self.temp_path = "C:/temp/" # Keysight通常基于Windows
else:
# 默认使用/tmp/因为大多数仪器基于Linux
self.temp_path = "/tmp/"
else:
raise ConnectionError("Failed to identify instrument")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def write(self, command: str) -> bool:
"""发送SCPI命令"""
try:
self.instrument.write(command)
return self._check_error()
except Exception as e:
print(f"Error writing command {command}: {str(e)}")
return False
def query(self, command: str) -> Tuple[bool, str]:
"""查询SCPI命令"""
try:
response = self.instrument.query(command)
success = self._check_error()
return success, response.strip()
except Exception as e:
print(f"Error querying {command}: {str(e)}")
return False, ""
def _check_error(self) -> bool:
"""检查仪器错误"""
try:
errors = self.instrument.query("SYST:ERR?")
if "No error" in errors:
return True
print(f"Instrument error: {errors}")
return False
except Exception as e:
print(f"Error checking system errors: {str(e)}")
return False
def _transfer_file_to_local(self,
instrument_path: str,
local_path: str,
delete_source: bool = True) -> bool:
"""
从仪器传输文件到本地
Args:
instrument_path: 仪器上的文件路径
local_path: 本地保存路径
delete_source: 传输后是否删除仪器上的源文件
Returns:
bool: 操作是否成功
"""
try:
# 确保本地目录存在
os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True)
# 读取仪器文件数据
success, data = self.query(f"MMEM:DATA? '{instrument_path}'")
if not success:
return False
# 保存到本地
with open(local_path, 'wb') as f:
f.write(data)
# 清理仪器上的源文件(如果需要)
if delete_source:
self.write(f"MMEM:DEL '{instrument_path}'")
return True
except Exception as e:
print(f"Error transferring file: {str(e)}")
return False
def screenshot_to_file(self,
local_path: str,
format: str = "PNG") -> bool:
"""
保存仪器截图到本地文件
"""
try:
# 生成临时文件名
temp_filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}"
temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/')
# 配置并执行截图
self.write(f":HCOP:DEV:LANG {format}")
self.write(f":MMEM:NAME '{temp_filepath}'")
self.write(":HCOP:IMM") # 执行截图
# 传输文件到本地
return self._transfer_file_to_local(temp_filepath, local_path)
except Exception as e:
print(f"Error saving screenshot: {str(e)}")
return False
def data_to_file(self,
local_path: str,
trace_number: int = 1,
format: str = "CSV") -> bool:
"""
保存轨迹数据到本地文件
"""
try:
# 生成临时文件名
temp_filename = f"trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}"
temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/')
# 配置数据导出格式
self.write(":FORM:DEXP:DSEP POIN")
self.write(f":FORM:DEXP:FORM {format}")
self.write(":FORM:DEXP:HEAD OFF")
self.write(":FORM:DEXP:TRAC ALL")
# 保存到仪器临时文件
self.write(f":MMEM:STOR{trace_number}:TRAC {trace_number},'{temp_filepath}'")
# 传输文件到本地
return self._transfer_file_to_local(temp_filepath, local_path)
except Exception as e:
print(f"Error saving trace data: {str(e)}")
return False
def file_to_local(self,
instrument_path: str,
local_path: str,
delete_source: bool = False) -> bool:
"""
从仪器复制任意文件到本地
Args:
instrument_path: 仪器上的文件路径
local_path: 本地保存路径
delete_source: 是否删除仪器上的源文件
"""
return self._transfer_file_to_local(instrument_path, local_path, delete_source)
def close(self):
"""关闭连接"""
try:
self.instrument.close()
self.rm.close()
except Exception as e:
print(f"Error closing connection: {str(e)}")