Skip to content

Commit

Permalink
Merge pull request #27 from aodn/filtering
Browse files Browse the repository at this point in the history
Refactor project for Data Delivery Mode Filter
  • Loading branch information
vietnguyengit authored Dec 9, 2024
2 parents 29a178f + 70289b9 commit 89d979c
Show file tree
Hide file tree
Showing 18 changed files with 1,101 additions and 736 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ syntax is :/....


# File Structure
```
data_discovery_ai/
├── common/ # Common utilities, including shared configurations and constants, used across modules
├── model/ # Core ML logic, including model training, evaluation, and inference implementations
├── pipeline/ # Data pipelines for using ML models
├── resources/ # Stored assets such as pretrained models, sample datasets, and other resources required for model inference
├── services/ # Service modules for providing service functions for API use
├── utils/ # Utility functions and helper scripts for various tasks
├── extras/ # Supplementary files
├── notebooks/ # Jupyter notebooks documenting the design, experiments, and practical usage of AI features
├── tests/ # Unit test for critical functions
```

## Required Configuration Files
1. Elasticsearch configuration file
File name `esManager.ini` saved under folder `data_discovery_ai/common`. Specific fileds & values required:
Expand Down
3 changes: 3 additions & 0 deletions data_discovery_ai/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
AVAILABLE_MODELS = ["development", "staging", "production", "experimental", "benchmark"]
KEYWORD_CONFIG = "keyword_classification_parameters.ini"
ELASTICSEARCH_CONFIG = "esManager.ini"
KEYWORD_FOLDER = "KeywordClassifier"
KEYWORD_SAMPLE_FILE = "keyword_sample.pkl"
KEYWORD_LABEL_FILE = "keyword_label.pkl"
FILTER_PREPROCESSED_FILE = "filter_preprocessed.pkl"
FILTER_FOLDER = "DataDeliveryModeFilter"

# global constants for es_connector
BATCH_SIZE = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ fl_gamma = 2
fl_alpha = 0.7
epoch = 100
batch = 32
early_stopping_patience = 5
early_stopping_patience = 3
reduce_lr_patience = 5
validation_split = 0.2
confidence = 0.4
confidence = 0.5
top_N = 2
18 changes: 12 additions & 6 deletions data_discovery_ai/model/keywordModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
from typing import Dict, Callable, Any, Tuple, Optional, List
import os
from pathlib import Path
import json

os.environ["TF_USE_LEGACY_KERAS"] = "1"

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# TODO: Delete this line after fix 'module not exist issue' in notebooks
KEYWORD_FOLDER = "KeywordClassifier"


def get_class_weights(Y_train: np.ndarray) -> Dict[int, float]:
"""
Expand Down Expand Up @@ -158,12 +160,13 @@ def keyword_model(
callbacks=[early_stopping, reduce_lr],
)
model_file_path = (
Path(__file__).resolve().parent.parent / "resources" / model_name
Path(__file__).resolve().parent.parent
/ "resources"
/ KEYWORD_FOLDER
/ model_name
).with_suffix(".keras")
# make sure folder exist
model_file_path.parent.mkdir(
parents=True, exist_ok=True
) # Ensure the folder exists
model_file_path.parent.mkdir(parents=True, exist_ok=True)

model.save(model_file_path)

Expand Down Expand Up @@ -288,7 +291,10 @@ def load_saved_model(trained_model: str) -> Optional[load_model]:
Optional[keras_load_model]: The loaded Keras model if successful, otherwise `None`.
"""
model_file_path = (
Path(__file__).resolve().parent.parent / "resources" / trained_model
Path(__file__).resolve().parent.parent
/ "resources"
/ KEYWORD_FOLDER
/ trained_model
).with_suffix(".keras")
try:
saved_model = load_model(model_file_path, compile=False)
Expand Down
File renamed without changes.
202 changes: 119 additions & 83 deletions data_discovery_ai/pipeline.py → data_discovery_ai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
# ML pipeline, need to be deployed to prefect, etc in the future
from data_discovery_ai.utils.preprocessor import *
from data_discovery_ai.utils import preprocessor as preprocessor

