Skip to content

Commit

Permalink
Merge pull request #7 from mrguilima/empty_action
Browse files Browse the repository at this point in the history
First fix attempt at the empty_action, to remove empty directories
  • Loading branch information
dynamic-entropy authored Nov 6, 2024
2 parents 113f380 + 00fc4fe commit ef058e6
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 41 deletions.
2 changes: 1 addition & 1 deletion actions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class ActionConfiguration(object):

def __init__(self, rse, source, action, **source_agrs):
def __init__(self, rse, source, action, **source_args):
self.Action = action
self.Config = CEConfiguration(source)[rse].get(action + "_action", {})

Expand Down
8 changes: 4 additions & 4 deletions actions/declare_dark.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ def chunked(lst, chunk_size=1000):
yield lst[i:i+chunk_size]

def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_stats):

my_stats["start_time"] = t0 = time.time()
if stats is not None:
stats.update_section(stats_key, my_stats)

runs = CCRun.runs_for_rse(storage_path, rse)
now = datetime.now()
recent_runs = sorted(
[r for r in runs if r.Timestamp >= now - timedelta(days=window)],
[r for r in runs if r.Timestamp >= now - timedelta(days=window)],
key=lambda r: r.Timestamp
)

Expand All @@ -50,7 +50,7 @@ def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_st
confirmed_dark_count = None
detected_dark_count = None
error = None

if recent_runs:
my_stats["runs_compared"] = [r.Run for r in recent_runs]

