From 05d4e03681e964c2bccce6b0120d924c598f6df0 Mon Sep 17 00:00:00 2001 From: murmur Date: Tue, 3 Dec 2024 14:19:55 +0000 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0python=E4=BB=AA=E8=A1=A8?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FSW.py | 54 ++++++++++++++ instrument_utils.py | 176 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 FSW.py create mode 100644 instrument_utils.py diff --git a/FSW.py b/FSW.py new file mode 100644 index 0000000..666e560 --- /dev/null +++ b/FSW.py @@ -0,0 +1,54 @@ +# python script created by FSW: 24:11:2020 04:53:44 +import visa +def write_command(instrument, command) : + instrument.write(command) + return process_system_error(instrument) +def write_query(instrument, command) : + buffer = instrument.query(command) + bSuccess = process_system_error(instrument) + return bSuccess, buffer +def process_system_error(instrument) : + bSuccess = True + EsrErrorMask = 0x3C + if ((get_esr(instrument) & EsrErrorMask) != 0) : + print(instrument.query(":SYST:ERR?")) + instrument.write("*CLS") + bSuccess = False + return bSuccess +def get_esr(instrument) : + esr = instrument.query("*ESR?") + return int(esr) +VisaResourceManager = visa.ResourceManager() +#set center freQ +fre = '200e6' +descr = '_harm_dis_meas' +#location on FSW +tar_loc = 'd:\\murmur\\' +# connect to analyzer +Analyzer = VisaResourceManager.open_resource("TCPIP::192.168.1.43::inst0::INSTR") +print(Analyzer.query("*IDN?")) +success = write_command( Analyzer, "*CLS" ) +success = write_command( Analyzer, ":SYST:DISP:UPD ON" ) +success = write_command( Analyzer, ":INIT:CONT OFF" ) +#success = write_command( Analyzer, ":DISP:WIND:SUBW:TRAC:Y:SCAL:RLEV 10" ) +success = write_command( Analyzer, ":SENS:FREQ:CENT " + fre ) +success = write_command( Analyzer, ":CALC:MARK:FUNC:HARM:STAT ON" ) +success = write_command( Analyzer, ":INIT:CONT ON" ) +success = write_command( Analyzer, ":FORM:DEXP:DSEP POIN" ) +success = write_command( Analyzer, ":FORM:DEXP:FORM CSV" ) +success = write_command( Analyzer, ":FORM:DEXP:HEAD OFF" ) +success = write_command( Analyzer, ":FORM:DEXP:TRAC ALL" ) +#success = write_command( Analyzer, ":MMEM:STOR1:TRAC 1,'d:\\" + fre + descr +".CSV'" ) +# +success = write_command( Analyzer, ":MMEM:STOR1:TRAC 1,'" + tar_loc + fre + descr +".CSV'" ) +success, data = write_query(Analyzer, f"MMEM:DATA? '{tar_loc}screenshot.png'") +with open("local_screenshot.png", "wb") as f: + f.write(data) +success = write_command( Analyzer, ":SENS:FREQ:MODE SWE" ) +# back to local mode +success = write_command(Analyzer, "@LOC") +if success: + print("Done.\r\nFile's location is {}{}{}.CSV.\r\n".format(tar_loc,fre,descr)) +# cleanup +Analyzer.close() +VisaResourceManager.close() diff --git a/instrument_utils.py b/instrument_utils.py new file mode 100644 index 0000000..c655b0b --- /dev/null +++ b/instrument_utils.py @@ -0,0 +1,176 @@ +import visa +from typing import Optional, Tuple, Union +import os +from datetime import datetime +import tempfile + +class InstrumentInterface: + def __init__(self, resource_string: str): + """ + 初始化仪器接口 + Args: + resource_string: VISA资源字符串,例如 "TCPIP::192.168.1.43::inst0::INSTR" + """ + self.rm = visa.ResourceManager() + self.instrument = self.rm.open_resource(resource_string) + self.instrument.timeout = 30000 # 30秒超时 + + # 查询仪器信息以确定系统类型 + success, idn = self.query("*IDN?") + if success: + self.instrument_info = idn + # 根据IDN确定仪器类型和路径 + if "R&S" in idn: + self.temp_path = "/tmp/" # R&S通常基于Linux + elif "Keysight" in idn or "Agilent" in idn: + self.temp_path = "C:/temp/" # Keysight通常基于Windows + else: + # 默认使用/tmp/,因为大多数仪器基于Linux + self.temp_path = "/tmp/" + else: + raise ConnectionError("Failed to identify instrument") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def write(self, command: str) -> bool: + """发送SCPI命令""" + try: + self.instrument.write(command) + return self._check_error() + except Exception as e: + print(f"Error writing command {command}: {str(e)}") + return False + + def query(self, command: str) -> Tuple[bool, str]: + """查询SCPI命令""" + try: + response = self.instrument.query(command) + success = self._check_error() + return success, response.strip() + except Exception as e: + print(f"Error querying {command}: {str(e)}") + return False, "" + + def _check_error(self) -> bool: + """检查仪器错误""" + try: + errors = self.instrument.query("SYST:ERR?") + if "No error" in errors: + return True + print(f"Instrument error: {errors}") + return False + except Exception as e: + print(f"Error checking system errors: {str(e)}") + return False + + def _transfer_file_to_local(self, + instrument_path: str, + local_path: str, + delete_source: bool = True) -> bool: + """ + 从仪器传输文件到本地 + Args: + instrument_path: 仪器上的文件路径 + local_path: 本地保存路径 + delete_source: 传输后是否删除仪器上的源文件 + Returns: + bool: 操作是否成功 + """ + try: + # 确保本地目录存在 + os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True) + + # 读取仪器文件数据 + success, data = self.query(f"MMEM:DATA? '{instrument_path}'") + if not success: + return False + + # 保存到本地 + with open(local_path, 'wb') as f: + f.write(data) + + # 清理仪器上的源文件(如果需要) + if delete_source: + self.write(f"MMEM:DEL '{instrument_path}'") + + return True + + except Exception as e: + print(f"Error transferring file: {str(e)}") + return False + + def screenshot_to_file(self, + local_path: str, + format: str = "PNG") -> bool: + """ + 保存仪器截图到本地文件 + """ + try: + # 生成临时文件名 + temp_filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}" + temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/') + + # 配置并执行截图 + self.write(f":HCOP:DEV:LANG {format}") + self.write(f":MMEM:NAME '{temp_filepath}'") + self.write(":HCOP:IMM") # 执行截图 + + # 传输文件到本地 + return self._transfer_file_to_local(temp_filepath, local_path) + + except Exception as e: + print(f"Error saving screenshot: {str(e)}") + return False + + def data_to_file(self, + local_path: str, + trace_number: int = 1, + format: str = "CSV") -> bool: + """ + 保存轨迹数据到本地文件 + """ + try: + # 生成临时文件名 + temp_filename = f"trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format.lower()}" + temp_filepath = os.path.join(self.temp_path, temp_filename).replace('\\', '/') + + # 配置数据导出格式 + self.write(":FORM:DEXP:DSEP POIN") + self.write(f":FORM:DEXP:FORM {format}") + self.write(":FORM:DEXP:HEAD OFF") + self.write(":FORM:DEXP:TRAC ALL") + + # 保存到仪器临时文件 + self.write(f":MMEM:STOR{trace_number}:TRAC {trace_number},'{temp_filepath}'") + + # 传输文件到本地 + return self._transfer_file_to_local(temp_filepath, local_path) + + except Exception as e: + print(f"Error saving trace data: {str(e)}") + return False + + def file_to_local(self, + instrument_path: str, + local_path: str, + delete_source: bool = False) -> bool: + """ + 从仪器复制任意文件到本地 + Args: + instrument_path: 仪器上的文件路径 + local_path: 本地保存路径 + delete_source: 是否删除仪器上的源文件 + """ + return self._transfer_file_to_local(instrument_path, local_path, delete_source) + + def close(self): + """关闭连接""" + try: + self.instrument.close() + self.rm.close() + except Exception as e: + print(f"Error closing connection: {str(e)}") \ No newline at end of file