import data_discovery_ai.model.keywordModel as model
import data_discovery_ai.utils.es_connector as connector
import data_discovery_ai.service.keywordClassifier as keywordClassifier
from data_discovery_ai.utils.config_utils import ConfigUtil
from data_discovery_ai.common.constants import (
AVAILABLE_MODELS,
KEYWORD_SAMPLE_FILE,
KEYWORD_LABEL_FILE,
BATCH_SIZE,
SLEEP_TIME,
ES_INDEX_NAME,
)
from data_discovery_ai.common.constants import *

import sys

Expand Down Expand Up @@ -43,35 +37,16 @@ class TrainTestData:
n_labels: int


class KeywordClassifierPipeline:
def __init__(
self, isDataChanged: bool, usePretrainedModel: bool, model_name: str
) -> None:
"""
Init the pipeline, load parameters from file.
Input:
isDataChanged: bool. A flag to show whether the data (metadata records) significantly changed. Set as True if data changed, which means sample set needs to be repreprocessed, as well as the model need to be re-trained.
usePretrainedModel: bool. Choose whether to use pretrained model or train the model and then to be used. If set as True, the model_name should be given.
model_name: str. The model name that saved in a .keras file.
"""
self.config = ConfigUtil()
self.params = self.config.load_keyword_config()
class BasePipeline:
def __init__(self, isDataChanged: bool, usePretrainedModel: bool, model_name: str):
self.isDataChanged = isDataChanged
self.usePretrainedModel = usePretrainedModel
# validate model name with accepted values, defined in data_discovery_ai/common/constants.py
self.model_name = model_name
# validate model name with accepted values, defined in data_discovery_ai/common/constants.py
if not self.is_valid_model():
raise ValueError(
'Available model name: ["development", "staging", "production", "experimental", "benchmark"]'
)
# create temp folder
self.temp_dir = tempfile.mkdtemp()

# define labels for prediction
self.labels = None

def set_labels(self, labels):
self.labels = labels

