From 2fe3e07458e9078dcdb1099f6ce10ff0be0a26b6 Mon Sep 17 00:00:00 2001 From: Guilherme Lima Date: Thu, 22 Aug 2024 16:02:47 -0500 Subject: [PATCH 1/6] Manual testing for empty_action Usage: source /consistency/cms_consistency/site_cmp3/empty-test.sh T2_US_Nebraska It is currently set for 10 directories removed (-L 10), and those first 10 have already been removed during manual tests. So the -L argument must be increased if any side effect is to be observed. Default is None, and the original hardcoded value is 10000. For enabling the change for real: cd /consistency/cms_consistency/site_cmp3 cp action/remove_empty_dirs_GL.py action/remove_empty_dirs.py --- actions/config.py | 2 +- actions/remove_empty_dirs.py | 4 +- actions/remove_empty_dirs_GL.py | 427 ++++++++++++++++++++++++++++++++ site_cmp3/empty-test.sh | 42 ++++ 4 files changed, 473 insertions(+), 2 deletions(-) create mode 100644 actions/remove_empty_dirs_GL.py create mode 100644 site_cmp3/empty-test.sh diff --git a/actions/config.py b/actions/config.py index 1b09c936..819ddcbe 100644 --- a/actions/config.py +++ b/actions/config.py @@ -3,7 +3,7 @@ class ActionConfiguration(object): - def __init__(self, rse, source, action, **source_agrs): + def __init__(self, rse, source, action, **source_args): self.Action = action self.Config = CEConfiguration(source)[rse].get(action + "_action", {}) diff --git a/actions/remove_empty_dirs.py b/actions/remove_empty_dirs.py index 496ccbb3..eb7a0961 100644 --- a/actions/remove_empty_dirs.py +++ b/actions/remove_empty_dirs.py @@ -398,9 +398,11 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru is_redirector = config.ServerIsRedirector client = XRootDClient(server, is_redirector, server_root, timeout=timeout) if os.path.isfile(storage_path): - remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) + print("GL: Calling remove_from_file()...") + #remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) run_stats = my_stats else: + print("GL: Calling empty_action()...") run_stats = empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) status = run_stats["status"] error = run_stats.get("error") diff --git a/actions/remove_empty_dirs_GL.py b/actions/remove_empty_dirs_GL.py new file mode 100644 index 00000000..975c2a50 --- /dev/null +++ b/actions/remove_empty_dirs_GL.py @@ -0,0 +1,427 @@ +import sys, os, getopt, time, os.path +from datetime import datetime, timedelta +from pythreader import TaskQueue, Task, Primitive, synchronized + +from run import CCRun, FileNotFoundException +from config import ActionConfiguration +from rucio_consistency import CEConfiguration, Stats +from rucio_consistency.xrootd import XRootDClient + + +Version = "1.0" + +Usage = """ +python remove_empty_dirs.py [options] (|) + -d - dry run + -o (-|) - write confirmed empty directory list to stdout (-) or to a file + -s - file to write stats to + -S - key to store stats under, default: "empty_action" + -c |rucio - load configuration from a YAML file or Rucio + -v - verbose output + + The following will override values read from the configuration: + -L - stop after removing so many directories + -w - max age for oldest run to use for confirmation, default = 35 days + -m - max age for the most recent run, default = 1 day + -M - min age for oldest run, default = 25 + -n - min number of runs to use to produce the confirmed empty directory list, default = 3 +""" + +class LFNConverter(object): + + def __init__(self, site_root, remove_prefix, add_prefix): + self.SiteRoot = site_root + self.RemovePrefix = remove_prefix + self.AddPrefix = add_prefix + + def canonic(self, path): + while path and "//" in path: + path = path.replace("//", "/") + return path + + def path_to_lfn(self, path): + assert path.startswith(self.SiteRoot) + path = self.canonic("/" + path[len(self.SiteRoot):]) + if self.RemovePrefix and path.startswith(self.RemovePrefix): + path = self.canonic("/" + path[len(self.RemovePrefix):]) + if self.AddPrefix: + path = self.canonic(self.AddPrefix + "/" + path) + return path + + def lfn_to_path(self, lfn): + if self.AddPrefix: + assert lfn.startswith(self.AddPrefix) + lfn = lfn[len(self.AddPrefix):] + path = lfn + if self.RemovePrefix: + path = self.RemovePrefix + path + return self.canonic(self.SiteRoot + "/" + path) + + def lfn_or_path_to_path(self, lfn_or_path): + if lfn_or_path.startswith(self.SiteRoot): + return lfn_or_path # already a path + return self.lfn_to_path(lfn_or_path) + +class RemoveDirectoryTask(Task): + + RETRIES = 3 + + def __init__(self, client, path): + Task.__init__(self) + self.Client = client + self.Path = path + self.Retries = self.RETRIES + + def run(self): + return self.Client.rmdir(self.Path) + + +class Remover(Primitive): + + def __init__(self, client, paths, dry_run, limit=None, max_workers=10, verbose=False): + Primitive.__init__(self) + self.Client = client + self.Paths = paths + self.Queue = TaskQueue(max_workers, capacity=max_workers, stagger=0.1, delegate=self) + self.Failed = [] # [(path, error), ...] + self.ErrorCounts = {} + self.RemovedCount = 0 + self.SubmittedCount = 0 + self.Verbose = verbose + self.DryRun = dry_run + self.Limit = limit # max number of dirs to delete + + def shave(self, paths): + # split the list of paths (assumed to be reversely ordered) into leaves and inner nodes + leaves = [] + inner = [] + last_path = None + for path in paths: + if last_path is None or not last_path.startswith(path): + leaves.append(path) + else: + inner.append(path) + last_path = path + return leaves, inner + + def run(self): + paths = sorted(self.Paths, reverse=True) + while paths and (self.Limit is None or self.SubmittedCount < self.Limit): + leaves, inner = self.shave(paths) + for leaf in leaves: + depth = len([p for p in leaf.split('/') if p]) # do not remove root directories like "/store/mc" + if depth <= 2: + print(f"skipping root:", leaf) + continue + if self.Verbose: + print(f"submitting (dry_run={self.DryRun}):", leaf) + if not self.DryRun: + self.Queue.append(RemoveDirectoryTask(self.Client, leaf)) + self.SubmittedCount += 1 + if self.Limit is not None and self.SubmittedCount >= self.Limit: + print(f"Limit of {self.Limit} reached") + break + if self.Verbose: + print("waiting for the queue to be empty...") + self.Queue.waitUntilEmpty() + paths = inner + return self.Failed + + @synchronized + def taskEnded(self, queue, task, result): + status, error = result + error = (error or "").strip() + if self.Verbose: + print("taskEnded:", task.Path, status, error or "") + if status != "OK": + if error == "timeout" and task.Retries > 0: + task.Retries -= 1 + if self.Verbose: + print("resubmitting after timeout:", task.Path) + self.Queue.append(task) + else: + reduced_error = error + while task.Path in reduced_error: + reduced_error = reduced_error.replace(task.Path, "[path]") + self.Failed.append((task.Path, error)) + self.ErrorCounts[reduced_error] = self.ErrorCounts.get(reduced_error, 0) + 1 + else: + self.RemovedCount += 1 + + @synchronized + def taskFailed(self, queue, task, exc_type, exc_value, tb): + if self.Verbose: + print("taskFailed:", task.Path, exc_value) + self.Failed.append((task.Path, str(exc_value))) + + +def parents(path): + # produce list of all parents for the path + while path and path != '/' and '/' in path: + path = path.rsplit('/', 1)[0] + yield path + +def remove_from_file(file_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit): + paths = [l.strip() for l in open(file_path, "r")] + failed = Remover(client, paths, verbose=verbose, limit=limit).run() + for path, error in failed: + print("Failed:", path, error) + return my_stats + +def update_confirmed(confirmed, update): + new_confirmed = confirmed & update + unconfirmed = confirmed - update + for path in unconfirmed: + for parent in parents(path): + try: new_confirmed.remove(parent) + except KeyError: pass + return new_confirmed + +def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit): + + my_stats["start_time"] = t0 = time.time() + if stats is not None: + stats.update_section(stats_key, my_stats) + + runs = list(CCRun.runs_for_rse(storage_path, rse, complete_only=False)) + now = datetime.now() + + for r in runs: + print(r.Run, ": in window:", r.Timestamp >= now - timedelta(days=window), + " ED info collected:", r.empty_directories_collected(), + " count:", r.empty_directory_count(), + " list present:", r.empty_dir_list_exists() + ) + recent_runs = sorted( + [r for r in runs + if True + #and (print(r.Run, r.Timestamp >= now - timedelta(days=window), r.empty_directories_collected(), r.empty_directory_count()) or True) + and (r.Timestamp >= now - timedelta(days=window)) + and r.empty_directories_collected() + and r.empty_directory_count() is not None + and r.empty_dir_list_exists() + #and (print(r.Run, r.Timestamp >= now - timedelta(days=window), r.empty_directories_collected(), r.empty_directory_count()) or True) + ], + key=lambda r: r.Timestamp + ) + + print("Usable runs:") + for r in recent_runs: + print(" ", r.Run) + + status = "started" + aborted_reason = None + confirmed_empty_count = None + detected_empty_count = None + failed_count = 0 + removed_count = 0 + error_counts = {} + error = None + + if recent_runs: + my_stats["runs_compared"] = [r.Run for r in recent_runs] + + if not recent_runs or len(recent_runs) < min_runs: + status = "aborted" + aborted_reason = "not enough runs to produce confirmed empty directories list: %d, required: %d" % (len(recent_runs), min_runs) + else: + first_run = recent_runs[0] + latest_run = recent_runs[-1] + num_scanned = latest_run.scanner_num_files() + detected_empty_count = latest_run.empty_directory_count() + print("Runs in the confirmation history:", len(recent_runs)) + print("First run:", first_run.Run, file=sys.stderr) + print("Last run:", latest_run.Run, file=sys.stderr) + print("Empty directories in last run:", latest_run.empty_directory_count(), file=sys.stderr) + + if latest_run.Timestamp < now - timedelta(days=max_age_last): + status = "aborted" + aborted_reason = "latest run is too old: %s, required: < %d days old" % (latest_run.Timestamp, max_age_last) + + elif first_run.Timestamp > now - timedelta(days=min_age_first): + status = "aborted" + aborted_reason = "oldest run is not old enough: %s, required: > %d days old" % (first_run.Timestamp, min_age_first) + + else: + # compute confirmed list and make sure the list would contain only removable directories + confirmed = set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[0].empty_directories()) + confirmed = update_confirmed(confirmed, set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[-1].empty_directories())) + for run in recent_runs[1:-1]: + if not confirmed: + break + run_set = set(lfn_converter.lfn_or_path_to_path(path) for path in run.empty_directories()) + confirmed = update_confirmed(confirmed, run_set) + + confirmed_empty_count = len(confirmed) + print("Confirmed empty directories:", confirmed_empty_count, file=sys.stderr) + + my_stats.update( + detected_empty_directories = detected_empty_count, + confirmed_empty_directories = confirmed_empty_count + ) + if stats is not None: + stats.update_section(stats_key, my_stats) + + status = "done" + if confirmed: + if out is not None: + for f in sorted(confirmed): + print(f, file=out) + if out is not sys.stdout: + out.close() + try: + remover = Remover(client, confirmed, dry_run, verbose=verbose, limit=limit) + failed = remover.run() + failed_count = len(failed) + removed_count = remover.RemovedCount + error_counts = remover.ErrorCounts + except Exception as e: + error = f"remover error: {e}" + status = "failed" + + t1 = time.time() + my_stats.update( + elapsed = t1-t0, + end_time = t1, + status = status, + error = error, + detected_empty_directories = detected_empty_count, + confirmed_empty_directories = confirmed_empty_count, + failed_count = failed_count, + removed_count = removed_count, + aborted_reason = aborted_reason, + error_counts = error_counts + ) + + if stats is not None: + stats.update_section(stats_key, my_stats) + + return my_stats + +if not sys.argv[1:] or sys.argv[1] == "help": + print(Usage) + sys.exit(2) + +opts, args = getopt.getopt(sys.argv[1:], "h?o:M:m:w:n:f:s:S:c:va:dL:") +opts = dict(opts) + +if not args or "-h" in opts or "-?" in opts: + print(Usage) + sys.exit(2) + +out = None +out_path = opts.get("-o") +out_filename = out_path.rsplit('/', 1)[-1] if out_path else None +if out_path: + if out_path == "-": + out = sys.stdout + else: + out = open(out_path, "w") + + +storage_path, rse = args + +config = ActionConfiguration(rse, opts["-c"], "dark") +scanner_config = CEConfiguration(opts["-c"])[rse].get("scanner", {}) + +window = int(opts.get("-w", config.get("confirmation_window", 35))) +min_age_first = int(opts.get("-M", config.get("min_age_first_run", 25))) +max_age_last = int(opts.get("-m", config.get("max_age_last_run", 1))) +fraction = float(opts.get("-f", config.get("max_fraction", 0.01))) +min_runs = int(opts.get("-n", config.get("min_runs", 3))) +account = opts.get("-a") +dry_run = "-d" in opts +verbose = "-v" in opts +limit = opts.get("-L") +if limit: limit = int(limit) + + +if dry_run: + print("====== dry run mode ======") + +stats_file = opts.get("-s") +stats = stats_key = None +if stats_file is not None: + stats = Stats(stats_file) +stats_key = opts.get("-S", "empty_action") + +if "-v" in opts: + print("\nParameters:") + print(" dry run: ", dry_run) + print(" stats file: ", stats_file) + print(" stats key: ", stats_key) + print(" config: ", opts.get("-c")) + print(" confirmation window: ", window) + print(" min age for last run: ", min_age_first) + print(" max age for first run: ", max_age_last) + print(" min number of runs: ", min_runs) + print(" limit: ", "no limit" if limit is None else limit) + print() + print("Scanner:") + print(f"{scanner_config}") + print(" server: ", scanner_config["server"]) + print(" serverRoot: ", scanner_config["server_root"]) + print(" add prefix: ", scanner_config["add_prefix"]) + print(" remove prefix: ", scanner_config["remove_prefix"]) + print() + +my_stats = { + "version": Version, + "dry_run": dry_run, + "elapsed": None, + "start_time": None, + "end_time": None, + "status": "started", + "detected_empty_directories": None, + "confirmed_empty_directories": None, + "removed_count": 0, + "failed_count": 0, + "confirmed_list": out_filename, + "aborted_reason": None, + "error": None, + "runs_compared": None, + "limit": limit, + "configuration": { + "confirmation_window": window, + "min_age_first_run": min_age_first, + "max_age_last_run": max_age_last, + "min_runs": min_runs + } +} + +if stats is not None: + stats.update_section(stats_key, my_stats) + +server = scanner_config["server"] +server_root = scanner_config["server_root"] +add_prefix = scanner_config["add_prefix"] +remove_prefix = scanner_config["remove_prefix"] + +lfn_converter = LFNConverter(server_root, remove_prefix, add_prefix) + +timeout = scanner_config["timeout"] +is_redirector = False # config.ServerIsRedirector +client = XRootDClient(server, is_redirector, server_root, timeout=timeout) +if os.path.isfile(storage_path): + remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) + run_stats = my_stats +else: + run_stats = empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) +status = run_stats["status"] +error = run_stats.get("error") +aborted_reason = run_stats.get("aborted_reason") + +if verbose: + print("\nFinal stats:") + for k, v in sorted(run_stats.items()): + print("%s = %s" % (k, v)) + print() + +print("Final status:", status, file=sys.stderr) +if status == "aborted": + print(" Reason:", aborted_reason, file=sys.stderr) +elif status != "done": + print(" Error:", error, file=sys.stderr) + +if status != "done": + sys.exit(1) diff --git a/site_cmp3/empty-test.sh b/site_cmp3/empty-test.sh new file mode 100644 index 00000000..db091f2b --- /dev/null +++ b/site_cmp3/empty-test.sh @@ -0,0 +1,42 @@ +#!/bin/bash +x +# file: empty-test + +#.. check if sourcing or running from a subprocess +shelltag=`echo $0 | grep bash` +if [ $shelltag"x" == "x" ]; then + cmd=$0 + myexit=exit +else + cmd=./empty-test + myexit=return +fi + +if [[ $# -lt 1 ]]; then + echo "" + echo "Usage: ${cmd} " + $myexit +fi + +#.. input +RSE=$1 +dump=/var/cache/consistency-dump +temp=/var/cache/consistency-temp + +#.. output +out=/var/cache/test + +ls -t1 ${dump}/${RSE}*stats.json | sed 's#_stats.json##' | sed "s#${dump}/${RSE}_##" > ${out}/${RSE}-dates.out +#len=$(expr length "${RSE}_") + +last=`head -1 ${out}/${RSE}-dates.out` +now=`date -u +%Y_%m_%d_%H_%M` + +ED_errors=${out}/${RSE}_${now}_ED.errors +export PYTHONPATH=/consistency/cms_consistency/cmp3 + +/usr/bin/python3 actions/remove_empty_dirs_GL.py -d -v \ + -c ${dump}/${RSE}_${last}_config.yaml \ + -s ${out}/${RSE}_${now}_stats.json \ + ${dump} \ + ${RSE} \ + > ${ED_errors} From 52e088164b412db6569376752c95e7a34f595b22 Mon Sep 17 00:00:00 2001 From: Guilherme Lima Date: Thu, 22 Aug 2024 16:46:06 -0500 Subject: [PATCH 2/6] Remove spurious changes --- actions/remove_empty_dirs.py | 4 +--- cmp3/run.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/actions/remove_empty_dirs.py b/actions/remove_empty_dirs.py index eb7a0961..496ccbb3 100644 --- a/actions/remove_empty_dirs.py +++ b/actions/remove_empty_dirs.py @@ -398,11 +398,9 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru is_redirector = config.ServerIsRedirector client = XRootDClient(server, is_redirector, server_root, timeout=timeout) if os.path.isfile(storage_path): - print("GL: Calling remove_from_file()...") - #remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) + remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) run_stats = my_stats else: - print("GL: Calling empty_action()...") run_stats = empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) status = run_stats["status"] error = run_stats.get("error") diff --git a/cmp3/run.py b/cmp3/run.py index ae45b784..91d61a50 100644 --- a/cmp3/run.py +++ b/cmp3/run.py @@ -11,7 +11,7 @@ def __init__(self, dir_path, rse, run): self.RSE = rse self.Timestamp = CCRun.parse_run(run) self.Stats = CCRun.get_stats(dir_path, rse, run) - + def is_complete(self): return self.Stats.get("cmp3", {}).get("status") == "done" From 404e9d612e63004bac88adf71e09443a6e8bddc4 Mon Sep 17 00:00:00 2001 From: Guilherme Lima Date: Fri, 16 Aug 2024 18:44:38 -0500 Subject: [PATCH 3/6] First working version of empty_action, for removal of empty directories - Update empty_action version from 1.0 to 1.1 - Replace use of unknown class EmptyActionConfiguration with ActionConfiguration - Using dark: configuration parameters for now - Save relevant scanner config parameters to output - Add file action/remove_empty_dirs_GL.py for manually testing / developing empty_action - Add test scripts in site_cmp3 - Lots of white space cleanup - Ensure that every run produces *compressed* list of empty directories --- actions/remove_empty_dirs.py | 63 ++++++++++++++++++--------------- actions/remove_empty_dirs_GL.py | 2 +- site_cmp3/empty-test.sh | 3 +- site_cmp3/site-scan-test.sh | 45 +++++++++++++++++++++++ site_cmp3/site_cmp3.sh | 4 +-- 5 files changed, 84 insertions(+), 33 deletions(-) mode change 100644 => 100755 site_cmp3/empty-test.sh create mode 100755 site_cmp3/site-scan-test.sh diff --git a/actions/remove_empty_dirs.py b/actions/remove_empty_dirs.py index 496ccbb3..a5993843 100644 --- a/actions/remove_empty_dirs.py +++ b/actions/remove_empty_dirs.py @@ -7,11 +7,8 @@ from rucio_consistency import CEConfiguration, Stats from rucio_consistency.xrootd import XRootDClient -from run import CCRun -from config import ActionConfiguration - -Version = "1.0" +Version = "1.1" Usage = """ python remove_empty_dirs.py [options] (|) @@ -31,7 +28,7 @@ """ class LFNConverter(object): - + def __init__(self, site_root, remove_prefix, add_prefix): self.SiteRoot = site_root self.RemovePrefix = remove_prefix @@ -59,16 +56,16 @@ def lfn_to_path(self, lfn): if self.RemovePrefix: path = self.RemovePrefix + path return self.canonic(self.SiteRoot + "/" + path) - + def lfn_or_path_to_path(self, lfn_or_path): if lfn_or_path.startswith(self.SiteRoot): return lfn_or_path # already a path return self.lfn_to_path(lfn_or_path) class RemoveDirectoryTask(Task): - + RETRIES = 3 - + def __init__(self, client, path): Task.__init__(self) self.Client = client @@ -80,7 +77,7 @@ def run(self): class Remover(Primitive): - + def __init__(self, client, paths, dry_run, limit=None, max_workers=10, verbose=False): Primitive.__init__(self) self.Client = client @@ -170,7 +167,7 @@ def remove_from_file(file_path, rse, out, lfn_converter, stats, stats_key, dry_r for path, error in failed: print("Failed:", path, error) return my_stats - + def update_confirmed(confirmed, update): new_confirmed = confirmed & update unconfirmed = confirmed - update @@ -181,21 +178,22 @@ def update_confirmed(confirmed, update): return new_confirmed def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit): - + my_stats["start_time"] = t0 = time.time() if stats is not None: stats.update_section(stats_key, my_stats) runs = list(CCRun.runs_for_rse(storage_path, rse, complete_only=False)) now = datetime.now() + for r in runs: - print(r.Run, ": in window:", r.Timestamp >= now - timedelta(days=window), - " ED info collected:", r.empty_directories_collected(), + print(r.Run, ": in window:", r.Timestamp >= now - timedelta(days=window), + " ED info collected:", r.empty_directories_collected(), " count:", r.empty_directory_count(), " list present:", r.empty_dir_list_exists() ) recent_runs = sorted( - [r for r in runs + [r for r in runs if True #and (print(r.Run, r.Timestamp >= now - timedelta(days=window), r.empty_directories_collected(), r.empty_directory_count()) or True) and (r.Timestamp >= now - timedelta(days=window)) @@ -203,7 +201,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru and r.empty_directory_count() is not None and r.empty_dir_list_exists() #and (print(r.Run, r.Timestamp >= now - timedelta(days=window), r.empty_directories_collected(), r.empty_directory_count()) or True) - ], + ], key=lambda r: r.Timestamp ) @@ -219,7 +217,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru removed_count = 0 error_counts = {} error = None - + if recent_runs: my_stats["runs_compared"] = [r.Run for r in recent_runs] @@ -270,8 +268,8 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru for f in sorted(confirmed): print(f, file=out) if out is not sys.stdout: - out.close() - try: + out.close() + try: remover = Remover(client, confirmed, dry_run, verbose=verbose, limit=limit) failed = remover.run() failed_count = len(failed) @@ -297,7 +295,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru if stats is not None: stats.update_section(stats_key, my_stats) - + return my_stats if not sys.argv[1:] or sys.argv[1] == "help": @@ -323,8 +321,8 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru storage_path, rse = args -config = EmptyActionConfiguration(rse, opts["-c"]) -scanner_config = ScannerConfiguration(rse, opts["-c"]) +config = ActionConfiguration(rse, opts["-c"], "dark") +scanner_config = CEConfiguration(opts["-c"])[rse].get("scanner", {}) window = int(opts.get("-w", config.get("confirmation_window", 35))) min_age_first = int(opts.get("-M", config.get("min_age_first_run", 25))) @@ -337,6 +335,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru limit = opts.get("-L") if limit: limit = int(limit) + if dry_run: print("====== dry run mode ======") @@ -358,6 +357,14 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru print(" min number of runs: ", min_runs) print(" limit: ", "no limit" if limit is None else limit) print() + print("Scanner:") + print(f"{scanner_config}") + print(" server: ", scanner_config["server"]) + print(" serverRoot: ", scanner_config["server_root"]) + print(" add prefix: ", scanner_config["add_prefix"]) + print(" remove prefix: ", scanner_config["remove_prefix"]) + print(" timeout: ", scanner_config["timeout"]) + print() my_stats = { "version": Version, @@ -386,16 +393,15 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru if stats is not None: stats.update_section(stats_key, my_stats) -server = config.Server -server_root = config.ServerRoot - -add_prefix = scanner_config.AddPrefix -remove_prefix = scanner_config.RemovePrefix +server = scanner_config["server"] +server_root = scanner_config["server_root"] +add_prefix = scanner_config["add_prefix"] +remove_prefix = scanner_config["remove_prefix"] lfn_converter = LFNConverter(server_root, remove_prefix, add_prefix) -timeout = config.ScannerTimeout -is_redirector = config.ServerIsRedirector +timeout = scanner_config["timeout"] +is_redirector = False # config.ServerIsRedirector client = XRootDClient(server, is_redirector, server_root, timeout=timeout) if os.path.isfile(storage_path): remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) @@ -420,4 +426,3 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru if status != "done": sys.exit(1) - diff --git a/actions/remove_empty_dirs_GL.py b/actions/remove_empty_dirs_GL.py index 975c2a50..973f2295 100644 --- a/actions/remove_empty_dirs_GL.py +++ b/actions/remove_empty_dirs_GL.py @@ -358,11 +358,11 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru print(" limit: ", "no limit" if limit is None else limit) print() print("Scanner:") - print(f"{scanner_config}") print(" server: ", scanner_config["server"]) print(" serverRoot: ", scanner_config["server_root"]) print(" add prefix: ", scanner_config["add_prefix"]) print(" remove prefix: ", scanner_config["remove_prefix"]) + print(" timeout: ", scanner_config["timeout"]) print() my_stats = { diff --git a/site_cmp3/empty-test.sh b/site_cmp3/empty-test.sh old mode 100644 new mode 100755 index db091f2b..b5265f02 --- a/site_cmp3/empty-test.sh +++ b/site_cmp3/empty-test.sh @@ -34,9 +34,10 @@ now=`date -u +%Y_%m_%d_%H_%M` ED_errors=${out}/${RSE}_${now}_ED.errors export PYTHONPATH=/consistency/cms_consistency/cmp3 -/usr/bin/python3 actions/remove_empty_dirs_GL.py -d -v \ +/usr/bin/python3 actions/remove_empty_dirs_GL.py -v \ -c ${dump}/${RSE}_${last}_config.yaml \ -s ${out}/${RSE}_${now}_stats.json \ + -L 100 \ ${dump} \ ${RSE} \ > ${ED_errors} diff --git a/site_cmp3/site-scan-test.sh b/site_cmp3/site-scan-test.sh new file mode 100755 index 00000000..f10d08bb --- /dev/null +++ b/site_cmp3/site-scan-test.sh @@ -0,0 +1,45 @@ +#!/bin/bash +x +# file: site-scan-test.sh + +#.. check if sourcing or running from a subprocess +shelltag=`echo $0 | grep bash` +if [ $shelltag"x" == "x" ]; then + cmd=$0 + myexit=exit +else + cmd=./site-scan-test.sh + myexit=return +fi + +if [[ $# -lt 1 ]]; then + echo "" + echo "Usage: ${cmd} " + $myexit +fi + +#.. input +RSE=$1 +dump=/var/cache/consistency-dump +temp=/var/cache/consistency-temp + +#.. output +out=/var/cache/test + +ls -t1 ${dump}/${RSE}*stats.json | sed 's#_stats.json##' | sed "s#${dump}/${RSE}_##" > ${out}/${RSE}-dates.out +#len=$(expr length "${RSE}_") + +last=`head -1 ${out}/${RSE}-dates.out` +now=`date -u +%Y_%m_%d_%H_%M` + +r_prefix=${out}/${RSE}_R +root_file_counts=${dump}/${RSE}_${last}_root_file_counts.json +scanner_errors=${out}/${RSE}_${now}_errors.out +empty_dirs=${out}/${RSE}_${now}_ED.out + +rce_scan -z -t 10 \ + -c ${dump}/${RSE}_${last}_config.yaml \ + -s ${out}/${RSE}_${now}_stats.json \ + -o ${r_prefix} \ + -r ${root_file_counts} \ + -E 2 -e ${empty_dirs} \ + ${RSE} > ${scanner_errors} diff --git a/site_cmp3/site_cmp3.sh b/site_cmp3/site_cmp3.sh index c87c4834..47d14c52 100755 --- a/site_cmp3/site_cmp3.sh +++ b/site_cmp3/site_cmp3.sh @@ -154,13 +154,13 @@ echo r_prefix=${scratch}/${RSE}_R rm -f ${r_prefix}.* -empty_dirs_out=${out}/${RSE}_${now}_ED.list +empty_dirs_out=${out}/${RSE}_${now}_ED.list.gz echo "Site scan..." > ${scanner_errors} rce_scan -z -c ${merged_config_file} -s ${stats} \ -o ${r_prefix} \ -r $root_file_counts \ - -E 2 -e $empty_dirs_out \ + -E 1 -e $empty_dirs_out \ ${RSE} 2>> ${scanner_errors} scanner_status=$? if [ "$scanner_status" != "0" ]; then From 394f09b2f6e44d430e28deaeeaf89ab23068e5fa Mon Sep 17 00:00:00 2001 From: Guilherme Lima Date: Fri, 11 Oct 2024 14:50:17 -0500 Subject: [PATCH 4/6] Minor cleanup --- actions/declare_dark.py | 8 ++++---- actions/declare_missing.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/actions/declare_dark.py b/actions/declare_dark.py index 539c1419..17af8bc7 100644 --- a/actions/declare_dark.py +++ b/actions/declare_dark.py @@ -32,7 +32,7 @@ def chunked(lst, chunk_size=1000): yield lst[i:i+chunk_size] def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_stats): - + my_stats["start_time"] = t0 = time.time() if stats is not None: stats.update_section(stats_key, my_stats) @@ -40,7 +40,7 @@ def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_st runs = CCRun.runs_for_rse(storage_path, rse) now = datetime.now() recent_runs = sorted( - [r for r in runs if r.Timestamp >= now - timedelta(days=window)], + [r for r in runs if r.Timestamp >= now - timedelta(days=window)], key=lambda r: r.Timestamp ) @@ -50,7 +50,7 @@ def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_st confirmed_dark_count = None detected_dark_count = None error = None - + if recent_runs: my_stats["runs_compared"] = [r.Run for r in recent_runs] @@ -90,7 +90,7 @@ def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_st if confirmed_dark_count > 0 and num_scanned > 0: ratio = confirmed_dark_count/num_scanned print("Ratio: %.2f%%" % (ratio*100.0,), file=sys.stderr) - + if confirmed: if out is not None: for f in sorted(confirmed): diff --git a/actions/declare_missing.py b/actions/declare_missing.py index 1c51e5da..4e3c56df 100644 --- a/actions/declare_missing.py +++ b/actions/declare_missing.py @@ -117,7 +117,7 @@ def missing_action(storage_dir, rse, scope, max_age_last, out, stats, stats_key, # chunk the list to avoid "request too large" errors for chunk in chunked(missing_list): result = client.declare_bad_file_replicas(chunk, "detected missing by CE", force=True) - not_declared += result.pop(rse, []) # there shuld be no other RSE in there + not_declared += result.pop(rse, []) # there should be no other RSE in there assert not result, "Other RSEs in the not_declared dictionary: " + ",".join(result.keys()) except Exception as e: status = "failed" @@ -212,4 +212,4 @@ def missing_action(storage_dir, rse, scope, max_age_last, out, stats, stats_key, print() if final_stats["status"] != "done": - sys.exit(1) \ No newline at end of file + sys.exit(1) From 73aee22a330b89009c9376f1669c9d9dc760bb8c Mon Sep 17 00:00:00 2001 From: Guilherme Lima Date: Fri, 11 Oct 2024 15:24:20 -0500 Subject: [PATCH 5/6] Rename test scripts --- site_cmp3/{empty-test.sh => test-empty-action.sh} | 0 site_cmp3/{site-scan-test.sh => test-site-scan.sh} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename site_cmp3/{empty-test.sh => test-empty-action.sh} (100%) rename site_cmp3/{site-scan-test.sh => test-site-scan.sh} (100%) diff --git a/site_cmp3/empty-test.sh b/site_cmp3/test-empty-action.sh similarity index 100% rename from site_cmp3/empty-test.sh rename to site_cmp3/test-empty-action.sh diff --git a/site_cmp3/site-scan-test.sh b/site_cmp3/test-site-scan.sh similarity index 100% rename from site_cmp3/site-scan-test.sh rename to site_cmp3/test-site-scan.sh From 00fc4fe70f57e2c775681201de0448030d8a0469 Mon Sep 17 00:00:00 2001 From: Guilherme Lima Date: Fri, 11 Oct 2024 15:29:43 -0500 Subject: [PATCH 6/6] Consolidate empty_action scripts --- actions/remove_empty_dirs.py | 5 ++--- actions/remove_empty_dirs_GL.py | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/actions/remove_empty_dirs.py b/actions/remove_empty_dirs.py index a5993843..893b1800 100644 --- a/actions/remove_empty_dirs.py +++ b/actions/remove_empty_dirs.py @@ -247,6 +247,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru confirmed = set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[0].empty_directories()) confirmed = update_confirmed(confirmed, set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[-1].empty_directories())) for run in recent_runs[1:-1]: + print(f"run: {run} - #confirmed: {len(confirmed)}") if not confirmed: break run_set = set(lfn_converter.lfn_or_path_to_path(path) for path in run.empty_directories()) @@ -358,7 +359,6 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru print(" limit: ", "no limit" if limit is None else limit) print() print("Scanner:") - print(f"{scanner_config}") print(" server: ", scanner_config["server"]) print(" serverRoot: ", scanner_config["server_root"]) print(" add prefix: ", scanner_config["add_prefix"]) @@ -404,8 +404,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru is_redirector = False # config.ServerIsRedirector client = XRootDClient(server, is_redirector, server_root, timeout=timeout) if os.path.isfile(storage_path): - remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) - run_stats = my_stats + run_stats = remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) else: run_stats = empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) status = run_stats["status"] diff --git a/actions/remove_empty_dirs_GL.py b/actions/remove_empty_dirs_GL.py index 973f2295..c9471b4d 100644 --- a/actions/remove_empty_dirs_GL.py +++ b/actions/remove_empty_dirs_GL.py @@ -8,7 +8,7 @@ from rucio_consistency.xrootd import XRootDClient -Version = "1.0" +Version = "1.1" Usage = """ python remove_empty_dirs.py [options] (|) @@ -247,6 +247,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru confirmed = set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[0].empty_directories()) confirmed = update_confirmed(confirmed, set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[-1].empty_directories())) for run in recent_runs[1:-1]: + print(f"run: {run} - #confirmed: {len(confirmed)}") if not confirmed: break run_set = set(lfn_converter.lfn_or_path_to_path(path) for path in run.empty_directories()) @@ -358,6 +359,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru print(" limit: ", "no limit" if limit is None else limit) print() print("Scanner:") + print(f"{scanner_config}") print(" server: ", scanner_config["server"]) print(" serverRoot: ", scanner_config["server_root"]) print(" add prefix: ", scanner_config["add_prefix"]) @@ -403,8 +405,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru is_redirector = False # config.ServerIsRedirector client = XRootDClient(server, is_redirector, server_root, timeout=timeout) if os.path.isfile(storage_path): - remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) - run_stats = my_stats + run_stats = remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) else: run_stats = empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit) status = run_stats["status"]