Skip to content

Commit

Permalink
Added filter
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Aug 1, 2024
1 parent 14e0ce0 commit 3d8f301
Show file tree
Hide file tree
Showing 14 changed files with 43 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ def process_partition(iterator):
except Exception as e:
logger.error(f"Exception during execution {e}: {traceback.print_exc()}")
return 1
finally:
# stop spark context at the end. Required for running multiple tests
sc.stop()
2 changes: 1 addition & 1 deletion transforms/.make.transforms
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ extra-help:

.transforms.test-spark_image-help::
@# Help: Test an already-built image (use make image) to be sure the --help option works
$(DOCKER) run -t --rm $(DOCKER_LOCAL_IMAGE) python3 src/$(TRANSFORM_RUNTIME_SRC_FILE) --help
$(DOCKER) run -t --rm $(DOCKER_LOCAL_IMAGE) python3 $(TRANSFORM_RUNTIME_SRC_FILE) --help

.PHONY: test-locals
test-locals:: .transforms.test-locals
Expand Down
4 changes: 2 additions & 2 deletions transforms/universal/filter/python/src/filter_local_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.utils import ParamsUtils
from filter_transform import (
FilterTransformConfiguration,
filter_columns_to_drop_cli_param,
filter_criteria_cli_param,
filter_logical_operator_cli_param,
)
from filter_transform_python import FilterPythonTransformConfiguration