"""
Validate model name within fixed selections
Expand Down Expand Up @@ -103,11 +78,83 @@ def fetch_raw_data(self) -> pd.DataFrame:
)
return raw_data


class DataDeliveryModeFilterPipeline(BasePipeline):
def __init__(
self, isDataChanged: bool, usePretrainedModel: bool, model_name: str
) -> None:
super().__init__(
isDataChanged=isDataChanged,
usePretrainedModel=usePretrainedModel,
model_name=model_name,
)
self.config = ConfigUtil()

# create temp folder
self.temp_dir = tempfile.mkdtemp()

# extends the fetch_raw_data method from BasePipeline
def fetch_raw_data(self) -> pd.DataFrame:
return super().fetch_raw_data()

def pipeline(self) -> None:

# define resource files paths
base_dir = self.config.base_dif
full_path = base_dir / "resources" / FILTER_FOLDER / FILTER_PREPROCESSED_FILE

if self.isDataChanged:
raw_data = self.fetch_raw_data()
preprocessed_data = preprocessor.identify_ddm_sample(raw_data)
preprocessed_data_embedding = preprocessor.calculate_embedding(
preprocessed_data
)
preprocessor.save_to_file(preprocessed_data_embedding, full_path)
else:
# load preprocessed data from resource
preprocessed_data = preprocessor.load_from_file(full_path)
print(preprocessed_data)
return preprocessed_data


class KeywordClassifierPipeline(BasePipeline):
def __init__(
self, isDataChanged: bool, usePretrainedModel: bool, model_name: str
) -> None:
"""
Init the pipeline, load parameters from file.
Input:
isDataChanged: bool. A flag to show whether the data (metadata records) significantly changed. Set as True if data changed, which means sample set needs to be repreprocessed, as well as the model need to be re-trained.
usePretrainedModel: bool. Choose whether to use pretrained model or train the model and then to be used. If set as True, the model_name should be given.
model_name: str. The model name that saved in a .keras file.
"""
# extends the BasePipeline class
super().__init__(
isDataChanged=isDataChanged,
usePretrainedModel=usePretrainedModel,
model_name=model_name,
)
self.config = ConfigUtil()
self.params = self.config.load_keyword_config()

# create temp folder
self.temp_dir = tempfile.mkdtemp()

# define labels for prediction
self.labels = None

# extends the fetch_raw_data method from BasePipeline
def fetch_raw_data(self) -> pd.DataFrame:
return super().fetch_raw_data()

def set_labels(self, labels):
self.labels = labels

def prepare_sampleSet(self, raw_data: pd.DataFrame) -> pd.DataFrame:
"""
Prepares a processed sample set from raw data via filtering, preprocessing and embedding calculations.
This method executes several processing steps on the raw data:
1. identify_sample: Identifies samples containing specific vocabulary terms from the "vocabs" parameter.
1. identify_km_sample: Identifies samples containing specific vocabulary terms from the "vocabs" parameter.
2. sample_preprocessor: Cleans and preprocesses the identified sample set by reformatting labels and removing empty records.
3. calculate_embedding: Calculates embeddings for each entry in the preprocessed samples.
4. Saves the processed sample set to a file, then reloads it for subsequent use.
Expand All @@ -118,7 +165,7 @@ def prepare_sampleSet(self, raw_data: pd.DataFrame) -> pd.DataFrame:
preprocessed_sampleSet: pd.DataFrame. Representing the processed sample set, with an additional embedding column.
"""
vocabs = self.params["preprocessor"]["vocabs"].split(", ")
labelledDS = preprocessor.identify_sample(raw_data, vocabs)
labelledDS = preprocessor.identify_km_sample(raw_data, vocabs)
preprocessed_samples = preprocessor.sample_preprocessor(labelledDS, vocabs)
sampleSet = preprocessor.calculate_embedding(preprocessed_samples)

Expand Down Expand Up @@ -247,58 +294,47 @@ def make_prediction(self, description: str) -> str:
print(predicted_labels)
return predicted_labels

def pipeline(self, description: str) -> None:
"""
The keyword classifier pipeline.
Inputs:
isDataChanged: bool. The indicator to call the data preprocessing module or not.
usePretrainedModel: bool. The indicator to use the pretrained model or not.
description: str. The item description which is used for making prediction.
selected_model: str. The model name for a selected pretrained model.
"""
# define resource files paths
base_dir = self.config.base_dif
full_sampleSet_path = (
base_dir / "resources" / KEYWORD_FOLDER / KEYWORD_SAMPLE_FILE
)
full_labelMap_path = (
base_dir / "resources" / KEYWORD_FOLDER / KEYWORD_LABEL_FILE
)

def pipeline(
isDataChanged: bool, usePretrainedModel: bool, description: str, selected_model: str
) -> None:
"""
The keyword classifier pipeline.
Inputs:
isDataChanged: bool. The indicator to call the data preprocessing module or not.
usePretrainedModel: bool. The indicator to use the pretrained model or not.
description: str. The item description which is used for making prediction.
selected_model: str. The model name for a selected pretrained model.
"""
keyword_classifier_pipeline = KeywordClassifierPipeline(
isDataChanged=isDataChanged,
usePretrainedModel=usePretrainedModel,
model_name=selected_model,
)

# define resource files paths
base_dir = keyword_classifier_pipeline.config.base_dif
full_sampleSet_path = base_dir / "resources" / KEYWORD_SAMPLE_FILE
full_labelMap_path = base_dir / "resources" / KEYWORD_LABEL_FILE

# data not changed, so load the preprocessed data from resource
if not isDataChanged:
sampleSet = preprocessor.load_from_file(full_sampleSet_path)

# usePretrainedModel = True
if keyword_classifier_pipeline.usePretrainedModel:
predefinedLabels = preprocessor.load_from_file(full_labelMap_path)
keyword_classifier_pipeline.set_labels(labels=predefinedLabels)
# usePretrainedModel = False
# data not changed, so load the preprocessed data from resource
if not self.isDataChanged:
sampleSet = preprocessor.load_from_file(full_sampleSet_path)

# usePretrainedModel = True
if self.usePretrainedModel:
predefinedLabels = preprocessor.load_from_file(full_labelMap_path)
self.set_labels(labels=predefinedLabels)
# usePretrainedModel = False
else:
# retrain the model
train_test_data = self.prepare_train_test_sets(sampleSet)
preprocessor.save_to_file(self.labels, full_labelMap_path)
self.train_evaluate_model(train_test_data)

# data changed, so start from the data preprocessing module
else:
# retrain the model
train_test_data = keyword_classifier_pipeline.prepare_train_test_sets(
sampleSet
)
preprocessor.save_to_file(
keyword_classifier_pipeline.labels, full_labelMap_path
)
keyword_classifier_pipeline.train_evaluate_model(train_test_data)
raw_data = self.fetch_raw_data()
sampleSet = self.prepare_sampleSet(raw_data=raw_data)
preprocessor.save_to_file(sampleSet, full_sampleSet_path)

# data changed, so start from the data preprocessing module
else:
raw_data = keyword_classifier_pipeline.fetch_raw_data()
sampleSet = keyword_classifier_pipeline.prepare_sampleSet(raw_data=raw_data)
preprocessor.save_to_file(sampleSet, full_sampleSet_path)

train_test_data = keyword_classifier_pipeline.prepare_train_test_sets(sampleSet)
preprocessor.save_to_file(
keyword_classifier_pipeline.labels, full_labelMap_path
)
train_test_data = self.prepare_train_test_sets(sampleSet)
preprocessor.save_to_file(self.labels, full_labelMap_path)

keyword_classifier_pipeline.train_evaluate_model(train_test_data)
keyword_classifier_pipeline.make_prediction(description)
self.train_evaluate_model(train_test_data)
self.make_prediction(description)
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
# Resources
This document explain the artifacts generated/used for the keyword classification model.
This document explains the artifacts generated and used for the ML models supporting Data Discovery AI features. These include:
1. Keyword Classification: Provides keyword suggestions for metadata records that lack keyword information. The related resources are saved in the `KeywordClassifier` folder.
2. Data Delivery Mode: Assists with classifying data delivery modes used by the AODN portal filter. The related resources are saved in the `DataDeliveryModeFilter` folder.

## `resouces/artifacts/*`
This folder saves artifacts used for notebooks, which includes the following files:

- `keyword_sampke.pkl`

## `resources/*.keras`
## `resources/*/*.keras`
These files with suffix `.keras` indicate the pretrained models. The file name (without suffix) are used for selecting models, which are restrictly controlled within these options:

| Option | Purpose | Typical Use |
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file removed data_discovery_ai/resources/development.keras
Binary file not shown.
Binary file removed data_discovery_ai/resources/keyword_label.pkl
Binary file not shown.
4 changes: 3 additions & 1 deletion data_discovery_ai/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import logging
from data_discovery_ai.common.constants import API_PREFIX
from data_discovery_ai.utils.api_utils import api_key_auth, validate_model_name
from data_discovery_ai.pipeline import KeywordClassifierPipeline
from data_discovery_ai.pipeline.pipeline import (
KeywordClassifierPipeline,
)

router = APIRouter(prefix=API_PREFIX)
logger = logging.getLogger(__name__)
Expand Down
Loading

0 comments on commit 89d979c

Please sign in to comment.