Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PrawnBlaster binary programming #112

Merged
102 changes: 82 additions & 20 deletions labscript_devices/PrawnBlaster/blacs_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import time
import labscript_utils.h5_lock
import h5py
import numpy as np
from blacs.tab_base_classes import Worker
from labscript_utils.connections import _ensure_str
import labscript_utils.properties as properties
Expand All @@ -38,6 +39,7 @@ def init(self):
global time; import time
global re; import re
global numpy; import numpy
global struct; import struct
global zprocess; import zprocess
self.smart_cache = {}
self.cached_pll_params = {}
Expand Down Expand Up @@ -69,6 +71,27 @@ def init(self):
self.prawnblaster.write(b"setinpin %d %d\r\n" % (i, in_pin))
assert self.prawnblaster.readline().decode() == "ok\r\n"

# Check if fast serial is available
version, _ = self.get_version()
self.fast_serial = version >= (1, 1, 0)

def get_version(self):
self.prawnblaster.write(b"version\r\n")
version_str = self.prawnblaster.readline().decode()
assert version_str.startswith("version: ")
version = version_str[9:].strip()

version_overclock_list = version.split('-')
overclock = False
if len(version_overclock_list) == 2:
if version_overclock_list[1] == 'overclock':
overclock = True

version = tuple(int(v) for v in version_overclock_list[0].split('.'))
assert len(version) == 3

return version, overclock

def check_status(self):
"""Checks the operational status of the PrawnBlaster.

Expand Down Expand Up @@ -292,27 +315,66 @@ def transition_to_buffered(self, device_name, h5file, initial_values, fresh):

# Program instructions
for pseudoclock, pulse_program in enumerate(pulse_programs):
for i, instruction in enumerate(pulse_program):
if i == len(self.smart_cache[pseudoclock]):
# Pad the smart cache out to be as long as the program:
self.smart_cache[pseudoclock].append(None)

# Only program instructions that differ from what's in the smart cache:
if self.smart_cache[pseudoclock][i] != instruction:
self.prawnblaster.write(
b"set %d %d %d %d\r\n"
% (
pseudoclock,
i,
instruction["half_period"],
instruction["reps"],
total_inst = len(pulse_program)
# check if it is more efficient to fully refresh
if not fresh and self.smart_cache[pseudoclock] is not None and self.fast_serial:
# get more convenient handles to smart cache arrays
curr_inst = self.smart_cache[pseudoclock]

# if arrays aren't of same shape, only compare up to smaller array size
n_curr = len(curr_inst)
n_new = len(pulse_program)
if n_curr > n_new:
# technically don't need to reprogram current elements beyond end of new elements
new_inst = np.sum(curr_inst[:n_new] != pulse_program)
elif n_curr < n_new:
n_diff = n_new - n_curr
val_diffs = np.sum(curr_inst != pulse_program[:n_curr])
new_inst = val_diffs + n_diff
else:
new_inst = np.sum(curr_inst != pulse_program)

if new_inst / total_inst > 0.1:
fresh = True

if (fresh or self.smart_cache[pseudoclock] is None) and self.fast_serial:
print('binary programming')
self.prawnblaster.write(b"setb %d %d %d\r\n" % (pseudoclock, 0, len(pulse_program)))
response = self.prawnblaster.readline().decode()
assert (
response == "ready\r\n"
), f"PrawnBlaster said '{response}', expected 'ready'"
program_array = np.array([pulse_program['half_period'],
pulse_program['reps']], dtype='<u4').T
self.prawnblaster.write(program_array.tobytes())
response = self.prawnblaster.readline().decode()
assert (
response == "ok\r\n"
), f"PrawnBlaster said '{response}', expected 'ok'"
self.smart_cache[pseudoclock] = pulse_program
else:
print('incremental programming')
for i, instruction in enumerate(pulse_program):
if i == len(self.smart_cache[pseudoclock]):
# Pad the smart cache out to be as long as the program:
self.smart_cache[pseudoclock].append(None)

# Only program instructions that differ from what's in the smart cache:
if self.smart_cache[pseudoclock][i] != instruction:
self.prawnblaster.write(
b"set %d %d %d %d\r\n"
% (
pseudoclock,
i,
instruction["half_period"],
instruction["reps"],
)
)
)
response = self.prawnblaster.readline().decode()
assert (
response == "ok\r\n"
), f"PrawnBlaster said '{response}', expected 'ok'"
self.smart_cache[pseudoclock][i] = instruction
response = self.prawnblaster.readline().decode()
assert (
response == "ok\r\n"
), f"PrawnBlaster said '{response}', expected 'ok'"
self.smart_cache[pseudoclock][i] = instruction

if not self.is_master_pseudoclock:
# Start the Prawnblaster and have it wait for a hardware trigger
Expand Down