Skip to content

Commit

Permalink
add general updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vietnguyengit committed Dec 9, 2024
1 parent 26f523f commit 3989793
Show file tree
Hide file tree
Showing 7 changed files with 801 additions and 803 deletions.
6 changes: 6 additions & 0 deletions data_discovery_ai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import logging

# Logging config
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
94 changes: 47 additions & 47 deletions data_discovery_ai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@

import numpy as np
import pandas as pd
from typing import Any, Dict, Tuple
from typing import Any, Dict
from dataclasses import dataclass
import logging
import tempfile
import os
import json


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
from data_discovery_ai import logger


@dataclass
Expand All @@ -38,9 +33,12 @@ class TrainTestData:


class BasePipeline:
def __init__(self, isDataChanged: bool, usePretrainedModel: bool, model_name: str):
self.isDataChanged = isDataChanged
self.usePretrainedModel = usePretrainedModel
def __init__(
self, is_data_changed: bool, use_pretrained_model: bool, model_name: str
):
self.config = ConfigUtil()
self.is_data_changed = is_data_changed
self.use_pretrained_model = use_pretrained_model
self.model_name = model_name
# validate model name with accepted values, defined in data_discovery_ai/common/constants.py
if not self.is_valid_model():
Expand Down Expand Up @@ -81,14 +79,13 @@ def fetch_raw_data(self) -> pd.DataFrame:

class DataDeliveryModeFilterPipeline(BasePipeline):
def __init__(
self, isDataChanged: bool, usePretrainedModel: bool, model_name: str
self, is_data_changed: bool, use_pretrained_model: bool, model_name: str
) -> None:
super().__init__(
isDataChanged=isDataChanged,
usePretrainedModel=usePretrainedModel,
is_data_changed=is_data_changed,
use_pretrained_model=use_pretrained_model,
model_name=model_name,
)
self.config = ConfigUtil()

# create temp folder
self.temp_dir = tempfile.mkdtemp()
Expand All @@ -103,7 +100,7 @@ def pipeline(self) -> None:
base_dir = self.config.base_dif
full_path = base_dir / "resources" / FILTER_FOLDER / FILTER_PREPROCESSED_FILE