# create parameters
Expand Down Expand Up @@ -55,6 +55,6 @@
# Set the simulated command line args
sys.argv = ParamsUtils.dict_to_req(d=params | filter_params)
# create launcher
launcher = PythonTransformLauncher(FilterTransformConfiguration())
launcher = PythonTransformLauncher(FilterPythonTransformConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()
9 changes: 4 additions & 5 deletions transforms/universal/filter/spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG BASE_IMAGE=quay.io/dataprep1/data-prep-kit/data-prep-kit-spark-3.5.1:0.1.0
ARG BASE_IMAGE=quay.io/dataprep1/data-prep-kit/data-prep-kit-spark-3.5.1:0.2.0
FROM ${BASE_IMAGE}

USER root
Expand All @@ -13,6 +13,8 @@ COPY --chown=spark:root data-processing-lib-python/ data-processing-lib-python/
RUN cd data-processing-lib-python && pip install --no-cache-dir -e .
COPY --chown=spark:root data-processing-lib-spark/ data-processing-lib-spark/
RUN cd data-processing-lib-spark && pip install --no-cache-dir -e .
COPY --chown=spark:root python-transform/ python-transform/
RUN cd python-transform && pip install --no-cache-dir -e .

# Install project source
COPY --chown=spark:root src/ src/
Expand All @@ -23,15 +25,12 @@ RUN pip install --no-cache-dir -e .
COPY ./src/filter_transform_spark.py .

# copy some of the samples in
COPY ./src/filter_local.py local/
COPY src/filter_local_spark.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/

# copy config
COPY config/ config/

USER spark

# Set environment
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion transforms/universal/filter/spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ This filter operation applied on the table above will return the following resul


## Running
You can run the Spark filter transform [filter_local.py](src/filter_local.py) to filter the `test1.parquet` file in [test input data](test-data/input) to an `output` directory. The directory will contain one or several filtered parquet files and the `metadata.json` file.
You can run the Spark filter transform [filter_local.py](src/filter_local_spark.py) to filter the `test1.parquet` file in [test input data](test-data/input) to an `output` directory. The directory will contain one or several filtered parquet files and the `metadata.json` file.

#### Running as Spark-based application
```
Expand Down
14 changes: 0 additions & 14 deletions transforms/universal/filter/spark/config/spark_profile_kube.yml

This file was deleted.

This file was deleted.

5 changes: 3 additions & 2 deletions transforms/universal/filter/spark/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
name = "dpk_filter_transform_spark"
version = "0.2.1.dev0"
requires-python = ">=3.10"
description = "Doc ID Spark Transform"
description = "Filter Spark Transform"
license = {text = "Apache-2.0"}
readme = {file = "README.md", content-type = "text/markdown"}
authors = [
{ name = "Constantin Adam", email = "cmadam@us.ibm.com" },
{ name = "Boris Lublinsky", email = "blublinsk@ibm.com" },
]
dependencies = [
"dpk_filter_transform_python==0.2.1.dev0",
"data-prep-toolkit-spark==0.2.1.dev0",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import os
import sys

from data_processing_spark.runtime.spark import SparkTransformLauncher
from data_processing.utils import ParamsUtils
from data_processing_spark.runtime.spark.spark_launcher import SparkTransformLauncher
from filter_transform_spark import (
FilterSparkRuntimeConfiguration,
from filter_transform import (
filter_columns_to_drop_cli_param,
filter_criteria_cli_param,
filter_logical_operator_cli_param,
)
from filter_transform_spark import FilterSparkTransformConfiguration


# create parameters
Expand All @@ -30,12 +30,11 @@
"input_folder": input_folder,
"output_folder": output_folder,
}

code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
filter_criteria = [
"docq_total_words > 100 AND docq_total_words < 200",
"ibmkenlm_docq_perplex_score < 230",
]

filter_logical_operator = "AND"
filter_columns_to_drop = ["extra", "cluster"]

Expand All @@ -44,22 +43,18 @@
filter_columns_to_drop_cli_param: filter_columns_to_drop,
filter_logical_operator_cli_param: filter_logical_operator,
}

code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config/spark_profile_local.yml"))
params = {
# Data access. Only required parameters are specified
"data_local_config": ParamsUtils.convert_to_ast(local_conf),
# execution info
"runtime_pipeline_id": "pipeline_id",
"runtime_job_id": "job_id",
"runtime_code_location": ParamsUtils.convert_to_ast(code_location),
"spark_local_config_filepath": config_path,
}
if __name__ == "__main__":
# Set the simulated command line args
sys.argv = ParamsUtils.dict_to_req(d=params | filter_params)
# create launcher
launcher = SparkTransformLauncher(runtime_config=FilterSparkRuntimeConfiguration())
launcher = SparkTransformLauncher(runtime_config=FilterSparkTransformConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()
201 changes: 12 additions & 189 deletions transforms/universal/filter/spark/src/filter_transform_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,197 +10,18 @@
# limitations under the License.
################################################################################

import ast
import json
from argparse import ArgumentParser, Namespace
from typing import Any

from data_processing.transform import TransformConfiguration
from data_processing.utils import CLIArgumentProvider, get_logger
from data_processing_spark.runtime.spark.runtime_config import (
SparkTransformRuntimeConfiguration,
)
from data_processing_spark.runtime.spark.spark_launcher import SparkTransformLauncher
from data_processing_spark.runtime.spark.spark_transform import AbstractSparkTransform
from pyspark.sql import DataFrame

from filter_transform import FilterTransformConfiguration
from data_processing.utils import get_logger
from data_processing_spark.runtime.spark import SparkTransformLauncher
from data_processing_spark.transform import SparkTransformRuntimeConfiguration

logger = get_logger(__name__)


short_name = "filter"
cli_prefix = short_name + "_"

filter_criteria_key = "criteria_list"
""" AST Key holds the list of filter criteria (in SQL WHERE clause format)"""
filter_logical_operator_key = "logical_operator"
""" Key holds the logical operator that joins filter criteria (AND or OR)"""
filter_columns_to_drop_key = "columns_to_drop"
""" AST Key holds the list of columns to drop after filtering"""

filter_criteria_cli_param = f"{cli_prefix}{filter_criteria_key}"
""" AST Key holds the list of filter criteria (in SQL WHERE clause format)"""
filter_logical_operator_cli_param = f"{cli_prefix}{filter_logical_operator_key}"
""" Key holds the logical operator that joins filter criteria (AND or OR)"""
filter_columns_to_drop_cli_param = f"{cli_prefix}{filter_columns_to_drop_key}"
""" AST Key holds the list of columns to drop after filtering"""

captured_arg_keys = [filter_criteria_key, filter_columns_to_drop_key, filter_logical_operator_key]
""" The set of keys captured from the command line """

# defaults
filter_criteria_default = ast.literal_eval("[]")
""" The default list of filter criteria (in SQL WHERE clause format)"""
filter_logical_operator_default = "AND"
filter_columns_to_drop_default = ast.literal_eval("[]")
""" The default list of columns to drop"""


class FilterTransform(AbstractSparkTransform):
"""
Implements Spark filtering - select from a dataset a set of rows that
satisfy a set of filtering criteria
"""

def __init__(self, config: dict[str, Any]):
"""
Initialize based on the dictionary of configuration information.
This is generally called with configuration parsed from the CLI arguments defined
by the companion runtime, FilterTransformRuntime.
"""
# Make sure that the param name corresponds to the name used in apply_input_params method
# of FilterTransformConfiguration class
super().__init__(config)
self.filter_criteria = config.get(filter_criteria_key, filter_criteria_default)
self.logical_operator = config.get(filter_logical_operator_key, filter_logical_operator_default)
self.columns_to_drop = config.get(filter_columns_to_drop_key, filter_columns_to_drop_default)

def transform(self, data: DataFrame) -> tuple[list[DataFrame], dict[str, Any]]:
"""
This implementation filters the input Spark dataframe using a SQL
statement and returns the filtered table and execution stats
:param data: input Spark DataFrame
:return: list of output Spark DataFrames and custom statistics
"""
# initialize the metadata dictionary
total_docs = data.count()
total_columns = len(data.columns)
metadata = {
"total_docs_count": total_docs,
"total_columns_count": total_columns,
}

data.createOrReplaceTempView("spark_table")

if len(self.filter_criteria) > 0:
# populate metadata with filtering stats for each filter criterion
for filter_criterion in self.filter_criteria:
query = f"SELECT * FROM spark_table WHERE {filter_criterion}"
filter_df = data.sparkSession.sql(query)
# filter_df = data.where(filter_criterion)
docs_filtered = total_docs - filter_df.count()
metadata[f"docs_filtered_out_by '{filter_criterion}'"] = docs_filtered

# use filtering criteria to build the SQL query for filtering
filter_clauses = [f"({x})" for x in self.filter_criteria]
where_clause = f" {self.logical_operator} ".join(filter_clauses)

# filter using 'where' function
try:
query = f"SELECT * FROM spark_table WHERE {where_clause}"
filtered_df = data.sparkSession.sql(query)
# filtered_df = data.where(where_clause)
except Exception as ex:
logger.error(f"FilterTransform::transform failed: {ex}")
raise ex
else:
filtered_df = data

data.sparkSession.catalog.dropTempView("spark_table")

# drop any columns requested from the final result
if len(self.columns_to_drop) > 0:
columns_to_drop = tuple(self.columns_to_drop)
filtered_df_cols_dropped = filtered_df.drop(*columns_to_drop)
else:
filtered_df_cols_dropped = filtered_df

# add global filter stats to metadata
metadata["docs_after_filter"] = filtered_df_cols_dropped.count()
metadata["columns_after_filter"] = len(filtered_df_cols_dropped.columns)

return [filtered_df_cols_dropped], metadata


class FilterTransformConfiguration(TransformConfiguration):

"""
Provides support for configuring and using the associated Transform class include
configuration with CLI args.
"""

def __init__(self):
super().__init__(
name=short_name,
transform_class=FilterTransform,
)

def add_input_params(self, parser: ArgumentParser) -> None:
"""
Add Transform-specific arguments to the given parser.
This will be included in a dictionary used to initialize the FilterTransform.
By convention a common prefix should be used for all mutator-specific CLI args
(e.g, noop_, pii_, etc.)
"""

sample_sql = [
"docq_total_words > 100 AND docq_total_words < 200",
"docq_perplex_score < 230",
"date_acquired BETWEEN '2023-07-04' AND '2023-07-08'",
"title LIKE 'https://%%'",
"document_id IN ('doc-id-1', 'doc-id-2', 'doc-id-3')",
]
columns_to_drop_example = ["column1", "column2"]

parser.add_argument(
f"--{filter_criteria_cli_param}",
type=ast.literal_eval,
required=True,
default=ast.literal_eval("[]"),
help=f"list of filter criteria (in SQL WHERE clause format), for example: {json.dumps(sample_sql, indent=2, default=str)}",
)
parser.add_argument(
f"--{filter_columns_to_drop_cli_param}",
type=ast.literal_eval,
required=False,
default=ast.literal_eval("[]"),
help=f"list of columns to drop after filtering, for example: {json.dumps(columns_to_drop_example)}",
)
parser.add_argument(
f"--{filter_logical_operator_cli_param}",
type=str,
required=False,
default="AND",
choices=["AND", "OR"],
help="logical operator (AND or OR) that joins filter criteria",
)

def apply_input_params(self, args: Namespace) -> bool:
"""
Validate and apply the arguments that have been parsed
:param args: user defined arguments.
:return: True, if validate pass or False otherwise
"""
# Capture the args that are specific to this transform
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
self.params = self.params | captured
return True


class FilterSparkRuntimeConfiguration(SparkTransformRuntimeConfiguration):
class FilterSparkTransformConfiguration(SparkTransformRuntimeConfiguration):
"""
Implements the SparkTransformConfiguration for Filter as required by the
SparkTransformLauncher.
Implements the SparkTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
Expand All @@ -211,6 +32,8 @@ def __init__(self):


if __name__ == "__main__":
launcher = SparkTransformLauncher(FilterSparkRuntimeConfiguration())
logger.info("Launching filter transform")
# create launcher
launcher = SparkTransformLauncher(runtime_config=FilterSparkTransformConfiguration())
logger.info("Launching noop transform")
# Launch the ray actor(s) to process the input
launcher.launch()
Binary file modified transforms/universal/filter/spark/test-data/input/test1.parquet
Binary file not shown.
Loading

0 comments on commit 3d8f301

Please sign in to comment.