Skip to content

Commit

Permalink
pre commit
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Aug 14, 2024
1 parent dbc2965 commit 75325f7
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from data_processing.runtime.pure_python.transform_runtime import DefaultPythonTransformRuntime
from data_processing.runtime.pure_python.runtime_configuration import PythonTransformRuntimeConfiguration
from data_processing.runtime.pure_python.execution_configuration import PythonTransformExecutionConfiguration
from data_processing.runtime.pure_python.transform_file_processor import (PythonTransformFileProcessor,
PythonPoolTransformFileProcessor)
from data_processing.runtime.pure_python.transform_file_processor import (
PythonTransformFileProcessor,
PythonPoolTransformFileProcessor,
)
from data_processing.runtime.pure_python.transform_orchestrator import orchestrate
from data_processing.runtime.pure_python.transform_launcher import PythonTransformLauncher
from data_processing.runtime.pure_python.transform_invoker import invoke_transform, execute_python_transform
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,4 @@ def get_input_params(self) -> dict[str, Any]:
get input parameters for job_input_params in metadata
:return: dictionary of parameters
"""
return {
"num_processors": self.num_processors
}
return {"num_processors": self.num_processors}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
################################################################################

from data_processing.runtime import TransformRuntimeConfiguration
from data_processing.transform import TransformConfiguration
from data_processing.runtime.pure_python import DefaultPythonTransformRuntime
from data_processing.transform import TransformConfiguration


class PythonTransformRuntimeConfiguration(TransformRuntimeConfiguration):
def __init__(self,
transform_config: TransformConfiguration,
runtime_class: type[DefaultPythonTransformRuntime] = DefaultPythonTransformRuntime,
):
def __init__(
self,
transform_config: TransformConfiguration,
runtime_class: type[DefaultPythonTransformRuntime] = DefaultPythonTransformRuntime,
):
"""
Initialization
:param transform_config - base configuration class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ class PythonPoolTransformFileProcessor(AbstractTransformFileProcessor):
"""

def __init__(
self,
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
self,
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
):
"""
Init method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
from data_processing.runtime.pure_python import (
PythonTransformRuntimeConfiguration,
PythonTransformExecutionConfiguration,
PythonTransformRuntimeConfiguration,
orchestrate,
)
from data_processing.runtime.transform_launcher import AbstractTransformLauncher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
################################################################################

import time
from typing import Any
from multiprocessing import Pool
import traceback
from datetime import datetime
from multiprocessing import Pool
from typing import Any

