Skip to content

Commit

Permalink
No api data handling (#75)
Browse files Browse the repository at this point in the history
* Wrapped measurement process in a try finally block to be able to save notes in cases of exceptions or interrupts

* WIP: expose project data when no stats or notes

* Working project data tab when no stats are present
  • Loading branch information
djesic authored Dec 16, 2022
1 parent 4b65334 commit 2f65f7c
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 84 deletions.
17 changes: 11 additions & 6 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ async def get_stats_single(project_id: str, remove_idle: bool=False):
params = params=(project_id,project_id)
data = DB().fetch_all(query, params=params)

if(data is None or data == []):
return {'success': False, 'err': 'Data is empty'}
return {"success": True, "data": data, "project": get_project(project_id)}
if (data is None or data == []):
return {"success": False, "err": "Data is empty"}
return {"success": True, "data": data}

@app.get('/v1/stats/multi')
async def get_stats_multi(p: list[str] | None = Query(default=None)):
Expand Down Expand Up @@ -262,7 +262,9 @@ async def post_project_add(project: Project):

return {"success": True}

def get_project(project_id):

@app.get('/v1/project/{project_id}')
async def get_project(project_id: str):
query = """
SELECT
id, name, uri, (SELECT STRING_AGG(t.name, ', ' ) FROM unnest(projects.categories) as elements LEFT JOIN categories as t on t.id = elements) as categories, start_measurement, end_measurement, measurement_config, machine_specs, usage_scenario, last_run, created_at
Expand All @@ -272,7 +274,10 @@ def get_project(project_id):
id = %s
"""
params = (project_id,)
return DB().fetch_one(query, params=params, cursor_factory = psycopg2.extras.RealDictCursor)
data = DB().fetch_one(query, params=params, cursor_factory=psycopg2.extras.RealDictCursor)
if (data is None or data == []):
return {'success': False, 'err': 'Data is empty'}
return {"success": True, "data": data}

if __name__ == "__main__":
app.run()
app.run()
47 changes: 34 additions & 13 deletions frontend/js/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const convertValue = (metric_name, value, unit) => {

}

const getMetrics = (stats_data, style='apex') => {
const getMetrics = (stats_data, start_measurement, end_measurement, style='apex') => {
const metrics = {}
let accumulate = 0;
const t0 = performance.now();
Expand Down Expand Up @@ -159,7 +159,7 @@ const getMetrics = (stats_data, style='apex') => {
accumulate = 0; // default

// here we use the undivided time on purpose
if (el[1] > stats_data.project.start_measurement && el[1] < stats_data.project.end_measurement) {
if (el[1] > start_measurement && el[1] < end_measurement) {
accumulate = 1;
}

Expand Down Expand Up @@ -394,7 +394,7 @@ const createGraph = (element, data, labels, title) => {
});
};

const fillAvgContainers = (stats_data, metrics) => {
const fillAvgContainers = (measurement_duration_in_s, metrics) => {

// let total_energy_in_mWh = 0;
let component_energy_in_mWh = 0;
Expand All @@ -407,11 +407,11 @@ const fillAvgContainers = (stats_data, metrics) => {
case 'J':
if(display_in_watts) createAvgContainer(metric_name, (acc / 3600) * 1000, 'mWh');
else createAvgContainer(metric_name, acc, 'J');
createAvgContainer(metric_name, acc / stats_data.project.measurement_duration_in_s, 'W');
createAvgContainer(metric_name, acc / measurement_duration_in_s, 'W');
break;
case 'W':
createAvgContainer(metric_name, acc / metrics[metric_name].sum.length, 'W');
createAvgContainer(metric_name, ((acc / metrics[metric_name].sum.length)*stats_data.project.measurement_duration_in_s)/3.6, ' mWh (approx!)');
createAvgContainer(metric_name, ((acc / metrics[metric_name].sum.length)*measurement_duration_in_s)/3.6, ' mWh (approx!)');
break;
case '%':
createAvgContainer(metric_name, acc / metrics[metric_name].sum.length, '%');
Expand Down Expand Up @@ -449,7 +449,7 @@ const fillAvgContainers = (stats_data, metrics) => {
} else {
document.querySelector("#component-energy").innerHTML = `${(component_energy_in_mWh).toFixed(2)} J`
}
document.querySelector("#component-power").innerHTML = `${(component_energy_in_mWh / stats_data.project.measurement_duration_in_s).toFixed(2)} W`
document.querySelector("#component-power").innerHTML = `${(component_energy_in_mWh / measurement_duration_in_s).toFixed(2)} W`

// network via formula: https://www.green-coding.org/co2-formulas/
const network_io_in_mWh = (network_io * 0.00006) * 1000000;
Expand Down Expand Up @@ -491,23 +491,44 @@ $(document).ready( (e) => {
const url_params = (new URLSearchParams(query_string))

try {
var notes_json = await makeAPICall('/v1/notes/' + url_params.get('id'))
var project_data = await makeAPICall('/v1/project/' + url_params.get('id'))
} catch (err) {
showNotification('Could not get project data from API', err);
}
try {
var stats_data = await makeAPICall('/v1/stats/single/' + url_params.get('id'))
} catch (err) {
showNotification('Could not get data from API', err);
return;
showNotification('Could not get stats data from API', err);
}
try {
var notes_json = await makeAPICall('/v1/notes/' + url_params.get('id'))
} catch (err) {
showNotification('Could not get notes data from API', err);
}

$('.ui.secondary.menu .item').tab();

const metrics = getMetrics(stats_data, 'echarts');
if (project_data == undefined || project_data.success == false) {
return;
}
fillProjectData(project_data.data)

if (stats_data == undefined || stats_data.success == false) {
return;
}

const metrics = getMetrics(stats_data, project_data.data.start_measurement, project_data.data.end_measurement, 'echarts');

// create new custom field
// timestamp is in microseconds, therefore divide by 10**6
stats_data.project['measurement_duration_in_s'] = (stats_data.project?.end_measurement - stats_data.project?.start_measurement) / 1000000
const measurement_duration_in_s = (project_data.data.end_measurement - project_data.data.start_measurement) / 1000000

fillAvgContainers(measurement_duration_in_s, metrics);

fillProjectData(stats_data.project)
if (notes_json == undefined || notes_json.success == false) {
return;
}
displayGraphs(metrics, notes_json.data, 'echarts');
fillAvgContainers(stats_data, metrics);
document.querySelector('#api-loader').remove();

// after all instances have been placed the flexboxes might have rearranged. We need to trigger resize
Expand Down
131 changes: 66 additions & 65 deletions tools/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,89 +266,90 @@ def run(self, uri, uri_type, project_id):
start_measurement = int(time.time_ns() / 1_000)
notes.append({"note" : "Start of measurement", 'detail_name' : '[SYSTEM]', "timestamp": start_measurement})

# run the flows
for el in obj['flow']:
print(TerminalColors.HEADER, "\nRunning flow: ", el['name'], TerminalColors.ENDC)
for inner_el in el['commands']:

if "note" in inner_el:
notes.append({"note" : inner_el['note'], 'detail_name' : el['container'], "timestamp": int(time.time_ns() / 1_000)})

if inner_el['type'] == 'console':
print(TerminalColors.HEADER, "\nConsole command", inner_el['command'], "on container", el['container'], TerminalColors.ENDC)

docker_exec_command = ['docker', 'exec']

docker_exec_command.append(el['container'])
docker_exec_command.extend( inner_el['command'].split(' ') )

# Note: In case of a detach wish in the usage_scenario.yml:
# We are NOT using the -d flag from docker exec, as this prohibits getting the stdout.
# Since Popen always make the process asynchronous we can leverage this to emulate a detached behaviour
ps = subprocess.Popen(
docker_exec_command,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
encoding="UTF-8"
)

self.ps_to_read.append({'cmd': docker_exec_command, 'ps': ps, 'read-notes-stdout': inner_el.get('read-notes-stdout', False), 'detail_name': el['container']})

if inner_el.get('detach', None) == True :
print("Process should be detached. Running asynchronously and detaching ...")
self.ps_to_kill.append({"pid": ps.pid, "cmd": inner_el['command'], "ps_group": False})
try:
# run the flows
for el in obj['flow']:
print(TerminalColors.HEADER, "\nRunning flow: ", el['name'], TerminalColors.ENDC)
for inner_el in el['commands']:

if "note" in inner_el:
notes.append({"note" : inner_el['note'], 'detail_name' : el['container'], "timestamp": int(time.time_ns() / 1_000)})

if inner_el['type'] == 'console':
print(TerminalColors.HEADER, "\nConsole command", inner_el['command'], "on container", el['container'], TerminalColors.ENDC)

docker_exec_command = ['docker', 'exec']

docker_exec_command.append(el['container'])
docker_exec_command.extend( inner_el['command'].split(' ') )

# Note: In case of a detach wish in the usage_scenario.yml:
# We are NOT using the -d flag from docker exec, as this prohibits getting the stdout.
# Since Popen always make the process asynchronous we can leverage this to emulate a detached behaviour
ps = subprocess.Popen(
docker_exec_command,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
encoding="UTF-8"
)

self.ps_to_read.append({'cmd': docker_exec_command, 'ps': ps, 'read-notes-stdout': inner_el.get('read-notes-stdout', False), 'detail_name': el['container']})

if inner_el.get('detach', None) == True :
print("Process should be detached. Running asynchronously and detaching ...")
self.ps_to_kill.append({"pid": ps.pid, "cmd": inner_el['command'], "ps_group": False})
else:
print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-runtime']}s runtime ...")
process_helpers.timeout(ps, inner_el['command'], config['measurement']['flow-process-runtime'])
else:
print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-runtime']}s runtime ...")
process_helpers.timeout(ps, inner_el['command'], config['measurement']['flow-process-runtime'])
else:
raise RuntimeError("Unknown command type in flow: ", inner_el['type'])
raise RuntimeError("Unknown command type in flow: ", inner_el['type'])

if debug.active: debug.pause("Waiting to start next command in flow")
if debug.active: debug.pause("Waiting to start next command in flow")

end_measurement = int(time.time_ns() / 1_000)
notes.append({"note" : "End of measurement", 'detail_name' : '[SYSTEM]', "timestamp": end_measurement})
end_measurement = int(time.time_ns() / 1_000)
notes.append({"note" : "End of measurement", 'detail_name' : '[SYSTEM]', "timestamp": end_measurement})

print(TerminalColors.HEADER, f"\nIdling containers after run for {config['measurement']['idle-time-end']}s", TerminalColors.ENDC)
time.sleep(config['measurement']['idle-time-end'])
print(TerminalColors.HEADER, f"\nIdling containers after run for {config['measurement']['idle-time-end']}s", TerminalColors.ENDC)
time.sleep(config['measurement']['idle-time-end'])

print(TerminalColors.HEADER, "Stopping metric providers and parsing stats", TerminalColors.ENDC)
for metric_provider in self.metric_providers:
stderr_read = metric_provider.get_stderr()
if stderr_read is not None:
raise RuntimeError(f"Stderr on {metric_provider.__class__.__name__} was NOT empty: {stderr_read}")

metric_provider.stop_profiling()
print(TerminalColors.HEADER, "Stopping metric providers and parsing stats", TerminalColors.ENDC)
for metric_provider in self.metric_providers:
stderr_read = metric_provider.get_stderr()
if stderr_read is not None:
raise RuntimeError(f"Stderr on {metric_provider.__class__.__name__} was NOT empty: {stderr_read}")

df = metric_provider.read_metrics(project_id, self.containers)
print(f"Imported",TerminalColors.HEADER, df.shape[0], TerminalColors.ENDC, "metrics from ", metric_provider.__class__.__name__)
if df is None or df.shape[0] == 0:
raise RuntimeError(f"No metrics were able to be imported from: {metric_provider.__class__.__name__}")
metric_provider.stop_profiling()

f = StringIO(df.to_csv(index=False, header=False))
DB().copy_from(file=f, table='stats', columns=df.columns, sep=",")
df = metric_provider.read_metrics(project_id, self.containers)
print(f"Imported",TerminalColors.HEADER, df.shape[0], TerminalColors.ENDC, "metrics from ", metric_provider.__class__.__name__)
if df is None or df.shape[0] == 0:
raise RuntimeError(f"No metrics were able to be imported from: {metric_provider.__class__.__name__}")

f = StringIO(df.to_csv(index=False, header=False))
DB().copy_from(file=f, table='stats', columns=df.columns, sep=",")

# now we have free capacity to parse the stdout / stderr of the processes
print(TerminalColors.HEADER, "\nGetting output from processes: ", TerminalColors.ENDC)
for ps in self.ps_to_read:
for line in process_helpers.parse_stream_generator(ps['ps'], ps['cmd']):
print("Output from process: ", line)
if(ps['read-notes-stdout']):
timestamp, note = line.split(' ', 1) # Fixed format according to our specification. If unpacking fails this is wanted error
notes.append({"note" : note, 'detail_name' : ps['detail_name'], "timestamp": timestamp})

process_helpers.kill_ps(self.ps_to_kill) # kill process only after reading. Otherwise the stream buffer might be gone
# now we have free capacity to parse the stdout / stderr of the processes
print(TerminalColors.HEADER, "\nGetting output from processes: ", TerminalColors.ENDC)
for ps in self.ps_to_read:
for line in process_helpers.parse_stream_generator(ps['ps'], ps['cmd']):
print("Output from process: ", line)
if(ps['read-notes-stdout']):
timestamp, note = line.split(' ', 1) # Fixed format according to our specification. If unpacking fails this is wanted error
notes.append({"note" : note, 'detail_name' : ps['detail_name'], "timestamp": timestamp})

print(TerminalColors.HEADER, "\nSaving notes: ", TerminalColors.ENDC, notes) # we here only want the header to be colored, not the notes itself
save_notes(project_id, notes)
process_helpers.kill_ps(self.ps_to_kill) # kill process only after reading. Otherwise the stream buffer might be gone
finally:
print(TerminalColors.HEADER, "\nSaving notes: ", TerminalColors.ENDC, notes) # we here only want the header to be colored, not the notes itself
save_notes(project_id, notes)

print(TerminalColors.HEADER, "\nUpdating start and end measurement times", TerminalColors.ENDC)
DB().query("""UPDATE projects
SET start_measurement=%s, end_measurement=%s
WHERE id = %s
""", params=(start_measurement, end_measurement, project_id))

self.cleanup() # always run cleanup automatically after each run
self.cleanup() # always run cleanup automatically after each run

print(TerminalColors.OKGREEN, "\n\n>>>>>>> MEASUREMENT SUCCESSFULLY COMPLETED <<<<<<<\n\n",TerminalColors.ENDC)

Expand Down

0 comments on commit 2f65f7c

Please sign in to comment.