diff --git a/geochemistrypi/data_mining/model/detection.py b/geochemistrypi/data_mining/model/detection.py index e380b04..8848bea 100644 --- a/geochemistrypi/data_mining/model/detection.py +++ b/geochemistrypi/data_mining/model/detection.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- - +import os from typing import Dict, Optional, Union import numpy as np @@ -8,10 +8,13 @@ from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor -from ..utils.base import clear_output +from ..constants import MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH +from ..utils.base import clear_output, save_data, save_fig from ._base import WorkflowBase +from .func.algo_anomalydetection._common import density_estimation, scatter2d, scatter3d +from .func.algo_anomalydetection._enum import AnormalyDetectionCommonFunction, LocalOutlierFactorSpecialFunction from .func.algo_anomalydetection._iforest import isolation_forest_manual_hyper_parameters -from .func.algo_anomalydetection._local_outlier_factor import local_outlier_factor_manual_hyper_parameters +from .func.algo_anomalydetection._local_outlier_factor import local_outlier_factor_manual_hyper_parameters, plot_lof_scores class AnomalyDetectionWorkflowBase(WorkflowBase): @@ -22,6 +25,7 @@ class AnomalyDetectionWorkflowBase(WorkflowBase): def __init__(self) -> None: super().__init__() self.mode = "Anomaly Detection" + self.anomaly_detection_result = None def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None: """Fit the model by Scikit-learn framework.""" @@ -69,9 +73,65 @@ def _detect_data(X: pd.DataFrame, detect_label: np.ndarray) -> tuple[pd.DataFram return X_anomaly_detection, X_normal, X_anomaly + @staticmethod + def _density_estimation(data: pd.DataFrame, labels: pd.DataFrame, graph_name: str, algorithm_name: str, local_path: str, mlflow_path: str) -> None: + """Plot the density estimation diagram of the anomaly detection result.""" + print(f"-----* {graph_name} *-----") + density_estimation(data, labels, algorithm_name=algorithm_name) + save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + data_with_labels = pd.concat([data, labels], axis=1) + save_data(data_with_labels, f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + + @staticmethod + def _scatter2d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str, graph_name: str, local_path: str, mlflow_path: str) -> None: + """Plot the two-dimensional diagram of the anomaly detection result.""" + print(f"-----* {graph_name} *-----") + scatter2d(data, labels, algorithm_name=algorithm_name) + save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + data_with_labels = pd.concat([data, labels], axis=1) + save_data(data_with_labels, f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + + @staticmethod + def _scatter3d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str, graph_name: str, local_path: str, mlflow_path: str) -> None: + """Plot the three-dimensional diagram of the anomaly detection result.""" + print(f"-----* {graph_name} *-----") + scatter3d(data, labels, algorithm_name=algorithm_name) + save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + data_with_labels = pd.concat([data, labels], axis=1) + save_data(data_with_labels, f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + def common_components(self) -> None: """Invoke all common application functions for anomaly detection algorithms by Scikit-learn framework.""" - pass + GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH") + if self.X.shape[1] >= 3: + two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2) + self._scatter2d( + data=two_dimen_data, + labels=self.anomaly_detection_result, + algorithm_name=self.naming, + graph_name=AnormalyDetectionCommonFunction.PLOT_SCATTER_2D.value, + local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH, + mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH, + ) + + three_dimen_axis_index, three_dimen_data = self.choose_dimension_data(self.X, 3) + self._scatter3d( + data=three_dimen_data, + labels=self.anomaly_detection_result, + algorithm_name=self.naming, + graph_name=AnormalyDetectionCommonFunction.PLOT_SCATTER_3D.value, + local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH, + mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH, + ) + + self._density_estimation( + data=self.X, + labels=self.anomaly_detection_result, + algorithm_name=self.naming, + graph_name=AnormalyDetectionCommonFunction.DENSITY_ESTIMATION.value, + local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH, + mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH, + ) class IsolationForestAnomalyDetection(AnomalyDetectionWorkflowBase): @@ -381,6 +441,25 @@ def manual_hyper_parameters(cls) -> Dict: clear_output() return hyper_parameters + @staticmethod + def _plot_lof_scores(X_train: pd.DataFrame, lof_scores: np.ndarray, graph_name: str, image_config: dict, algorithm_name: str, local_path: str, mlflow_path: str) -> None: + """Draw the LOF scores bar diagram.""" + print(f"-----* {graph_name} *-----") + columns_name = X_train.index + data = plot_lof_scores(columns_name, lof_scores, image_config) + save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path) + save_data(data, f"{graph_name} - {algorithm_name}", local_path, mlflow_path, True) + def special_components(self, **kwargs) -> None: """Invoke all special application functions for this algorithms by Scikit-learn framework.""" - pass + GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH") + lof_scores = self.model.negative_outlier_factor_ + self._plot_lof_scores( + X_train=self.X_train, + lof_scores=lof_scores, + image_config=self.image_config, + algorithm_name=self.naming, + graph_name=LocalOutlierFactorSpecialFunction.PLOT_LOF_SCORE.value, + local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH, + mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH, + ) diff --git a/geochemistrypi/data_mining/model/func/algo_anomalydetection/_common.py b/geochemistrypi/data_mining/model/func/algo_anomalydetection/_common.py new file mode 100644 index 0000000..747b351 --- /dev/null +++ b/geochemistrypi/data_mining/model/func/algo_anomalydetection/_common.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +import matplotlib.pyplot as plt +import pandas as pd + + +def density_estimation(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str) -> None: + """Generate a density estimation plot for anomaly detection.""" + # Assuming the labels contain '0' for normal and '1' for anomalies. + normal_data = data[labels == 0] + anomaly_data = data[labels == 1] + + # Using Kernel Density Estimation (KDE) for density estimation + import matplotlib.pyplot as plt + import seaborn as sns + + plt.figure(figsize=(10, 6)) + + sns.kdeplot(data=normal_data, fill=True, label="Normal Data", color="blue") + sns.kdeplot(data=anomaly_data, fill=True, label="Anomaly Data", color="red") + + plt.title(f"Density Estimation for {algorithm_name}") + plt.xlabel("Feature Space") + plt.ylabel("Density") + plt.legend() + + +def scatter2d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str) -> None: + """ + Draw the 2D scatter plot for anomaly detection results. + + Parameters + ---------- + data : pd.DataFrame (n_samples, n_components) + The features of the data. + + labels : pd.DataFrame (n_samples,) + Labels of each point (1 for normal, -1 for anomaly). + + algorithm_name : str + The name of the algorithm + """ + markers = ["o", "x"] + colors = ["#1f77b4", "#d62728"] + + fig = plt.figure() + fig.set_size_inches(18, 10) + plt.subplot(111) + + for i, label in enumerate([-1, 1]): + anomaly_data = data[labels == label] + color = colors[i] + marker = markers[i] + plt.scatter(anomaly_data.iloc[:, 0], anomaly_data.iloc[:, 1], c=color, marker=marker, label="Anomaly" if label == -1 else "Normal") + + plt.xlabel(f"{data.columns[0]}") + plt.ylabel(f"{data.columns[1]}") + plt.title(f"Anomaly Detection 2D Scatter Plot - {algorithm_name}") + plt.legend() + + +def scatter3d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str) -> None: + """ + Draw the 3D scatter plot for anomaly detection results. + + Parameters + ---------- + data : pd.DataFrame (n_samples, n_components) + The features of the data. + + labels : pd.DataFrame (n_samples,) + Labels of each point (1 for normal, -1 for anomaly). + + algorithm_name : str + The name of the algorithm + """ + fig = plt.figure(figsize=(12, 6), facecolor="w") + plt.subplots_adjust(left=0.05, right=0.95, bottom=0.05, top=0.9) + + ax = fig.add_subplot(121, projection="3d") + ax.scatter(data.iloc[:, 0], data.iloc[:, 1], data.iloc[:, 2], alpha=0.3, c="#FF0000", marker=".") + ax.set_xlabel(data.columns[0]) + ax.set_ylabel(data.columns[1]) + ax.set_zlabel(data.columns[2]) + plt.grid(True) + + ax2 = fig.add_subplot(122, projection="3d") + markers = ["o", "x"] + colors = ["#1f77b4", "#d62728"] + + for i, label in enumerate([-1, 1]): + anomaly_data = data[labels == label] + color = colors[i] + marker = markers[i] + ax2.scatter( + anomaly_data.iloc[:, 0], anomaly_data.iloc[:, 1], anomaly_data.iloc[:, 2], c=color, marker=marker, s=6, cmap=plt.cm.Paired, edgecolors="none", label="Anomaly" if label == -1 else "Normal" + ) + + ax2.set_xlabel(data.columns[0]) + ax2.set_ylabel(data.columns[1]) + ax2.set_zlabel(data.columns[2]) + plt.grid(True) + ax.set_title(f"Base Data 3D Plot - {algorithm_name}") + ax2.set_title(f"Anomaly Detection 3D Plot - {algorithm_name}") + plt.legend() diff --git a/geochemistrypi/data_mining/model/func/algo_anomalydetection/_enum.py b/geochemistrypi/data_mining/model/func/algo_anomalydetection/_enum.py new file mode 100644 index 0000000..f771608 --- /dev/null +++ b/geochemistrypi/data_mining/model/func/algo_anomalydetection/_enum.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class AnormalyDetectionCommonFunction(Enum): + PLOT_SCATTER_2D = "Anomaly Detection Two-Dimensional Diagram" + PLOT_SCATTER_3D = "Anomaly Detection Three-Dimensional Diagram" + DENSITY_ESTIMATION = "Anomaly Detection Density Estimation" + + +class LocalOutlierFactorSpecialFunction(Enum): + PLOT_LOF_SCORE = "Lof Score Diagram" diff --git a/geochemistrypi/data_mining/model/func/algo_anomalydetection/_local_outlier_factor.py b/geochemistrypi/data_mining/model/func/algo_anomalydetection/_local_outlier_factor.py index 984b375..56e5831 100644 --- a/geochemistrypi/data_mining/model/func/algo_anomalydetection/_local_outlier_factor.py +++ b/geochemistrypi/data_mining/model/func/algo_anomalydetection/_local_outlier_factor.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- from typing import Dict +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd from rich import print from ....constants import SECTION @@ -37,3 +40,65 @@ def local_outlier_factor_manual_hyper_parameters() -> Dict: "n_jobs": n_jobs, } return hyper_parameters + + +def plot_lof_scores(columns_name: pd.Index, lof_scores: np.ndarray, image_config: dict) -> pd.DataFrame: + """Draw the LOF scores bar diagram. + + Parametersplot_lof_scores + ---------- + columns_name : pd.Index + The name of the columns. + + lof_scores : np.ndarray + The LOF scores values. + + image_config : dict + The configuration of the image. + + Returns + ------- + lof_scores_df : pd.DataFrame + The LOF scores values. + """ + # create drawing canvas + fig, ax = plt.subplots(figsize=(image_config["width"], image_config["height"]), dpi=image_config["dpi"]) + + # # print the LOF scores value orderly + # for feature_name, score in zip(list(columns_name), lof_scores): + # print(feature_name, ":", score) + + # draw the main content + lof_scores_df = pd.DataFrame({"Feature": columns_name, "LOF Score": lof_scores}) + lof_scores_df = lof_scores_df.sort_values(["LOF Score"], ascending=True) + lof_scores_df["LOF Score"] = lof_scores_df["LOF Score"].astype(float) + lof_scores_df = lof_scores_df.sort_values(["LOF Score"]) + lof_scores_df.set_index("Feature", inplace=True) + lof_scores_df.plot.barh(alpha=image_config["alpha2"], rot=0) + + # automatically optimize picture layout structure + fig.tight_layout() + xmin, xmax = ax.get_xlim() + ymin, ymax = ax.get_ylim() + x_adjustment = (xmax - xmin) * 0.01 + y_adjustment = (ymax - ymin) * 0.01 + ax.axis([xmin - x_adjustment, xmax + x_adjustment, ymin - y_adjustment, ymax + y_adjustment]) + + # convert the font of the axes + x1_label = ax.get_xticklabels() # adjust the axis label font + [x1_label_temp.set_fontname(image_config["axislabelfont"]) for x1_label_temp in x1_label] + y1_label = ax.get_yticklabels() + [y1_label_temp.set_fontname(image_config["axislabelfont"]) for y1_label_temp in y1_label] + + ax.set_title( + label=image_config["title_label"], + fontdict={ + "size": image_config["title_size"], + "color": image_config["title_color"], + "family": image_config["title_font"], + }, + loc=image_config["title_location"], + pad=image_config["title_pad"], + ) + + return lof_scores_df diff --git a/geochemistrypi/data_mining/process/detect.py b/geochemistrypi/data_mining/process/detect.py index c0424f4..d2141c1 100644 --- a/geochemistrypi/data_mining/process/detect.py +++ b/geochemistrypi/data_mining/process/detect.py @@ -56,7 +56,7 @@ def activate( self.ad_workflow.fit(X) y_predict = self.ad_workflow.predict(X) X_anomaly_detection, X_normal, X_anomaly = self.ad_workflow._detect_data(X, y_predict) - + self.ad_workflow.anomaly_detection_result = X_anomaly_detection self.ad_workflow.data_upload(X=X, y=y, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test) # Save the model hyper-parameters