Skip to content

Commit

Permalink
Use concurrent.futures
Browse files Browse the repository at this point in the history
  • Loading branch information
ppigazzini committed Jan 24, 2025
1 parent ba760ad commit 99761f9
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions worker/games.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import threading
import time
from base64 import b64decode
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timedelta, timezone
from pathlib import Path
from queue import Empty, Queue
Expand Down Expand Up @@ -387,7 +388,7 @@ def establish_validated_net(remote, testing_dir, net, global_cache):
time.sleep(waitTime)


def run_single_bench(engine, queue):
def run_single_bench(engine):
bench_sig = None
bench_nps = None

Expand All @@ -407,9 +408,11 @@ def run_single_bench(engine, queue):
if "Nodes/second" in line:
bench_nps = float(line.split(": ")[1].strip())

queue.put((bench_sig, bench_nps))
except:
queue.put((None, None))
p.wait()
except (OSError, subprocess.SubprocessError) as e:
raise e

return bench_sig, bench_nps


def verify_signature(engine, signature, active_cores):
Expand All @@ -432,36 +435,26 @@ def verify_signature(engine, signature, active_cores):
)
)

queue = multiprocessing.Queue()

processes = [
multiprocessing.Process(
target=run_single_bench,
args=(engine, queue),
)
for _ in range(active_cores)
]

for p in processes:
p.start()
with ProcessPoolExecutor(max_workers=active_cores) as executor:
try:
results = list(executor.map(run_single_bench, [engine] * active_cores))
except Exception as e:
raise WorkerException("Failed to run engine bench: {}".format(str(e)), e=e)

results = [queue.get() for _ in range(active_cores)]
bench_nps = 0.0

for sig, nps in results:

if sig is None or bench_nps is None:
if sig is None or nps is None:
raise RunException(
"Unable to parse bench output of {}".format(os.path.basename(engine))
)

if int(sig) != int(signature):
message = "Wrong bench in {}, user expected: {} but worker got: {}".format(
os.path.basename(engine),
signature,
sig,
raise RunException(
"Wrong bench in {}, user expected: {} but worker got: {}".format(
os.path.basename(engine), signature, sig
)
)
raise RunException(message)

bench_nps += nps

Expand Down

0 comments on commit 99761f9

Please sign in to comment.