Expand Down Expand Up @@ -90,7 +90,7 @@ def dark_action(storage_dir, rse, out, stats, stats_key, account, dry_run, my_st
if confirmed_dark_count > 0 and num_scanned > 0:
ratio = confirmed_dark_count/num_scanned
print("Ratio: %.2f%%" % (ratio*100.0,), file=sys.stderr)

if confirmed:
if out is not None:
for f in sorted(confirmed):
Expand Down
4 changes: 2 additions & 2 deletions actions/declare_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def missing_action(storage_dir, rse, scope, max_age_last, out, stats, stats_key,
# chunk the list to avoid "request too large" errors
for chunk in chunked(missing_list):
result = client.declare_bad_file_replicas(chunk, "detected missing by CE", force=True)
not_declared += result.pop(rse, []) # there shuld be no other RSE in there
not_declared += result.pop(rse, []) # there should be no other RSE in there
assert not result, "Other RSEs in the not_declared dictionary: " + ",".join(result.keys())
except Exception as e:
status = "failed"
Expand Down Expand Up @@ -212,4 +212,4 @@ def missing_action(storage_dir, rse, scope, max_age_last, out, stats, stats_key,
print()

if final_stats["status"] != "done":
sys.exit(1)
sys.exit(1)
66 changes: 35 additions & 31 deletions actions/remove_empty_dirs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
from rucio_consistency import CEConfiguration, Stats
from rucio_consistency.xrootd import XRootDClient

from run import CCRun
from config import ActionConfiguration


Version = "1.0"
Version = "1.1"

Usage = """
python remove_empty_dirs.py [options] (<storage_path>|<file path>) <rse>
Expand All @@ -31,7 +28,7 @@
"""

class LFNConverter(object):

def __init__(self, site_root, remove_prefix, add_prefix):
self.SiteRoot = site_root
self.RemovePrefix = remove_prefix
Expand Down Expand Up @@ -59,16 +56,16 @@ def lfn_to_path(self, lfn):
if self.RemovePrefix:
path = self.RemovePrefix + path
return self.canonic(self.SiteRoot + "/" + path)

def lfn_or_path_to_path(self, lfn_or_path):
if lfn_or_path.startswith(self.SiteRoot):
return lfn_or_path # already a path
return self.lfn_to_path(lfn_or_path)

class RemoveDirectoryTask(Task):

RETRIES = 3

def __init__(self, client, path):
Task.__init__(self)
self.Client = client
Expand All @@ -80,7 +77,7 @@ def run(self):


class Remover(Primitive):

def __init__(self, client, paths, dry_run, limit=None, max_workers=10, verbose=False):
Primitive.__init__(self)
self.Client = client
Expand Down Expand Up @@ -170,7 +167,7 @@ def remove_from_file(file_path, rse, out, lfn_converter, stats, stats_key, dry_r
for path, error in failed:
print("Failed:", path, error)
return my_stats

def update_confirmed(confirmed, update):
new_confirmed = confirmed & update
unconfirmed = confirmed - update
Expand All @@ -181,29 +178,30 @@ def update_confirmed(confirmed, update):
return new_confirmed

def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit):

my_stats["start_time"] = t0 = time.time()
if stats is not None:
stats.update_section(stats_key, my_stats)

runs = list(CCRun.runs_for_rse(storage_path, rse, complete_only=False))
now = datetime.now()

for r in runs:
print(r.Run, ": in window:", r.Timestamp >= now - timedelta(days=window),
" ED info collected:", r.empty_directories_collected(),
print(r.Run, ": in window:", r.Timestamp >= now - timedelta(days=window),
" ED info collected:", r.empty_directories_collected(),
" count:", r.empty_directory_count(),
" list present:", r.empty_dir_list_exists()
)
recent_runs = sorted(
[r for r in runs
[r for r in runs
if True
#and (print(r.Run, r.Timestamp >= now - timedelta(days=window), r.empty_directories_collected(), r.empty_directory_count()) or True)
and (r.Timestamp >= now - timedelta(days=window))
and r.empty_directories_collected()
and r.empty_directory_count() is not None
and r.empty_dir_list_exists()
#and (print(r.Run, r.Timestamp >= now - timedelta(days=window), r.empty_directories_collected(), r.empty_directory_count()) or True)
],
],
key=lambda r: r.Timestamp
)

Expand All @@ -219,7 +217,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru
removed_count = 0
error_counts = {}
error = None

if recent_runs:
my_stats["runs_compared"] = [r.Run for r in recent_runs]

Expand Down Expand Up @@ -249,6 +247,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru
confirmed = set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[0].empty_directories())
confirmed = update_confirmed(confirmed, set(lfn_converter.lfn_or_path_to_path(path) for path in recent_runs[-1].empty_directories()))
for run in recent_runs[1:-1]:
print(f"run: {run} - #confirmed: {len(confirmed)}")
if not confirmed:
break
run_set = set(lfn_converter.lfn_or_path_to_path(path) for path in run.empty_directories())
Expand All @@ -270,8 +269,8 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru
for f in sorted(confirmed):
print(f, file=out)
if out is not sys.stdout:
out.close()
try:
out.close()
try:
remover = Remover(client, confirmed, dry_run, verbose=verbose, limit=limit)
failed = remover.run()
failed_count = len(failed)
Expand All @@ -297,7 +296,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru

if stats is not None:
stats.update_section(stats_key, my_stats)

return my_stats

if not sys.argv[1:] or sys.argv[1] == "help":
Expand All @@ -323,8 +322,8 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru

storage_path, rse = args

config = EmptyActionConfiguration(rse, opts["-c"])
scanner_config = ScannerConfiguration(rse, opts["-c"])
config = ActionConfiguration(rse, opts["-c"], "dark")
scanner_config = CEConfiguration(opts["-c"])[rse].get("scanner", {})

window = int(opts.get("-w", config.get("confirmation_window", 35)))
min_age_first = int(opts.get("-M", config.get("min_age_first_run", 25)))
Expand All @@ -337,6 +336,7 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru
limit = opts.get("-L")
if limit: limit = int(limit)


if dry_run:
print("====== dry run mode ======")

Expand All @@ -358,6 +358,13 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru
print(" min number of runs: ", min_runs)
print(" limit: ", "no limit" if limit is None else limit)
print()
print("Scanner:")
print(" server: ", scanner_config["server"])
print(" serverRoot: ", scanner_config["server_root"])
print(" add prefix: ", scanner_config["add_prefix"])
print(" remove prefix: ", scanner_config["remove_prefix"])
print(" timeout: ", scanner_config["timeout"])
print()

my_stats = {
"version": Version,
Expand Down Expand Up @@ -386,20 +393,18 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru
if stats is not None:
stats.update_section(stats_key, my_stats)

server = config.Server
server_root = config.ServerRoot

add_prefix = scanner_config.AddPrefix
remove_prefix = scanner_config.RemovePrefix
server = scanner_config["server"]
server_root = scanner_config["server_root"]
add_prefix = scanner_config["add_prefix"]
remove_prefix = scanner_config["remove_prefix"]

lfn_converter = LFNConverter(server_root, remove_prefix, add_prefix)

timeout = config.ScannerTimeout
is_redirector = config.ServerIsRedirector
timeout = scanner_config["timeout"]
is_redirector = False # config.ServerIsRedirector
client = XRootDClient(server, is_redirector, server_root, timeout=timeout)
if os.path.isfile(storage_path):
remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit)
run_stats = my_stats
run_stats = remove_from_file(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit)
else:
run_stats = empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_run, client, my_stats, verbose, limit)
status = run_stats["status"]
Expand All @@ -420,4 +425,3 @@ def empty_action(storage_path, rse, out, lfn_converter, stats, stats_key, dry_ru

if status != "done":
sys.exit(1)

Loading

0 comments on commit ef058e6

Please sign in to comment.