From 322c906b1c90d3ce64357053566f193d3d29b319 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Fri, 16 Feb 2024 08:39:19 +0100 Subject: [PATCH 01/22] Refactor script sending and implement linear voltage sweep --- py_instrument_control_lib/devices/KEI2600.py | 90 +++++++++++-------- .../playground/test_kei2600_buffering.py | 21 ++--- .../playground/test_smu_sweep.py | 11 +++ specifications/KEI2600.json | 30 ++++++- 4 files changed, 104 insertions(+), 48 deletions(-) create mode 100644 py_instrument_control_lib/playground/test_smu_sweep.py diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index 53dd786..83f199a 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -210,61 +210,64 @@ def toggle_buffering(self, enable: bool, check_errors: bool = False) \ if not hasattr(self, '_buffered_script'): self._buffered_script = [] - def execute_buffered_script(self, blocking: bool = True, check_errors: bool = False) \ + def execute_buffered_script(self, blocking: bool = True, script_name: str = 'BufferedScript', check_errors: bool = False) \ -> None: - self.toggle_buffering(False) - - buffer_entries_a = len([line for line in self._buffered_script if 'A_M_BUFFER' in line]) - buffer_entries_b = len([line for line in self._buffered_script if 'B_M_BUFFER' in line]) - default_script = ["loadscript pyBuff", - f'A_M_BUFFER = smua.makebuffer({buffer_entries_a})', - f'B_M_BUFFER = smub.makebuffer({buffer_entries_b})', - 'A_M_BUFFER.appendmode = 1', - 'B_M_BUFFER.appendmode = 1'] - self._buffered_script = default_script + self._buffered_script - self._buffered_script.append('endscript') + for buffer in ('A_M_BUFFER', 'B_M_BUFFER'): + buffer_entries = len([line for line in self._buffered_script if buffer in line]) + self._buffered_script.insert(0, f'{buffer} = smu{buffer[0].lower()}.makebuffer({buffer_entries})') + self._buffered_script.insert(1, f'{buffer}.appendmode = 1') + self.send_execute_script(self._buffered_script, script_name, blocking) + + def send_script(self, script: list[str] | str, script_name: str, check_errors: bool = False) \ + -> None: + if isinstance(script, str): + script = script.split('\n') + script = [f'loadscript {script_name}', *script, 'endscript'] + chunk_size = 32 + chunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)] exit_payload: dict = {'command': 'keyInput', 'value': 'K'} - payloads: list[dict] = [] - - n = 32 - chunks = [self._buffered_script[i:i + n] for i in range(0, len(self._buffered_script), n)] - - payloads += [self.__make_payload('\n'.join(chunk)) for chunk in chunks] - payloads += [self.__make_payload('pyBuff.save()'), exit_payload] + payloads: list[dict] = [exit_payload, + *[self.__make_payload('\n'.join(chunk)) for chunk in chunks], + self.__make_payload(f'{script_name}.save()'), + exit_payload] for payload in payloads: response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload) if response.status_code != 200: raise DeviceException(msg='Failed to send and execute buffered script') time.sleep(0.5) + + def execute_script(self, script_name: str, blocking: bool = True, check_errors: bool = False) \ + -> None: + buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled + self.toggle_buffering(False) + self.execute(f'{script_name}()') + self.toggle_buffering(buffering_enabled) - self.execute('pyBuff()') - - print('Waiting for script to complete') if blocking: - status = requests.post('http://' + self._config.ip + '/HttpCommand', json={ - "command": "shellOutput", - "timeout": 3, - "acceptsMultiple": True - }) + poll_payload = {"command": "shellOutput", "timeout": 3, "acceptsMultiple": True} + print('Waiting for script to complete') + status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload) while status.json()['status']['value'] == 'timeout': - status = requests.post('http://' + self._config.ip + '/HttpCommand', json={ - "command": "shellOutput", - "timeout": 3, - "acceptsMultiple": True - }) + status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload) print('Script finished') + def send_execute_script(self, script: list[str] | str, script_name: str, blocking: bool = True, check_errors: bool = False) \ + -> None: + self.send_script(script, script_name) + self.execute_script(script_name, blocking) + def __make_payload(self, value: str, check_errors: bool = False) \ -> dict: return {'command': 'shellInput', 'value': value} def read_buffer(self, check_errors: bool = False) \ -> list[list[str]]: - buffer_a = self.__read_channel_buffer(ChannelIndex(1)) - buffer_b = self.__read_channel_buffer(ChannelIndex(2)) - self._buffer = [buffer_a, buffer_b] + buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled + self.toggle_buffering(False) + self._buffer = [self.__read_channel_buffer(ChannelIndex(1)), self.__read_channel_buffer(ChannelIndex(2))] + self.toggle_buffering(buffering_enabled) return copy.deepcopy(self._buffer) def __read_channel_buffer(self, channel_idx: ChannelIndex, check_errors: bool = False) \ @@ -312,3 +315,20 @@ def measure_channel(self, unit: ChannelUnit, channel_idx: ChannelIndex, check_er def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ -> SourceMeasureChannel: return SourceMeasureChannel(self, channel_idx, [ChannelUnit.VOLTAGE, ChannelUnit.CURRENT], [ChannelUnit.VOLTAGE, ChannelUnit.CURRENT, ChannelUnit.POWER, ChannelUnit.RESISTANCE], True) + + def perform_linear_voltage_sweep(self, channel_idx: ChannelIndex, start_voltage: float, stop_voltage: float, increase_rate: float, current: float, blocking: bool = True, check_errors: bool = False) \ + -> None: + precision = 1000 + factor = 1 if start_voltage <= stop_voltage else -1 + script = f'''channel = {self.__to_channel(channel_idx)} + channel.source.func = channel.OUTPUT_DCVOLTS + channel.source.output = channel.OUTPUT_ON + channel.source.limitv = {factor * stop_voltage} + 0.1 + channel.source.limiti = {current} + 0.0001 + channel.source.leveli = {current} + for current_voltage = {factor * start_voltage * precision}, {factor * stop_voltage * precision} do + channel.source.levelv = {factor} * current_voltage / {precision} + delay(1 / {increase_rate * precision}) + end + channel.source.output = channel.OUTPUT_OFF''' + self.send_execute_script(script, "LinearVoltageSweep", blocking) diff --git a/py_instrument_control_lib/playground/test_kei2600_buffering.py b/py_instrument_control_lib/playground/test_kei2600_buffering.py index a21b915..3e9970d 100644 --- a/py_instrument_control_lib/playground/test_kei2600_buffering.py +++ b/py_instrument_control_lib/playground/test_kei2600_buffering.py @@ -9,6 +9,8 @@ smu.connect() smu.toggle_buffering(True) +iterations = 10 + smu.set_limit(ChannelUnit.CURRENT, ChannelIndex(1), 1) smu.set_limit(ChannelUnit.CURRENT, ChannelIndex(2), 1) smu.display_measure_function(ChannelIndex(1), SMUDisplay.MEASURE_DC_AMPS) @@ -26,24 +28,23 @@ smu.toggle_channel(ChannelIndex(2), True) smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(1), 1) smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(2), 1) -for i in range(0, 10): - smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) +for i in range(iterations): + smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) - smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) + smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) smu.toggle_channel(ChannelIndex(1), False) smu.toggle_channel(ChannelIndex(2), False) smu.execute_buffered_script(blocking=True) smu.read_buffer() -buff = smu.get_buffer() -for i in range(0, 10): - print(f'smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}\n') - print(f'smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}\n') - print(f'smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) = {smu.next_buffer_element(ChannelIndex(2))}\n') - print(f'smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}\n') - print(f'smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) = {smu.next_buffer_element(ChannelIndex(2))}\n') +for i in range(iterations): + print(f'smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}') + print(f'smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}') + print(f'smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) = {smu.next_buffer_element(ChannelIndex(2))}') + print(f'smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}') + print(f'smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) = {smu.next_buffer_element(ChannelIndex(2))}') pass diff --git a/py_instrument_control_lib/playground/test_smu_sweep.py b/py_instrument_control_lib/playground/test_smu_sweep.py new file mode 100644 index 0000000..ce17086 --- /dev/null +++ b/py_instrument_control_lib/playground/test_smu_sweep.py @@ -0,0 +1,11 @@ +from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex +from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig +from py_instrument_control_lib.devices.KEI2600 import KEI2600 + +smu_config = TCPDeviceConfig(ip='132.231.14.169', port=5025, timeout=5) +smu = KEI2600(smu_config) +smu.connect() + +smu.perform_linear_voltage_sweep(ChannelIndex(1), 0, 2, 0.1, 0.3, True) + +smu.disconnect() diff --git a/specifications/KEI2600.json b/specifications/KEI2600.json index da2d8ea..b2b36cb 100644 --- a/specifications/KEI2600.json +++ b/specifications/KEI2600.json @@ -156,8 +156,26 @@ { "name": "execute_buffered_script", "type": "code", - "signature": "blocking: bool = True", - "code": "self.toggle_buffering(False)\n\nbuffer_entries_a = len([line for line in self._buffered_script if 'A_M_BUFFER' in line])\nbuffer_entries_b = len([line for line in self._buffered_script if 'B_M_BUFFER' in line])\ndefault_script = [\"loadscript pyBuff\",\n f'A_M_BUFFER = smua.makebuffer({buffer_entries_a})',\n f'B_M_BUFFER = smub.makebuffer({buffer_entries_b})',\n 'A_M_BUFFER.appendmode = 1',\n 'B_M_BUFFER.appendmode = 1']\nself._buffered_script = default_script + self._buffered_script\nself._buffered_script.append('endscript')\n\nexit_payload: dict = {'command': 'keyInput', 'value': 'K'}\npayloads: list[dict] = []\n\nn = 32\nchunks = [self._buffered_script[i:i + n] for i in range(0, len(self._buffered_script), n)]\n\npayloads += [self.__make_payload('\\n'.join(chunk)) for chunk in chunks]\npayloads += [self.__make_payload('pyBuff.save()'), exit_payload]\n\nfor payload in payloads:\n response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload)\n if response.status_code != 200:\n raise DeviceException(msg='Failed to send and execute buffered script')\n time.sleep(0.5)\n\nself.execute('pyBuff()')\n\nprint('Waiting for script to complete')\nif blocking:\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json={\n \"command\": \"shellOutput\",\n \"timeout\": 3,\n \"acceptsMultiple\": True\n })\n while status.json()['status']['value'] == 'timeout':\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json={\n \"command\": \"shellOutput\",\n \"timeout\": 3,\n \"acceptsMultiple\": True\n })\n print('Script finished')" + "signature": "blocking: bool = True, script_name: str = 'BufferedScript'", + "code": "for buffer in ('A_M_BUFFER', 'B_M_BUFFER'):\n buffer_entries = len([line for line in self._buffered_script if buffer in line])\n self._buffered_script.insert(0, f'{buffer} = smu{buffer[0].lower()}.makebuffer({buffer_entries})')\n self._buffered_script.insert(1, f'{buffer}.appendmode = 1')\nself.send_execute_script(self._buffered_script, script_name, blocking)" + }, + { + "name": "send_script", + "type": "code", + "signature": "script: list[str] | str, script_name: str", + "code": "if isinstance(script, str):\n script = script.split('\\n')\nscript = [f'loadscript {script_name}', *script, 'endscript']\n\nchunk_size = 32\nchunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)]\nexit_payload: dict = {'command': 'keyInput', 'value': 'K'}\npayloads: list[dict] = [exit_payload,\n *[self.__make_payload('\\n'.join(chunk)) for chunk in chunks],\n self.__make_payload(f'{script_name}.save()'),\n exit_payload]\n\nfor payload in payloads:\n response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload)\n if response.status_code != 200:\n raise DeviceException(msg='Failed to send and execute buffered script')\n time.sleep(0.5)" + }, + { + "name": "execute_script", + "type": "code", + "signature": "script_name: str, blocking: bool = True", + "code": "buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled\nself.toggle_buffering(False)\nself.execute(f'{script_name}()')\nself.toggle_buffering(buffering_enabled)\n\nif blocking:\n poll_payload = {\"command\": \"shellOutput\", \"timeout\": 3, \"acceptsMultiple\": True}\n print('Waiting for script to complete')\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload)\n while status.json()['status']['value'] == 'timeout':\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload)\n print('Script finished')" + }, + { + "name": "send_execute_script", + "type": "code", + "signature": "script: list[str] | str, script_name: str, blocking: bool = True", + "code": "self.send_script(script, script_name)\nself.execute_script(script_name, blocking)" }, { "name": "__make_payload", @@ -171,7 +189,7 @@ "type": "code", "signature": "", "return": "list[list[str]]", - "code": "buffer_a = self.__read_channel_buffer(ChannelIndex(1))\nbuffer_b = self.__read_channel_buffer(ChannelIndex(2))\nself._buffer = [buffer_a, buffer_b]\nreturn copy.deepcopy(self._buffer)" + "code": "buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled\nself.toggle_buffering(False)\nself._buffer = [self.__read_channel_buffer(ChannelIndex(1)), self.__read_channel_buffer(ChannelIndex(2))]\nself.toggle_buffering(buffering_enabled)\nreturn copy.deepcopy(self._buffer)" }, { "name": "__read_channel_buffer", @@ -214,6 +232,12 @@ "signature": "channel_idx: ChannelIndex", "return": "SourceMeasureChannel", "code": "return SourceMeasureChannel(self, channel_idx, [ChannelUnit.VOLTAGE, ChannelUnit.CURRENT], [ChannelUnit.VOLTAGE, ChannelUnit.CURRENT, ChannelUnit.POWER, ChannelUnit.RESISTANCE], True)" + }, + { + "name": "perform_linear_voltage_sweep", + "type": "code", + "signature": "channel_idx: ChannelIndex, start_voltage: float, stop_voltage: float, increase_rate: float, current: float, blocking: bool = True", + "code": "precision = 1000\nfactor = 1 if start_voltage <= stop_voltage else -1\nscript = f'''channel = {self.__to_channel(channel_idx)}\nchannel.source.func = channel.OUTPUT_DCVOLTS\nchannel.source.output = channel.OUTPUT_ON\nchannel.source.limitv = {factor * stop_voltage} + 0.1\nchannel.source.limiti = {current} + 0.0001\nchannel.source.leveli = {current}\nfor current_voltage = {factor * start_voltage * precision}, {factor * stop_voltage * precision} do\n channel.source.levelv = {factor} * current_voltage / {precision}\n delay(1 / {increase_rate * precision})\n end\nchannel.source.output = channel.OUTPUT_OFF'''\nself.send_execute_script(script, \"LinearVoltageSweep\", blocking)" } ] } From ce5cfbe145945e2f9fe38017c3a9af8b652a9041 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Fri, 16 Feb 2024 12:29:31 +0100 Subject: [PATCH 02/22] Implement get real data for oscilloscope --- py_instrument_control_lib/devices/KST3000.py | 36 ++++++++++++++++---- specifications/KST3000.json | 18 ++++++---- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/py_instrument_control_lib/devices/KST3000.py b/py_instrument_control_lib/devices/KST3000.py index 1cfeff9..31c7eca 100644 --- a/py_instrument_control_lib/devices/KST3000.py +++ b/py_instrument_control_lib/devices/KST3000.py @@ -134,17 +134,35 @@ def save_waveform_data(self, file_path: str, check_errors: bool = False) \ def get_waveform_data(self, check_errors: bool = False) \ -> str: - val = self.query('WAVeform:DATA?') + self.execute('WAVeform:DATA?') + response = self._socket.recv(1024) if check_errors: self.check_error_buffer() - return val + return response def get_real_data(self, check_errors: bool = False) \ - -> str: - val = self.query('') - if check_errors: - self.check_error_buffer() - return val + -> tuple[list[float], list[float]]: + preamble = self.get_waveform_preamble().split(',') + x_increment = float(preamble[4]) + x_origin = float(preamble[5]) + x_reference = float(preamble[6]) + y_increment = float(preamble[7]) + y_origin = float(preamble[8]) + y_reference = float(preamble[9]) + + data = self.get_waveform_data()[10:] + result = ([], []) + for i in range(self.get_waveform_points()): + time = ((i - x_reference) * x_increment) + x_origin + voltage_data = int(data[i]) + if voltage_data != 0: # Not a hole. Holes are locations where data has not yet been acquired. + voltage = ((voltage_data - y_reference) * y_increment) + y_origin + result[0].append(time) + result[1].append(voltage) + + if check_errors: + self.check_error_buffer() + return result def digitize(self, channel: OscChannel, check_errors: bool = False) \ -> None: @@ -169,3 +187,7 @@ def set_channel_display(self, channel: OscChannel, enable: bool, check_errors: b self.execute(f'CHANnel:DISPLAY{channel.value} {int(enable)}') if check_errors: self.check_error_buffer() + + def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ + -> None: + raise NotImplementedError('Channels are not supported yet.') diff --git a/specifications/KST3000.json b/specifications/KST3000.json index da5de49..3101b79 100644 --- a/specifications/KST3000.json +++ b/specifications/KST3000.json @@ -120,19 +120,17 @@ }, { "name": "get_waveform_data", - "type": "generated", + "type": "code", "signature": "", "return": "str", - "command": "'WAVeform:DATA?'", - "TODO": "Maybe make substring [10:-1]" + "code": "self.execute('WAVeform:DATA?')\nresponse = self._socket.recv(1024)\nif check_errors:\n self.check_error_buffer()\nreturn response" }, { "name": "get_real_data", - "type": "generated", + "type": "code", "signature": "", - "return": "str", - "command": "''", - "TODO": "IMPLEMENT!" + "return": "tuple[list[float], list[float]]", + "code": "preamble = self.get_waveform_preamble().split(',')\nx_increment = float(preamble[4])\nx_origin = float(preamble[5])\nx_reference = float(preamble[6])\ny_increment = float(preamble[7])\ny_origin = float(preamble[8])\ny_reference = float(preamble[9])\n\ndata = self.get_waveform_data()[10:]\nresult = ([], [])\nfor i in range(self.get_waveform_points()):\n time = ((i - x_reference) * x_increment) + x_origin\n voltage_data = int(data[i])\n if voltage_data != 0: # Not a hole. Holes are locations where data has not yet been acquired.\n voltage = ((voltage_data - y_reference) * y_increment) + y_origin\n result[0].append(time)\n result[1].append(voltage)\n\nif check_errors:\n self.check_error_buffer()\nreturn result" }, { "name": "digitize", @@ -157,6 +155,12 @@ "type": "generated", "signature": "channel: OscChannel, enable: bool", "command": "f'CHANnel:DISPLAY{channel.value} {int(enable)}'" + }, + { + "name": "get_channel", + "type": "code", + "signature": "channel_idx: ChannelIndex", + "code": "raise NotImplementedError('Channels are not supported yet.')" } ] } From 05cae3ca562e0a61c29e012c9f995a2613206ef9 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Fri, 16 Feb 2024 12:35:01 +0100 Subject: [PATCH 03/22] Use new channel indices for oscilloscope --- py_instrument_control_lib/devices/KST3000.py | 37 ++++++++++------ specifications/KST3000.json | 45 +++++++++++++------- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/py_instrument_control_lib/devices/KST3000.py b/py_instrument_control_lib/devices/KST3000.py index 31c7eca..436af6a 100644 --- a/py_instrument_control_lib/devices/KST3000.py +++ b/py_instrument_control_lib/devices/KST3000.py @@ -52,21 +52,21 @@ def set_time_range(self, range_: float, check_errors: bool = False) \ if check_errors: self.check_error_buffer() - def set_channel_offset(self, channel: OscChannel, offset: float, check_errors: bool = False) \ + def set_channel_offset(self, channel_idx: ChannelIndex, offset: float, check_errors: bool = False) \ -> None: - self.execute(f'CHANnel{channel.value}:offset {offset}') + self.execute(f'CHANnel{self.__to_channel(channel_idx)}:offset {offset}') if check_errors: self.check_error_buffer() - def set_channel_scale(self, channel: OscChannel, scale: float, check_errors: bool = False) \ + def set_channel_scale(self, channel_idx: ChannelIndex, scale: float, check_errors: bool = False) \ -> None: - self.execute(f'CHANnel{channel.value}:SCALe {scale}') + self.execute(f'CHANnel{self.__to_channel(channel_idx)}:SCALe {scale}') if check_errors: self.check_error_buffer() - def set_channel_range(self, channel: OscChannel, value: float, voltage_unit: VoltageUnit, check_errors: bool = False) \ + def set_channel_range(self, channel_idx: ChannelIndex, value: float, voltage_unit: VoltageUnit, check_errors: bool = False) \ -> None: - self.execute(f'CHANnel{channel.value}:RANGe range {"mV" if voltage_unit == VoltageUnit.MILLI_VOLT else ""}') + self.execute(f'CHANnel{self.__to_channel(channel_idx)}:RANGe range {"mV" if voltage_unit == VoltageUnit.MILLI_VOLT else ""}') if check_errors: self.check_error_buffer() @@ -76,9 +76,9 @@ def set_trigger_edge(self, edge: TriggerEdge, check_errors: bool = False) \ if check_errors: self.check_error_buffer() - def set_trigger_source(self, channel: OscChannel, check_errors: bool = False) \ + def set_trigger_source(self, channel_idx: ChannelIndex, check_errors: bool = False) \ -> None: - self.execute(f'TRIGger:SOURceCHAN{channel.value}') + self.execute(f'TRIGger:SOURceCHAN{self.__to_channel(channel_idx)}') if check_errors: self.check_error_buffer() @@ -88,9 +88,9 @@ def set_time_delay(self, delay: float, check_errors: bool = False) \ if check_errors: self.check_error_buffer() - def set_waveform_source(self, channel: OscChannel, check_errors: bool = False) \ + def set_waveform_source(self, channel_idx: ChannelIndex, check_errors: bool = False) \ -> None: - self.execute(f'WAVeform:SOURce CHANnel{channel.value}') + self.execute(f'WAVeform:SOURce CHANnel{self.__to_channel(channel_idx)}') if check_errors: self.check_error_buffer() @@ -164,9 +164,9 @@ def get_real_data(self, check_errors: bool = False) \ self.check_error_buffer() return result - def digitize(self, channel: OscChannel, check_errors: bool = False) \ + def digitize(self, channel_idx: ChannelIndex, check_errors: bool = False) \ -> None: - self.execute(f'DIGitize CHAnnel{channel.value}') + self.execute(f'DIGitize CHAnnel{self.__to_channel(channel_idx)}') if check_errors: self.check_error_buffer() @@ -182,12 +182,21 @@ def set_display_mode(self, display_mode: DisplayModes, check_errors: bool = Fals if check_errors: self.check_error_buffer() - def set_channel_display(self, channel: OscChannel, enable: bool, check_errors: bool = False) \ + def set_channel_display(self, channel_idx: ChannelIndex, enable: bool, check_errors: bool = False) \ -> None: - self.execute(f'CHANnel:DISPLAY{channel.value} {int(enable)}') + self.execute(f'CHANnel:DISPLAY{self.__to_channel(channel_idx)} {int(enable)}') if check_errors: self.check_error_buffer() def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ -> None: raise NotImplementedError('Channels are not supported yet.') + + def __to_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ + -> str: + channel_idx.check(4) + return str(channel_idx) + + def check_error_buffer(self, check_errors: bool = False) \ + -> None: + pass diff --git a/specifications/KST3000.json b/specifications/KST3000.json index 3101b79..0fa3b53 100644 --- a/specifications/KST3000.json +++ b/specifications/KST3000.json @@ -36,22 +36,22 @@ { "name": "set_channel_offset", "type": "generated", - "signature": "channel: OscChannel, offset: float", - "command": "f'CHANnel{channel.value}:offset {offset}'", - "TODO": "Check if channel.value is right" + "signature": "channel_idx: ChannelIndex, offset: float", + "command": "f'CHANnel{self.__to_channel(channel_idx)}:offset {offset}'", + "TODO": "Check if self.__to_channel(channel_idx) is right" }, { "name": "set_channel_scale", "type": "generated", - "signature": "channel: OscChannel, scale: float", - "command": "f'CHANnel{channel.value}:SCALe {scale}'", - "TODO": "Check if channel.value is right" + "signature": "channel_idx: ChannelIndex, scale: float", + "command": "f'CHANnel{self.__to_channel(channel_idx)}:SCALe {scale}'", + "TODO": "Check if self.__to_channel(channel_idx) is right" }, { "name": "set_channel_range", "type": "generated", - "signature": "channel: OscChannel, value: float, voltage_unit: VoltageUnit", - "command": "f'CHANnel{channel.value}:RANGe range {\"mV\" if voltage_unit == VoltageUnit.MILLI_VOLT else \"\"}'" + "signature": "channel_idx: ChannelIndex, value: float, voltage_unit: VoltageUnit", + "command": "f'CHANnel{self.__to_channel(channel_idx)}:RANGe range {\"mV\" if voltage_unit == VoltageUnit.MILLI_VOLT else \"\"}'" }, { "name": "set_trigger_edge", @@ -62,8 +62,8 @@ { "name": "set_trigger_source", "type": "generated", - "signature": "channel: OscChannel", - "command": "f'TRIGger:SOURceCHAN{channel.value}'", + "signature": "channel_idx: ChannelIndex", + "command": "f'TRIGger:SOURceCHAN{self.__to_channel(channel_idx)}'", "TODO": "Check" }, { @@ -75,8 +75,8 @@ { "name": "set_waveform_source", "type": "generated", - "signature": "channel: OscChannel", - "command": "f'WAVeform:SOURce CHANnel{channel.value}'" + "signature": "channel_idx: ChannelIndex", + "command": "f'WAVeform:SOURce CHANnel{self.__to_channel(channel_idx)}'" }, { "name": "get_waveform_preamble", @@ -135,8 +135,8 @@ { "name": "digitize", "type": "generated", - "signature": "channel: OscChannel", - "command": "f'DIGitize CHAnnel{channel.value}'" + "signature": "channel_idx: ChannelIndex", + "command": "f'DIGitize CHAnnel{self.__to_channel(channel_idx)}'" }, { "name": "get_system_setup", @@ -153,14 +153,27 @@ { "name": "set_channel_display", "type": "generated", - "signature": "channel: OscChannel, enable: bool", - "command": "f'CHANnel:DISPLAY{channel.value} {int(enable)}'" + "signature": "channel_idx: ChannelIndex, enable: bool", + "command": "f'CHANnel:DISPLAY{self.__to_channel(channel_idx)} {int(enable)}'" }, { "name": "get_channel", "type": "code", "signature": "channel_idx: ChannelIndex", "code": "raise NotImplementedError('Channels are not supported yet.')" + }, + { + "name": "__to_channel", + "type": "code", + "signature": "channel_idx: ChannelIndex", + "return": "str", + "code": "channel_idx.check(4)\nreturn str(channel_idx)" + }, + { + "name": "check_error_buffer", + "type": "code", + "signature": "", + "code": "pass" } ] } From 8ef84618e5e1a6a98db5997c17371efaee1832ac Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Fri, 16 Feb 2024 12:38:35 +0100 Subject: [PATCH 04/22] Optimize imports --- .../device_generation/DEFAULT_IMPORTS.py | 3 --- py_instrument_control_lib/devices/KEI2600.py | 23 ++++++------------- py_instrument_control_lib/devices/KST3000.py | 10 -------- py_instrument_control_lib/devices/KST33500.py | 15 ++++-------- py_instrument_control_lib/devices/SPD1305X.py | 15 ++++-------- .../devices/SwitchMatrix.py | 12 ---------- specifications/KST33500.json | 6 +++++ specifications/SPD1305X.json | 6 +++++ 8 files changed, 28 insertions(+), 62 deletions(-) diff --git a/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py b/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py index b3887a0..8aa5c88 100644 --- a/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py +++ b/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py @@ -9,6 +9,3 @@ from py_instrument_control_lib.manufacturers.FloDevice import FloDevice from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice - -_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice, - KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice) \ No newline at end of file diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index 83f199a..646eed2 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -4,25 +4,16 @@ Only make changes in the source file. """ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex, ChannelUnit -from py_instrument_control_lib.channels.Channel import SourceChannel, MeasureChannel, SourceMeasureChannel -from py_instrument_control_lib.device_base.TCPDevice import TCPDevice -from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix -from py_instrument_control_lib.device_types.FunctionGenerator import * -from py_instrument_control_lib.device_types.Oscilloscope import * -from py_instrument_control_lib.device_types.PowerSupply import * -from py_instrument_control_lib.device_types.SMU import * -from py_instrument_control_lib.manufacturers.FloDevice import FloDevice -from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice -from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice +import copy +import time +from typing import Optional -_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice, - KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice) import requests + +from py_instrument_control_lib.channels.Channel import SourceMeasureChannel from py_instrument_control_lib.device_base.DeviceException import DeviceException -import time -import copy -from typing import Optional +from py_instrument_control_lib.device_types.SMU import * +from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice class KEI2600(SMU, KeithleyDevice): diff --git a/py_instrument_control_lib/devices/KST3000.py b/py_instrument_control_lib/devices/KST3000.py index 436af6a..28c43a0 100644 --- a/py_instrument_control_lib/devices/KST3000.py +++ b/py_instrument_control_lib/devices/KST3000.py @@ -4,20 +4,10 @@ Only make changes in the source file. """ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex, ChannelUnit -from py_instrument_control_lib.channels.Channel import SourceChannel, MeasureChannel, SourceMeasureChannel -from py_instrument_control_lib.device_base.TCPDevice import TCPDevice -from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix -from py_instrument_control_lib.device_types.FunctionGenerator import * from py_instrument_control_lib.device_types.Oscilloscope import * -from py_instrument_control_lib.device_types.PowerSupply import * from py_instrument_control_lib.device_types.SMU import * -from py_instrument_control_lib.manufacturers.FloDevice import FloDevice -from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice -_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice, - KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice) class KST3000(Oscilloscope, KeysightDevice): diff --git a/py_instrument_control_lib/devices/KST33500.py b/py_instrument_control_lib/devices/KST33500.py index ad40f76..4fae593 100644 --- a/py_instrument_control_lib/devices/KST33500.py +++ b/py_instrument_control_lib/devices/KST33500.py @@ -4,20 +4,11 @@ Only make changes in the source file. """ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex, ChannelUnit -from py_instrument_control_lib.channels.Channel import SourceChannel, MeasureChannel, SourceMeasureChannel -from py_instrument_control_lib.device_base.TCPDevice import TCPDevice -from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix +from py_instrument_control_lib.channels.Channel import SourceChannel from py_instrument_control_lib.device_types.FunctionGenerator import * -from py_instrument_control_lib.device_types.Oscilloscope import * -from py_instrument_control_lib.device_types.PowerSupply import * from py_instrument_control_lib.device_types.SMU import * -from py_instrument_control_lib.manufacturers.FloDevice import FloDevice -from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice -_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice, - KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice) class KST33500(FunctionGenerator, KeysightDevice): @@ -93,3 +84,7 @@ def set_channel_level(self, unit: ChannelUnit, channel_idx: ChannelIndex, level: -> None: channel_idx.check(2) self.execute(f'VOLTage:OFFSet {level / 2}') # TODO: Find proper way to set voltage level + + def check_error_buffer(self, check_errors: bool = False) \ + -> None: + pass diff --git a/py_instrument_control_lib/devices/SPD1305X.py b/py_instrument_control_lib/devices/SPD1305X.py index b831857..320771e 100644 --- a/py_instrument_control_lib/devices/SPD1305X.py +++ b/py_instrument_control_lib/devices/SPD1305X.py @@ -4,20 +4,9 @@ Only make changes in the source file. """ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex, ChannelUnit -from py_instrument_control_lib.channels.Channel import SourceChannel, MeasureChannel, SourceMeasureChannel -from py_instrument_control_lib.device_base.TCPDevice import TCPDevice -from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix -from py_instrument_control_lib.device_types.FunctionGenerator import * -from py_instrument_control_lib.device_types.Oscilloscope import * from py_instrument_control_lib.device_types.PowerSupply import * -from py_instrument_control_lib.device_types.SMU import * -from py_instrument_control_lib.manufacturers.FloDevice import FloDevice -from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice -_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice, - KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice) class SPD1305X(PowerSupply, KeysightDevice): @@ -75,3 +64,7 @@ def unlock_input(self, check_errors: bool = False) \ self.execute('*UNLOCK') if check_errors: self.check_error_buffer() + + def check_error_buffer(self, check_errors: bool = False) \ + -> None: + pass diff --git a/py_instrument_control_lib/devices/SwitchMatrix.py b/py_instrument_control_lib/devices/SwitchMatrix.py index 48624b1..bebc5db 100644 --- a/py_instrument_control_lib/devices/SwitchMatrix.py +++ b/py_instrument_control_lib/devices/SwitchMatrix.py @@ -4,20 +4,8 @@ Only make changes in the source file. """ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex, ChannelUnit -from py_instrument_control_lib.channels.Channel import SourceChannel, MeasureChannel, SourceMeasureChannel -from py_instrument_control_lib.device_base.TCPDevice import TCPDevice from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix -from py_instrument_control_lib.device_types.FunctionGenerator import * -from py_instrument_control_lib.device_types.Oscilloscope import * -from py_instrument_control_lib.device_types.PowerSupply import * -from py_instrument_control_lib.device_types.SMU import * from py_instrument_control_lib.manufacturers.FloDevice import FloDevice -from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice -from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice - -_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice, - KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice) class SwitchMatrix(AbstractSwitchMatrix, FloDevice): diff --git a/specifications/KST33500.json b/specifications/KST33500.json index e8fdb28..14e4e1d 100644 --- a/specifications/KST33500.json +++ b/specifications/KST33500.json @@ -77,6 +77,12 @@ "type": "code", "signature": "unit: ChannelUnit, channel_idx: ChannelIndex, level: float", "code": "channel_idx.check(2)\nself.execute(f'VOLTage:OFFSet {level / 2}') # TODO: Find proper way to set voltage level" + }, + { + "name": "check_error_buffer", + "type": "code", + "signature": "", + "code": "pass" } ] } diff --git a/specifications/SPD1305X.json b/specifications/SPD1305X.json index de28ddf..266fc61 100644 --- a/specifications/SPD1305X.json +++ b/specifications/SPD1305X.json @@ -56,6 +56,12 @@ "type": "generated", "signature": "", "command": "'*UNLOCK'" + }, + { + "name": "check_error_buffer", + "type": "code", + "signature": "", + "code": "pass" } ] } From c9561236f64d87729296b971ce12a4c15cf89d99 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Fri, 16 Feb 2024 12:42:19 +0100 Subject: [PATCH 05/22] Use channel indices --- .../device_types/Oscilloscope.py | 16 +++++----------- .../playground/test_devices.py | 14 +++++++------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/py_instrument_control_lib/device_types/Oscilloscope.py b/py_instrument_control_lib/device_types/Oscilloscope.py index 2730099..6db3b33 100644 --- a/py_instrument_control_lib/device_types/Oscilloscope.py +++ b/py_instrument_control_lib/device_types/Oscilloscope.py @@ -1,16 +1,10 @@ from abc import ABC from enum import Enum +from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex from py_instrument_control_lib.device_base.Device import Device -class OscChannel(Enum): - CHANNEL_1 = "1" - CHANNEL_2 = "2" - CHANNEL_3 = "3" - CHANNEL_4 = "4" - - class VoltageUnit(Enum): VOLT = "" MILLI_VOLT = "" @@ -53,17 +47,17 @@ def auto_scale(self) -> None: def set_time_range(self, value: float) -> None: pass - def set_channel_offset(self, channel: OscChannel, offset: float) -> None: + def set_channel_offset(self, channel_idx: ChannelIndex, offset: float) -> None: pass - def set_channel_scale(self, channel: OscChannel, value: float) -> None: + def set_channel_scale(self, channel_idx: ChannelIndex, value: float) -> None: pass - def set_channel_range(self, channel: OscChannel, value: float, voltage_unit: VoltageUnit) -> None: + def set_channel_range(self, channel_idx: ChannelIndex, value: float, voltage_unit: VoltageUnit) -> None: pass def set_trigger_edge(self, edge: TriggerEdge) -> None: pass - def set_trigger_source(self, channel: OscChannel) -> None: + def set_trigger_source(self, channel_idx: ChannelIndex) -> None: pass diff --git a/py_instrument_control_lib/playground/test_devices.py b/py_instrument_control_lib/playground/test_devices.py index f4a566b..c75520a 100644 --- a/py_instrument_control_lib/playground/test_devices.py +++ b/py_instrument_control_lib/playground/test_devices.py @@ -112,19 +112,19 @@ def test_osc() -> None: osc.set_time_range(1) time.sleep(0.3) - osc.set_channel_offset(OscChannel.CHANNEL_1, 1) + osc.set_channel_offset(ChannelIndex(1), 1) time.sleep(0.3) - osc.set_channel_scale(OscChannel.CHANNEL_1, 1) + osc.set_channel_scale(ChannelIndex(1), 1) time.sleep(0.3) - osc.set_channel_range(OscChannel.CHANNEL_1, 1, VoltageUnit.VOLT) + osc.set_channel_range(ChannelIndex(1), 1, VoltageUnit.VOLT) time.sleep(0.3) osc.set_trigger_edge(TriggerEdge.POS_EDGE) time.sleep(0.3) - osc.set_trigger_source(OscChannel.CHANNEL_1) + osc.set_trigger_source(ChannelIndex(1)) time.sleep(0.3) osc.set_time_delay(1) time.sleep(0.3) - osc.set_waveform_source(OscChannel.CHANNEL_1) + osc.set_waveform_source(ChannelIndex(1)) time.sleep(0.3) print(osc.set_waveform_points(100)) time.sleep(0.3) @@ -136,13 +136,13 @@ def test_osc() -> None: time.sleep(0.3) osc.get_real_data() time.sleep(0.3) - osc.digitize(OscChannel.CHANNEL_1) + osc.digitize(ChannelIndex(1)) time.sleep(0.3) print(osc.get_system_setup()) time.sleep(0.3) osc.set_display_mode(DisplayModes.XY) time.sleep(0.3) - osc.set_channel_display(OscChannel.CHANNEL_1, True) + osc.set_channel_display(ChannelIndex(1), True) osc.disconnect() From 05b417ca3f6aec85fcbc311e95c4b1b5fbbd4a5c Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Fri, 16 Feb 2024 12:45:14 +0100 Subject: [PATCH 06/22] Fix channel index to string method --- specifications/KST3000.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/specifications/KST3000.json b/specifications/KST3000.json index 0fa3b53..7f57e13 100644 --- a/specifications/KST3000.json +++ b/specifications/KST3000.json @@ -167,7 +167,7 @@ "type": "code", "signature": "channel_idx: ChannelIndex", "return": "str", - "code": "channel_idx.check(4)\nreturn str(channel_idx)" + "code": "channel_idx.check(4)\nreturn str(channel_idx.get())" }, { "name": "check_error_buffer", From 227dfb1966c1c6438330dcf9868f3b11bd6d4e78 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 12:29:54 +0200 Subject: [PATCH 07/22] Implement equals and hash methods of ChannelIndex --- py_instrument_control_lib/channels/ChannelEnums.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/py_instrument_control_lib/channels/ChannelEnums.py b/py_instrument_control_lib/channels/ChannelEnums.py index 0b4ac3e..d8494b1 100644 --- a/py_instrument_control_lib/channels/ChannelEnums.py +++ b/py_instrument_control_lib/channels/ChannelEnums.py @@ -21,3 +21,11 @@ def check(self, number_channels: int) -> None: def get(self) -> int: return self._index + + def __eq__(self, other): + if isinstance(other, ChannelIndex): + return other.get() == self.get() + return False + + def __hash__(self): + return self.get() From 134b14a61a22a4ec33232130638d3c4d580c313c Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:06:13 +0200 Subject: [PATCH 08/22] Use autoformatter after code generation --- .../device_generation/generate_module.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/py_instrument_control_lib/device_generation/generate_module.py b/py_instrument_control_lib/device_generation/generate_module.py index c14e4da..2da0284 100644 --- a/py_instrument_control_lib/device_generation/generate_module.py +++ b/py_instrument_control_lib/device_generation/generate_module.py @@ -100,8 +100,10 @@ def generate_code(spec: dict) -> str: def generate_module(filepath: str, target_dir: str = '.') -> None: spec = load_spec(abspath(filepath)) module = DISCLAIMER.format(filepath) + '\n\n' + generate_code(spec) - with open(os.path.join(target_dir, spec['name'] + '.py'), 'w') as f: + file_name = os.path.join(target_dir, spec['name'] + '.py') + with open(file_name, 'w') as f: f.write(module) + os.system(f'python3 -m autopep8 -i --max-line-length 120 {abspath(file_name)}') if __name__ == '__main__': From b9891d23055e004039e32da4818232fa7a42c7e7 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:07:03 +0200 Subject: [PATCH 09/22] Regenerate devices after changes to generator --- py_instrument_control_lib/devices/KEI2600.py | 176 +++++++++--------- py_instrument_control_lib/devices/KST3000.py | 116 ++++++------ py_instrument_control_lib/devices/KST33500.py | 53 +++--- py_instrument_control_lib/devices/SPD1305X.py | 39 ++-- .../devices/SwitchMatrix.py | 16 +- 5 files changed, 200 insertions(+), 200 deletions(-) diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index 646eed2..e397f0f 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -19,14 +19,14 @@ class KEI2600(SMU, KeithleyDevice): def execute(self, command: str, check_errors: bool = False) \ - -> None: + -> None: if hasattr(self, '_buffering_enabled') and self._buffering_enabled: self._buffered_script.append(command) else: super().execute(command) - + def query(self, query: str, check_errors: bool = False) \ - -> Optional[str]: + -> Optional[str]: if hasattr(self, '_buffering_enabled') and self._buffering_enabled: buffer_name = 'A_M_BUFFER' if query.startswith('smua') else 'B_M_BUFFER' query = query[:-1] + buffer_name + ')' @@ -42,179 +42,181 @@ def query(self, query: str, check_errors: bool = False) \ raise DeviceException(msg='Do not use the web interface before or during use of this control lib! ' 'You have to restart your device in order to continue.') return response_decoded - + def measure(self, unit: ChannelUnit, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> float: + -> float: """ This function measures a certain unit on a specific channel. Can be used to measure voltage, current, power or resistance. :param unit: The thing to measure. :param channel_idx: The channel to measure on. :param check_errors: Whether to check the error buffer after the execution. :return: None. - """ + """ val = self.query(f'{self.__to_channel(channel_idx)}.measure.{unit.value}()') if check_errors: self.check_error_buffer() return float(val) - + def __to_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> str: + -> str: channel_idx.check(2) return 'smua' if channel_idx.get() == 1 else 'smub' - + def toggle_channel(self, channel_idx: ChannelIndex, enable: bool, check_errors: bool = False) \ - -> None: - self.execute(f'{self.__to_channel(channel_idx)}.source.output = {self.__to_channel(channel_idx)}.OUTPUT_{("ON" if enable else "OFF")}') + -> None: + self.execute( + f'{self.__to_channel(channel_idx)}.source.output = {self.__to_channel(channel_idx)}.OUTPUT_{("ON" if enable else "OFF")}') if check_errors: self.check_error_buffer() - + def set_level(self, unit: ChannelUnit, channel_idx: ChannelIndex, level: float, check_errors: bool = False) \ - -> None: + -> None: if not (unit in (ChannelUnit.VOLTAGE, ChannelUnit.CURRENT)): raise ValueError('Requirements not satisfied') - + self.execute(f'{self.__to_channel(channel_idx)}.source.level{unit.value} = {level}') if check_errors: self.check_error_buffer() - + def set_limit(self, unit: ChannelUnit, channel_idx: ChannelIndex, limit: float, check_errors: bool = False) \ - -> None: + -> None: if not (unit in (ChannelUnit.VOLTAGE, ChannelUnit.CURRENT, ChannelUnit.POWER)): raise ValueError('Requirements not satisfied') - + self.execute(f'{self.__to_channel(channel_idx)}.source.limit{unit.value} = {str(limit)}') if check_errors: self.check_error_buffer() - + def toggle_autorange(self, unit: ChannelUnit, channel_idx: ChannelIndex, mode: SMUMode, enable: bool, check_errors: bool = False) \ - -> None: + -> None: if not (unit in (ChannelUnit.VOLTAGE, ChannelUnit.CURRENT)): raise ValueError('Requirements not satisfied') - + self.execute(f'{self.__to_channel(channel_idx)}.{mode.value}.autorange{unit.value} = {int(enable) if mode == SMUMode.MEASURE else self.__to_channel(channel_idx) + ".AUTORANGE_" + ("ON" if enable else "OFF")}') if check_errors: self.check_error_buffer() - + def toggle_measure_analog_filter(self, channel_idx: ChannelIndex, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.measure.analogfilter = {int(enable)}') if check_errors: self.check_error_buffer() - + def set_range(self, unit: ChannelUnit, channel_idx: ChannelIndex, mode: SMUMode, range_: float, check_errors: bool = False) \ - -> None: + -> None: if not (unit in (ChannelUnit.VOLTAGE, ChannelUnit.CURRENT)): raise ValueError('Requirements not satisfied') - + self.execute(f'{self.__to_channel(channel_idx)}.{mode.value}.range{unit.value} = {range_}') if check_errors: self.check_error_buffer() - + def set_sense_mode(self, channel_idx: ChannelIndex, sense_arg: SMUSense, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.sense = {self.__to_channel(channel_idx)}.{sense_arg.value}') if check_errors: self.check_error_buffer() - + def set_measure_plc(self, channel_idx: ChannelIndex, value: float, check_errors: bool = False) \ - -> None: + -> None: if not (0.001 < value < 25): raise ValueError('Requirements not satisfied') - + self.execute(f'{self.__to_channel(channel_idx)}.measure.nplc = {value}') if check_errors: self.check_error_buffer() - + def set_measure_low_range(self, unit: ChannelUnit, channel_idx: ChannelIndex, value: float, check_errors: bool = False) \ - -> None: + -> None: if not (unit in (ChannelUnit.VOLTAGE, ChannelUnit.CURRENT)): raise ValueError('Requirements not satisfied') - + self.execute(f'{self.__to_channel(channel_idx)}.measure.lowrange{unit.value} = {value}') if check_errors: self.check_error_buffer() - + def set_measure_auto_zero(self, channel_idx: ChannelIndex, auto_zero: Autozero, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.measure.autozero = {self.__to_channel(channel_idx)}.{auto_zero.value}') if check_errors: self.check_error_buffer() - + def set_measure_count(self, channel_idx: ChannelIndex, nr_of_measurements: int, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.measure.count = {nr_of_measurements}') if check_errors: self.check_error_buffer() - + def set_source_function(self, channel_idx: ChannelIndex, src_func: SourceFunction, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.source.func = {self.__to_channel(channel_idx)}.{src_func.value}') if check_errors: self.check_error_buffer() - + def set_source_off_mode(self, channel_idx: ChannelIndex, src_off_mode: SourceOffMode, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.source.offmode = {self.__to_channel(channel_idx)}.{src_off_mode.value}') if check_errors: self.check_error_buffer() - + def set_source_settling(self, channel_idx: ChannelIndex, src_settling: SourceSettling, check_errors: bool = False) \ - -> None: - self.execute(f'{self.__to_channel(channel_idx)}.source.settling = {self.__to_channel(channel_idx)}.{src_settling.value}') + -> None: + self.execute( + f'{self.__to_channel(channel_idx)}.source.settling = {self.__to_channel(channel_idx)}.{src_settling.value}') if check_errors: self.check_error_buffer() - + def toggle_source_sink(self, channel_idx: ChannelIndex, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{self.__to_channel(channel_idx)}.source.sink = {str(int(enable))}') if check_errors: self.check_error_buffer() - + def display_measure_function(self, channel_idx: ChannelIndex, diplay_measure_func: SMUDisplay, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'display.{self.__to_channel(channel_idx)}.measure.func = display.{diplay_measure_func.value}') if check_errors: self.check_error_buffer() - + def toggle_beep(self, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'beeper.enable = beeper.{"ON" if enable else "OFF"}') if check_errors: self.check_error_buffer() - + def beep(self, duration: float, frequency: int, check_errors: bool = False) \ - -> None: + -> None: """ Send a beeping sound with a specific duration and frequency to the SMU. :param duration: Time in seconds to play the beeping sound. :param frequency: Frequency in HZ of the sound to play. :param check_errors: Whether to check the error buffer after the execution. :return: None. - """ + """ self.execute(f'beeper.beep({duration}, {frequency})') if check_errors: self.check_error_buffer() - + def toggle_buffering(self, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self._buffering_enabled = enable if not hasattr(self, '_buffered_script'): self._buffered_script = [] - + def execute_buffered_script(self, blocking: bool = True, script_name: str = 'BufferedScript', check_errors: bool = False) \ - -> None: + -> None: for buffer in ('A_M_BUFFER', 'B_M_BUFFER'): buffer_entries = len([line for line in self._buffered_script if buffer in line]) self._buffered_script.insert(0, f'{buffer} = smu{buffer[0].lower()}.makebuffer({buffer_entries})') self._buffered_script.insert(1, f'{buffer}.appendmode = 1') self.send_execute_script(self._buffered_script, script_name, blocking) - + def send_script(self, script: list[str] | str, script_name: str, check_errors: bool = False) \ - -> None: + -> None: if isinstance(script, str): script = script.split('\n') script = [f'loadscript {script_name}', *script, 'endscript'] - + chunk_size = 32 chunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)] exit_payload: dict = {'command': 'keyInput', 'value': 'K'} @@ -222,20 +224,20 @@ def send_script(self, script: list[str] | str, script_name: str, check_errors: b *[self.__make_payload('\n'.join(chunk)) for chunk in chunks], self.__make_payload(f'{script_name}.save()'), exit_payload] - + for payload in payloads: response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload) if response.status_code != 200: raise DeviceException(msg='Failed to send and execute buffered script') time.sleep(0.5) - + def execute_script(self, script_name: str, blocking: bool = True, check_errors: bool = False) \ - -> None: + -> None: buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled self.toggle_buffering(False) self.execute(f'{script_name}()') self.toggle_buffering(buffering_enabled) - + if blocking: poll_payload = {"command": "shellOutput", "timeout": 3, "acceptsMultiple": True} print('Waiting for script to complete') @@ -243,72 +245,72 @@ def execute_script(self, script_name: str, blocking: bool = True, check_errors: while status.json()['status']['value'] == 'timeout': status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload) print('Script finished') - + def send_execute_script(self, script: list[str] | str, script_name: str, blocking: bool = True, check_errors: bool = False) \ - -> None: + -> None: self.send_script(script, script_name) self.execute_script(script_name, blocking) - + def __make_payload(self, value: str, check_errors: bool = False) \ - -> dict: + -> dict: return {'command': 'shellInput', 'value': value} - + def read_buffer(self, check_errors: bool = False) \ - -> list[list[str]]: + -> list[list[str]]: buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled self.toggle_buffering(False) self._buffer = [self.__read_channel_buffer(ChannelIndex(1)), self.__read_channel_buffer(ChannelIndex(2))] self.toggle_buffering(buffering_enabled) return copy.deepcopy(self._buffer) - + def __read_channel_buffer(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> list[str]: + -> list[str]: channel_idx.check(2) buffer_name = ('A' if channel_idx.get() == 1 else 'B') + '_M_BUFFER' buffer_size = len([line for line in self._buffered_script if buffer_name in line]) - 2 batch_size = 1024 // 15 - + buffer = [] offset = 1 while offset + batch_size <= buffer_size: buffer += self.__get_buffer_content(offset, batch_size, buffer_name) offset += batch_size - + if (remaining := buffer_size % batch_size) > 0: buffer += self.__get_buffer_content(offset, remaining, buffer_name) - + return buffer - + def __get_buffer_content(self, offset: int, batch_size: int, buffer_name: str, check_errors: bool = False) \ - -> list[str]: + -> list[str]: print_query = f'printbuffer({offset}, {offset + batch_size - 1}, {buffer_name})' self.execute(print_query) return self._socket.recv(1024).decode().replace('\n', '').split(', ') - + def next_buffer_element(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> float: + -> float: channel_idx.check(2) if len(self._buffer) == 0: raise Exception('Buffer is empty!') buffer_idx = channel_idx.get() - 1 return float(self._buffer[buffer_idx].pop(0)) - + def set_channel_level(self, unit: ChannelUnit, channel_idx: ChannelIndex, level: float, check_errors: bool = False) \ - -> None: + -> None: channel_idx.check(2) self.set_level(unit, channel_idx, level) - + def measure_channel(self, unit: ChannelUnit, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> float: + -> float: channel_idx.check(2) return self.measure(unit, channel_idx) - + def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> SourceMeasureChannel: + -> SourceMeasureChannel: return SourceMeasureChannel(self, channel_idx, [ChannelUnit.VOLTAGE, ChannelUnit.CURRENT], [ChannelUnit.VOLTAGE, ChannelUnit.CURRENT, ChannelUnit.POWER, ChannelUnit.RESISTANCE], True) - + def perform_linear_voltage_sweep(self, channel_idx: ChannelIndex, start_voltage: float, stop_voltage: float, increase_rate: float, current: float, blocking: bool = True, check_errors: bool = False) \ - -> None: + -> None: precision = 1000 factor = 1 if start_voltage <= stop_voltage else -1 script = f'''channel = {self.__to_channel(channel_idx)} diff --git a/py_instrument_control_lib/devices/KST3000.py b/py_instrument_control_lib/devices/KST3000.py index 28c43a0..4266fd5 100644 --- a/py_instrument_control_lib/devices/KST3000.py +++ b/py_instrument_control_lib/devices/KST3000.py @@ -9,129 +9,129 @@ from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice - class KST3000(Oscilloscope, KeysightDevice): def run(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('RUN') if check_errors: self.check_error_buffer() - + def stop(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('STOP') if check_errors: self.check_error_buffer() - + def single(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('SINGLE') if check_errors: self.check_error_buffer() - + def auto_scale(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('AUToscale') if check_errors: self.check_error_buffer() - + def set_time_range(self, range_: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'TIMebase:RANGe {range_}') if check_errors: self.check_error_buffer() - + def set_channel_offset(self, channel_idx: ChannelIndex, offset: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'CHANnel{self.__to_channel(channel_idx)}:offset {offset}') if check_errors: self.check_error_buffer() - + def set_channel_scale(self, channel_idx: ChannelIndex, scale: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'CHANnel{self.__to_channel(channel_idx)}:SCALe {scale}') if check_errors: self.check_error_buffer() - + def set_channel_range(self, channel_idx: ChannelIndex, value: float, voltage_unit: VoltageUnit, check_errors: bool = False) \ - -> None: - self.execute(f'CHANnel{self.__to_channel(channel_idx)}:RANGe range {"mV" if voltage_unit == VoltageUnit.MILLI_VOLT else ""}') + -> None: + self.execute( + f'CHANnel{self.__to_channel(channel_idx)}:RANGe range {"mV" if voltage_unit == VoltageUnit.MILLI_VOLT else ""}') if check_errors: self.check_error_buffer() - + def set_trigger_edge(self, edge: TriggerEdge, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'TRIGger:SLOPe {edge.value}') if check_errors: self.check_error_buffer() - + def set_trigger_source(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'TRIGger:SOURceCHAN{self.__to_channel(channel_idx)}') if check_errors: self.check_error_buffer() - + def set_time_delay(self, delay: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'TIMebase:DELay {delay}') if check_errors: self.check_error_buffer() - + def set_waveform_source(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'WAVeform:SOURce CHANnel{self.__to_channel(channel_idx)}') if check_errors: self.check_error_buffer() - + def get_waveform_preamble(self, check_errors: bool = False) \ - -> str: + -> str: val = self.query(f'WAVeform:PREamble?') if check_errors: self.check_error_buffer() return val - + def get_waveform_points(self, check_errors: bool = False) \ - -> int: + -> int: val = self.query('WAVeform:POINts?') if check_errors: self.check_error_buffer() return int(val) - + def set_waveform_points(self, num_points: int, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'WAVeform:POINts {num_points}') if check_errors: self.check_error_buffer() - + def set_waveform_points_mode(self, mode: str, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'WAVeform:POINts:MODE {mode}') if check_errors: self.check_error_buffer() - + def set_waveform_format(self, format_: FileFormat, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'WAVeform:FORMat {format_.value}') if check_errors: self.check_error_buffer() - + def save_waveform_data(self, file_path: str, check_errors: bool = False) \ - -> None: + -> None: self.execute('') if check_errors: self.check_error_buffer() - + def get_waveform_data(self, check_errors: bool = False) \ - -> str: + -> str: self.execute('WAVeform:DATA?') response = self._socket.recv(1024) if check_errors: self.check_error_buffer() return response - + def get_real_data(self, check_errors: bool = False) \ - -> tuple[list[float], list[float]]: + -> tuple[list[float], list[float]]: preamble = self.get_waveform_preamble().split(',') x_increment = float(preamble[4]) x_origin = float(preamble[5]) @@ -139,7 +139,7 @@ def get_real_data(self, check_errors: bool = False) \ y_increment = float(preamble[7]) y_origin = float(preamble[8]) y_reference = float(preamble[9]) - + data = self.get_waveform_data()[10:] result = ([], []) for i in range(self.get_waveform_points()): @@ -149,44 +149,44 @@ def get_real_data(self, check_errors: bool = False) \ voltage = ((voltage_data - y_reference) * y_increment) + y_origin result[0].append(time) result[1].append(voltage) - + if check_errors: self.check_error_buffer() return result - + def digitize(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'DIGitize CHAnnel{self.__to_channel(channel_idx)}') if check_errors: self.check_error_buffer() - + def get_system_setup(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('SYSTem:SETup?') if check_errors: self.check_error_buffer() - + def set_display_mode(self, display_mode: DisplayModes, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'TIMebase:MODE {display_mode.value}') if check_errors: self.check_error_buffer() - + def set_channel_display(self, channel_idx: ChannelIndex, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'CHANnel:DISPLAY{self.__to_channel(channel_idx)} {int(enable)}') if check_errors: self.check_error_buffer() - + def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> None: + -> None: raise NotImplementedError('Channels are not supported yet.') - + def __to_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> str: + -> str: channel_idx.check(4) - return str(channel_idx) - + return str(channel_idx.get()) + def check_error_buffer(self, check_errors: bool = False) \ - -> None: + -> None: pass diff --git a/py_instrument_control_lib/devices/KST33500.py b/py_instrument_control_lib/devices/KST33500.py index 4fae593..69c9f14 100644 --- a/py_instrument_control_lib/devices/KST33500.py +++ b/py_instrument_control_lib/devices/KST33500.py @@ -10,81 +10,80 @@ from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice - class KST33500(FunctionGenerator, KeysightDevice): def toggle(self, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'OUTPut {"ON" if enable else "OFF"}') if check_errors: self.check_error_buffer() - + def set_frequency(self, frequency: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'FREQuency {frequency}') if check_errors: self.check_error_buffer() - + def set_amplitude(self, frequency: float, constrain: str, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'VOLTage{":" + constrain if constrain else ""} {frequency}') if check_errors: self.check_error_buffer() - + def set_offset(self, offset: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'VOLTage:OFFSet {offset}') if check_errors: self.check_error_buffer() - + def set_phase(self, phase: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'PHASe {phase}') if check_errors: self.check_error_buffer() - + def set_function(self, function: FunctionType, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'FUNCtion {function.value}') if check_errors: self.check_error_buffer() - + def display(self, text: str, check_errors: bool = False) \ - -> None: + -> None: self.execute(f"DISP:TEXT '{text}'") if check_errors: self.check_error_buffer() - + def display_clear(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('DISPlay:TEXT:CLEar') if check_errors: self.check_error_buffer() - + def set_pulsewidth(self, pulsewidth: float, check_errors: bool = False) \ - -> None: + -> None: """ Pulswidth unit: ms - """ + """ self.execute(f'FUNCtion:PULSe:WIDTh {pulsewidth} ms') if check_errors: self.check_error_buffer() - + def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ - -> SourceChannel: + -> SourceChannel: channel_idx.check(2) return SourceChannel(self, channel_idx, [ChannelUnit.VOLTAGE]) - + def toggle_channel(self, channel_idx: ChannelIndex, enable: bool, check_errors: bool = False) \ - -> None: + -> None: channel_idx.check(2) self.toggle(enable) - + def set_channel_level(self, unit: ChannelUnit, channel_idx: ChannelIndex, level: float, check_errors: bool = False) \ - -> None: + -> None: channel_idx.check(2) self.execute(f'VOLTage:OFFSet {level / 2}') # TODO: Find proper way to set voltage level - + def check_error_buffer(self, check_errors: bool = False) \ - -> None: + -> None: pass diff --git a/py_instrument_control_lib/devices/SPD1305X.py b/py_instrument_control_lib/devices/SPD1305X.py index 320771e..ed5e322 100644 --- a/py_instrument_control_lib/devices/SPD1305X.py +++ b/py_instrument_control_lib/devices/SPD1305X.py @@ -8,63 +8,62 @@ from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice - class SPD1305X(PowerSupply, KeysightDevice): def measure(self, unit: PSUnit, channel: PSChannel, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'MEASure:{unit.value} {channel.value}') if check_errors: self.check_error_buffer() - + def toggle(self, channel: PSChannel, enable: bool, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'OUTPut {channel.value}, {"ON" if enable else "OFF"}') if check_errors: self.check_error_buffer() - + def set_current(self, channel: PSChannel, current: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{channel.value}:CURRent {current}') if check_errors: self.check_error_buffer() - + def get_current(self, channel: PSChannel, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{channel.value}:CURRent?') if check_errors: self.check_error_buffer() - + def set_voltage(self, channel: PSChannel, voltage: float, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{channel.value}:VOLTage {voltage}') if check_errors: self.check_error_buffer() - + def get_voltage(self, channel: PSChannel, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'{channel.value}:VOLTage?') if check_errors: self.check_error_buffer() - + def set_mode(self, mode: PSMode, check_errors: bool = False) \ - -> None: + -> None: self.execute(f'MODE:SET {mode.value}') if check_errors: self.check_error_buffer() - + def lock_input(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('LOCK') if check_errors: self.check_error_buffer() - + def unlock_input(self, check_errors: bool = False) \ - -> None: + -> None: self.execute('*UNLOCK') if check_errors: self.check_error_buffer() - + def check_error_buffer(self, check_errors: bool = False) \ - -> None: + -> None: pass diff --git a/py_instrument_control_lib/devices/SwitchMatrix.py b/py_instrument_control_lib/devices/SwitchMatrix.py index bebc5db..1ca3aae 100644 --- a/py_instrument_control_lib/devices/SwitchMatrix.py +++ b/py_instrument_control_lib/devices/SwitchMatrix.py @@ -11,28 +11,28 @@ class SwitchMatrix(AbstractSwitchMatrix, FloDevice): def set_row(self, row: int, check_errors: bool = False) \ - -> None: + -> None: if not (0 <= row <= 11): raise ValueError('Requirements not satisfied') - + self.execute(f'{{"cmd": "SET_ROW", "row": "{row}"}}') if check_errors: self.check_error_buffer() - + def set_col(self, col: int, check_errors: bool = False) \ - -> None: + -> None: if not (0 <= col <= 11): raise ValueError('Requirements not satisfied') - + self.execute(f'{{"cmd": "SET_COLUMN", "column": "{col}"}}') if check_errors: self.check_error_buffer() - + def set_row_col(self, row: int, col: int, check_errors: bool = False) \ - -> None: + -> None: if not (0 <= row <= 11 and 0 <= col <= 11): raise ValueError('Requirements not satisfied') - + self.execute(f'{{"cmd": "SET_ROW_COLUMN", "row": "{row}", "column": "{col}"}}') if check_errors: self.check_error_buffer() From bd2bde4d819f89e5cd8fa6aa96af79f8aa6bebb6 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:07:19 +0200 Subject: [PATCH 10/22] Refactor device emulator to send unique values --- py_instrument_control_lib/playground/Emulator.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/py_instrument_control_lib/playground/Emulator.py b/py_instrument_control_lib/playground/Emulator.py index 0ad9c98..3c13082 100644 --- a/py_instrument_control_lib/playground/Emulator.py +++ b/py_instrument_control_lib/playground/Emulator.py @@ -4,6 +4,8 @@ host = sys.argv[1] port = int(sys.argv[2]) +c = 0 + if __name__ == '__main__': s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) @@ -24,8 +26,9 @@ output += data if 'print' in data: - conn.sendall('1'.encode()) - output += 'Sent 1\n' + conn.sendall(str(c).encode()) + output += f'Sent {c}\n' + c += 1 except socket.error as e: print("Error Occured.", e) From a96891fece17d03e50b46fcae72f2bbf01f1f970 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:09:09 +0200 Subject: [PATCH 11/22] Implement abstract FutureValue to store placeholder values when buffering --- .../channels/FutureValue.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 py_instrument_control_lib/channels/FutureValue.py diff --git a/py_instrument_control_lib/channels/FutureValue.py b/py_instrument_control_lib/channels/FutureValue.py new file mode 100644 index 0000000..a7c07ce --- /dev/null +++ b/py_instrument_control_lib/channels/FutureValue.py @@ -0,0 +1,19 @@ +from typing import Any + +from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex + + +class FutureValue: + + def __init__(self, channel_idx: ChannelIndex, buffer_index: int): + self.channel_index = channel_idx + self.buffer_index = buffer_index + + def get(self, buffer: list) -> Any: + channel_buffer = buffer[self.channel_index.get() - 1] + if not channel_buffer or len(channel_buffer) <= self.buffer_index: + raise ValueError('Buffer is not filled appropriately.') + return channel_buffer[self.buffer_index] + + def __repr__(self) -> str: + return f'FutureValue @ {self.buffer_index} @ C{self.channel_index.get()}' From 49493e245ce0f35b2caadb72922d08f900051b88 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:10:07 +0200 Subject: [PATCH 12/22] Refactor Channels --- py_instrument_control_lib/channels/Channel.py | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/py_instrument_control_lib/channels/Channel.py b/py_instrument_control_lib/channels/Channel.py index 3f840f6..cc9620a 100644 --- a/py_instrument_control_lib/channels/Channel.py +++ b/py_instrument_control_lib/channels/Channel.py @@ -9,10 +9,13 @@ class ParentDevice(Protocol): def toggle_buffering(self, enable: bool) -> None: pass - def execute_buffered_script(self, blocking: bool, check_errors: bool) -> None: + def execute_buffered_script(self, blocking: bool, check_errors: bool = False) -> None: + pass + + def read_buffer(self, check_errors: bool = False) -> list: pass - def read_buffer(self, check_errors: bool) -> list: + def get_buffer(self, check_errors: bool = False) -> list: pass def next_buffer_element(self, channel_idx: ChannelIndex) -> Any: @@ -36,21 +39,20 @@ def measure_channel(self, unit: ChannelUnit, channel_number: ChannelIndex, check class Channel(ABC): - def __init__(self, device: ParentDevice, channel_idx: ChannelIndex, supported_source_units: list[ChannelUnit], - supported_measure_units: list[ChannelUnit], buffering_available: bool = False): + def __init__(self, device: ParentDevice, channel_idx: ChannelIndex, buffering_available: bool = False): self._device = device self._channel_idx = channel_idx - self._supported_source_units = supported_source_units - self._supported_measure_units = supported_measure_units self._buffering_available = buffering_available + self._buffering_enabled = False def toggle_buffering(self, enable: bool) -> None: self.__check_buffering_available() + self._buffering_enabled = enable self._device.toggle_buffering(enable) def execute_buffered_script(self, blocking: bool, check_errors: bool = False) -> None: self.__check_buffering_available() - self._device.execute_buffered_script(blocking, check_errors) + self._device.execute_buffered_script(blocking=blocking, check_errors=check_errors) def read_buffer(self, check_errors: bool = False) -> list: self.__check_buffering_available() @@ -70,11 +72,12 @@ class SourceChannel(Channel): def __init__(self, device: ParentDevice, channel_idx: ChannelIndex, supported_source_units: list[ChannelUnit], buffering_available: bool = False): - super().__init__(device, channel_idx, supported_source_units, [], buffering_available) + Channel.__init__(self, device, channel_idx, buffering_available) + self._supported_source_units = supported_source_units def set_level(self, unit: ChannelUnit, value: float, check_errors: bool = False) -> None: if unit not in self._supported_source_units: - raise ValueError(f"Unit {unit} is not supported by this channel.") + raise ValueError(f"Setting unit {unit} is not supported by this channel.") self._device.set_channel_level(unit, self._channel_idx, value, check_errors) def toggle(self, enabled: bool, check_errors: bool = False) -> None: @@ -86,7 +89,9 @@ class MeasureChannel(Channel): def __init__(self, device: ParentDevice, channel_idx: ChannelIndex, supported_measure_units: list[ChannelUnit], buffering_available: bool = False): - super().__init__(device, channel_idx, [], supported_measure_units, buffering_available) + Channel.__init__(self, device, channel_idx, buffering_available) + self._supported_measure_units = supported_measure_units + self._buffer_index: dict = {} def measure(self, unit: ChannelUnit, check_errors: bool = False) -> float: if unit not in self._supported_source_units: @@ -98,8 +103,5 @@ class SourceMeasureChannel(SourceChannel, MeasureChannel): def __init__(self, device: ParentDevice, channel_idx: ChannelIndex, supported_source_units: list[ChannelUnit], supported_measure_units: list[ChannelUnit], buffering_available: bool = False): - self._device = device - self._channel_idx = channel_idx - self._supported_source_units = supported_source_units - self._supported_measure_units = supported_measure_units - self._buffering_available = buffering_available + SourceChannel.__init__(self, device, channel_idx, supported_source_units, buffering_available) + MeasureChannel.__init__(self, device, channel_idx, supported_measure_units, buffering_available) From 57856a7196873a30b484391dec2d3bffd9a9647e Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:12:54 +0200 Subject: [PATCH 13/22] Implement future value mechanism in Channel --- py_instrument_control_lib/channels/Channel.py | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/py_instrument_control_lib/channels/Channel.py b/py_instrument_control_lib/channels/Channel.py index cc9620a..f38ae3f 100644 --- a/py_instrument_control_lib/channels/Channel.py +++ b/py_instrument_control_lib/channels/Channel.py @@ -2,6 +2,7 @@ from typing import Protocol, Any from py_instrument_control_lib.channels.ChannelEnums import ChannelUnit, ChannelIndex +from py_instrument_control_lib.channels.FutureValue import FutureValue class ParentDevice(Protocol): @@ -56,7 +57,17 @@ def execute_buffered_script(self, blocking: bool, check_errors: bool = False) -> def read_buffer(self, check_errors: bool = False) -> list: self.__check_buffering_available() - return self._device.read_buffer(check_errors) + return self._device.read_buffer(check_errors=check_errors) + + def read_values(self, blocking: bool, *value_lists: list[Any]) -> None: + if self._buffering_enabled: + if self._device.get_buffer() is None: + self.execute_buffered_script(blocking) + self.read_buffer() + for value_list in value_lists: + for index, value in enumerate(value_list): + if isinstance(value, FutureValue): + value_list[index] = value.get(self._device.get_buffer()) def next_buffer_element(self) -> Any: self.__check_buffering_available() @@ -94,9 +105,18 @@ def __init__(self, device: ParentDevice, channel_idx: ChannelIndex, supported_me self._buffer_index: dict = {} def measure(self, unit: ChannelUnit, check_errors: bool = False) -> float: - if unit not in self._supported_source_units: - raise ValueError(f"Unit {unit} is not supported by this channel.") - return self._device.measure_channel(unit, self._channel_idx, check_errors) + if unit not in self._supported_measure_units: + raise ValueError(f"Measuring unit {unit} is not supported by this channel.") + + value = self._device.measure_channel(unit, self._channel_idx, check_errors) + + if self._buffering_enabled: + if self._channel_idx not in self._buffer_index: + self._buffer_index[self._channel_idx] = 0 + value = FutureValue(self._channel_idx, self._buffer_index[self._channel_idx]) + self._buffer_index[self._channel_idx] += 1 + + return value class SourceMeasureChannel(SourceChannel, MeasureChannel): From 0f2e7c1bb9ab4b91dbf6ffc58aa13f9c47d1684f Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:13:35 +0200 Subject: [PATCH 14/22] Add new test scripts --- .../playground/kei2600_read_test.py | 35 +++++++++++ .../playground/test_kei2600_buffering.py | 20 ++++--- .../playground/test_kei2600_futures.py | 60 +++++++++++++++++++ 3 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 py_instrument_control_lib/playground/kei2600_read_test.py create mode 100644 py_instrument_control_lib/playground/test_kei2600_futures.py diff --git a/py_instrument_control_lib/playground/kei2600_read_test.py b/py_instrument_control_lib/playground/kei2600_read_test.py new file mode 100644 index 0000000..e752cf2 --- /dev/null +++ b/py_instrument_control_lib/playground/kei2600_read_test.py @@ -0,0 +1,35 @@ +from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex +from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig +from py_instrument_control_lib.devices.KEI2600 import KEI2600 + +smu_config = TCPDeviceConfig(ip='132.231.14.169', port=5025, timeout=5) +smu = KEI2600(smu_config) +smu.connect() +smu.toggle_channel(ChannelIndex(1), False) +smu.toggle_channel(ChannelIndex(2), False) + +voltage = 1.3 +current = 0.1 +samples = 500 +opposite_voltage_time = 0.1 + +script = f'''A_M_BUFFER = smua.makebuffer({samples}) +A_M_BUFFER.appendmode = 1 +smua.source.levelv = {voltage} +smua.source.leveli = {current} +smua.source.output = smua.OUTPUT_ON +delay({opposite_voltage_time}) +smua.source.levelv = {voltage} +smua.source.leveli = {current} +smua.source.output = smua.OUTPUT_ON +for i = 1, {samples} do + smua.measure.r(A_M_BUFFER) +end +smua.source.output = smua.OUTPUT_OFF'''.split('\n') + +smu.send_execute_script(script, 'NewMemristorReadTest', blocking=True) +resistances = smu.read_channel_buffer_2('A_M_BUFFER', samples) + +print(resistances) + +smu.disconnect() diff --git a/py_instrument_control_lib/playground/test_kei2600_buffering.py b/py_instrument_control_lib/playground/test_kei2600_buffering.py index 3e9970d..8dfb721 100644 --- a/py_instrument_control_lib/playground/test_kei2600_buffering.py +++ b/py_instrument_control_lib/playground/test_kei2600_buffering.py @@ -9,7 +9,7 @@ smu.connect() smu.toggle_buffering(True) -iterations = 10 +iterations = 1 smu.set_limit(ChannelUnit.CURRENT, ChannelIndex(1), 1) smu.set_limit(ChannelUnit.CURRENT, ChannelIndex(2), 1) @@ -26,14 +26,20 @@ smu.toggle_source_sink(channel, True) smu.toggle_channel(ChannelIndex(1), True) smu.toggle_channel(ChannelIndex(2), True) -smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(1), 1) +smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(1), 1.2) smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(2), 1) + +voltages_1 = [] +voltages_2 = [] +currents_1 = [] +currents_2 = [] + for i in range(iterations): - smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) - smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) - smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) - smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1)) - smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2)) + voltages_1.append(smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1))) + currents_1.append(smu.measure(ChannelUnit.CURRENT, ChannelIndex(1))) + voltages_2.append(smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(2))) + voltages_1.append(smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1))) + currents_2.append(smu.measure(ChannelUnit.CURRENT, ChannelIndex(2))) smu.toggle_channel(ChannelIndex(1), False) smu.toggle_channel(ChannelIndex(2), False) diff --git a/py_instrument_control_lib/playground/test_kei2600_futures.py b/py_instrument_control_lib/playground/test_kei2600_futures.py new file mode 100644 index 0000000..7d7b507 --- /dev/null +++ b/py_instrument_control_lib/playground/test_kei2600_futures.py @@ -0,0 +1,60 @@ +import time + +from py_instrument_control_lib.channels.Channel import SourceMeasureChannel +from py_instrument_control_lib.channels.ChannelEnums import ChannelUnit, ChannelIndex +from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig +from py_instrument_control_lib.device_types.SMU import SMUDisplay, SMUMode, SourceFunction, \ + SourceOffMode, SourceSettling, Autozero +from py_instrument_control_lib.devices.KEI2600 import KEI2600 + +smu_config = TCPDeviceConfig(ip='132.231.14.169', port=5025, timeout=5) +smu = KEI2600(smu_config) +smu.connect() + +iterations = 2 + +smu.set_limit(ChannelUnit.CURRENT, ChannelIndex(1), 1) +smu.set_limit(ChannelUnit.CURRENT, ChannelIndex(2), 1) +smu.display_measure_function(ChannelIndex(1), SMUDisplay.MEASURE_DC_AMPS) +smu.display_measure_function(ChannelIndex(2), SMUDisplay.MEASURE_DC_VOLTS) +for channel in (ChannelIndex(1), ChannelIndex(2)): + smu.toggle_measure_analog_filter(channel, True) + smu.toggle_autorange(ChannelUnit.VOLTAGE, channel, SMUMode.MEASURE, True) + smu.set_measure_plc(channel, 2) + smu.set_measure_auto_zero(channel, Autozero.ONCE) + smu.set_source_function(channel, SourceFunction.DC_VOLTS) + smu.set_source_off_mode(channel, SourceOffMode.OUTPUT_ZERO) + smu.set_source_settling(channel, SourceSettling.SMOOTH) + smu.toggle_source_sink(channel, True) + + +voltages_1 = [] +voltages_2 = [] +currents_1 = [] +currents_2 = [] + +channel_1: SourceMeasureChannel = smu.get_channel(ChannelIndex(1)) +channel_2: SourceMeasureChannel = smu.get_channel(ChannelIndex(2)) + +channel_1.toggle_buffering(True) +channel_2.toggle_buffering(True) + +smu.toggle_channel(ChannelIndex(1), True) +smu.toggle_channel(ChannelIndex(2), True) +smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(1), 1.42) +smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(2), 1) + +for i in range(iterations): + voltages_1.append(channel_1.measure(ChannelUnit.VOLTAGE)) + currents_1.append(channel_1.measure(ChannelUnit.CURRENT)) + voltages_2.append(channel_2.measure(ChannelUnit.VOLTAGE)) + voltages_1.append(channel_1.measure(ChannelUnit.VOLTAGE)) + currents_2.append(channel_2.measure(ChannelUnit.CURRENT)) + +channel_1.toggle(False) +channel_2.toggle(False) + +channel_1.read_values(True, voltages_1, currents_1) +channel_2.read_values(True, voltages_2, currents_2) + +pass From 0c1e1980a185534866fdbeeb8192da3a707f5e1f Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 13:36:01 +0200 Subject: [PATCH 15/22] Adapt buffer handling of KEI2600 --- py_instrument_control_lib/devices/KEI2600.py | 24 +++++++++++---- .../playground/kei2600_read_test.py | 2 +- .../playground/test_kei2600_buffering.py | 2 +- specifications/KEI2600.json | 30 ++++++++++++++----- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index e397f0f..ca975f5 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -210,6 +210,7 @@ def execute_buffered_script(self, blocking: bool = True, script_name: str = 'Buf self._buffered_script.insert(0, f'{buffer} = smu{buffer[0].lower()}.makebuffer({buffer_entries})') self._buffered_script.insert(1, f'{buffer}.appendmode = 1') self.send_execute_script(self._buffered_script, script_name, blocking) + self._buffer = None def send_script(self, script: list[str] | str, script_name: str, check_errors: bool = False) \ -> None: @@ -239,6 +240,7 @@ def execute_script(self, script_name: str, blocking: bool = True, check_errors: self.toggle_buffering(buffering_enabled) if blocking: + time.sleep(2) poll_payload = {"command": "shellOutput", "timeout": 3, "acceptsMultiple": True} print('Waiting for script to complete') status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload) @@ -255,32 +257,42 @@ def __make_payload(self, value: str, check_errors: bool = False) \ -> dict: return {'command': 'shellInput', 'value': value} - def read_buffer(self, check_errors: bool = False) \ + def read_channel_buffers(self, script: list[str] = None, check_errors: bool = False) \ -> list[list[str]]: + if script is None: + script = self._buffered_script buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled self.toggle_buffering(False) - self._buffer = [self.__read_channel_buffer(ChannelIndex(1)), self.__read_channel_buffer(ChannelIndex(2))] + self._buffer = [self.read_channel_buffer(script, ChannelIndex( + 1)), self.read_channel_buffer(script, ChannelIndex(2))] self.toggle_buffering(buffering_enabled) return copy.deepcopy(self._buffer) - def __read_channel_buffer(self, channel_idx: ChannelIndex, check_errors: bool = False) \ + def read_channel_buffer(self, script: list[str], channel_idx: ChannelIndex, check_errors: bool = False) \ -> list[str]: channel_idx.check(2) buffer_name = ('A' if channel_idx.get() == 1 else 'B') + '_M_BUFFER' - buffer_size = len([line for line in self._buffered_script if buffer_name in line]) - 2 - batch_size = 1024 // 15 + buffer_size = len([line for line in script if buffer_name in line]) - 2 + return self.read_buffer(buffer_name, buffer_size, check_errors) + def read_buffer(self, buffer_name: str, buffer_size: int, check_errors: bool = False) \ + -> list[str]: + batch_size = 1024 // 15 buffer = [] offset = 1 while offset + batch_size <= buffer_size: buffer += self.__get_buffer_content(offset, batch_size, buffer_name) offset += batch_size - if (remaining := buffer_size % batch_size) > 0: buffer += self.__get_buffer_content(offset, remaining, buffer_name) + self.execute(f'{buffer_name}.clear()') return buffer + def get_buffer(self, check_errors: bool = False) \ + -> list[list[str]]: + return copy.deepcopy(self._buffer) if hasattr(self, '_buffer') else None + def __get_buffer_content(self, offset: int, batch_size: int, buffer_name: str, check_errors: bool = False) \ -> list[str]: print_query = f'printbuffer({offset}, {offset + batch_size - 1}, {buffer_name})' diff --git a/py_instrument_control_lib/playground/kei2600_read_test.py b/py_instrument_control_lib/playground/kei2600_read_test.py index e752cf2..32c9792 100644 --- a/py_instrument_control_lib/playground/kei2600_read_test.py +++ b/py_instrument_control_lib/playground/kei2600_read_test.py @@ -28,7 +28,7 @@ smua.source.output = smua.OUTPUT_OFF'''.split('\n') smu.send_execute_script(script, 'NewMemristorReadTest', blocking=True) -resistances = smu.read_channel_buffer_2('A_M_BUFFER', samples) +resistances = smu.read_buffer('A_M_BUFFER', samples) print(resistances) diff --git a/py_instrument_control_lib/playground/test_kei2600_buffering.py b/py_instrument_control_lib/playground/test_kei2600_buffering.py index 8dfb721..cc9ad18 100644 --- a/py_instrument_control_lib/playground/test_kei2600_buffering.py +++ b/py_instrument_control_lib/playground/test_kei2600_buffering.py @@ -44,7 +44,7 @@ smu.toggle_channel(ChannelIndex(2), False) smu.execute_buffered_script(blocking=True) -smu.read_buffer() +smu.read_channel_buffers() for i in range(iterations): print(f'smu.measure(ChannelUnit.CURRENT, ChannelIndex(1)) = {smu.next_buffer_element(ChannelIndex(1))}') diff --git a/specifications/KEI2600.json b/specifications/KEI2600.json index b2b36cb..4ab521e 100644 --- a/specifications/KEI2600.json +++ b/specifications/KEI2600.json @@ -157,7 +157,7 @@ "name": "execute_buffered_script", "type": "code", "signature": "blocking: bool = True, script_name: str = 'BufferedScript'", - "code": "for buffer in ('A_M_BUFFER', 'B_M_BUFFER'):\n buffer_entries = len([line for line in self._buffered_script if buffer in line])\n self._buffered_script.insert(0, f'{buffer} = smu{buffer[0].lower()}.makebuffer({buffer_entries})')\n self._buffered_script.insert(1, f'{buffer}.appendmode = 1')\nself.send_execute_script(self._buffered_script, script_name, blocking)" + "code": "for buffer in ('A_M_BUFFER', 'B_M_BUFFER'):\n buffer_entries = len([line for line in self._buffered_script if buffer in line])\n self._buffered_script.insert(0, f'{buffer} = smu{buffer[0].lower()}.makebuffer({buffer_entries})')\n self._buffered_script.insert(1, f'{buffer}.appendmode = 1')\nself.send_execute_script(self._buffered_script, script_name, blocking)\nself._buffer = None" }, { "name": "send_script", @@ -169,7 +169,7 @@ "name": "execute_script", "type": "code", "signature": "script_name: str, blocking: bool = True", - "code": "buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled\nself.toggle_buffering(False)\nself.execute(f'{script_name}()')\nself.toggle_buffering(buffering_enabled)\n\nif blocking:\n poll_payload = {\"command\": \"shellOutput\", \"timeout\": 3, \"acceptsMultiple\": True}\n print('Waiting for script to complete')\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload)\n while status.json()['status']['value'] == 'timeout':\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload)\n print('Script finished')" + "code": "buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled\nself.toggle_buffering(False)\nself.execute(f'{script_name}()')\nself.toggle_buffering(buffering_enabled)\n\nif blocking:\n time.sleep(2)\n poll_payload = {\"command\": \"shellOutput\", \"timeout\": 3, \"acceptsMultiple\": True}\n print('Waiting for script to complete')\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload)\n while status.json()['status']['value'] == 'timeout':\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json=poll_payload)\n print('Script finished')" }, { "name": "send_execute_script", @@ -185,18 +185,32 @@ "code": "return {'command': 'shellInput', 'value': value}" }, { - "name": "read_buffer", + "name": "read_channel_buffers", "type": "code", - "signature": "", + "signature": "script: list[str] = None", "return": "list[list[str]]", - "code": "buffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled\nself.toggle_buffering(False)\nself._buffer = [self.__read_channel_buffer(ChannelIndex(1)), self.__read_channel_buffer(ChannelIndex(2))]\nself.toggle_buffering(buffering_enabled)\nreturn copy.deepcopy(self._buffer)" + "code": "if script is None:\n script = self._buffered_script\nbuffering_enabled = hasattr(self, '_buffering_enabled') and self._buffering_enabled\nself.toggle_buffering(False)\nself._buffer = [self.read_channel_buffer(script, ChannelIndex(1)), self.read_channel_buffer(script, ChannelIndex(2))]\nself.toggle_buffering(buffering_enabled)\nreturn copy.deepcopy(self._buffer)" }, { - "name": "__read_channel_buffer", + "name": "read_channel_buffer", "type": "code", - "signature": "channel_idx: ChannelIndex", + "signature": "script: list[str], channel_idx: ChannelIndex", "return": "list[str]", - "code": "channel_idx.check(2)\nbuffer_name = ('A' if channel_idx.get() == 1 else 'B') + '_M_BUFFER'\nbuffer_size = len([line for line in self._buffered_script if buffer_name in line]) - 2\nbatch_size = 1024 // 15\n\nbuffer = []\noffset = 1\nwhile offset + batch_size <= buffer_size:\n buffer += self.__get_buffer_content(offset, batch_size, buffer_name)\n offset += batch_size\n\nif (remaining := buffer_size % batch_size) > 0:\n buffer += self.__get_buffer_content(offset, remaining, buffer_name)\n\nreturn buffer" + "code": "channel_idx.check(2)\nbuffer_name = ('A' if channel_idx.get() == 1 else 'B') + '_M_BUFFER'\nbuffer_size = len([line for line in script if buffer_name in line]) - 2\nreturn self.read_buffer(buffer_name, buffer_size, check_errors)" + }, + { + "name": "read_buffer", + "type": "code", + "signature": "buffer_name: str, buffer_size: int", + "return": "list[str]", + "code": "batch_size = 1024 // 15\nbuffer = []\noffset = 1\nwhile offset + batch_size <= buffer_size:\n buffer += self.__get_buffer_content(offset, batch_size, buffer_name)\n offset += batch_size\nif (remaining := buffer_size % batch_size) > 0:\n buffer += self.__get_buffer_content(offset, remaining, buffer_name)\n\nself.execute(f'{buffer_name}.clear()')\nreturn buffer" + }, + { + "name": "get_buffer", + "type": "code", + "signature": "", + "return": "list[list[str]]", + "code": "return copy.deepcopy(self._buffer) if hasattr(self, '_buffer') else None" }, { "name": "__get_buffer_content", From 409a7aa2a343068d2faa173df60529a1728afeda Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 15:11:46 +0200 Subject: [PATCH 16/22] Fix sending script --- py_instrument_control_lib/devices/KEI2600.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index ca975f5..96660a2 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -227,10 +227,10 @@ def send_script(self, script: list[str] | str, script_name: str, check_errors: b exit_payload] for payload in payloads: + time.sleep(0.2) response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload) if response.status_code != 200: raise DeviceException(msg='Failed to send and execute buffered script') - time.sleep(0.5) def execute_script(self, script_name: str, blocking: bool = True, check_errors: bool = False) \ -> None: From 41092e4bff823cae8667885f052119ce75a2f21f Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 15:39:53 +0200 Subject: [PATCH 17/22] Fix sending script properly --- py_instrument_control_lib/devices/KEI2600.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index 96660a2..9418745 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -221,11 +221,16 @@ def send_script(self, script: list[str] | str, script_name: str, check_errors: b chunk_size = 32 chunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)] exit_payload: dict = {'command': 'keyInput', 'value': 'K'} - payloads: list[dict] = [exit_payload, - *[self.__make_payload('\n'.join(chunk)) for chunk in chunks], + payloads: list[dict] = list([exit_payload, + *list([self.__make_payload('\n'.join(chunk)) for chunk in chunks]), self.__make_payload(f'{script_name}.save()'), - exit_payload] + exit_payload]) + class NullOutput: + def write(self, _): + pass + + print(payloads, file=NullOutput()) for payload in payloads: time.sleep(0.2) response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload) From 745269be5c84ebcd5f62f29a35604bd395ef4979 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Mon, 8 Apr 2024 15:44:38 +0200 Subject: [PATCH 18/22] Include fix for sending scripts in specification --- py_instrument_control_lib/devices/KEI2600.py | 6 +++--- specifications/KEI2600.json | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/py_instrument_control_lib/devices/KEI2600.py b/py_instrument_control_lib/devices/KEI2600.py index 9418745..ec48823 100644 --- a/py_instrument_control_lib/devices/KEI2600.py +++ b/py_instrument_control_lib/devices/KEI2600.py @@ -222,9 +222,9 @@ def send_script(self, script: list[str] | str, script_name: str, check_errors: b chunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)] exit_payload: dict = {'command': 'keyInput', 'value': 'K'} payloads: list[dict] = list([exit_payload, - *list([self.__make_payload('\n'.join(chunk)) for chunk in chunks]), - self.__make_payload(f'{script_name}.save()'), - exit_payload]) + *list([self.__make_payload('\n'.join(chunk)) for chunk in chunks]), + self.__make_payload(f'{script_name}.save()'), + exit_payload]) class NullOutput: def write(self, _): diff --git a/specifications/KEI2600.json b/specifications/KEI2600.json index 4ab521e..0554320 100644 --- a/specifications/KEI2600.json +++ b/specifications/KEI2600.json @@ -163,7 +163,7 @@ "name": "send_script", "type": "code", "signature": "script: list[str] | str, script_name: str", - "code": "if isinstance(script, str):\n script = script.split('\\n')\nscript = [f'loadscript {script_name}', *script, 'endscript']\n\nchunk_size = 32\nchunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)]\nexit_payload: dict = {'command': 'keyInput', 'value': 'K'}\npayloads: list[dict] = [exit_payload,\n *[self.__make_payload('\\n'.join(chunk)) for chunk in chunks],\n self.__make_payload(f'{script_name}.save()'),\n exit_payload]\n\nfor payload in payloads:\n response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload)\n if response.status_code != 200:\n raise DeviceException(msg='Failed to send and execute buffered script')\n time.sleep(0.5)" + "code": "if isinstance(script, str):\n script = script.split('\\n')\nscript = [f'loadscript {script_name}', *script, 'endscript']\n\nchunk_size = 32\nchunks = [script[i:i + chunk_size] for i in range(0, len(script), chunk_size)]\nexit_payload: dict = {'command': 'keyInput', 'value': 'K'}\npayloads: list[dict] = list([exit_payload,\n*list([self.__make_payload('\\n'.join(chunk)) for chunk in chunks]),\nself.__make_payload(f'{script_name}.save()'),\nexit_payload])\n\nclass NullOutput:\n def write(self, _):\n pass\n\nprint(payloads, file=NullOutput())\nfor payload in payloads:\n time.sleep(0.2)\n response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload)\n if response.status_code != 200:\n raise DeviceException(msg='Failed to send and execute buffered script')" }, { "name": "execute_script", From 4475f574ddd5c5781ad99b27728f6b3c7bd87fb4 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Tue, 9 Apr 2024 09:14:32 +0200 Subject: [PATCH 19/22] Implement failsafe mechanism for switch matrix --- .../devices/SwitchMatrix.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/py_instrument_control_lib/devices/SwitchMatrix.py b/py_instrument_control_lib/devices/SwitchMatrix.py index 1ca3aae..c1f5a2d 100644 --- a/py_instrument_control_lib/devices/SwitchMatrix.py +++ b/py_instrument_control_lib/devices/SwitchMatrix.py @@ -3,13 +3,44 @@ Any changes made to this file are overwritten if you regenerate this module. Only make changes in the source file. """ +import socket +import time +from py_instrument_control_lib.channels.Channel import Channel +from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex +from py_instrument_control_lib.device_base.DeviceException import DeviceException from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix from py_instrument_control_lib.manufacturers.FloDevice import FloDevice class SwitchMatrix(AbstractSwitchMatrix, FloDevice): + def query(self, query: str) -> str: + query += '\n' if not query.endswith('\n') else '' + print(query, end='') + self._socket.sendall(query.encode()) + response = self._socket.recv(1024) + + return response.decode() + + def execute(self, command: str, ttl: int = 10, num_tries: int = 0) -> None: + if num_tries >= ttl: + raise DeviceException(msg=f'Fatal error, cannot establish connection after {ttl} attempts. Aborting.') + + if num_tries > 0: + time.sleep(2 ** num_tries) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + self._socket.settimeout(self._config.timeout) + self._socket.connect((self._config.ip, self._config.port)) + except TimeoutError | OSError: + print(f'Error when trying to execute {command}. Already tried {num_tries} times.') + + try: + self.query(command) + except TimeoutError | OSError: + self.execute(command, ttl, num_tries + 1) + def set_row(self, row: int, check_errors: bool = False) \ -> None: if not (0 <= row <= 11): @@ -36,3 +67,6 @@ def set_row_col(self, row: int, col: int, check_errors: bool = False) \ self.execute(f'{{"cmd": "SET_ROW_COLUMN", "row": "{row}", "column": "{col}"}}') if check_errors: self.check_error_buffer() + + def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) -> Channel: + raise NotImplementedError('Channels are not supported when using a switch matrix.') From a12176ba1451fe04c3334f9e382ccb767dbb17ab Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Wed, 17 Apr 2024 13:59:10 +0200 Subject: [PATCH 20/22] Add new failsafe methods of switch matrix to specification --- .../devices/SwitchMatrix.py | 14 ++++++++------ specifications/SwitchMatrix.json | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/py_instrument_control_lib/devices/SwitchMatrix.py b/py_instrument_control_lib/devices/SwitchMatrix.py index c1f5a2d..8354369 100644 --- a/py_instrument_control_lib/devices/SwitchMatrix.py +++ b/py_instrument_control_lib/devices/SwitchMatrix.py @@ -3,27 +3,28 @@ Any changes made to this file are overwritten if you regenerate this module. Only make changes in the source file. """ + import socket import time -from py_instrument_control_lib.channels.Channel import Channel -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex from py_instrument_control_lib.device_base.DeviceException import DeviceException from py_instrument_control_lib.device_types.AbstractSwitchMatrix import AbstractSwitchMatrix +from py_instrument_control_lib.device_types.SMU import * from py_instrument_control_lib.manufacturers.FloDevice import FloDevice class SwitchMatrix(AbstractSwitchMatrix, FloDevice): - def query(self, query: str) -> str: + def query(self, query: str, check_errors: bool = False) \ + -> None: query += '\n' if not query.endswith('\n') else '' print(query, end='') self._socket.sendall(query.encode()) response = self._socket.recv(1024) - return response.decode() - def execute(self, command: str, ttl: int = 10, num_tries: int = 0) -> None: + def execute(self, command: str, ttl: int = 10, num_tries: int = 0, check_errors: bool = False) \ + -> None: if num_tries >= ttl: raise DeviceException(msg=f'Fatal error, cannot establish connection after {ttl} attempts. Aborting.') @@ -68,5 +69,6 @@ def set_row_col(self, row: int, col: int, check_errors: bool = False) \ if check_errors: self.check_error_buffer() - def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) -> Channel: + def get_channel(self, channel_idx: ChannelIndex, check_errors: bool = False) \ + -> None: raise NotImplementedError('Channels are not supported when using a switch matrix.') diff --git a/specifications/SwitchMatrix.json b/specifications/SwitchMatrix.json index 578e0db..4761a91 100644 --- a/specifications/SwitchMatrix.json +++ b/specifications/SwitchMatrix.json @@ -2,7 +2,20 @@ "name": "SwitchMatrix", "devicetype": "AbstractSwitchMatrix", "superclass": "FloDevice", + "imports": "import socket\nimport time", "commands": [ + { + "name": "query", + "type": "code", + "signature": "query: str", + "code": "query += '\\n' if not query.endswith('\\n') else ''\nprint(query, end='')\nself._socket.sendall(query.encode())\nresponse = self._socket.recv(1024)\nreturn response.decode()" + }, + { + "name": "execute", + "type": "code", + "signature": "command: str, ttl: int = 10, num_tries: int = 0", + "code": "if num_tries >= ttl:\n raise DeviceException(msg=f'Fatal error, cannot establish connection after {ttl} attempts. Aborting.')\n\nif num_tries > 0:\n time.sleep(2 ** num_tries)\n self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n try:\n self._socket.settimeout(self._config.timeout)\n self._socket.connect((self._config.ip, self._config.port))\n except TimeoutError | OSError:\n print(f'Error when trying to execute {command}. Already tried {num_tries} times.')\n\ntry:\n self.query(command)\nexcept TimeoutError | OSError:\n self.execute(command, ttl, num_tries + 1)" + }, { "name": "set_row", "type": "generated", @@ -23,6 +36,12 @@ "signature": "row: int, col: int", "requirements": "0 <= row <= 11 and 0 <= col <= 11", "command": "f'{{\"cmd\": \"SET_ROW_COLUMN\", \"row\": \"{row}\", \"column\": \"{col}\"}}'" + }, + { + "name": "get_channel", + "type": "code", + "signature": "channel_idx: ChannelIndex", + "code": "raise NotImplementedError('Channels are not supported when using a switch matrix.')" } ] } From 3507fccb97c20326eee6599c18d8fe7b058266b0 Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Wed, 17 Apr 2024 14:25:01 +0200 Subject: [PATCH 21/22] Refactor device emulator --- .../device_generation/DEFAULT_IMPORTS.py | 1 + .../playground/Emulator.py | 44 ++++++++++++------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py b/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py index 8aa5c88..2fc6112 100644 --- a/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py +++ b/py_instrument_control_lib/device_generation/DEFAULT_IMPORTS.py @@ -9,3 +9,4 @@ from py_instrument_control_lib.manufacturers.FloDevice import FloDevice from py_instrument_control_lib.manufacturers.KeithleyDevice import KeithleyDevice from py_instrument_control_lib.manufacturers.KeysightDevice import KeysightDevice +from py_instrument_control_lib.device_base.DeviceException import DeviceException diff --git a/py_instrument_control_lib/playground/Emulator.py b/py_instrument_control_lib/playground/Emulator.py index 3c13082..b080f23 100644 --- a/py_instrument_control_lib/playground/Emulator.py +++ b/py_instrument_control_lib/playground/Emulator.py @@ -1,39 +1,49 @@ import socket import sys +from random import randrange -host = sys.argv[1] -port = int(sys.argv[2]) +response_mode = 0 # Reponse to print messages. 0: Static number, 1: Incrementing counter, 2: Random number in range +static_number = 1 # If static (0) +counter = 0 # If counter (1) +lower_bound, upper_bound = int(1e9), int(1e12) # If random number (2) + + +def response_data() -> int: + match response_mode: + case 0: + return static_number + case 1: + global counter + counter += 1 + return counter + case 2: + return randrange(int(1e9), int(1e12)) -c = 0 if __name__ == '__main__': + if len(sys.argv) != 2: + print('Invalid number of arguments. Expected IP and port of host.') + host = sys.argv[1] + port = int(sys.argv[2]) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) - s.listen(1) conn, addr = s.accept() - - output = '' - while True: try: data = conn.recv(1024) - if not data: break - data = data.decode('UTF-8') - - output += data - if 'print' in data: - conn.sendall(str(c).encode()) - output += f'Sent {c}\n' - c += 1 + print('Recv:', data) + if 'print' in data or 'IDN?' in data: + response = response_data() + conn.sendall(str(response).encode()) + print('Sent:', response) except socket.error as e: print("Error Occured.", e) break - print(output) - conn.close() From c11a2638a8327c0b530e2ebb9c08bf09c5517eeb Mon Sep 17 00:00:00 2001 From: Alexander Braml Date: Wed, 17 Apr 2024 14:51:37 +0200 Subject: [PATCH 22/22] Refactor playground --- .../playground/Emulator.py | 2 - ...2600_buffering.py => KEI2600_Buffering.py} | 0 ..._kei2600_futures.py => KEI2600_Futures.py} | 0 .../{emulator.sh => LocalEmulator.sh} | 1 - .../playground/RawSocketUsage.py | 19 --- ..._channels.py => Test_Channel_Buffering.py} | 0 .../playground/kei2600_read_test.py | 35 ---- .../playground/list_all_commands.py | 7 - .../playground/test_devices.py | 156 ------------------ .../playground/test_smu_sweep.py | 11 -- 10 files changed, 231 deletions(-) rename py_instrument_control_lib/playground/{test_kei2600_buffering.py => KEI2600_Buffering.py} (100%) rename py_instrument_control_lib/playground/{test_kei2600_futures.py => KEI2600_Futures.py} (100%) rename py_instrument_control_lib/playground/{emulator.sh => LocalEmulator.sh} (98%) delete mode 100644 py_instrument_control_lib/playground/RawSocketUsage.py rename py_instrument_control_lib/playground/{test_channels.py => Test_Channel_Buffering.py} (100%) delete mode 100644 py_instrument_control_lib/playground/kei2600_read_test.py delete mode 100644 py_instrument_control_lib/playground/list_all_commands.py delete mode 100644 py_instrument_control_lib/playground/test_devices.py delete mode 100644 py_instrument_control_lib/playground/test_smu_sweep.py diff --git a/py_instrument_control_lib/playground/Emulator.py b/py_instrument_control_lib/playground/Emulator.py index b080f23..ffb4019 100644 --- a/py_instrument_control_lib/playground/Emulator.py +++ b/py_instrument_control_lib/playground/Emulator.py @@ -41,9 +41,7 @@ def response_data() -> int: response = response_data() conn.sendall(str(response).encode()) print('Sent:', response) - except socket.error as e: print("Error Occured.", e) break - conn.close() diff --git a/py_instrument_control_lib/playground/test_kei2600_buffering.py b/py_instrument_control_lib/playground/KEI2600_Buffering.py similarity index 100% rename from py_instrument_control_lib/playground/test_kei2600_buffering.py rename to py_instrument_control_lib/playground/KEI2600_Buffering.py diff --git a/py_instrument_control_lib/playground/test_kei2600_futures.py b/py_instrument_control_lib/playground/KEI2600_Futures.py similarity index 100% rename from py_instrument_control_lib/playground/test_kei2600_futures.py rename to py_instrument_control_lib/playground/KEI2600_Futures.py diff --git a/py_instrument_control_lib/playground/emulator.sh b/py_instrument_control_lib/playground/LocalEmulator.sh similarity index 98% rename from py_instrument_control_lib/playground/emulator.sh rename to py_instrument_control_lib/playground/LocalEmulator.sh index 9e768e1..306f183 100755 --- a/py_instrument_control_lib/playground/emulator.sh +++ b/py_instrument_control_lib/playground/LocalEmulator.sh @@ -4,4 +4,3 @@ while : do python3 Emulator.py 127.0.0.1 $1 done - diff --git a/py_instrument_control_lib/playground/RawSocketUsage.py b/py_instrument_control_lib/playground/RawSocketUsage.py deleted file mode 100644 index d743139..0000000 --- a/py_instrument_control_lib/playground/RawSocketUsage.py +++ /dev/null @@ -1,19 +0,0 @@ -import socket - -server_ip = '132.231.14.104' # Ändern Sie dies auf die IP-Adresse Ihres Servers -server_port = 5025 # Ändern Sie dies auf den Port Ihres Servers - -cmd = '*IDN?\n' - -client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -client_socket.connect((server_ip, server_port)) - -client_socket.sendall(cmd.encode()) - -response = client_socket.recv(1024) - -response = response.decode() - -print("Response:", response) - -client_socket.close() diff --git a/py_instrument_control_lib/playground/test_channels.py b/py_instrument_control_lib/playground/Test_Channel_Buffering.py similarity index 100% rename from py_instrument_control_lib/playground/test_channels.py rename to py_instrument_control_lib/playground/Test_Channel_Buffering.py diff --git a/py_instrument_control_lib/playground/kei2600_read_test.py b/py_instrument_control_lib/playground/kei2600_read_test.py deleted file mode 100644 index 32c9792..0000000 --- a/py_instrument_control_lib/playground/kei2600_read_test.py +++ /dev/null @@ -1,35 +0,0 @@ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex -from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig -from py_instrument_control_lib.devices.KEI2600 import KEI2600 - -smu_config = TCPDeviceConfig(ip='132.231.14.169', port=5025, timeout=5) -smu = KEI2600(smu_config) -smu.connect() -smu.toggle_channel(ChannelIndex(1), False) -smu.toggle_channel(ChannelIndex(2), False) - -voltage = 1.3 -current = 0.1 -samples = 500 -opposite_voltage_time = 0.1 - -script = f'''A_M_BUFFER = smua.makebuffer({samples}) -A_M_BUFFER.appendmode = 1 -smua.source.levelv = {voltage} -smua.source.leveli = {current} -smua.source.output = smua.OUTPUT_ON -delay({opposite_voltage_time}) -smua.source.levelv = {voltage} -smua.source.leveli = {current} -smua.source.output = smua.OUTPUT_ON -for i = 1, {samples} do - smua.measure.r(A_M_BUFFER) -end -smua.source.output = smua.OUTPUT_OFF'''.split('\n') - -smu.send_execute_script(script, 'NewMemristorReadTest', blocking=True) -resistances = smu.read_buffer('A_M_BUFFER', samples) - -print(resistances) - -smu.disconnect() diff --git a/py_instrument_control_lib/playground/list_all_commands.py b/py_instrument_control_lib/playground/list_all_commands.py deleted file mode 100644 index 8c27185..0000000 --- a/py_instrument_control_lib/playground/list_all_commands.py +++ /dev/null @@ -1,7 +0,0 @@ -import json - -with open('../../specifications/KST3000.json', 'r') as f: - d = json.load(f) - -for m in d['commands']: - print(f'osc.{m["name"]}()') diff --git a/py_instrument_control_lib/playground/test_devices.py b/py_instrument_control_lib/playground/test_devices.py deleted file mode 100644 index c75520a..0000000 --- a/py_instrument_control_lib/playground/test_devices.py +++ /dev/null @@ -1,156 +0,0 @@ -import time - -from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig, SerialDeviceConfig -from py_instrument_control_lib.device_types.FunctionGenerator import * -from py_instrument_control_lib.device_types.Oscilloscope import * -from py_instrument_control_lib.device_types.PowerSupply import * -from py_instrument_control_lib.device_types.SMU import * -from py_instrument_control_lib.devices.KEI2600 import KEI2600 -from py_instrument_control_lib.devices.KST3000 import KST3000 -from py_instrument_control_lib.devices.KST33500 import KST33500 -from py_instrument_control_lib.devices.SPD1305X import SPD1305X -from py_instrument_control_lib.devices.SwitchMatrix import SwitchMatrix -from py_instrument_control_lib.devices.UARTSwitchMatrix import UARTSwitchMatrix - - -def test_smu() -> None: - config = TCPDeviceConfig(ip='132.231.14.105', port=5025) - smu: SMU = KEI2600(config) - smu.connect() - - print(smu.measure(ChannelUnit.VOLTAGE, ChannelIndex(1))) - print(smu.measure(ChannelUnit.CURRENT, ChannelIndex(1))) - print(smu.measure(ChannelUnit.POWER, ChannelIndex(1))) - print(smu.measure(ChannelUnit.RESISTANCE, ChannelIndex(1))) - - smu.toggle_channel(ChannelIndex(1), True) - smu.set_level(ChannelUnit.VOLTAGE, ChannelIndex(1), 0.42) - smu.set_limit(ChannelUnit.VOLTAGE, ChannelIndex(1), 0.69) - smu.toggle_autorange(ChannelUnit.VOLTAGE, ChannelIndex(1), SMUMode.MEASURE, False) - smu.toggle_measure_analog_filter(ChannelIndex(1), False) - smu.set_range(ChannelUnit.VOLTAGE, ChannelIndex(1), SMUMode.MEASURE, 1.0) - smu.set_sense_mode(ChannelIndex(1), SMUSense.LOCAL) - smu.set_measure_plc(ChannelIndex(1), 1.0) - smu.set_measure_low_range(ChannelUnit.VOLTAGE, ChannelIndex(1), 1.0) - smu.set_measure_auto_zero(ChannelIndex(1), Autozero.AUTO) - smu.set_measure_count(ChannelIndex(1), 10) - smu.set_source_function(ChannelIndex(1), SourceFunction.DC_VOLTS) - smu.set_source_off_mode(ChannelIndex(1), SourceOffMode.OUTPUT_ZERO) - smu.set_source_settling(ChannelIndex(1), SourceSettling.SMOOTH) - smu.toggle_source_sink(ChannelIndex(1), True) - smu.display_measure_function(ChannelIndex(1), SMUDisplay.MEASURE_DC_VOLTS) - smu.toggle_beep(True) - smu.beep(0.3, 1000) - - smu.disconnect() - - -def test_fg() -> None: - config = TCPDeviceConfig(ip='132.231.14.107', port=5025) - fg: FunctionGenerator = KST33500(config) - fg.connect() - - fg.toggle(True) - fg.set_frequency(1000) - fg.set_amplitude(3, '') - fg.set_offset(4.2) - fg.set_phase(1.0) - fg.set_function(FunctionType.DC_VOLTAGE) - fg.display('Funktioniert!') - fg.set_pulsewidth(1000) - - fg.disconnect() - - -def test_uart_sm() -> None: - config = SerialDeviceConfig(interface='/dev/ttyUSB0') - matrix = UARTSwitchMatrix(config) - matrix.connect() - - time.sleep(1) - matrix.set_row_col(1, 1) - - matrix.disconnect() - - -def test_eth_sm() -> None: - config = TCPDeviceConfig(ip='192.168.0.2', port=2000, timeout=2) - matrix = SwitchMatrix(config) - matrix.connect() - - time.sleep(1) - matrix.set_row(4, True) - matrix.set_col(10, True) - matrix.set_row_col(1, 2, True) - - matrix.disconnect() - - -def test_ps() -> None: - config = TCPDeviceConfig(ip='132.231.14.104', port=5025) - ps: PowerSupply = SPD1305X(config) - ps.connect() - ps.set_voltage(PSChannel.CHANNEL_1, 12) - ps.set_current(PSChannel.CHANNEL_1, 0.1) - - -def test_osc() -> None: - config = TCPDeviceConfig(ip='132.231.14.115', port=5025) - osc: KST3000 = KST3000(config) - osc.connect() - - # print(osc.get_waveform_points()) - # time.sleep(0.3) - # print(osc.get_waveform_preamble()) - # time.sleep(0.3) - - osc.run() - time.sleep(1) - osc.stop() - osc.single() - osc.auto_scale() - - osc.set_time_range(1) - time.sleep(0.3) - osc.set_channel_offset(ChannelIndex(1), 1) - time.sleep(0.3) - osc.set_channel_scale(ChannelIndex(1), 1) - time.sleep(0.3) - osc.set_channel_range(ChannelIndex(1), 1, VoltageUnit.VOLT) - time.sleep(0.3) - osc.set_trigger_edge(TriggerEdge.POS_EDGE) - time.sleep(0.3) - osc.set_trigger_source(ChannelIndex(1)) - time.sleep(0.3) - osc.set_time_delay(1) - time.sleep(0.3) - osc.set_waveform_source(ChannelIndex(1)) - time.sleep(0.3) - print(osc.set_waveform_points(100)) - time.sleep(0.3) - # osc.set_waveform_points_mode() - osc.set_waveform_format(FileFormat.BYTE) - time.sleep(0.3) - # osc.save_waveform_data() - print(osc.get_waveform_data()) - time.sleep(0.3) - osc.get_real_data() - time.sleep(0.3) - osc.digitize(ChannelIndex(1)) - time.sleep(0.3) - print(osc.get_system_setup()) - time.sleep(0.3) - osc.set_display_mode(DisplayModes.XY) - time.sleep(0.3) - osc.set_channel_display(ChannelIndex(1), True) - - osc.disconnect() - - -if __name__ == '__main__': - # test_smu() - # test_fg(config) - # test_uart_sm() - test_eth_sm() - # test_osc() - # test_ps() diff --git a/py_instrument_control_lib/playground/test_smu_sweep.py b/py_instrument_control_lib/playground/test_smu_sweep.py deleted file mode 100644 index ce17086..0000000 --- a/py_instrument_control_lib/playground/test_smu_sweep.py +++ /dev/null @@ -1,11 +0,0 @@ -from py_instrument_control_lib.channels.ChannelEnums import ChannelIndex -from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig -from py_instrument_control_lib.devices.KEI2600 import KEI2600 - -smu_config = TCPDeviceConfig(ip='132.231.14.169', port=5025, timeout=5) -smu = KEI2600(smu_config) -smu.connect() - -smu.perform_linear_voltage_sweep(ChannelIndex(1), 0, 2, 0.1, 0.3, True) - -smu.disconnect()