Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue where metadata would come with garbage #50

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
97 changes: 97 additions & 0 deletions examples/example_measure_mw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import time
import threading

from src.ppk2_api.ppk2_api import PPK2_API


class PowerProfiler:
conversion_factor = 1_000_000 # uA to mW

def __init__(self, voltage_mv=3700):
self.voltage_mv = voltage_mv
self.total_power_mW = 0
self.total_samples = 0

self.lock = threading.Lock()
self.ppk2 = None
self.sampling_thread = None
self.sampling_enabled = False


def _setup_ppk2(self):
ppk2s_connected = PPK2_API.list_devices()

# Check if we have at least one PPK2 device
if not ppk2s_connected:
raise ConnectionError("No PPK2 devices found!")

print(ppk2s_connected)
# Just select the first available PPK2 device
for ppk2_port_tuple in ppk2s_connected:
ppk2_port = ppk2_port_tuple[0] #Just get the port part of the tuple
print(f"Connecting to {ppk2_port}")

self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True)

ret = self.ppk2.get_modifiers()
if ret is not None:
break

print(f"Failed to connect to {ppk2_port}")

self.ppk2.set_source_voltage(self.voltage_mv)
self.ppk2.use_source_meter()
self.ppk2.toggle_DUT_power("ON")

self.ppk2.start_measuring()
print("Initialized Power Profiler")


def _run_sampling(self):
try:
self._setup_ppk2()
while self.sampling_enabled:
time.sleep(0.01)
read_data = self.ppk2.get_data()

if read_data == b"":
continue

samples, raw_digital = self.ppk2.get_samples(read_data)
if not samples:
continue

average_current_uA = sum(samples) / len(samples)
average_power_mW = (average_current_uA * self.voltage_mv) / self.conversion_factor
formatted_power = round(average_power_mW, 2)

with self.lock:
self.total_power_mW += formatted_power
self.total_samples += 1
average_of_averages_mW = self.total_power_mW / self.total_samples

print(f"{formatted_power} mW, Avg: {average_of_averages_mW:.2f} mW")

except Exception as e:
self.sampling_enabled = False
print(f"An error occurred: {e}")

def start_sampling(self):
self.sampling_enabled = True
self.sampling_thread = threading.Thread(target=self._run_sampling, daemon=True)
self.sampling_thread.start()

def stop_sampling(self):
self.sampling_enabled = False
self.sampling_thread.join()


def main():
sampler = PowerProfiler(voltage_mv=3800)
sampler.start_sampling()
input("Press Enter to exit...\n")
sampler.stop_sampling()


if __name__ == "__main__":
main()
File renamed without changes.
55 changes: 45 additions & 10 deletions src/ppk2_api/ppk2_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,25 @@ def _read_metadata(self):
"""Read metadata"""
# try to get metadata from device
for _ in range(0, 5):
# it appears the second reading is the metadata
read = self.ser.read(self.ser.in_waiting)
time.sleep(0.1)

# TODO add a read_until serial read function with a timeout
if read != b'' and "END" in read.decode("utf-8"):
return read.decode("utf-8")
if not read:
continue # No data, try again

# Try decoding the data
try:
metadata = read.decode("utf-8")
except UnicodeDecodeError:
# If decoding fails, try again in next iteration
continue

# Check if the metadata is valid (i.e., contains "END")
if "END" in metadata:
return metadata

# If we exit the loop, it means we couldn't get valid metadata
raise ValueError("Could not retrieve valid metadata from the device.")

def _parse_metadata(self, metadata):
"""Parse metadata and store it to modifiers"""
Expand Down Expand Up @@ -229,17 +241,40 @@ def list_devices():
]
return devices


def get_data(self):
"""Return readings of one sampling period"""
sampling_data = self.ser.read(self.ser.in_waiting)
return sampling_data

def get_modifiers(self):
"""Gets and sets modifiers from device memory"""
self._write_serial((PPK2_Command.GET_META_DATA, ))
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
return ret
def get_modifiers(self, retries=2):
"""
Retrieve and parse modifiers from the device memory, with optional retries.

In cases where the PPK2 tool did not shut down gracefully, the device may still
hold residual data from the previous session. The first GET_META_DATA command
may return a mix of valid metadata and garbage. Rather than parsing
and filtering out this garbage on the first try, issuing the GET_META_DATA command
again often yields clean data. This function will retry up to the
specified number of times before giving up.
"""

for attempt in range(1, retries + 1):
# Send command to request metadata
self._write_serial((PPK2_Command.GET_META_DATA, ))
try:
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
print(f"Attempt {attempt}/{retries} - Got metadata from PPK2")

return ret
except ValueError as e:
print(f"Attempt {attempt}/{retries} - Failed to get valid PPK2 metadata: {e}")
# If this wasn't the last attempt, we try again by sending GET_META_DATA again.


print("Failed to get modifiers after multiple attempts.")
return None

def start_measuring(self):
"""Start continuous measurement"""
Expand Down