Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add common functions to anomaly detection and add special funct… #373

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 84 additions & 5 deletions geochemistrypi/data_mining/model/detection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-

import os
from typing import Dict, Optional, Union

import numpy as np
Expand All @@ -8,10 +8,13 @@
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

from ..utils.base import clear_output
from ..constants import MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH
from ..utils.base import clear_output, save_data, save_fig
from ._base import WorkflowBase
from .func.algo_anomalydetection._common import density_estimation, scatter2d, scatter3d
from .func.algo_anomalydetection._enum import AnormalyDetectionCommonFunction, LocalOutlierFactorSpecialFunction
from .func.algo_anomalydetection._iforest import isolation_forest_manual_hyper_parameters
from .func.algo_anomalydetection._local_outlier_factor import local_outlier_factor_manual_hyper_parameters
from .func.algo_anomalydetection._local_outlier_factor import local_outlier_factor_manual_hyper_parameters, plot_lof_scores


class AnomalyDetectionWorkflowBase(WorkflowBase):
Expand All @@ -22,6 +25,7 @@ class AnomalyDetectionWorkflowBase(WorkflowBase):
def __init__(self) -> None:
super().__init__()
self.mode = "Anomaly Detection"
self.anomaly_detection_result = None

def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None:
"""Fit the model by Scikit-learn framework."""
Expand Down Expand Up @@ -69,9 +73,65 @@ def _detect_data(X: pd.DataFrame, detect_label: np.ndarray) -> tuple[pd.DataFram

return X_anomaly_detection, X_normal, X_anomaly

@staticmethod
def _density_estimation(data: pd.DataFrame, labels: pd.DataFrame, graph_name: str, algorithm_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the density estimation diagram of the anomaly detection result."""
print(f"-----* {graph_name} *-----")
density_estimation(data, labels, algorithm_name=algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)

@staticmethod
def _scatter2d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str, graph_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the two-dimensional diagram of the anomaly detection result."""
print(f"-----* {graph_name} *-----")
scatter2d(data, labels, algorithm_name=algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)

@staticmethod
def _scatter3d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str, graph_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the three-dimensional diagram of the anomaly detection result."""
print(f"-----* {graph_name} *-----")
scatter3d(data, labels, algorithm_name=algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)

def common_components(self) -> None:
"""Invoke all common application functions for anomaly detection algorithms by Scikit-learn framework."""
pass
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
if self.X.shape[1] >= 3:
two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2)
self._scatter2d(
data=two_dimen_data,
labels=self.anomaly_detection_result,
algorithm_name=self.naming,
graph_name=AnormalyDetectionCommonFunction.PLOT_SCATTER_2D.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)

three_dimen_axis_index, three_dimen_data = self.choose_dimension_data(self.X, 3)
self._scatter3d(
data=three_dimen_data,
labels=self.anomaly_detection_result,
algorithm_name=self.naming,
graph_name=AnormalyDetectionCommonFunction.PLOT_SCATTER_3D.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)

self._density_estimation(
data=self.X,
labels=self.anomaly_detection_result,
algorithm_name=self.naming,
graph_name=AnormalyDetectionCommonFunction.DENSITY_ESTIMATION.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)


class IsolationForestAnomalyDetection(AnomalyDetectionWorkflowBase):
Expand Down Expand Up @@ -381,6 +441,25 @@ def manual_hyper_parameters(cls) -> Dict:
clear_output()
return hyper_parameters

@staticmethod
def _plot_lof_scores(X_train: pd.DataFrame, lof_scores: np.ndarray, graph_name: str, image_config: dict, algorithm_name: str, local_path: str, mlflow_path: str) -> None:
"""Draw the LOF scores bar diagram."""
print(f"-----* {graph_name} *-----")
columns_name = X_train.index
data = plot_lof_scores(columns_name, lof_scores, image_config)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
save_data(data, f"{graph_name} - {algorithm_name}", local_path, mlflow_path, True)

def special_components(self, **kwargs) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
pass
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
lof_scores = self.model.negative_outlier_factor_
self._plot_lof_scores(
X_train=self.X_train,
lof_scores=lof_scores,
image_config=self.image_config,
algorithm_name=self.naming,
graph_name=LocalOutlierFactorSpecialFunction.PLOT_LOF_SCORE.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
104 changes: 104 additions & 0 deletions geochemistrypi/data_mining/model/func/algo_anomalydetection/_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import pandas as pd


def density_estimation(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str) -> None:
"""Generate a density estimation plot for anomaly detection."""
# Assuming the labels contain '0' for normal and '1' for anomalies.
normal_data = data[labels == 0]
anomaly_data = data[labels == 1]

# Using Kernel Density Estimation (KDE) for density estimation
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))

sns.kdeplot(data=normal_data, fill=True, label="Normal Data", color="blue")
sns.kdeplot(data=anomaly_data, fill=True, label="Anomaly Data", color="red")

plt.title(f"Density Estimation for {algorithm_name}")
plt.xlabel("Feature Space")
plt.ylabel("Density")
plt.legend()


def scatter2d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str) -> None:
"""
Draw the 2D scatter plot for anomaly detection results.

Parameters
----------
data : pd.DataFrame (n_samples, n_components)
The features of the data.

labels : pd.DataFrame (n_samples,)
Labels of each point (1 for normal, -1 for anomaly).

algorithm_name : str
The name of the algorithm
"""
markers = ["o", "x"]
colors = ["#1f77b4", "#d62728"]

fig = plt.figure()
fig.set_size_inches(18, 10)
plt.subplot(111)

for i, label in enumerate([-1, 1]):
anomaly_data = data[labels == label]
color = colors[i]
marker = markers[i]
plt.scatter(anomaly_data.iloc[:, 0], anomaly_data.iloc[:, 1], c=color, marker=marker, label="Anomaly" if label == -1 else "Normal")

plt.xlabel(f"{data.columns[0]}")
plt.ylabel(f"{data.columns[1]}")
plt.title(f"Anomaly Detection 2D Scatter Plot - {algorithm_name}")
plt.legend()


def scatter3d(data: pd.DataFrame, labels: pd.DataFrame, algorithm_name: str) -> None:
"""
Draw the 3D scatter plot for anomaly detection results.

Parameters
----------
data : pd.DataFrame (n_samples, n_components)
The features of the data.

labels : pd.DataFrame (n_samples,)
Labels of each point (1 for normal, -1 for anomaly).

algorithm_name : str
The name of the algorithm
"""
fig = plt.figure(figsize=(12, 6), facecolor="w")
plt.subplots_adjust(left=0.05, right=0.95, bottom=0.05, top=0.9)

ax = fig.add_subplot(121, projection="3d")
ax.scatter(data.iloc[:, 0], data.iloc[:, 1], data.iloc[:, 2], alpha=0.3, c="#FF0000", marker=".")
ax.set_xlabel(data.columns[0])
ax.set_ylabel(data.columns[1])
ax.set_zlabel(data.columns[2])
plt.grid(True)

ax2 = fig.add_subplot(122, projection="3d")
markers = ["o", "x"]
colors = ["#1f77b4", "#d62728"]

for i, label in enumerate([-1, 1]):
anomaly_data = data[labels == label]
color = colors[i]
marker = markers[i]
ax2.scatter(
anomaly_data.iloc[:, 0], anomaly_data.iloc[:, 1], anomaly_data.iloc[:, 2], c=color, marker=marker, s=6, cmap=plt.cm.Paired, edgecolors="none", label="Anomaly" if label == -1 else "Normal"
)

ax2.set_xlabel(data.columns[0])
ax2.set_ylabel(data.columns[1])
ax2.set_zlabel(data.columns[2])
plt.grid(True)
ax.set_title(f"Base Data 3D Plot - {algorithm_name}")
ax2.set_title(f"Anomaly Detection 3D Plot - {algorithm_name}")
plt.legend()
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum


class AnormalyDetectionCommonFunction(Enum):
PLOT_SCATTER_2D = "Anomaly Detection Two-Dimensional Diagram"
PLOT_SCATTER_3D = "Anomaly Detection Three-Dimensional Diagram"
DENSITY_ESTIMATION = "Anomaly Detection Density Estimation"


class LocalOutlierFactorSpecialFunction(Enum):
PLOT_LOF_SCORE = "Lof Score Diagram"
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
from typing import Dict

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from rich import print

from ....constants import SECTION
Expand Down Expand Up @@ -37,3 +40,65 @@ def local_outlier_factor_manual_hyper_parameters() -> Dict:
"n_jobs": n_jobs,
}
return hyper_parameters


def plot_lof_scores(columns_name: pd.Index, lof_scores: np.ndarray, image_config: dict) -> pd.DataFrame:
"""Draw the LOF scores bar diagram.

Parametersplot_lof_scores
----------
columns_name : pd.Index
The name of the columns.

lof_scores : np.ndarray
The LOF scores values.

image_config : dict
The configuration of the image.

Returns
-------
lof_scores_df : pd.DataFrame
The LOF scores values.
"""
# create drawing canvas
fig, ax = plt.subplots(figsize=(image_config["width"], image_config["height"]), dpi=image_config["dpi"])

# # print the LOF scores value orderly
# for feature_name, score in zip(list(columns_name), lof_scores):
# print(feature_name, ":", score)

# draw the main content
lof_scores_df = pd.DataFrame({"Feature": columns_name, "LOF Score": lof_scores})
lof_scores_df = lof_scores_df.sort_values(["LOF Score"], ascending=True)
lof_scores_df["LOF Score"] = lof_scores_df["LOF Score"].astype(float)
lof_scores_df = lof_scores_df.sort_values(["LOF Score"])
lof_scores_df.set_index("Feature", inplace=True)
lof_scores_df.plot.barh(alpha=image_config["alpha2"], rot=0)

# automatically optimize picture layout structure
fig.tight_layout()
xmin, xmax = ax.get_xlim()
ymin, ymax = ax.get_ylim()
x_adjustment = (xmax - xmin) * 0.01
y_adjustment = (ymax - ymin) * 0.01
ax.axis([xmin - x_adjustment, xmax + x_adjustment, ymin - y_adjustment, ymax + y_adjustment])

# convert the font of the axes
x1_label = ax.get_xticklabels() # adjust the axis label font
[x1_label_temp.set_fontname(image_config["axislabelfont"]) for x1_label_temp in x1_label]
y1_label = ax.get_yticklabels()
[y1_label_temp.set_fontname(image_config["axislabelfont"]) for y1_label_temp in y1_label]

ax.set_title(
label=image_config["title_label"],
fontdict={
"size": image_config["title_size"],
"color": image_config["title_color"],
"family": image_config["title_font"],
},
loc=image_config["title_location"],
pad=image_config["title_pad"],
)

return lof_scores_df
2 changes: 1 addition & 1 deletion geochemistrypi/data_mining/process/detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def activate(
self.ad_workflow.fit(X)
y_predict = self.ad_workflow.predict(X)
X_anomaly_detection, X_normal, X_anomaly = self.ad_workflow._detect_data(X, y_predict)

self.ad_workflow.anomaly_detection_result = X_anomaly_detection
self.ad_workflow.data_upload(X=X, y=y, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test)

# Save the model hyper-parameters
Expand Down
Loading