Skip to content

Commit

Permalink
feat(clp-package): Add support for deleting archives that are exclusi…
Browse files Browse the repository at this point in the history
…vely within a time range. (#594)

Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
haiqi96 and kirkrodrigues authored Nov 27, 2024
1 parent 7aea626 commit b8e22da
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 72 deletions.
13 changes: 13 additions & 0 deletions components/clp-package-utils/clp_package_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging

# Set up console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d %(levelname)s [%(module)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
logging_console_handler.setFormatter(logging_formatter)

# Set up root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logging_console_handler)
2 changes: 1 addition & 1 deletion components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get_clp_home():
return clp_home.resolve()


def generate_container_name(job_type: JobType) -> str:
def generate_container_name(job_type: str) -> str:
"""
:param job_type:
:return: A unique container name for the given job type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@
validate_and_load_db_credentials_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
Expand Down Expand Up @@ -66,7 +58,7 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

container_name = generate_container_name(JobType.COMPRESSION)
container_name = generate_container_name(str(JobType.COMPRESSION))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@
validate_path_could_be_dir,
)

# Setup logging
# Create logger
logger = logging.getLogger("clp")
logger.setLevel(logging.DEBUG)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)
logger = logging.getLogger(__file__)


def validate_and_load_config(
Expand Down Expand Up @@ -89,7 +81,7 @@ def handle_extract_file_cmd(
if clp_config is None:
return -1

container_name = generate_container_name(JobType.FILE_EXTRACTION)
container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
Expand Down Expand Up @@ -164,7 +156,7 @@ def handle_extract_stream_cmd(
if clp_config is None:
return -1

container_name = generate_container_name(JobType.IR_EXTRACTION)
container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
Expand Down
103 changes: 103 additions & 0 deletions components/clp-package-utils/clp_package_utils/scripts/del_archives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import argparse
import logging
import subprocess
import sys
from pathlib import Path

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
get_clp_home,
load_config_file,
validate_and_load_db_credentials_file,
)

logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(
description="Deletes archives that fall within the specified time range."
)
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--begin-ts",
type=int,
default=0,
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-ts",
type=int,
required=True,
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.",
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
try:
config_file_path = Path(parsed_args.config)
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()

# Validate and load necessary credentials
validate_and_load_db_credentials_file(clp_config, clp_home, False)
except:
logger.exception("Failed to load config.")
return -1

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
if begin_ts > end_ts:
logger.error("begin-ts must be <= end-ts")
return -1
if end_ts < 0 or begin_ts < 0:
logger.error("begin_ts and end_ts must be non-negative.")
return -1

container_name = generate_container_name("del-archives")

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)

necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
del_archive_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.del_archives",
"--config", str(generated_config_path_on_container),
str(begin_ts),
str(end_ts)

]
# fmt: on

cmd = container_start_cmd + del_archive_cmd
subprocess.run(cmd, check=True)

# Remove generated files
generated_config_path_on_host.unlink()

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,7 @@
load_config_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def print_compression_job_status(job_row, current_time):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,7 @@
wait_for_query_job,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import argparse
import logging
import shutil
import sys
from contextlib import closing
from pathlib import Path
from typing import List

from clp_py_utils.clp_config import Database
from clp_py_utils.sql_adapter import SQL_Adapter

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
get_clp_home,
load_config_file,
)

logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(
description="Deletes archives that fall within the specified time range."
)
args_parser.add_argument(
"--config",
"-c",
required=True,
default=str(default_config_file_path),
help="CLP configuration file.",
)
args_parser.add_argument(
"begin_ts",
type=int,
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"end_ts",
type=int,
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.",
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
config_file_path = Path(parsed_args.config)
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()
except:
logger.exception("Failed to load config.")
return -1

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
if not archives_dir.exists():
logger.error("`archive_output.directory` doesn't exist.")
return -1

return _delete_archives(
archives_dir,
database_config,
parsed_args.begin_ts,
parsed_args.end_ts,
)


def _delete_archives(
archives_dir: Path,
database_config: Database,
begin_ts: int,
end_ts: int,
) -> int:
"""
Deletes all archives where `begin_ts <= archive.begin_timestamp` and
`archive.end_timestamp <= end_ts` from both the metadata database and disk.
:param archives_dir:
:param database_config:
:param begin_ts:
:param end_ts:
:return: 0 on success, -1 otherwise.
"""

archive_ids: List[str]
logger.info("Starting to delete archives from the database.")
try:
sql_adapter = SQL_Adapter(database_config)
clp_db_connection_params = database_config.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
db_cursor.execute(
f"""
DELETE FROM `{table_prefix}archives`
WHERE begin_timestamp >= %s AND end_timestamp <= %s
RETURNING id
""",
(begin_ts, end_ts),
)
results = db_cursor.fetchall()

if 0 == len(results):
logger.info("No archives (exclusively) within the specified time range.")
return 0

archive_ids = [result["id"] for result in results]
db_cursor.execute(
f"""
DELETE FROM `{table_prefix}files`
WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))})
""",
archive_ids,
)
db_conn.commit()
except Exception:
logger.exception("Failed to delete archives from the database. Aborting deletion.")
return -1

logger.info(f"Finished deleting archives from the database.")

for archive_id in archive_ids:
archive_path = archives_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
continue

logger.info(f"Deleting archive {archive_id} from disk.")
shutil.rmtree(archive_path)

logger.info(f"Finished deleting archives from disk.")

return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,7 @@
wait_for_query_job,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def create_and_monitor_job_in_db(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@
validate_and_load_db_credentials_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
Expand Down Expand Up @@ -82,7 +74,7 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

container_name = generate_container_name(JobType.SEARCH)
container_name = generate_container_name(str(JobType.SEARCH))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Loading

0 comments on commit b8e22da

Please sign in to comment.