Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Datatime formatter in small dataset and improve performace #244

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions sdgx/data_processors/formatters/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,36 +125,29 @@ def convert_datetime_columns(datetime_column_list, datetime_formats, processed_d
- result_data (pd.DataFrame): Processed table data with datetime columns converted to timestamp
"""

def convert_single_column_datetime_to_timestamp(
column_data: pd.Series, datetime_format: str
):
def datetime_formatter(each_value, datetime_format):
"""
convert each single column datetime string to timestamp int value.
"""
res = []
for each_value in column_data:
try:
datetime_obj = datetime.strptime(str(each_value), datetime_format)
each_stamp = datetime.timestamp(datetime_obj)
except Exception as e:
logger.warning(f"An error occured when convert str to timestamp {e}.")
logger.warning(f"Input parameters: ({str(each_value)}, {datetime_format})")
logger.warning(f"Input type: ({type(each_value)}, {type(datetime_format)})")
each_stamp = 0
res.append(each_stamp)
return pd.Series(res)
try:
datetime_obj = datetime.strptime(str(each_value), datetime_format)
each_stamp = datetime.timestamp(datetime_obj)
except Exception as e:
logger.warning(f"An error occured when convert str to timestamp {e}.")
logger.warning(f"Input parameters: ({str(each_value)}, {datetime_format})")
logger.warning(f"Input type: ({type(each_value)}, {type(datetime_format)})")
each_stamp = 0
return each_stamp

# Make a copy of processed_data to avoid modifying the original data
result_data = processed_data.copy()

# Convert each datetime column in datetime_column_list to timestamp
for column in datetime_column_list:
# Convert datetime to timestamp (int)
timestamp_col = convert_single_column_datetime_to_timestamp(
processed_data[column], datetime_formats[column]
result_data[column] = result_data[column].apply(
datetime_formatter, datetime_format=datetime_formats[column]
)
result_data[column] = timestamp_col

return result_data

def reverse_convert(self, processed_data: pd.DataFrame) -> pd.DataFrame:
Expand Down
60 changes: 60 additions & 0 deletions tests/optmize/test_generator_connector_with_datetime_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import faker
import pandas as pd
import pytest
from typing_extensions import Generator

from sdgx.data_connectors.generator_connector import GeneratorConnector
from sdgx.data_loader import DataLoader
from sdgx.data_models.metadata import Metadata
from sdgx.data_processors.formatters.datetime import DatetimeFormatter

CHUNK_SIZE = 100


@pytest.fixture
def datetime_test_df():
total_row = 150
ff = faker.Faker()
df = pd.DataFrame([ff.date() for i in range(total_row)], columns=["date"])
return df


def test_datetime_formatter_test_df(datetime_test_df: pd.DataFrame):
def df_generator():
yield datetime_test_df

data_processors = [DatetimeFormatter()]
dataconnector = GeneratorConnector(df_generator)
dataloader = DataLoader(dataconnector, chunksize=CHUNK_SIZE)

metadata = Metadata.from_dataloader(dataloader)
metadata.datetime_columns = ["date"]
metadata.discrete_columns = []
metadata.datetime_format = {"date": "%Y-%m-%d"}

for d in data_processors:
d.fit(metadata=metadata, tabular_data=dataloader)

def chunk_generator() -> Generator[pd.DataFrame, None, None]:
for chunk in dataloader.iter():
for d in data_processors:
chunk = d.convert(chunk)

assert not chunk.isna().any().any()
assert not chunk.isnull().any().any()
yield chunk

processed_dataloader = DataLoader(
GeneratorConnector(chunk_generator), identity=dataloader.identity
)

df = processed_dataloader.load_all()

assert not df.isna().any().any()
assert not df.isnull().any().any()

reverse_converted_df = df
for d in data_processors:
reverse_converted_df = d.reverse_convert(df)

assert reverse_converted_df.eq(datetime_test_df).all().all()
Loading