diff --git a/.github/workflows/build_nix.yml b/.github/workflows/build_nix.yml index 7a04861..67c43a0 100644 --- a/.github/workflows/build_nix.yml +++ b/.github/workflows/build_nix.yml @@ -27,7 +27,7 @@ jobs: - run: | curl -Lo mkn https://github.com/mkn/mkn/releases/download/latest/mkn_nix chmod +x mkn - ./mkn clean build -p scope_timer test -a "-fPIC -std=c++17" + ./mkn clean build test run -p scope_timer,threaded_scope_timer -Oa "-fPIC -std=c++20" -W 9 - uses: actions/setup-python@v4 with: diff --git a/inc/phlop/timing/scope_timer.hpp b/inc/phlop/timing/scope_timer.hpp index 3f16f6d..a66d6d1 100644 --- a/inc/phlop/timing/scope_timer.hpp +++ b/inc/phlop/timing/scope_timer.hpp @@ -90,8 +90,8 @@ struct ScopeTimerMan struct RunTimerReportSnapshot { - RunTimerReportSnapshot(RunTimerReport* s, RunTimerReport* p, std::uint64_t const& st, - std::uint64_t const& t) + RunTimerReportSnapshot(RunTimerReport* s, RunTimerReport* p, std::uint64_t const st, + std::uint64_t const t) : self{s} , parent{p} , start{st} @@ -167,17 +167,17 @@ struct scope_timer struct BinaryTimerFileNode { - BinaryTimerFileNode(std::uint16_t const& _fn_id, std::uint64_t const& _start, - std::uint64_t const& _time) + BinaryTimerFileNode(std::uint16_t const& _fn_id, std::uint64_t const _start, + std::uint64_t const _time) : fn_id{_fn_id} , start{_start} , time{_time} { } - std::uint16_t fn_id; - std::uint64_t start; - std::uint64_t time; + std::uint16_t const fn_id; + std::uint64_t const start; + std::uint64_t const time; std::vector kinder{}; }; diff --git a/inc/phlop/timing/threaded_scope_timer.hpp b/inc/phlop/timing/threaded_scope_timer.hpp index c37e6c4..b11fc78 100644 --- a/inc/phlop/timing/threaded_scope_timer.hpp +++ b/inc/phlop/timing/threaded_scope_timer.hpp @@ -141,9 +141,11 @@ struct ScopeTimerMan struct RunTimerReportSnapshot { - RunTimerReportSnapshot(RunTimerReport* s, RunTimerReport* p, std::uint64_t const& t) + RunTimerReportSnapshot(RunTimerReport* s, RunTimerReport* p, std::uint64_t const st, + std::uint64_t const t) : self{s} , parent{p} + , start{st} , time{t} { childs.reserve(2); @@ -152,6 +154,7 @@ struct RunTimerReportSnapshot RunTimerReport const* const self; RunTimerReport const* const parent; + std::uint64_t const start; std::uint64_t const time; std::vector childs; }; @@ -190,7 +193,7 @@ struct scope_timer std::uint64_t static now() { return std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) + std::chrono::steady_clock::now().time_since_epoch()) .count(); } @@ -215,14 +218,17 @@ struct scope_timer struct BinaryTimerFileNode { - BinaryTimerFileNode(std::uint16_t _fn_id, std::uint64_t _time) + BinaryTimerFileNode(std::uint16_t const& _fn_id, std::uint64_t const _start, + std::uint64_t const _time) : fn_id{_fn_id} + , start{_start} , time{_time} { } - std::uint16_t fn_id; - std::uint64_t time; + std::uint16_t const fn_id; + std::uint64_t const start; + std::uint64_t const time; std::vector kinder{}; }; @@ -240,8 +246,8 @@ struct BinaryTimerFile for (auto const& [reports, traces] : ScopeTimerMan::INSTANCE().thread_storage) for (auto const& trace : traces) recurse_traces_for_nodes( - trace, - roots.emplace_back(key_ids[std::string{trace->self->k}], trace->time)); + trace, roots.emplace_back(key_ids[std::string{trace->self->k}], + trace->start, trace->time)); } } @@ -252,7 +258,7 @@ struct BinaryTimerFile for (std::size_t i = 0; i < c->childs.size(); ++i) recurse_traces_for_nodes( c->childs[i], node.kinder.emplace_back(key_ids[std::string{c->childs[i]->self->k}], - c->childs[i]->time)); + c->childs[i]->start, c->childs[i]->time)); } template @@ -322,7 +328,7 @@ struct BinaryTimerFile { for (std::size_t ti = 0; ti < tabs; ++ti) file << " "; - file << node.fn_id << " " << node.time << std::endl; + file << node.fn_id << " " << node.start << ":" << node.time << std::endl; for (auto const& n : node.kinder) _write(file, n, tabs + 1); } diff --git a/mkn.yaml b/mkn.yaml index bd138a7..28cc1ba 100644 --- a/mkn.yaml +++ b/mkn.yaml @@ -1,3 +1,4 @@ +#! clean build test run -p scope_timer,threaded_scope_timer -Oa "-fPIC -std=c++20" -W 9 name: phlop parent: base @@ -8,6 +9,12 @@ profile: - name: scope_timer parent: base - src: src/phlop/timing/scope_timer.cpp # static instance lives here + src: src/phlop/timing/scope_timer.cpp mode: shared test: tests/timing/test_scope_timer.cpp + +- name: threaded_scope_timer + parent: base + src: src/phlop/timing/threaded_scope_timer.cpp + mode: shared + test: tests/timing/test_threaded_scope_timer.cpp diff --git a/phlop/os.py b/phlop/os.py index 504dea7..fa3313a 100644 --- a/phlop/os.py +++ b/phlop/os.py @@ -48,5 +48,21 @@ def write_to_file(file, contents, mode="w", skip_if_empty=True): raise RuntimeError(f"Failed to write to file {file}: {e}") +def read_file(file): + try: + with open(file, "r") as f: + return f.read() + except IOError as e: + raise RuntimeError(f"Failed to read file {file}: {e}") + + +def read_last_lines_of(file, n=10): + try: + with open(file, "r") as f: + return f.readlines()[:10] + except IOError as e: + raise RuntimeError(f"Failed to read file {file}: {e}") + + def env_sep(): return ";" if any(platform.win32_ver()) else ":" diff --git a/phlop/proc.py b/phlop/proc.py index aba63d1..bef61da 100644 --- a/phlop/proc.py +++ b/phlop/proc.py @@ -2,6 +2,7 @@ # # # + import subprocess import sys diff --git a/phlop/procs/runtimer.py b/phlop/procs/runtimer.py index f1a7b94..25d38c0 100644 --- a/phlop/procs/runtimer.py +++ b/phlop/procs/runtimer.py @@ -2,13 +2,11 @@ # # # -# import os import subprocess import time -from copy import deepcopy from phlop.os import pushd, write_to_file from phlop.string import decode_bytes @@ -26,19 +24,25 @@ def __init__( working_dir=None, log_file_path=None, logging=2, + popen=True, **kwargs, ): self.cmd = cmd - start = time.time() - - self.run_time = None self.stdout = "" self.stderr = "" + self.run_time = None + self.logging = logging + self.log_file_path = log_file_path + self.capture_output = capture_output benv = os.environ.copy() benv.update(env) - self.logging = logging - - ekwargs = {} + ekwargs = dict( + shell=shell, + env=benv, + close_fds=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) if not capture_output and log_file_path: ekwargs.update( dict( @@ -46,43 +50,73 @@ def __init__( stderr=open(f"{log_file_path}.stderr", "w"), ), ) + else: + ekwargs.update( + dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE), + ) - def run(): - logging = deepcopy(self.logging) - try: - self.run = subprocess.run( - self.cmd, - shell=shell, - check=check, - env=benv, - capture_output=capture_output, - close_fds=True, - **kwargs, - **ekwargs, + def go(): + if popen: + self._popen(**ekwargs, **kwargs) + else: + self._run( + check=check, capture_output=capture_output, **ekwargs, **kwargs ) - self.run_time = time.time() - start - self.exitcode = self.run.returncode - if logging == 2 and capture_output: - self.stdout = decode_bytes(self.run.stdout) - self.stderr = decode_bytes(self.run.stderr) - except ( - subprocess.CalledProcessError - ) as e: # only triggers on failure if check=True - self.exitcode = e.returncode - self.run_time = time.time() - start - if logging >= 1 and capture_output: - self.stdout = decode_bytes(e.stdout) - self.stderr = decode_bytes(e.stderr) - logging = 2 # force logging as exception occurred - if logging == 2 and capture_output and log_file_path: - write_to_file(f"{log_file_path}.stdout", self.stdout) - write_to_file(f"{log_file_path}.stderr", self.stderr) if working_dir: with pushd(working_dir): - run() + go() else: - run() + go() + + def _locals(self): + return self.capture_output, self.log_file_path, self.logging + + def _run(self, **kwargs): + capture_output, log_file_path, logging = self._locals() + try: + start = time.time() + self.run = subprocess.run(self.cmd, **kwargs) + self.run_time = time.time() - start + self.exitcode = self.run.returncode + if logging == 2 and capture_output: + self.stdout = decode_bytes(self.run.stdout) + self.stderr = decode_bytes(self.run.stderr) + except subprocess.CalledProcessError as e: + # only triggers on failure if check=True + self.run_time = time.time() - start + self.exitcode = e.returncode + if logging >= 1 and capture_output: + self.stdout = decode_bytes(e.stdout) + self.stderr = decode_bytes(e.stderr) + logging = 2 # force logging as exception occurred + if logging == 2 and capture_output and log_file_path: + write_to_file(f"{log_file_path}.stdout", self.stdout) + write_to_file(f"{log_file_path}.stderr", self.stderr) + + def _popen(self, **kwargs): + capture_output, log_file_path, logging = self._locals() + start = time.time() + p = subprocess.Popen(self.cmd, **kwargs) + self.stdout, self.stderr = p.communicate() + self.run_time = time.time() - start + self.exitcode = p.returncode + if not capture_output and log_file_path: + kwargs["stdout"].close() + kwargs["stderr"].close() + elif capture_output: + p.stdout.close() + p.stderr.close() + p = None + + if self.exitcode > 0 and capture_output: + logging = 2 # force logging as exception occurred + if logging == 2 and capture_output: + self.stdout = decode_bytes(self.stdout) + self.stderr = decode_bytes(self.stderr) + if logging == 2 and capture_output and log_file_path: + write_to_file(f"{log_file_path}.stdout", self.stdout) + write_to_file(f"{log_file_path}.stderr", self.stderr) def out(self, ignore_exit_code=False): if not ignore_exit_code and self.exitcode > 0: diff --git a/phlop/reflection.py b/phlop/reflection.py index fe25485..35ba4e0 100644 --- a/phlop/reflection.py +++ b/phlop/reflection.py @@ -2,7 +2,6 @@ # # # -# import importlib import inspect @@ -17,7 +16,7 @@ ).lower() in ("true", "1", "t") -def classes_in_file(file_path, subclasses_only=None, fail_on_import_error=False): +def classes_in_file(file_path, subclasses_only=None, fail_on_import_error=True): file = Path(file_path) module = str(file).replace(os.path.sep, ".")[:-3] assert module diff --git a/phlop/testing/parallel_processor.py b/phlop/testing/parallel_processor.py index 62cddd0..de51838 100644 --- a/phlop/testing/parallel_processor.py +++ b/phlop/testing/parallel_processor.py @@ -4,15 +4,20 @@ # # - -import time +import os from enum import Enum from multiprocessing import Process, Queue, cpu_count +from phlop.logger import getLogger +from phlop.os import read_file, read_last_lines_of from phlop.proc import run +timeout = 60 * 60 +logger = getLogger(__name__) + -class TestCaseFailure(Exception): ... +class TestCaseFailure(Exception): + ... class LoggingMode(Enum): @@ -25,14 +30,14 @@ class CallableTest: def __init__(self, batch_index, test_case, logging): self.batch_index = batch_index self.test_case = test_case - self.run = None self.logging = logging + self.run = None def __call__(self, **kwargs): self.run = run( self.test_case.cmd.split(), shell=False, - capture_output=True, + capture_output=False, check=True, print_cmd=False, env=self.test_case.env, @@ -44,6 +49,13 @@ def __call__(self, **kwargs): print(self.run.stderr) return self + def print_log_files(self): + print(self.run.stdout) + print(self.run.stderr) + if self.test_case.log_file_path: + print(read_file(f"{self.test_case.log_file_path}.stdout")) + print(read_file(f"{self.test_case.log_file_path}.stdout")) + class CoreCount: def __init__(self, cores_avail): @@ -62,6 +74,24 @@ def print_tests(batches): print(test.cmd) +def print_pending(cc, batches, logging): + print("pending jobs start") + for batch_index, batch in enumerate(batches): + for pid, proc in enumerate(cc.procs[batch_index]): + if not proc.is_alive(): + continue + test_case = batch.tests[pid] + print("cmd: ", test_case.cmd) + if test_case.log_file_path: + stdout = read_last_lines_of(f"{test_case.log_file_path}.stdout") + if stdout: + print(f"stdout:{os.linesep}", " ".join(stdout)) + stderr = read_last_lines_of(f"{test_case.log_file_path}.stderr") + if stderr: + print(f"stderr:{os.linesep}", " ".join(stderr)) + print("pending jobs end") + + def process( batches, n_cores=None, print_only=False, fail_fast=False, logging: LoggingMode = 1 ): @@ -85,41 +115,45 @@ def process( def launch_tests(): for batch_index, batch in enumerate(batches): offset = len(cc.procs[batch_index]) - for test_index, test in enumerate(batch.tests[offset:]): + for test_index, test_case in enumerate(batch.tests[offset:]): test_index += offset if batch.cores <= cc.cores_avail: - test = CallableTest( - batch_index, batches[batch_index].tests[test_index], logging - ) + test = CallableTest(batch_index, test_case, logging) cc.cores_avail -= batch.cores cc.procs[batch_index] += [ - Process( - target=runner, - args=(test, (pqueue)), - ) + Process(target=runner, args=(test, pqueue)) ] cc.procs[batch_index][-1].daemon = True cc.procs[batch_index][-1].start() def finished(): - b = True for batch_index, batch in enumerate(batches): - b &= cc.fin[batch_index] == len(batch.tests) - return b + if cc.fin[batch_index] != len(batch.tests): + return False + return True def waiter(queue): fail = 0 while True: - proc = queue.get() - time.sleep(0.01) # don't throttle! + proc = None + try: + proc = queue.get(timeout=timeout) + + except Exception: + logger.info("Queue timeout - polling") + print_pending(cc, batches, logging) + continue + if isinstance(proc, CallableTest): status = "finished" if proc.run.exitcode == 0 else "FAILED" - fail += proc.run.exitcode - if fail_fast and fail > 0: - raise TestCaseFailure("Some tests have failed") print( proc.test_case.cmd, f"{status} in {proc.run.run_time:.2f} seconds" ) + if proc.run.exitcode != 0: + proc.print_log_files() + if fail_fast: + raise TestCaseFailure("Some tests have failed") + fail += proc.run.exitcode cc.cores_avail += batches[proc.batch_index].cores cc.fin[proc.batch_index] += 1 if finished(): diff --git a/phlop/testing/test_cases.py b/phlop/testing/test_cases.py index ff3b4ea..782062e 100644 --- a/phlop/testing/test_cases.py +++ b/phlop/testing/test_cases.py @@ -18,7 +18,6 @@ from phlop.sys import extend_sys_path _LOG_DIR = Path(os.environ.get("PHLOP_LOG_DIR", os.getcwd())) - CMD_PREFIX = "" CMD_POSTFIX = "" @@ -143,8 +142,10 @@ def determine_cores_for_test_case(test_case): def binless(test_case): - if test_case.cmd.startswith("/usr/bin/"): - test_case.cmd = test_case.cmd[9:] + if test_case.cmd.startswith("/usr/"): + bits = test_case.cmd.split(" ") + test_case.cmd = " ".join([bits[0].split("/")[-1]] + bits[1:]) + # print("test_case.cmd ", test_case.cmd) return test_case diff --git a/phlop/timing/scope_timer.py b/phlop/timing/scope_timer.py index 453dfc7..8886f95 100644 --- a/phlop/timing/scope_timer.py +++ b/phlop/timing/scope_timer.py @@ -2,9 +2,10 @@ # parsing PHARE scope funtion timers # -import numpy as np -from pathlib import Path from dataclasses import dataclass, field +from pathlib import Path + +import numpy as np @dataclass diff --git a/pyproject.toml b/pyproject.toml index 98a8111..b5f248a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "phlop" -version = "0.0.27" +version = "0.0.28" dependencies = [ diff --git a/setup.py b/setup.py index 371a541..a292b94 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="phlop", - version="0.0.27", + version="0.0.28", cmdclass={}, classifiers=[], include_package_data=True, diff --git a/src/phlop/timing/threaded_scope_timer.cpp b/src/phlop/timing/threaded_scope_timer.cpp index 6699273..f00f298 100644 --- a/src/phlop/timing/threaded_scope_timer.cpp +++ b/src/phlop/timing/threaded_scope_timer.cpp @@ -38,7 +38,7 @@ scope_timer::~scope_timer() detail::_current_scope_timer = this->pscope; auto& s = *r.snapshots.emplace_back( // allocated in construtor - std::make_shared(&r, parent, now() - start)); + std::make_shared(&r, parent, start, now() - start)); if (this->pscope) pscope->childs.emplace_back(&s);