diff --git a/ltsm/data_reader/train_database_reader.py b/ltsm/data_reader/train_database_reader.py index 80f5063..c1ed1d6 100644 --- a/ltsm/data_reader/train_database_reader.py +++ b/ltsm/data_reader/train_database_reader.py @@ -149,13 +149,11 @@ def retrieve_data_to_csv(conn, database, table_name, output_file): csv_files = [os.path.join(datapath, f) for f in os.listdir(datapath) if f.endswith('.csv')] tables = [os.path.splitext(os.path.basename(csv_file))[0] for csv_file in csv_files] for csv_file, table_name in zip(csv_files, tables): - table_name=f'train_{table_name}' insert_data_from_csv(conn, database, csv_file, table_name) if not os.path.exists(output_folder): os.makedirs(output_folder) for table_name in tables: output_file = os.path.join(output_folder, f"{table_name}.csv") - table_name=f'train_{table_name}' retrieve_data_to_csv(conn, database, table_name, output_file) finally: diff --git a/ltsm/evaluate_pipeline/evaluation_pipeline.py b/ltsm/evaluate_pipeline/evaluation_pipeline.py new file mode 100644 index 0000000..fd414d4 --- /dev/null +++ b/ltsm/evaluate_pipeline/evaluation_pipeline.py @@ -0,0 +1,271 @@ +import numpy as np +import pandas as pd +import os + +class EvaluationPipeline: + def __init__(self, x_test, y_test, bias_function="flat", gamma_function=None): + """ + Initialize the evaluation pipeline with a trained model and user-defined bias. + + Parameters: + x_test - Predicted values from the model (should be a list of tuples, the start and end of the anomaly range, the data type should be int) + y_test - True labels for the test set (should be a list of tuples, the start and end of the anomaly range, the data type should be int) + + bias_function - The bias function to use ('flat', 'front_end', 'back_end', 'middle') + gamma_function - The gamma function to use for the cardinality factor, default is 1/overlap_count + """ + self.x_test = x_test + self.y_test = y_test + self.bias_function = self.get_bias_function(bias_function) + self.gamma_function = gamma_function if gamma_function else self.default_gamma + self.results = {} + self.recall_score = 0 + self.precision_score = 0 + self.f1_score = 0 + + def get_bias_function(self, bias_type): + """Return the selected bias function.""" + bias_functions = { + "flat": self.flat_bias, + "front_end": self.front_end_bias, + "back_end": self.back_end_bias, + "middle": self.middle_bias, + } + return bias_functions.get(bias_type, self.flat_bias) + + @staticmethod + def flat_bias(i, anomaly_length): + """Flat bias function.""" + return 1 + + @staticmethod + def front_end_bias(i, anomaly_length): + """Front-end bias function.""" + return -i + 1 + + @staticmethod + def back_end_bias(i, anomaly_length): + """Back-end bias function.""" + return i + + @staticmethod + def middle_bias(i, anomaly_length): + """Middle bias function.""" + if i < anomaly_length / 2: + return i + else: + return anomaly_length - i + 1 + + @staticmethod + def default_gamma(overlap_count): + """Default gamma function for cardinality factor.""" + return 1 / overlap_count + + def overlap_size(self, range1, range2): + """ + Compute the overlap reward ω. + range1 - (start, end) of one range (true or predicted) + range2 - (start, end) of the other range + + Returns: + Overlap reward for the two ranges + """ + overlap_start = max(range1[0], range2[0]) + overlap_end = min(range1[1], range2[1]) + if overlap_start > overlap_end: + return 0 # No overlap + + anomaly_length = range1[1] - range1[0] + 1 + + my_value = 0 + max_value = 0 + for i in range(1, anomaly_length + 1): + bias = self.bias_function(i, anomaly_length) + max_value += bias + if overlap_start <= range1[0] + i - 1 <= overlap_end: + my_value += bias + + return my_value / max_value + + def cardinality_factor(self, target_range, comparison_ranges): + """ + Compute the CardinalityFactor. + target_range - (start, end) of the range (true or predicted) + comparison_ranges - List of ranges to compare with (true or predicted) + + Returns: + Cardinality factor for the target range + """ + overlap_count = sum( + 1 for comparison_range in comparison_ranges + if max(target_range[0], comparison_range[0]) <= min(target_range[1], comparison_range[1]) + ) + if overlap_count <= 1: + return 1 + else: + return self.gamma_function(overlap_count) + + def recall_t(self, Ri, P, alpha = 0): + """ + Compute RecallT(Ri, P). + Ri - A single true anomaly range + P - List of predicted anomaly ranges + alpha - Weight for existence reward (default is 0) + + Returns: + Recall_t - Recall score for a single true range + """ + # Existence Reward + total_overlap = sum( + max(0, min(Ri[1], Pj[1]) - max(Ri[0], Pj[0]) + 1) + for Pj in P + ) + existence_reward = 1 if total_overlap >= 1 else 0 + + # Overlap Reward + overlap_reward = self.cardinality_factor(Ri, P) * sum( + self.overlap_size(Ri, Pj) for Pj in P + ) + + return alpha * existence_reward + (1 - alpha) * overlap_reward + + def precision_t(self, R, Pi): + """ + Compute PrecisionT(R, Pi). + R - List of true anomaly ranges + Pi - A single predicted anomaly range + + Returns: + precision_t - Precision score for a single predicted range + """ + # Overlap Reward + overlap_reward = self.cardinality_factor(Pi, R) * sum( + self.overlap_size(Pi, Rj) for Rj in R + ) + + return overlap_reward + + def evaluate_recall_score(self): + """ + Evaluate the model on test data and store the results in the class instance. + + Parameters: + x_test - Predicted values from the model + y_test - True labels for the test set + + Returns: + Range-based recall score + """ + Nr = len(self.y_test) + if Nr == 0: + return 0 + else: + recall_score = sum(self.recall_t(Ri, self.x_test) for Ri in self.y_test) / Nr + self.recall_score = recall_score + return recall_score + + + def evaluate_precision_score(self): + """ + Evaluate the model on test data and store the results in the class instance. + + Parameters: + x_test - Predicted values from the model + y_test - True labels for the test set + + Returns: + Range-based precision score + """ + Np = len(self.x_test) + if Np == 0: + return 0 + else: + precision_score = sum(self.precision_t(self.y_test, Pi) for Pi in self.x_test) / Np + self.precision_score = precision_score + return precision_score + + def evaluate_f1_score(self): + """ + Evaluate the model on test data and store the results in the class instance. + + Parameters: + x_test - Predicted values from the model + y_test - True labels for the test set + + Returns: + Range-based f1 score + """ + recall = self.evaluate_recall_score() + precision = self.evaluate_precision_score() + if precision + recall == 0: + self.f1_score = 0 + return 0 + else: + f1_score = 2 * (precision * recall) / (precision + recall) + self.f1_score = f1_score + return f1_score + + + def evaluate(self,dataset_name='',model_name=''): + """ + Evaluate the model on test data and store the results in the class instance. + + Parameters: + dataset_name - Name of the dataset + model_name - Name of the model + + Returns: + Dictionary of evaluation results + """ + recall = self.evaluate_recall_score() + precision = self.evaluate_precision_score() + if precision + recall == 0: + self.f1_score = 0 + + else: + self.f1_score = 2 * (precision * recall) / (precision + recall) + + # Store results + self.results = { + "dataset_name": dataset_name, + "model_name": model_name, + "precision": self.precision_score, + "recall": self.recall_score, + "f1_score": self.f1_score + } + print("Evaluation results:", self.results) + print("Use 'save_results()' method to log the results to a CSV file.") + return self.results + + def get_results(self): + """ + Get the evaluation results. + + Returns: + Dictionary of evaluation results + """ + return self.results + + def save_results(self, csv_path="./evaluation_result.csv"): + """ + Log the evaluation results to a CSV file. If the file doesn't exist, it creates a new one; + if it exists, it appends the new results. + + Parameters: + csv_path - Path to the CSV file for logging (default is "evaluation_log.csv") + + Returns: + Updated DataFrame with the appended results + """ + new_results_df = pd.DataFrame([self.results]) + + # Check if the CSV file exists + if os.path.exists(csv_path): + existing_df = pd.read_csv(csv_path) + updated_df = pd.concat([existing_df, new_results_df], ignore_index=True) + else: + updated_df = new_results_df + updated_df.to_csv(csv_path, index=False) + + return updated_df + diff --git a/tests/evaluate_pipeline/evaluation_pipeline_test.py b/tests/evaluate_pipeline/evaluation_pipeline_test.py new file mode 100644 index 0000000..d54c954 --- /dev/null +++ b/tests/evaluate_pipeline/evaluation_pipeline_test.py @@ -0,0 +1,121 @@ +import unittest +from ltsm.evaluate_pipeline.evaluation_pipeline import EvaluationPipeline +import random + + +class TestEvaluationPipeline(unittest.TestCase): + def setUp(self): + # Complex data for testing + # Predicted ranges with overlaps, gaps, and exact matches + self.x_test = [ + (0, 10), (15, 25), (30, 40), (45, 55), (60, 70) + ] + # True ranges with partial overlaps, non-overlapping ranges, and complete overlaps + self.y_test = [ + (5, 15), (20, 30), (35, 50), (65, 75) + ] + + # Large, random test set + self.x_test_large = self.generate_random_ranges(100, 0, 1000) + self.y_test_large = self.generate_random_ranges(100, 0, 1000) + + @staticmethod + def generate_random_ranges(count, range_start, range_end): + """Generate random range pairs.""" + ranges = [] + for _ in range(count): + start = random.randint(range_start, range_end - 10) + end = start + random.randint(5, 20) + ranges.append((start, end)) + return ranges + + def test_overlap(self): + # Test overlap size with varying ranges + pipeline = EvaluationPipeline(self.x_test, self.y_test) + + # Overlap between first predicted and first true range + overlap_1 = pipeline.overlap_size((0, 10), (5, 15)) + self.assertAlmostEqual(overlap_1, 0.545, places=2) # Partial overlap + + overlap_2 = pipeline.overlap_size((17, 29), (23, 33)) + self.assertAlmostEqual(overlap_2, 0.5384615384615384, places=2) # Partial overlap + + # Overlap between first predicted and first true range + overlap_3 = pipeline.overlap_size((16, 20), (20, 35)) + self.assertAlmostEqual(overlap_3, 0.2, places=2) + + # No overlap + overlap_4 = pipeline.overlap_size((15, 25), (35, 50)) + self.assertEqual(overlap_4, 0) + + # Complete overlap + overlap_5 = pipeline.overlap_size((30, 40), (30, 40)) + self.assertEqual(overlap_5, 1.0) + + def test_cardinality_factor(self): + # Test cardinality factor with overlapping ranges + pipeline = EvaluationPipeline(self.x_test, self.y_test) + + # A range with overlaps + cardinality_1 = pipeline.cardinality_factor((35, 50), self.y_test) + self.assertGreaterEqual(cardinality_1, 1.0) + + # A range with no overlaps + cardinality_2 = pipeline.cardinality_factor((100, 110), self.y_test) + self.assertEqual(cardinality_2, 1.0) + + def test_large_random_data(self): + # Test pipeline with large randomized data + pipeline = EvaluationPipeline(self.x_test_large, self.y_test_large) + + recall = pipeline.evaluate_recall_score() + precision = pipeline.evaluate_precision_score() + f1_score = pipeline.evaluate_f1_score() + + # Check scores are within bounds + self.assertGreaterEqual(recall, 0.0) + self.assertGreaterEqual(precision, 0.0) + self.assertGreaterEqual(f1_score, 0.0) + self.assertLessEqual(recall, 1.0) + self.assertLessEqual(precision, 1.0) + self.assertLessEqual(f1_score, 1.0) + + def test_edge_case_empty_inputs(self): + # Edge case: Empty inputs + pipeline = EvaluationPipeline([], []) + recall = pipeline.evaluate_recall_score() + precision = pipeline.evaluate_precision_score() + f1_score = pipeline.evaluate_f1_score() + + self.assertEqual(recall, 0.0) + self.assertEqual(precision, 0.0) + self.assertEqual(f1_score, 0.0) + + def test_edge_case_no_overlap(self): + # Edge case: No overlaps + x_test_no_overlap = [(0, 10), (20, 30)] + y_test_no_overlap = [(40, 50), (60, 70)] + pipeline = EvaluationPipeline(x_test_no_overlap, y_test_no_overlap) + recall = pipeline.evaluate_recall_score() + precision = pipeline.evaluate_precision_score() + f1_score = pipeline.evaluate_f1_score() + + self.assertEqual(recall, 0.0) + self.assertEqual(precision, 0.0) + self.assertEqual(f1_score, 0.0) + + def test_f1_score_consistency(self): + # Validate that F1 score is consistent with precision and recall + pipeline = EvaluationPipeline(self.x_test, self.y_test) + recall = pipeline.evaluate_recall_score() + precision = pipeline.evaluate_precision_score() + f1_score = pipeline.evaluate_f1_score() + + if precision + recall > 0: + self.assertAlmostEqual(f1_score, 2 * (precision * recall) / (precision + recall), places=2) + else: + self.assertEqual(f1_score, 0.0) + + +if __name__ == "__main__": + unittest.main()