Skip to content

Commit

Permalink
Add comments to utils
Browse files Browse the repository at this point in the history
Signed-off-by: Sid Murching <[email protected]>
  • Loading branch information
smurching committed Sep 26, 2024
1 parent 2f1e495 commit 998d8a3
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ env
.idea
__pycache__

# Exclude `databricks sync` CLI command snapshots
.databricks
2 changes: 1 addition & 1 deletion agent_app_sample_code/00_global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
# UC Catalog & Schema where outputs tables/indexes are saved
# By default, will use the current user name to create a unique UC catalog/schema & vector search endpoint
# If this catalog/schema does not exist, you need create catalog/schema permissions.
UC_CATALOG = f"{user_name}_catalog"
UC_CATALOG = f"smurching"
UC_SCHEMA = f"cookbook"

## UC Model name where the Agent's model is logged
Expand Down
6 changes: 1 addition & 5 deletions agent_app_sample_code/02_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@

# COMMAND ----------

# MAGIC %run ./utils/install_aptget_package

# COMMAND ----------

# MAGIC %md
# MAGIC ## Import the global configuration

Expand All @@ -56,7 +52,7 @@
# COMMAND ----------

# MAGIC %md
# MAGIC ## Set the MLflow experiement name
# MAGIC ## Set the MLflow experiment name
# MAGIC
# MAGIC Used to track information about this Data Pipeline that are used in the later notebooks.

Expand Down
2 changes: 1 addition & 1 deletion agent_app_sample_code/utils/build_retriever_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def find_index(endpoint_name, index_name):

if create_index:
print(
f"Computing document embeddings and Vector Search Index {get_table_url}. This can take 15 minutes or much longer if you have a larger number of documents."
f"Computing document embeddings and Vector Search Index. This can take 15 minutes or much longer if you have a larger number of documents."
)

vsc.create_delta_sync_index_and_wait(
Expand Down
28 changes: 11 additions & 17 deletions agent_app_sample_code/utils/file_loading.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### `load_files_to_df`
# MAGIC
# MAGIC `load_files_to_df` loads files from a specified source path into a Spark DataFrame after parsing and extracting metadata.
# MAGIC
# MAGIC Arguments:
# MAGIC - source_path: The path to the folder of files. This should be a valid directory path where the files are stored.
# MAGIC - dest_table_name: The name of the destination Delta Table.
# MAGIC - parse_file_udf: A user-defined function that takes the bytes of the file, parses it, and returns the parsed content and metadata.
# MAGIC For example: `def parse_file(raw_doc_contents_bytes, doc_path): return {'doc_content': content, 'metadata': metadata}`
# MAGIC - spark_dataframe_schema: The schema of the resulting Spark DataFrame after parsing and metadata extraction.

# COMMAND ----------

import json
import traceback
from datetime import datetime
from typing import Any, Callable, TypedDict, Dict
Expand Down Expand Up @@ -86,6 +70,11 @@ def _get_parser_udf(
def load_files_to_df(
spark: SparkSession,
source_path: str) -> DataFrame:
"""
Load files from a directory into a Spark DataFrame.
Each row in the DataFrame will contain the path, length, and content of the file; for more
details, see https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
"""

if not os.path.exists(source_path):
raise ValueError(
Expand All @@ -94,7 +83,7 @@ def load_files_to_df(

# Load the raw riles
raw_files_df = (
spark.read.format("binaryFile").option("recursiveFileLookup", "true")
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html.option("recursiveFileLookup", "true")
.load(source_path)
)

Expand All @@ -108,6 +97,11 @@ def load_files_to_df(


def apply_parsing_udf(raw_files_df: DataFrame, parse_file_udf: Callable[[[dict, Any]], str], parsed_df_schema: StructType) -> DataFrame:
"""
Apply a file-parsing UDF to a DataFrame whose rows correspond to file content/metadata loaded via
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
Returns a DataFrame with the parsed content and metadata.
"""
print("Running parsing & metadata extraction UDF in spark...")

parser_udf = _get_parser_udf(parse_file_udf, parsed_df_schema)
Expand Down

0 comments on commit 998d8a3

Please sign in to comment.