Skip to content

Commit

Permalink
added function add_in_scope_entries
Browse files Browse the repository at this point in the history
  • Loading branch information
majoma7 committed Jan 23, 2025
1 parent e6d2622 commit b0c820f
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 2 deletions.
4 changes: 3 additions & 1 deletion inventory_foundation_sdk/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
'inventory_foundation_sdk/custom_datasets.py'),
'inventory_foundation_sdk.custom_datasets.DynamicPathJSONDataset._save': ( 'custom_datasets.html#dynamicpathjsondataset._save',
'inventory_foundation_sdk/custom_datasets.py')},
'inventory_foundation_sdk.db_mgmt': { 'inventory_foundation_sdk.db_mgmt.get_db_credentials': ( 'db_mgmt.html#get_db_credentials',
'inventory_foundation_sdk.db_mgmt': { 'inventory_foundation_sdk.db_mgmt.check_in_scope_entries': ( 'db_mgmt.html#check_in_scope_entries',
'inventory_foundation_sdk/db_mgmt.py'),
'inventory_foundation_sdk.db_mgmt.get_db_credentials': ( 'db_mgmt.html#get_db_credentials',
'inventory_foundation_sdk/db_mgmt.py'),
'inventory_foundation_sdk.db_mgmt.insert_multi_rows': ( 'db_mgmt.html#insert_multi_rows',
'inventory_foundation_sdk/db_mgmt.py')},
Expand Down
87 changes: 86 additions & 1 deletion inventory_foundation_sdk/db_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/10_db_mgmt.ipynb.

# %% auto 0
__all__ = ['logger', 'get_db_credentials', 'insert_multi_rows']
__all__ = ['logger', 'get_db_credentials', 'insert_multi_rows', 'check_in_scope_entries']

# %% ../nbs/10_db_mgmt.ipynb 3
from kedro.config import OmegaConfigLoader
Expand Down Expand Up @@ -145,3 +145,88 @@ def insert_multi_rows(
conn.commit() # Commit all changes after processing

return None

# %% ../nbs/10_db_mgmt.ipynb 7
def check_in_scope_entries(target_table, dataset_column, id_column, insert_arguments, credentials, dataset_id, filter=None):

"""
Ensures all entries in the dataset scope have corresponding entries in the target table.
If an entry is missing, specified insert_arguments are set to zero.
Args:
target_table (str): The name of the target table to check and update.
dataset_column (str): The name of the column identifying the dataset (e.g., "datasetID").
id_column (str): The name of the column identifying the entries (e.g., "skuID").
insert_arguments (list): List of columns to be inserted with default zero values if missing.
filter (list, optional): List of dictionaries specifying filter conditions.
Each dictionary contains:
- "column" (str): Column name for the condition.
- "value" (any): Value for the condition.
"""
if filter is None:
logger.info(f"Checking in-scope entries for {target_table}...")
else:
logger.info(f"Checking in-scope entries for {target_table} with filter: {filter}")

try:
with psycopg2.connect(credentials) as conn:
with conn.cursor() as cur:
# Get all IDs in the dataset scope (no filter clause here)
cur.execute(f"""
SELECT "{id_column}"
FROM dataset_matching
WHERE "{dataset_column}" = %s;
""", (dataset_id,))
all_ids = {row[0] for row in cur.fetchall()}

# Build filter SQL for WHERE clause if provided
filter_clause = ""
filter_values = []
if filter:
filter_conditions = [f'"{f["column"]}" = %s' for f in filter]
filter_clause = " AND " + " AND ".join(filter_conditions)
filter_values = [f["value"] for f in filter]

# Get IDs already present in the target table (apply filter here)
cur.execute(f"""
SELECT DISTINCT "{id_column}"
FROM {target_table}
WHERE "{dataset_column}" = %s {filter_clause};
""", (dataset_id, *filter_values))
existing_ids = {row[0] for row in cur.fetchall()}

# Identify IDs with no entries in the target table
missing_ids = all_ids - existing_ids

if missing_ids:
if filter is None:
logger.info(f"Adding missing IDs with zero values for {target_table}: {missing_ids}")
else:
logger.info(f"Adding missing IDs with zero values for {target_table} with filter {filter}: {missing_ids}")

# Build column list and value placeholders
columns = [id_column, dataset_column] + insert_arguments
if filter:
columns += [f["column"] for f in filter]
column_list = ", ".join(f'"{col}"' for col in columns)

value_placeholders = ["%s", "%s"] + ["0"] * len(insert_arguments)
if filter:
value_placeholders += ["%s"] * len(filter)
value_placeholder_list = ", ".join(value_placeholders)

# Insert zero values for missing IDs
for entry_id in missing_ids:
cur.execute(f"""
INSERT INTO {target_table} ({column_list})
VALUES ({value_placeholder_list})
ON CONFLICT ({", ".join(f'"{col}"' for col in [id_column, dataset_column] + ([f["column"] for f in filter] if filter else []))})
DO NOTHING;
""", (entry_id, dataset_id, *filter_values))

conn.commit()
logger.info(f"Missing IDs handled successfully for {target_table}.")

except Exception as e:
logger.error(f"Error checking in-scope entries for {target_table}: {e}")
raise e
93 changes: 93 additions & 0 deletions nbs/10_db_mgmt.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,99 @@
" return None"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"\n",
"def check_in_scope_entries(target_table, dataset_column, id_column, insert_arguments, credentials, dataset_id, filter=None):\n",
" \n",
" \"\"\"\n",
" Ensures all entries in the dataset scope have corresponding entries in the target table.\n",
" If an entry is missing, specified insert_arguments are set to zero.\n",
"\n",
" Args:\n",
" target_table (str): The name of the target table to check and update.\n",
" dataset_column (str): The name of the column identifying the dataset (e.g., \"datasetID\").\n",
" id_column (str): The name of the column identifying the entries (e.g., \"skuID\").\n",
" insert_arguments (list): List of columns to be inserted with default zero values if missing.\n",
" filter (list, optional): List of dictionaries specifying filter conditions. \n",
" Each dictionary contains:\n",
" - \"column\" (str): Column name for the condition.\n",
" - \"value\" (any): Value for the condition.\n",
" \"\"\"\n",
" if filter is None:\n",
" logger.info(f\"Checking in-scope entries for {target_table}...\")\n",
" else:\n",
" logger.info(f\"Checking in-scope entries for {target_table} with filter: {filter}\")\n",
"\n",
" try:\n",
" with psycopg2.connect(credentials) as conn:\n",
" with conn.cursor() as cur:\n",
" # Get all IDs in the dataset scope (no filter clause here)\n",
" cur.execute(f\"\"\"\n",
" SELECT \"{id_column}\"\n",
" FROM dataset_matching\n",
" WHERE \"{dataset_column}\" = %s;\n",
" \"\"\", (dataset_id,))\n",
" all_ids = {row[0] for row in cur.fetchall()}\n",
"\n",
" # Build filter SQL for WHERE clause if provided\n",
" filter_clause = \"\"\n",
" filter_values = []\n",
" if filter:\n",
" filter_conditions = [f'\"{f[\"column\"]}\" = %s' for f in filter]\n",
" filter_clause = \" AND \" + \" AND \".join(filter_conditions)\n",
" filter_values = [f[\"value\"] for f in filter]\n",
"\n",
" # Get IDs already present in the target table (apply filter here)\n",
" cur.execute(f\"\"\"\n",
" SELECT DISTINCT \"{id_column}\"\n",
" FROM {target_table}\n",
" WHERE \"{dataset_column}\" = %s {filter_clause};\n",
" \"\"\", (dataset_id, *filter_values))\n",
" existing_ids = {row[0] for row in cur.fetchall()}\n",
"\n",
" # Identify IDs with no entries in the target table\n",
" missing_ids = all_ids - existing_ids\n",
"\n",
" if missing_ids:\n",
" if filter is None:\n",
" logger.info(f\"Adding missing IDs with zero values for {target_table}: {missing_ids}\")\n",
" else:\n",
" logger.info(f\"Adding missing IDs with zero values for {target_table} with filter {filter}: {missing_ids}\")\n",
"\n",
" # Build column list and value placeholders\n",
" columns = [id_column, dataset_column] + insert_arguments\n",
" if filter:\n",
" columns += [f[\"column\"] for f in filter]\n",
" column_list = \", \".join(f'\"{col}\"' for col in columns)\n",
"\n",
" value_placeholders = [\"%s\", \"%s\"] + [\"0\"] * len(insert_arguments)\n",
" if filter:\n",
" value_placeholders += [\"%s\"] * len(filter)\n",
" value_placeholder_list = \", \".join(value_placeholders)\n",
"\n",
" # Insert zero values for missing IDs\n",
" for entry_id in missing_ids:\n",
" cur.execute(f\"\"\"\n",
" INSERT INTO {target_table} ({column_list})\n",
" VALUES ({value_placeholder_list})\n",
" ON CONFLICT ({\", \".join(f'\"{col}\"' for col in [id_column, dataset_column] + ([f[\"column\"] for f in filter] if filter else []))})\n",
" DO NOTHING;\n",
" \"\"\", (entry_id, dataset_id, *filter_values))\n",
" \n",
" conn.commit()\n",
" logger.info(f\"Missing IDs handled successfully for {target_table}.\")\n",
" \n",
" except Exception as e:\n",
" logger.error(f\"Error checking in-scope entries for {target_table}: {e}\")\n",
" raise e"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down

0 comments on commit b0c820f

Please sign in to comment.