if self.isDataChanged:
if self.is_data_changed:
raw_data = self.fetch_raw_data()
preprocessed_data = preprocessor.identify_ddm_sample(raw_data)
preprocessed_data_embedding = preprocessor.calculate_embedding(
Expand All @@ -113,13 +110,13 @@ def pipeline(self) -> None:
else:
# load preprocessed data from resource
preprocessed_data = preprocessor.load_from_file(full_path)
print(preprocessed_data)
logger.info(preprocessed_data)
return preprocessed_data


class KeywordClassifierPipeline(BasePipeline):
def __init__(
self, isDataChanged: bool, usePretrainedModel: bool, model_name: str
self, is_data_changed: bool, use_pretrained_model: bool, model_name: str
) -> None:
"""
Init the pipeline, load parameters from file.
Expand All @@ -130,11 +127,10 @@ def __init__(
"""
# extends the BasePipeline class
super().__init__(
isDataChanged=isDataChanged,
usePretrainedModel=usePretrainedModel,
is_data_changed=is_data_changed,
use_pretrained_model=use_pretrained_model,
model_name=model_name,
)
self.config = ConfigUtil()
self.params = self.config.load_keyword_config()

# create temp folder
Expand All @@ -150,7 +146,7 @@ def fetch_raw_data(self) -> pd.DataFrame:
def set_labels(self, labels):
self.labels = labels

def prepare_sampleSet(self, raw_data: pd.DataFrame) -> pd.DataFrame:
def prepare_sample_set(self, raw_data: pd.DataFrame) -> pd.DataFrame:
"""
Prepares a processed sample set from raw data via filtering, preprocessing and embedding calculations.
This method executes several processing steps on the raw data:
Expand All @@ -165,19 +161,21 @@ def prepare_sampleSet(self, raw_data: pd.DataFrame) -> pd.DataFrame:
preprocessed_sampleSet: pd.DataFrame. Representing the processed sample set, with an additional embedding column.
"""
vocabs = self.params["preprocessor"]["vocabs"].split(", ")
labelledDS = preprocessor.identify_km_sample(raw_data, vocabs)
preprocessed_samples = preprocessor.sample_preprocessor(labelledDS, vocabs)
sampleSet = preprocessor.calculate_embedding(preprocessed_samples)
labelled_ds = preprocessor.identify_km_sample(raw_data, vocabs)
preprocessed_samples = preprocessor.sample_preprocessor(labelled_ds, vocabs)
sample_set = preprocessor.calculate_embedding(preprocessed_samples)

# drop empty keywords rows
filtered_sampleSet = sampleSet[sampleSet["keywords"].apply(lambda x: x != [])]
filtered_sample_set = sample_set[
sample_set["keywords"].apply(lambda x: x != [])
]

full_path = os.path.join(self.temp_dir, KEYWORD_SAMPLE_FILE)

preprocessor.save_to_file(filtered_sampleSet, full_path)
return filtered_sampleSet
preprocessor.save_to_file(filtered_sample_set, full_path)
return filtered_sample_set

def prepare_train_test_sets(self, sampleSet: pd.DataFrame) -> TrainTestData:
def prepare_train_test_sets(self, sample_set: pd.DataFrame) -> TrainTestData:
"""
Prepares training and test sets from a given sample set by processing features and labels,
handling rare labels, and applying resampling techniques.
Expand All @@ -202,7 +200,7 @@ def prepare_train_test_sets(self, sampleSet: pd.DataFrame) -> TrainTestData:
"""

# Prepare feature matrix (X) and label matrix (Y) from the sample set
X, Y, Y_df, labels = preprocessor.prepare_X_Y(sampleSet)
X, Y, Y_df, labels = preprocessor.prepare_X_Y(sample_set)

self.labels = labels

Expand All @@ -214,6 +212,7 @@ def prepare_train_test_sets(self, sampleSet: pd.DataFrame) -> TrainTestData:
rare_label_threshold = self.params.getint(
"preprocessor", "rare_label_threshold"
)
# TODO fix type of "labels": not Dict from here: Expected type 'dict', got 'list[str]' instead
rare_label_index = preprocessor.identify_rare_labels(
Y_df, rare_label_threshold, labels
)
Expand All @@ -232,6 +231,7 @@ def prepare_train_test_sets(self, sampleSet: pd.DataFrame) -> TrainTestData:
label_weight_dict = model.get_class_weights(Y_train)

# Apply additional oversampling (Random Over Sampling) to the training set
# TODO: rare_keyword_index needs attention: Expected type 'list[int]', got 'None' instead
X_train_oversampled, Y_train_oversampled = preprocessor.resampling(
X_train=X_train, Y_train=Y_train, strategy="ROS", rare_keyword_index=None
)
Expand Down Expand Up @@ -274,12 +274,12 @@ def train_evaluate_model(self, train_test_data: TrainTestData) -> None:
predicted_labels = model.prediction(
train_test_data.X_test, trained_model, confidence, top_N
)
eval = model.evaluation(
eval_results = model.evaluation(
Y_test=train_test_data.Y_test, predictions=predicted_labels
)
print(eval)
logger.info(eval_results)

def make_prediction(self, description: str) -> str:
def make_prediction(self, description: str) -> list[Any]:
"""
Makes a prediction on the given description using a trained keyword classifier model Generates predicted labels for the given description using the trained keyword
classifier model specified by self.model_name.
Expand All @@ -288,10 +288,10 @@ def make_prediction(self, description: str) -> str:
Output:
predicted_labels: str. The predicted keywords by the trained keyword classifier model
"""
predicted_labels = keywordClassifier.keywordClassifier(
predicted_labels = keywordClassifier.classify_keyword(
trained_model=self.model_name, description=description, labels=self.labels
)
print(predicted_labels)
logger.info(predicted_labels)
return predicted_labels

def pipeline(self, description: str) -> None:
Expand All @@ -305,36 +305,36 @@ def pipeline(self, description: str) -> None:
"""
# define resource files paths
base_dir = self.config.base_dif
full_sampleSet_path = (
full_sample_set_path = (
base_dir / "resources" / KEYWORD_FOLDER / KEYWORD_SAMPLE_FILE
)
full_labelMap_path = (
full_label_map_path = (
base_dir / "resources" / KEYWORD_FOLDER / KEYWORD_LABEL_FILE
)

# data not changed, so load the preprocessed data from resource
if not self.isDataChanged:
sampleSet = preprocessor.load_from_file(full_sampleSet_path)
if not self.is_data_changed:
sample_set = preprocessor.load_from_file(full_sample_set_path)

# usePretrainedModel = True
if self.usePretrainedModel:
predefinedLabels = preprocessor.load_from_file(full_labelMap_path)
self.set_labels(labels=predefinedLabels)
if self.use_pretrained_model:
predefined_labels = preprocessor.load_from_file(full_label_map_path)
self.set_labels(labels=predefined_labels)
# usePretrainedModel = False
else:
# retrain the model
train_test_data = self.prepare_train_test_sets(sampleSet)
preprocessor.save_to_file(self.labels, full_labelMap_path)
train_test_data = self.prepare_train_test_sets(sample_set)
preprocessor.save_to_file(self.labels, full_label_map_path)
self.train_evaluate_model(train_test_data)

# data changed, so start from the data preprocessing module
else:
raw_data = self.fetch_raw_data()
sampleSet = self.prepare_sampleSet(raw_data=raw_data)
preprocessor.save_to_file(sampleSet, full_sampleSet_path)
sample_set = self.prepare_sample_set(raw_data=raw_data)
preprocessor.save_to_file(sample_set, full_sample_set_path)

train_test_data = self.prepare_train_test_sets(sampleSet)
preprocessor.save_to_file(self.labels, full_labelMap_path)
train_test_data = self.prepare_train_test_sets(sample_set)
preprocessor.save_to_file(self.labels, full_label_map_path)

self.train_evaluate_model(train_test_data)
self.make_prediction(description)
26 changes: 15 additions & 11 deletions data_discovery_ai/routes.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
from typing import Dict

from fastapi import APIRouter, Depends
from pydantic import BaseModel
import logging
from data_discovery_ai.common.constants import API_PREFIX
from data_discovery_ai.utils.api_utils import api_key_auth, validate_model_name
from data_discovery_ai.pipeline.pipeline import (
KeywordClassifierPipeline,
)
from data_discovery_ai import logger


router = APIRouter(prefix=API_PREFIX)
logger = logging.getLogger(__name__)


class PredictKeywordRequest(BaseModel):
selected_model: str = "default" # Default value
raw_input: str # Required field
selected_model: str
raw_input: str


class HealthCheckResponse(BaseModel):
status: str

@router.get("/hello", dependencies=[Depends(api_key_auth)])
async def hello():
logger.info("hello endpoint is called")
return {"content": "Hello World!"}

@router.get("/health", response_model=HealthCheckResponse)
async def health_check() -> HealthCheckResponse:
response = HealthCheckResponse(status="healthy")
return response


@router.post("/predict", dependencies=[Depends(api_key_auth)])
async def predict_keyword(payload: PredictKeywordRequest):
# selected_model = validate_model_name(payload.selected_model)
keyword_classifier_pipeline = KeywordClassifierPipeline(
isDataChanged=False, usePretrainedModel=True, model_name=payload.selected_model
is_data_changed=False,
use_pretrained_model=True,
model_name=payload.selected_model,
)
logger.info(
f"selected_model: {payload.selected_model}, raw_input: {payload.raw_input}"
Expand Down
6 changes: 2 additions & 4 deletions data_discovery_ai/service/keywordClassifier.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import data_discovery_ai.utils.preprocessor as preprocessor
import data_discovery_ai.model.keywordModel as model
from data_discovery_ai.utils.config_utils import ConfigUtil
from data_discovery_ai.common.constants import KEYWORD_LABEL_FILE
from typing import List, Any, Dict


def keywordClassifier(trained_model: str, description: str, labels: Dict) -> List[Any]:
def classify_keyword(trained_model: str, description: str, labels: Dict) -> List[Any]:
"""
The keyword classifier service for API use.
Input:
Expand All @@ -27,5 +26,4 @@ def keywordClassifier(trained_model: str, description: str, labels: Dict) -> Lis
params.getfloat("keywordModel", "confidence"),
params.getint("keywordModel", "top_N"),
)
prediction = model.get_predicted_keywords(target_predicted_labels, labels)
return prediction
return model.get_predicted_keywords(target_predicted_labels, labels)
19 changes: 9 additions & 10 deletions data_discovery_ai/utils/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
from typing import Dict
import tempfile
import json

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
from data_discovery_ai import logger


class Concept:
Expand Down Expand Up @@ -182,6 +180,7 @@ def prepare_X_Y(
X = np.array(sampleSet["embedding"].tolist())
Y_df, labels = prepare_Y_matrix(sampleSet)
Y = Y_df.to_numpy()
# TODO: labels set to be List not Dict but identify_rare_labels where labels being consumed expects Dict?
return X, Y, Y_df, labels


Expand Down Expand Up @@ -326,7 +325,7 @@ def prepare_train_test(
) -> Tuple[int, int, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Prepares the training and testing datasets using multi-label stratified splitting.
This function splits the feature matrix X and target matrix Y into training and testing sets based on parameters for multi-label stratified shuffling. It prints dataset information and returns the dimensions, number of labels, and split data for training and testing.
This function splits the feature matrix X and target matrix Y into training and testing sets based on parameters for multi-label stratified shuffling. It logger.infos dataset information and returns the dimensions, number of labels, and split data for training and testing.
Input:
X: np.ndarray. Feature matrix of shape (n_samples, dimension).
Y: np.ndarray. Target matrix of shape (n_samples, n_labels).
Expand Down Expand Up @@ -435,10 +434,10 @@ def resampling(
[list(map(int, list(row))) for row in Y_combined_resampled]
)

print(" ======== After Resampling ========")
print(f"Total samples: {len(X_train_resampled)}")
print(f"Dimension: {X_train_resampled.shape[1]}")
print(f"No. of labels: {Y_train_resampled.shape[1]}")
print(f"X resampled set size: {X_train_resampled.shape[0]}")
print(f"Y resampled set size: {Y_train_resampled.shape[0]}")
logger.info(" ======== After Resampling ========")
logger.info(f"Total samples: {len(X_train_resampled)}")
logger.info(f"Dimension: {X_train_resampled.shape[1]}")
logger.info(f"No. of labels: {Y_train_resampled.shape[1]}")
logger.info(f"X resampled set size: {X_train_resampled.shape[0]}")
logger.info(f"Y resampled set size: {Y_train_resampled.shape[0]}")
return X_train_resampled, Y_train_resampled
Loading

0 comments on commit 3989793

Please sign in to comment.