Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate spark runtime implementation #406

Merged
merged 32 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e284e88
initial implementation
blublinsky Jul 12, 2024
bfd64c4
Added dockerfile
blublinsky Jul 15, 2024
f312146
refactoring of code
blublinsky Jul 31, 2024
7d6957f
removed unnecessary abstract transform class
blublinsky Jul 31, 2024
7fc18a6
removed unnecessary abstract transform class
blublinsky Jul 31, 2024
5f52e6a
removed unnecessary abstract transform class
blublinsky Jul 31, 2024
3ed2db9
fixed spark image help
blublinsky Jul 31, 2024
3fe378f
documentation update
blublinsky Aug 1, 2024
b5d75b0
Added filter
blublinsky Aug 1, 2024
eeadae9
preparing for docid
blublinsky Aug 2, 2024
3d7c84c
implementing docid
blublinsky Aug 3, 2024
44859cc
implementing docid
blublinsky Aug 3, 2024
bd2f588
Add comments to implementation
blublinsky Aug 4, 2024
a14aac0
Add comments to implementation
blublinsky Aug 4, 2024
3711b3e
Add support for explicit parallelization
blublinsky Aug 5, 2024
6fa77f2
Add support for explicit parallelization
blublinsky Aug 5, 2024
0cb0241
Add support for explicit parallelization
blublinsky Aug 5, 2024
2e22d23
Add support for explicit parallelization
blublinsky Aug 5, 2024
fbaa472
Addressed comments
blublinsky Aug 5, 2024
19fec86
Addressed comments
blublinsky Aug 5, 2024
6714ac7
run pre commit
blublinsky Aug 5, 2024
995ab0c
small fixes
blublinsky Sep 9, 2024
1c01f90
addressed comments
blublinsky Sep 9, 2024
26ad9fa
addressed comments - launcher refactoring
blublinsky Sep 10, 2024
e698f60
added support for runtime
blublinsky Sep 10, 2024
7117079
small cleanup
blublinsky Sep 11, 2024
be7d944
re factored doc id
blublinsky Sep 11, 2024
2a8222a
Use multi-stage build
cmadam Sep 11, 2024
433fcc3
changed Spark version
blublinsky Sep 12, 2024
c319fcc
changed Spark version
blublinsky Sep 12, 2024
4bfc07f
changed Spark version
blublinsky Sep 12, 2024
a876e7e
changed Spark version
blublinsky Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .make.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ DOCKER_REGISTRY_KEY?=$(DPK_DOCKER_REGISTRY_KEY)
DOCKER_REGISTRY_ENDPOINT?=$(DOCKER_HOSTNAME)/$(DOCKER_NAMESPACE)
DOCKER_LOCAL_IMAGE=$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
DOCKER_REMOTE_IMAGE=$(DOCKER_REGISTRY_ENDPOINT)/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
DOCKER_SPARK_BASE_IMAGE_NAME=data-prep-kit-spark-3.5.1
DOCKER_SPARK_BASE_IMAGE_NAME=data-prep-kit-spark-3.5.2
DOCKER_SPARK_BASE_IMAGE=$(DOCKER_SPARK_BASE_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
RAY_BASE_IMAGE?=docker.io/rayproject/ray:${RAY}-py310
# Deprecated in favor of DOCKER_REMOTE_IMAGE
Expand Down
11 changes: 6 additions & 5 deletions data-processing-lib/doc/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ This might include operations such as de-duplication, merging, and splitting.
The framework uses a plug-in model for the primary functions. The core
transformation-specific classes/interfaces are as follows:

* [AbstractTransform](../python/src/data_processing/transform/abstract_transform.py) -
* [AbstractBinaryTransform](../python/src/data_processing/transform/binary_transform.py) -
a simple, easily-implemented interface allowing the definition transforms
over arbitrary data types. Support is provided for both
[files](../python/src/data_processing/transform/binary_transform.py) of arbitrary data as a byte array and
[parquet/arrow](../python/src/data_processing/transform/table_transform.py) tables.
* [TransformConfiguration](../python/src/data_processing/transform/transform_configuration.py) - defines
of arbitrary data as a byte array. Additionally
[table](../python/src/data_processing/transform/table_transform.py) transform interface
is provided allowing definition of transforms operating on
[pyarrow](https://arrow.apache.org/docs/python/index.html) tables.
* [TransformConfiguration](../python/src/data_processing/runtime//transform_configuration.py) - defines
the transform short name, its implementation class, and command line configuration
parameters.

Expand Down
17 changes: 7 additions & 10 deletions data-processing-lib/doc/spark-launcher-options.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Spark Launcher Command Line Options


A number of command line options are available when launching a transform using Spark.

The following is a current --help output (a work in progress) for
the `NOOPTransform` (note the --noop_sleep_sec and --noop_pwd options):

```
usage: noop_transform.py [-h] [--noop_sleep_sec NOOP_SLEEP_SEC] [--noop_pwd NOOP_PWD] [--data_s3_cred DATA_S3_CRED] [--data_s3_config DATA_S3_CONFIG]
[--data_local_config DATA_LOCAL_CONFIG] [--data_max_files DATA_MAX_FILES] [--data_checkpointing DATA_CHECKPOINTING]
[--data_data_sets DATA_DATA_SETS] [--data_files_to_use DATA_FILES_TO_USE] [--data_num_samples DATA_NUM_SAMPLES]
[--runtime_pipeline_id RUNTIME_PIPELINE_ID] [--runtime_job_id RUNTIME_JOB_ID] [--runtime_code_location RUNTIME_CODE_LOCATION]
[--spark_local_config_filepath SPARK_LOCAL_CONFIG_FILEPATH] [--spark_kube_config_filepath SPARK_KUBE_CONFIG_FILEPATH]
usage: noop_python_runtime.py [-h] [--noop_sleep_sec NOOP_SLEEP_SEC] [--noop_pwd NOOP_PWD] [--data_s3_cred DATA_S3_CRED] [--data_s3_config DATA_S3_CONFIG] [--data_local_config DATA_LOCAL_CONFIG] [--data_max_files DATA_MAX_FILES]
[--data_checkpointing DATA_CHECKPOINTING] [--data_data_sets DATA_DATA_SETS] [--data_files_to_use DATA_FILES_TO_USE] [--data_num_samples DATA_NUM_SAMPLES] [--runtime_pipeline_id RUNTIME_PIPELINE_ID]
[--runtime_job_id RUNTIME_JOB_ID] [--runtime_code_location RUNTIME_CODE_LOCATION]

Driver for noop processing on Spark
Driver for noop processing

options:
-h, --help show this help message and exit
Expand Down Expand Up @@ -59,8 +59,5 @@ options:
path: Path within the repository
Example: { 'github': 'https://github.com/somerepo', 'commit_hash': '1324',
'path': 'transforms/universal/code' }
--spark_local_config_filepath SPARK_LOCAL_CONFIG_FILEPATH
Path to spark configuration for run
--spark_kube_config_filepath SPARK_KUBE_CONFIG_FILEPATH
Path to Kubernetes-based configuration.
```

63 changes: 52 additions & 11 deletions data-processing-lib/doc/spark-runtime.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,59 @@
# Spark Framework
The Spark runtime extends the base framework with the following set of components:
The Spark runtime implementation is roughly based on the ideas from
[here](https://wrightturn.wordpress.com/2015/07/22/getting-spark-data-from-aws-s3-using-boto-and-pyspark/),
[here](https://medium.com/how-to-become-a-data-architect/get-best-performance-for-pyspark-jobs-using-parallelize-48c8fa03a21e)
and [here](https://medium.com/@shuklaprashant9264/alternate-of-for-loop-in-pyspark-25a00888ec35).
Spark itself is basically used for execution parallelization, but all data access is based on the
framework's [data access](data-access-factory.md), thus preserving all the implemented features. At
the start of the execution, the list of files to process is obtained (using data access framework)
and then split between Spark workers for reading actual data, its transformation and writing it back.
The implementation is based on Spark RDD (For comparison of the three Apache Spark APIs:
RDDs, DataFrames, and Datasets see this
[Databricks blog post](https://www.databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html))
As defined by Databricks:
```text
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an
immutable distributed collection of elements of your data, partitioned across nodes in your
cluster that can be operated in parallel with a low-level API that offers transformations
and actions.
```
This APIs fits perfectly into what we are implementing. It allows us to fully leverage our
existing DataAccess APIs thus preserving all of the investments into flexible, reliable data
access. Additionally RDDs flexible low-level control allows us to work on partition level,
thus limiting the amount of initialization and set up.
Note that in our approach transform's processing is based on either binary or parquet data,
not Spark DataFrames or DataSet. We are not currently supporting supporting these Spark APIs,
as they are not well mapped into what we are implementing.

In our implementation we are using
[pyspark.SparkContext.parallelize](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html)
for running multiple transforms in parallel. We allow 2 options for specifying the number of partitions, determining
how many partitions the RDD should be divided into. See
[here](https://sparktpoint.com/how-to-create-rdd-using-parallelize/) for the explanation
of this parameter:
* If you specify a positive value of the parameter, Spark will attempt to evenly
distribute the data from seq into that many partitions. For example, if you have
a collection of 100 elements and you specify numSlices as 4, Spark will try
to create 4 partitions with approximately 25 elements in each partition.
* If you don’t specify this parameter, Spark will use a default value, which is
typically determined based on the cluster configuration or the available resources
(number of workers).

## Transforms

* [AbstractSparkTransform](../spark/src/data_processing_spark/runtime/spark/spark_transform.py) - this
is the base class for all spark-based transforms over spark DataFrames.
* [SparkTransformConfiguration](../spark/src/data_processing_spark/runtime/spark/spark_transform_config.py) - this
is simple extension of the base TransformConfiguration class to hold the transformation class
(an extension of AbstractSparkTransform).
* [SparkTransformRuntimeConfiguration](../spark/src/data_processing_spark/transform/runtime_configuration.py) allows
to configure transform to use PySpark


## Runtime

* [SparkTransformLauncher](../spark/src/data_processing_spark/runtime/spark/spark_launcher.py) - this is a
class generally used to implement `main()` that makes use of a `SparkTransformConfiguration` to
start the Spark runtime and execute the transform over the specified set of input files.
* [SparkTransformRuntimeConfiguration](../spark/src/data_processing_spark/runtime/spark/runtime_config.py) - this
class is a simple extension of the transform's base TransformConfiguration class.
Spark runtime extends the base framework with the following set of components:
* [SparkTransformExecutionConfiguration](../spark/src/data_processing_spark/runtime/spark/execution_configuration.py)
allows to configure Spark execution
* [SparkTransformFileProcessor](../spark/src/data_processing_spark/runtime/spark/transform_file_processor.py) extends
[AbstractTransformFileProcessor](../python/src/data_processing/runtime/transform_file_processor.py) to work on
PySpark
* [SparkTransformLauncher](../spark/src/data_processing_spark/runtime/spark/transform_launcher.py) allows
to launch PySpark runtime and execute a transform
* [orchestrate](../spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py) function orchestrates Spark
based execution
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from data_processing.runtime.execution_configuration import TransformExecutionConfiguration
from data_processing.runtime.execution_configuration import TransformExecutionConfiguration, runtime_cli_prefix
from data_processing.runtime.runtime_configuration import TransformRuntimeConfiguration
from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher
from data_processing.runtime.transform_file_processor import AbstractTransformFileProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
logger = get_logger(__name__)


cli_prefix = "runtime_"
runtime_cli_prefix = "runtime_"


class TransformExecutionConfiguration(CLIArgumentProvider):
Expand All @@ -45,16 +45,16 @@ def add_input_params(self, parser: argparse.ArgumentParser) -> None:
:param parser: parser
:return:
"""
parser.add_argument(f"--{cli_prefix}pipeline_id", type=str, default="pipeline_id", help="pipeline id")
parser.add_argument(f"--{cli_prefix}job_id", type=str, default="job_id", help="job id")
parser.add_argument(f"--{runtime_cli_prefix}pipeline_id", type=str, default="pipeline_id", help="pipeline id")
parser.add_argument(f"--{runtime_cli_prefix}job_id", type=str, default="job_id", help="job id")

help_example_dict = {
"github": ["https://github.com/somerepo", "Github repository URL."],
"commit_hash": ["1324", "github commit hash"],
"path": ["transforms/universal/code", "Path within the repository"],
}
parser.add_argument(
f"--{cli_prefix}code_location",
f"--{runtime_cli_prefix}code_location",
type=ast.literal_eval,
default=None,
help="AST string containing code location\n" + ParamsUtils.get_ast_help_text(help_example_dict),
Expand All @@ -66,7 +66,7 @@ def apply_input_params(self, args: argparse.Namespace) -> bool:
:param args: user defined arguments
:return: True, if validate pass or False otherwise
"""
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
captured = CLIArgumentProvider.capture_parameters(args, runtime_cli_prefix, False)
# store parameters locally
self.pipeline_id = captured["pipeline_id"]
self.job_details = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# limitations under the License.
################################################################################

import argparse
import time

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
Expand Down Expand Up @@ -44,29 +43,6 @@ def __init__(
super().__init__(runtime_config, data_access_factory)
self.execution_config = PythonTransformExecutionConfiguration(name=runtime_config.get_name())

def __get_parameters(self) -> bool:
"""
This method creates arg parser, fills it with the parameters
and does parameters validation
:return: True if validation passes or False, if not
"""
parser = argparse.ArgumentParser(
description=f"Driver for {self.name} processing",
# RawText is used to allow better formatting of ast-based arguments
# See uses of ParamsUtils.dict_to_str()
formatter_class=argparse.RawTextHelpFormatter,
)
# add additional arguments
self.runtime_config.add_input_params(parser=parser)
self.data_access_factory.add_input_params(parser=parser)
self.execution_config.add_input_params(parser=parser)
args = parser.parse_args()
return (
self.runtime_config.apply_input_params(args=args)
and self.execution_config.apply_input_params(args=args)
and self.data_access_factory.apply_input_params(args=args)
)

def _submit_for_execution(self) -> int:
"""
Submit for execution
Expand All @@ -87,12 +63,3 @@ def _submit_for_execution(self) -> int:
finally:
logger.info(f"Completed execution in {round((time.time() - start)/60., 3)} min, execution result {res}")
return res

def launch(self) -> int:
"""
Execute method orchestrates driver invocation
:return:
"""
if self.__get_parameters():
return self._submit_for_execution()
return 1
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ def compute_execution_stats(self, stats: TransformStatistics) -> None:
:param stats: output of statistics as aggregated across all calls to all transforms.
:return: job execution statistics. These are generally reported as metadata by the Ray Orchestrator.
"""
return stats
pass
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import sys
from typing import Any
import argparse

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
from data_processing.runtime import TransformRuntimeConfiguration
Expand All @@ -36,9 +37,59 @@ def __init__(
self.name = self.runtime_config.get_name()
self.data_access_factory = data_access_factory

def launch(self):
def _get_parser(self) -> argparse.ArgumentParser:
"""
This method creates a parser
:return: parser
"""
return argparse.ArgumentParser(
description=f"Driver for {self.name} processing",
# RawText is used to allow better formatting of ast-based arguments
# See uses of ParamsUtils.dict_to_str()
formatter_class=argparse.RawTextHelpFormatter,
)

def _get_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace:
"""
Parse input parameters
:param parser: parser
:return: list of arguments
"""
# add additional arguments
self.runtime_config.add_input_params(parser=parser)
self.data_access_factory.add_input_params(parser=parser)
self.execution_config.add_input_params(parser=parser)
return parser.parse_args()

def _get_parameters(self, args: argparse.Namespace) -> bool:
"""
This method creates arg parser, fills it with the parameters
and does parameters validation
:return: True if validation passes or False, if not
"""
return (
self.runtime_config.apply_input_params(args=args)
and self.execution_config.apply_input_params(args=args)
and self.data_access_factory.apply_input_params(args=args)
)

def _submit_for_execution(self) -> int:
"""
Submit for execution
:return:
"""
raise ValueError("must be implemented by subclass")

def launch(self):
"""
Execute method orchestrates driver invocation
:return:
"""
args = self._get_arguments(self._get_parser())
if self._get_parameters(args):
return self._submit_for_execution()
return 1

def get_transform_name(self) -> str:
return self.name

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from data_processing.transform.abstract_transform import AbstractTransform
from data_processing.transform.binary_transform import AbstractBinaryTransform
from data_processing.transform.table_transform import AbstractTableTransform
from data_processing.transform.transform_statistics import TransformStatistics
Expand Down
daw3rd marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,8 @@

from typing import Any, TypeVar

from data_processing.transform.abstract_transform import AbstractTransform


DATA = TypeVar("DATA")


class AbstractBinaryTransform(AbstractTransform[DATA]):
class AbstractBinaryTransform:
"""
Converts input binary file to output file(s) (binary)
Sub-classes must provide the transform() method to provide the conversion of one binary files to 0 or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from data_processing.utils import TransformUtils


class AbstractTableTransform(AbstractBinaryTransform[pa.Table]):
class AbstractTableTransform(AbstractBinaryTransform):
"""
Extends AbstractBinaryTransform to expect the byte arrays from to contain a pyarrow Table.
Sub-classes are expected to implement transform() on the parsed Table instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from argparse import ArgumentParser
from typing import Any

from data_processing.transform import AbstractTransform
from data_processing.transform import AbstractBinaryTransform
from data_processing.utils import CLIArgumentProvider


Expand All @@ -22,7 +22,9 @@ class TransformConfiguration(CLIArgumentProvider):
This is a base transform configuration class defining transform's input/output parameter
"""

def __init__(self, name: str, transform_class: type[AbstractTransform], remove_from_metadata: list[str] = []):
def __init__(
self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = []
):
"""
Initialization
:param name: transformer name
Expand All @@ -34,12 +36,12 @@ def __init__(self, name: str, transform_class: type[AbstractTransform], remove_f
self.remove_from_metadata = remove_from_metadata
self.params = {}

def get_transform_class(self) -> type[AbstractTransform]:
def get_transform_class(self) -> type[AbstractBinaryTransform]:
"""
Get the class extending AbstractTransform which implements a specific transformation.
Get the class extending AbstractBinaryTransform which implements a specific transformation.
The class will generally be instantiated with a dictionary of configuration produced by
the associated TransformRuntime get_transform_config() method.
:return: class extending AbstractTransform
:return: class extending AbstractBinaryTransform
"""
return self.transform_class

Expand Down
Loading
Loading