From fad6ddbbdcdf31ce86674fa7375f9ab49f02099f Mon Sep 17 00:00:00 2001 From: MinetaS Date: Tue, 4 Jun 2024 23:36:40 +0900 Subject: [PATCH] Implement time control based estimation improvements --- server/fishtest/rundb.py | 23 ++-- server/fishtest/util.py | 192 ++++++++++++++++++++++------- server/fishtest/views.py | 22 +++- server/utils/delta_update_users.py | 13 +- worker/games.py | 43 +++++-- worker/sri.txt | 2 +- 6 files changed, 223 insertions(+), 72 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 3097575b3..bdec772d5 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -25,16 +25,17 @@ from fishtest.stats.stat_util import SPRT_elo from fishtest.userdb import UserDb from fishtest.util import ( + BASE_NPS, GeneratorAsFileReader, Scheduler, crash_or_time, - estimate_game_duration, + estimate_game_duration_from_run, format_bounds, format_results, get_bad_workers, get_chi2, get_hash, - get_tc_ratio, + get_tc_ratio_from_run, remaining_hours, update_residuals, worker_name, @@ -619,8 +620,8 @@ def aggregate_unfinished_runs(self, username=None): nps += concurrency * task["worker_info"]["nps"] if task["worker_info"]["nps"] != 0: games_per_minute += ( - (task["worker_info"]["nps"] / 691680) - * (60.0 / estimate_game_duration(run["args"]["tc"])) + (task["worker_info"]["nps"] / BASE_NPS) + * (60.0 / estimate_game_duration_from_run(run)) * ( int(task["worker_info"]["concurrency"]) // run["args"].get("threads", 1) @@ -719,7 +720,7 @@ def calc_itp(self, run, count): # The primary adjustment is derived from a power law of test TC relative to STC, so that long TCs compromise # between worse latency and chewing too many cores. - tc_ratio = get_tc_ratio(run["args"]["tc"], run["args"]["threads"]) + tc_ratio = get_tc_ratio_from_run(run) # Discount longer test itp-per-TC without boosting sub-STC tests if tc_ratio > 1: # LTC/STC tc_ratio = 6, target latency ratio = 3/2, @@ -761,7 +762,7 @@ def worker_cap(self, run, worker_info): # during the time interval determined by "self.task_duration". # Make sure the result is properly quantized and not zero. - game_time = estimate_game_duration(run["args"]["tc"]) + game_time = estimate_game_duration_from_run(run) concurrency = worker_info["concurrency"] // run["args"]["threads"] assert concurrency >= 1 # as we have more tasks done (>250), make them longer to avoid @@ -1012,14 +1013,14 @@ def priority(run): # lower is better # and windows workers only to LTC jobs if max_threads >= 29: if "windows" in worker_info["uname"].lower(): - tc_too_short = get_tc_ratio(run["args"]["tc"], base="55+0.5") < 1.0 - else: tc_too_short = ( - get_tc_ratio( - run["args"]["tc"], run["args"]["threads"], "35+0.3" - ) + get_tc_ratio_from_run(run, base="55+0.5") + / run["args"]["threads"] < 1.0 ) + else: + tc_too_short = get_tc_ratio_from_run(run, base="35+0.3") < 1.0 + if tc_too_short: continue diff --git a/server/fishtest/util.py b/server/fishtest/util.py index 6e5bbb817..dab4f3018 100644 --- a/server/fishtest/util.py +++ b/server/fishtest/util.py @@ -15,6 +15,146 @@ FISH_URL = "https://tests.stockfishchess.org/tests/view/" +# ============================================================================ +# Time control utility functions + +BASE_NPS = 691680 + + +class TC: + def __init__(self, base, inc=0, moves=0, nodestime=None): + self.base = base + self.inc = inc + self.moves = moves + self.nodestime = nodestime + + def __str__(self): + tc_str = "" + if self.moves > 0: + tc_str += "{:d}/".format(self.moves) + + tc_str += "{:.3f}".format(self.base) + + if self.inc > 0: + tc_str += "+{:.3f}".format(self.inc) + + return tc_str + + def scale(self, factor): + self.base *= factor + self.inc *= factor + + def total_time(self, moves): + t = self.base + if self.moves > 0: + t *= moves / self.moves + t += self.inc * (moves - 1) + if self.nodestime is not None: + t *= self.nodestime * 1000 / BASE_NPS + return t + + @staticmethod + def parse(tc): + chunks = tc.split("+") + increment = 0.0 + if len(chunks) == 2: + increment = float(chunks[1]) + + chunks = chunks[0].split("/") + num_moves = 0 + if len(chunks) == 2: + num_moves = int(chunks[0]) + + t = chunks[-1] + chunks = t.split(":") + if len(chunks) == 2: + base = float(chunks[0]) * 60 + float(chunks[1]) + else: + base = float(chunks[0]) + + return TC(base, increment, num_moves) + + +def _tc_estimate_engine_time(tc: TC): + # Estimated number of moves for each game. (fishtest LTC results) + game_moves = 68 + + return tc.total_time(game_moves) + + +def _tc_estimate_game_duration(test_tc: TC, base_tc: TC = None): + # Reduced to 92% because on average a game is stopped earlier (LTC fishtest result). + scale = 0.92 + + if base_tc is None: + return _tc_estimate_engine_time(test_tc) * 2 * scale + else: + return ( + _tc_estimate_engine_time(test_tc) + _tc_estimate_engine_time(base_tc) + ) * scale + + +def _tc_ratio(tc: TC, base_tc: TC = None, threads: int = 1): + """Get TC ratio relative to the `base`, which defaults to standard STC. + Example: standard LTC is 6x, SMP-STC is 4x.""" + + if base_tc is None: + base_tc = TC(10, 0.1) + + return threads * _tc_estimate_engine_time(tc) / _tc_estimate_engine_time(base_tc) + + +def estimate_game_duration( + tc: str, nodestime: int = None, base_tc: str = None, base_nodestime: int = None +): + _test = TC.parse(tc) + _test.nodestime = nodestime + + if base_tc is None: + _base = None + else: + _base = TC.parse(base_tc) + _base.nodestime = base_nodestime + + return _tc_estimate_game_duration(_test, _base) + + +def estimate_game_duration_from_run(run): + base_tc = run["args"]["tc"] + base_nodestime = get_nodestime(run["args"]["base_options"]) + test_tc = run["args"].get("new_tc", base_tc) + test_nodestime = get_nodestime(run["args"]["new_options"]) + return estimate_game_duration(test_tc, test_nodestime, base_tc, base_nodestime) + + +def get_tc_ratio(tc, nodestime=None, threads=1, base="10+0.1", base_nodestime=None): + """Get TC ratio relative to the `base`, which defaults to standard STC. + Example: standard LTC is 6x, SMP-STC is 4x.""" + _tc = TC.parse(tc) + _tc.nodestime = nodestime + _base = TC.parse(base) + _base.nodestime = base_nodestime + return _tc_ratio(_tc, _base, threads) + + +def get_tc_ratio_from_run(run, base: str = "10+0.1", base_nodestime: int = None): + threads = run["args"]["threads"] + _base = TC.parse(base) + _base.nodestime = base_nodestime + + _tc = TC.parse(run["args"]["tc"]) + _tc.nodestime = get_nodestime(run["args"]["base_options"]) + base_ratio = _tc_ratio(_tc, _base, threads) + + _tc = TC.parse(run["args"].get("new_tc", run["args"]["tc"])) + _tc.nodestime = get_nodestime(run["args"]["new_options"]) + test_ratio = _tc_ratio(_tc, _base, threads) + + return (base_ratio + test_ratio) / 2 + + +# ============================================================================ + class GeneratorAsFileReader: def __init__(self, generator): @@ -313,48 +453,9 @@ def format_results(run_results, run): return result -@cache # A single hash lookup should be much cheaper than parsing a string -def estimate_game_duration(tc): - # Total time for a game is assumed to be the double of tc for each player - # reduced for 92% because on average a game is stopped earlier (LTC fishtest result). - scale = 2 * 0.92 - # estimated number of moves per game (LTC fishtest result) - game_moves = 68 - - chunks = tc.split("+") - increment = 0.0 - if len(chunks) == 2: - increment = float(chunks[1]) - - chunks = chunks[0].split("/") - num_moves = 0 - if len(chunks) == 2: - num_moves = int(chunks[0]) - - time_tc = chunks[-1] - chunks = time_tc.split(":") - if len(chunks) == 2: - time_tc = float(chunks[0]) * 60 + float(chunks[1]) - else: - time_tc = float(chunks[0]) - - if num_moves > 0: - time_tc = time_tc * (game_moves / num_moves) - - return (time_tc + (increment * game_moves)) * scale - - -def get_tc_ratio(tc, threads=1, base="10+0.1"): - """Get TC ratio relative to the `base`, which defaults to standard STC. - Example: standard LTC is 6x, SMP-STC is 4x.""" - return threads * estimate_game_duration(tc) / estimate_game_duration(base) - - def is_active_sprt_ltc(run): return ( - not run["finished"] - and "sprt" in run["args"] - and get_tc_ratio(run["args"]["tc"], run["args"]["threads"]) > 4 + not run["finished"] and "sprt" in run["args"] and get_tc_ratio_from_run(run) > 4 ) # SMP-STC ratio is 4 @@ -429,7 +530,7 @@ def remaining_hours(run): return 0 # Assume all tests use default book (UHO_4060_v3). - book_positions = 242201 + book_positions = 2632036 t = scipy.stats.beta(1, 15).cdf(min(N / book_positions, 1.0)) expected_games_llr = int(2 * N * llr_bound / llr) expected_games = min( @@ -440,7 +541,7 @@ def remaining_hours(run): else: expected_games = run["args"]["num_games"] remaining_games = max(0, expected_games - r["wins"] - r["losses"] - r["draws"]) - game_secs = estimate_game_duration(run["args"]["tc"]) + game_secs = estimate_game_duration_from_run(run) return game_secs * remaining_games * int(run["args"].get("threads", 1)) / (60 * 60) @@ -542,6 +643,13 @@ def get_hash(s): return 0 +def get_nodestime(s): + h = re.search("nodestime=([0-9]+)", s) + if h: + return int(h.group(1)) + return None + + # Avoids exposing sensitive data about the workers to the client and skips some heavy data. def strip_run(run): # a deep copy, avoiding copies of a few large lists. diff --git a/server/fishtest/views.py b/server/fishtest/views.py index fb80e95d8..941e82a6a 100644 --- a/server/fishtest/views.py +++ b/server/fishtest/views.py @@ -19,6 +19,7 @@ format_results, get_chi2, get_hash, + get_nodestime, get_tc_ratio, github_repo_valid, password_strength, @@ -1049,7 +1050,26 @@ def strip_message(m): # This means a batch with be completed in roughly 2 minutes on a 8 core worker. # This expression adjusts the batch size for threads and TC, to keep timings somewhat similar. sprt_batch_size_games = 2 * max( - 1, int(0.5 + 16 / get_tc_ratio(data["tc"], data["threads"])) + 1, + int( + 0.5 + + 16 + / ( + ( + get_tc_ratio( + data["tc"], + get_nodestime(data["base_options"]), + data["threads"], + ) + + get_tc_ratio( + data.get("new_tc", data["tc"]), + get_nodestime(data["new_options"]), + data["threads"], + ) + ) + / 2 + ) + ), ) assert sprt_batch_size_games % 2 == 0 elo_model = request.POST["elo_model"] diff --git a/server/utils/delta_update_users.py b/server/utils/delta_update_users.py index b47cfc9af..4ae9beafe 100644 --- a/server/utils/delta_update_users.py +++ b/server/utils/delta_update_users.py @@ -4,7 +4,12 @@ from datetime import datetime, timedelta, timezone from fishtest.rundb import RunDb -from fishtest.util import delta_date, diff_date, estimate_game_duration +from fishtest.util import ( + BASE_NPS, + delta_date, + diff_date, + estimate_game_duration_from_run, +) from pymongo import DESCENDING @@ -48,8 +53,8 @@ def compute_games_rates(rundb, info_tuple): # use the reference core nps, also set in rundb.py and games.py for machine in rundb.get_machines(): games_per_hour = ( - (machine["nps"] / 691680) - * (3600.0 / estimate_game_duration(machine["run"]["args"]["tc"])) + (machine["nps"] / BASE_NPS) + * (3600.0 / estimate_game_duration_from_run(machine["run"])) * (int(machine["concurrency"]) // machine["run"]["args"].get("threads", 1)) ) for info in info_tuple: @@ -66,7 +71,7 @@ def process_run(run, info): return # Update the information for the workers contributed by the users - tc = estimate_game_duration(run["args"]["tc"]) + tc = estimate_game_duration_from_run(run) for task in run["tasks"]: if "worker_info" not in task: continue diff --git a/worker/games.py b/worker/games.py index 1caa1ed1b..edb0b7a18 100644 --- a/worker/games.py +++ b/worker/games.py @@ -810,7 +810,6 @@ def adjust_tc(tc, factor): scaled_tc = "{}/{}".format(num_moves, scaled_tc) tc_limit *= 100.0 / num_moves - print("CPU factor : {} - tc adjusted to {}".format(factor, scaled_tc)) return scaled_tc, tc_limit @@ -1417,14 +1416,39 @@ def parse_options(s): # 0.7 Mnps as threshold for the slow worker. # also set in rundb.py and delta_update_users.py factor = 691680 / base_nps + print("CPU factor: {:.6f}".format(factor)) # Adjust CPU scaling. _, tc_limit_ltc = adjust_tc("60+0.6", factor) - scaled_tc, tc_limit = adjust_tc(run["args"]["tc"], factor) - scaled_new_tc = scaled_tc - if "new_tc" in run["args"]: - scaled_new_tc, new_tc_limit = adjust_tc(run["args"]["new_tc"], factor) - tc_limit = (tc_limit + new_tc_limit) / 2 + tc_limit, new_tc_limit = None, None + scaled_tc = run["args"]["tc"] + scaled_new_tc = ( + run["args"]["new_tc"] if "new_tc" in run["args"] else run["args"]["tc"] + ) + + for option in base_options: + if "nodestime" in option: + nodestime = int(option.split("=")[1]) + real_tc, tc_limit = adjust_tc(scaled_tc, nodestime * 1000 / 691680) + print("Base TC using {} in real time".format(real_tc)) + break + + if tc_limit is None: + scaled_tc, tc_limit = adjust_tc(scaled_tc, factor) + print("Base TC adjusted to {}".format(scaled_tc)) + + for option in new_options: + if "nodestime" in option: + nodestime = int(option.split("=")[1]) + real_tc, new_tc_limit = adjust_tc(scaled_new_tc, nodestime * 1000 / 691680) + print("New TC using {} in real time".format(real_tc)) + break + + if new_tc_limit is None: + scaled_new_tc, new_tc_limit = adjust_tc(scaled_new_tc, factor) + print("New TC adjusted to {}".format(scaled_new_tc)) + + tc_limit = (tc_limit + new_tc_limit) / 2 result["worker_info"]["nps"] = float(base_nps) result["worker_info"]["ARCH"] = cpu_features @@ -1433,12 +1457,6 @@ def parse_options(s): if not any("Threads" in s for s in new_options + base_options): threads_cmd = ["option.Threads={}".format(threads)] - # If nodestime is being used, give engines extra grace time to - # make time losses virtually impossible. - nodestime_cmd = [] - if any("nodestime" in s for s in new_options + base_options): - nodestime_cmd = ["timemargin=10000"] - def make_player(arg): return run["args"][arg].split(" ")[0] @@ -1546,7 +1564,6 @@ def make_player(arg): + base_options + ["_spsa_"] + ["-each", "proto=uci"] - + nodestime_cmd + threads_cmd + book_cmd ) diff --git a/worker/sri.txt b/worker/sri.txt index b16ced5c6..f768845ea 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 239, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "+ubGHk3rIV0ILVhg1dxpsOkRF8GUaO5otPVnAyw8kKKq9Rqzksv02xj6wjYpSTmA", "games.py": "6vKH51UtL56oNvA539hLXRzgE1ADXy3QZNJohoK94RntM72+iMancSJZHaNjEb5+"} +{"__version": 239, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "+ubGHk3rIV0ILVhg1dxpsOkRF8GUaO5otPVnAyw8kKKq9Rqzksv02xj6wjYpSTmA", "games.py": "+5XP/cODNSyEPER2gt+dacRpwiXU26tiiI0detS4Ct4HE4NVQBUNdkmeWVws7J/Z"}