from data_processing.data_access import DataAccessFactoryBase
from data_processing.runtime.pure_python import (
PythonPoolTransformFileProcessor,
PythonTransformExecutionConfiguration,
PythonTransformRuntimeConfiguration,
PythonTransformFileProcessor,
PythonPoolTransformFileProcessor,
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import TransformStatistics, AbstractBinaryTransform
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.utils import get_logger


Expand Down Expand Up @@ -69,21 +69,28 @@ def orchestrate(
logger.debug(f"{runtime_config.get_name()} Begin processing files")
if execution_config.num_processors > 0:
# using multiprocessor pool for execution
statistics = _process_transforms_multiprocessor(files=files, size=execution_config.num_processors,
data_access_factory=data_access_factory,
print_interval=print_interval,
transform_params=runtime.get_transform_config(
data_access_factory=data_access_factory,
statistics=statistics, files=files),
transform_class=runtime_config.get_transform_class())
statistics = _process_transforms_multiprocessor(
files=files,
size=execution_config.num_processors,
data_access_factory=data_access_factory,
print_interval=print_interval,
transform_params=runtime.get_transform_config(
data_access_factory=data_access_factory, statistics=statistics, files=files
),
transform_class=runtime_config.get_transform_class(),
)
else:
# using sequential execution
_process_transforms(files=files, data_access_factory=data_access_factory,
print_interval=print_interval, statistics=statistics,
transform_params=runtime.get_transform_config(
data_access_factory=data_access_factory,
statistics=statistics, files=files),
transform_class=runtime_config.get_transform_class())
_process_transforms(
files=files,
data_access_factory=data_access_factory,
print_interval=print_interval,
statistics=statistics,
transform_params=runtime.get_transform_config(
data_access_factory=data_access_factory, statistics=statistics, files=files
),
transform_class=runtime_config.get_transform_class(),
)
status = "success"
return_code = 0
except Exception as e:
Expand All @@ -108,8 +115,9 @@ def orchestrate(
"status": status,
},
"code": execution_config.code_location,
"job_input_params":
input_params | data_access_factory.get_input_params() | execution_config.get_input_params(),
"job_input_params": input_params
| data_access_factory.get_input_params()
| execution_config.get_input_params(),
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand All @@ -121,9 +129,14 @@ def orchestrate(
return 1


def _process_transforms(files: list[str], print_interval: int, data_access_factory: DataAccessFactoryBase,
statistics: TransformStatistics, transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform]) -> None:
def _process_transforms(
files: list[str],
print_interval: int,
data_access_factory: DataAccessFactoryBase,
statistics: TransformStatistics,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
) -> None:
"""
Process transforms sequentially
:param files: list of files to process
Expand All @@ -137,8 +150,12 @@ def _process_transforms(files: list[str], print_interval: int, data_access_facto
:return: None
"""
# create executor
executor = PythonTransformFileProcessor(data_access_factory=data_access_factory, statistics=statistics,
transform_params=transform_params, transform_class=transform_class)
executor = PythonTransformFileProcessor(
data_access_factory=data_access_factory,
statistics=statistics,
transform_params=transform_params,
transform_class=transform_class,
)
# process data
t_start = time.time()
completed = 0
Expand All @@ -157,9 +174,14 @@ def _process_transforms(files: list[str], print_interval: int, data_access_facto
logger.info(f"done flushing in {round(time.time() - start, 3)} sec")


def _process_transforms_multiprocessor(files: list[str], size: int, print_interval: int,
data_access_factory: DataAccessFactoryBase, transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform]) -> TransformStatistics:
def _process_transforms_multiprocessor(
files: list[str],
size: int,
print_interval: int,
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
) -> TransformStatistics:
"""
Process transforms using multiprocessing pool
:param files: list of files to process
Expand All @@ -173,8 +195,9 @@ def _process_transforms_multiprocessor(files: list[str], size: int, print_interv
# result statistics
statistics = TransformStatistics()
# create processor
processor = PythonPoolTransformFileProcessor(data_access_factory=data_access_factory,
transform_params=transform_params, transform_class=transform_class)
processor = PythonPoolTransformFileProcessor(
data_access_factory=data_access_factory, transform_params=transform_params, transform_class=transform_class
)
completed = 0
t_start = time.time()
# create multiprocessing pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class AbstractTransformFileProcessor:
"""

def __init__(
self,
data_access_factory: DataAccessFactoryBase,
transform_parameters: dict[str, Any],
self,
data_access_factory: DataAccessFactoryBase,
transform_parameters: dict[str, Any],
):
"""
Init method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def sort_sem(files_df: pd.DataFrame, logger: Logger, title_column_name="new_titl
# Dependency based Topological Sort
start_time = time.time()
files_df_sorted = topological_sort_on_df(dep_graph, files_df, logger, title_column_name)
sort_time = round(time.time() - start_time,2)
sort_time = round(time.time() - start_time, 2)
logger.info(f"sort_sem: time taken topological sort - {sort_time}")

else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
from typing import Any

import pyarrow as pa
from data_processing.transform import (
AbstractTableTransform,
TransformConfiguration,
)
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import GB, CLIArgumentProvider, TransformUtils


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def get_test_transform_fixtures(self) -> list[tuple]:
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir))
fixtures = []
launcher = PythonTransformLauncher(NOOPPythonTransformConfiguration())
fixtures.append((
launcher,
{sleep_cli_param: 0, "runtime_num_processors": 2},
basedir + "/input", basedir + "/expected"))
fixtures.append(
(launcher, {sleep_cli_param: 0, "runtime_num_processors": 2}, basedir + "/input", basedir + "/expected")
)
return fixtures

0 comments on commit 75325f7

Please sign in to comment.