Skip to content

Commit

Permalink
Merge pull request #26 from SiemaApplications/issue-25
Browse files Browse the repository at this point in the history
New settings to cutomize how bytes data are displayed
  • Loading branch information
pfayolle authored Nov 13, 2023
2 parents dfff94e + 0243958 commit bc4d8d6
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 8 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export PYTHONPATH=${PYTHONPATH}:$(pwd)/tahu/python/core/
```
When launched, the `enki.py` script gives access to a shell with commands to manage topics subscriptions, list EoN and devices, craft metrics into payload. Type `help` to see the list of commands available.

Additional usage information can be found [here](usage.md).

# Demo
[![asciicast](https://asciinema.org/a/lKGTwxDlLOYwGtsF1kecBLfa0.svg)](https://asciinema.org/a/lKGTwxDlLOYwGtsF1kecBLfa0)

Expand Down
90 changes: 82 additions & 8 deletions shell.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Enki shell interface."""
import time
from datetime import datetime, timezone
from string import printable

import cmd2

Expand All @@ -14,6 +15,14 @@
from sp_network import SPNet


# Setting values for displaying of byte data
BYTE_DATA_DISPLAY_LIST = "list"
BYTE_DATA_DISPLAY_HEXDUMP = "hexdump"
BYTE_DATA_DISPLAY_MODE = BYTE_DATA_DISPLAY_LIST
BYTE_DATA_SHORT_LEN = 10
BYTE_DATA_MAX_LEN = BYTE_DATA_SHORT_LEN


def str_to_int(string):
"""Convert string to int.
Expand All @@ -38,19 +47,59 @@ def str_to_bool(string):
return False


def bytes_to_list_str(byte_data) -> str:
"""Converts an iterable containing bytes to a string representation, using a list format"""
size = len(byte_data)
if BYTE_DATA_MAX_LEN:
byte_data = byte_data[0:BYTE_DATA_MAX_LEN]
res = "[" + " ,".join([f"0x{val:02X}" for val in byte_data])
if BYTE_DATA_MAX_LEN and size > BYTE_DATA_MAX_LEN:
res += " ..."
res += "]"
return res

def bytes_to_hexdump_str(byte_data) -> str:
"""
Converts an iterable containing bytes to a string representation
using a hexadecimal dump format
"""
offset = 0
size = len(byte_data)
if BYTE_DATA_MAX_LEN != 0 and BYTE_DATA_MAX_LEN < size:
size = BYTE_DATA_MAX_LEN
processed = 0
res = ""
while processed < size:
block_size = 16 if size - processed >= 16 else size - processed
block = byte_data[offset:offset+block_size]
block_str = [f"{b:02X}" for b in block]
pairs = zip(block_str[0::2], block_str[1::2])
block_str = " ".join(p[0] + p[1] for p in pairs)
if block_size % 2 == 1:
if block_size > 1:
block_str += " "
block_str += f"{block[-1]:02X}"
block_ascii = "".join(chr(c) if chr(c) in printable else "." for c in block)
if res:
res += "\n"
res += f"{offset:08X}: {block_str:39} {block_ascii}"
offset += block_size
processed += block_size
return res


def get_bytearray_str(bytes_array):
"""String representation of a bytearray value."""
size = len(bytes_array)
res = f"Bytes array of length {size}"
if size:
excerpt = bytes_array[0:10]
excerpt = "[" + " ".join([f"0x{val:02X}" for val in excerpt])
if size > 10:
excerpt += " ..."
excerpt += "]"
res = res + " " + excerpt
if BYTE_DATA_DISPLAY_MODE == BYTE_DATA_DISPLAY_LIST:
res += ":\n" + bytes_to_list_str(bytes_array) + "\n"
else:
res += ":\n" + bytes_to_hexdump_str(bytes_array) + "\n"
return res


def get_dataset_row_str(dataset, row):
"""String representation of a dataset row."""
return tuple(get_typed_value_str(col_type, row.elements[idx])
Expand Down Expand Up @@ -110,6 +159,7 @@ def get_common_info_str(metric):
res = res + "\n\tproperties: None"
return res


def get_metric_str(metric) -> str:
"""Return a string describing metric if it is known to Device"""
value_str: str = get_typed_value_str(metric.datatype, metric)
Expand All @@ -119,13 +169,37 @@ def get_metric_str(metric) -> str:

@cmd2.with_default_category('Enki')
class SPShell(cmd2.Cmd):
# pylint: disable=too-many-instance-attributes, too-many-public-methods
# pylint: disable=too-many-instance-attributes, too-many-public-methods, global-statement
"""Enki Shell Interface"""

def __init__(self, mqtt_if: MQTTInterface):
cmd2.Cmd.__init__(self, allow_cli_args=False)
self.mqtt_if = mqtt_if
self.prompt = "enki> "
self.byte_data_display_mode = BYTE_DATA_DISPLAY_MODE
self.add_settable(
cmd2.Settable("byte_data_display_mode", str, "Display mode for byte data", self,
choices=[BYTE_DATA_DISPLAY_LIST, BYTE_DATA_DISPLAY_HEXDUMP],
onchange_cb=self._on_change_byte_data_display_mode)
)
self.byte_data_max_len = BYTE_DATA_MAX_LEN
self.add_settable(
cmd2.Settable("byte_data_max_len", int,
"Maximum number of bytes to display for byte data",
self, onchange_cb=self._on_change_byte_data_max_len)
)

def _on_change_byte_data_display_mode(self, _param_name, _old_value, new_value):
global BYTE_DATA_DISPLAY_MODE
BYTE_DATA_DISPLAY_MODE = new_value

def _on_change_byte_data_max_len(self, param_name, old_value, new_value):
global BYTE_DATA_MAX_LEN
if new_value < 0:
print(f"{param_name} should be greater or equal to 0")
setattr(self, param_name, old_value)
else:
BYTE_DATA_MAX_LEN = new_value

def do_exit(self, *_args):
"""Exits the app."""
Expand All @@ -148,7 +222,7 @@ def do_list(self, args):
print(f" * {dev.get_id().dev_id}")

def build_target_list(self, include_eon, include_dev):
"""Returns a list of potentatial handles for autocompletion purposes."""
"""Returns a list of potential handles for autocompletion purposes."""
res = []
sp_net = SPNet()
for eon in sp_net.eon_nodes:
Expand Down
28 changes: 28 additions & 0 deletions usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Usage

On startup Enki will connect to the specified broker and subscribe to spBv1.0/#.

Enki will then monitor the EoNs and devices that get born or die on the broker.

## Command line options

Command line options are self documented.
Their help can be seen by running enki with the option```-h``` or ```--help```.

## Basic commands
The list of available commands can be accessed with the command ```help```.

Additional help on each command can be seen with ```help <command>```.

## Advanced usage
### Sending null metrics
When prompted for a metric value, you can specify you want to send the metric with the null flag set to true,
by pressing ```Ctrl-d```.

### Application Settings
The following settings can be set with the command ```set <setting_name> <value>```.

- **byte_data_display_mode** _list_ or _hexdump_:
- _list_: The bytes will be displayed as a python list (e.g. [0x01, 0x02, 0x03])
- _hexdump_: The bytes will be displayed using a hexadecimal dump format, similar to the output of the xxd command
- **byte_data_max_len**: The maximum number of bytes to display for either display mode. Set this to 0 to remove the limit and display all bytes.

0 comments on commit bc4d8d6

Please sign in to comment.