-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patheval_methods.py
129 lines (115 loc) · 4.46 KB
/
eval_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-
import numpy as np
###################################################################
#
# The following part is copied from OmniAnomaly, which is used to evaluate the model performance.
#
###################################################################
def calc_point2point(predict, actual):
"""
calculate f1 score by predict and actual.
Args:
predict (np.ndarray): the predict label
actual (np.ndarray): np.ndarray
"""
TP = np.sum(predict * actual)
TN = np.sum((1 - predict) * (1 - actual))
FP = np.sum(predict * (1 - actual))
FN = np.sum((1 - predict) * actual)
precision = TP / (TP + FP + 0.00001)
recall = TP / (TP + FN + 0.00001)
f1 = 2 * precision * recall / (precision + recall + 0.00001)
return f1, precision, recall, TP, TN, FP, FN
def adjust_predicts(score, label,
threshold=None,
pred=None,
calc_latency=False):
"""
Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`.
Args:
score (np.ndarray): The anomaly score
label (np.ndarray): The ground-truth label
threshold (float): The threshold of anomaly score.
A point is labeled as "anomaly" if its score is lower than the threshold.
pred (np.ndarray or None): if not None, adjust `pred` and ignore `score` and `threshold`,
calc_latency (bool):
Returns:
np.ndarray: predict labels
"""
if len(score) != len(label):
raise ValueError("score and label must have the same length")
score = np.asarray(score)
label = np.asarray(label)
latency = 0
if pred is None:
# 预测的分数小于阈值的为1,否则为0
# 1表示异常,0表示正常
# 如果重构误差来表示正常与异常
# 那么应该重构误差越大,越异常
# 因此如果分数大于阈值,则应该被认为是异常
# 因此这里的大小关系应该反过来
#predict = score < threshold
predict = score > threshold
else:
predict = pred
actual = label > 0.1
anomaly_state = False
anomaly_count = 0
for i in range(len(score)):
if actual[i] and predict[i] and not anomaly_state:
# 如果预测的值和真实的值同样为异常值,并且anomaly_state为False
anomaly_state = True
anomaly_count += 1
for j in range(i, 0, -1):
if not actual[j]: # 从当前真实值开始向前循环,如果真实值是异常值,则将预测值同样置为异常值
break
else:
if not predict[j]:
predict[j] = True
latency += 1
elif not actual[i]:
anomaly_state = False
if anomaly_state:
predict[i] = True
if calc_latency:
return predict, latency / (anomaly_count + 1e-4)
else:
return predict
def calc_seq(score, label, threshold, calc_latency=False):
"""
Calculate f1 score for a score sequence
"""
if calc_latency:
predict, latency = adjust_predicts(score, label, threshold, calc_latency=calc_latency)
t = list(calc_point2point(predict, label))
t.append(latency)
return t
else:
predict = adjust_predicts(score, label, threshold, calc_latency=calc_latency)
return calc_point2point(predict, label)
def bf_search(score, label, start, end=None, step_num=1, display_freq=1, verbose=True):
"""
Find the best-f1 score by searching best `threshold` in [`start`, `end`).
Returns:
list: list for results
float: the `threshold` for best-f1
"""
if step_num is None or end is None:
end = start
step_num = 1
search_step, search_range, search_lower_bound = step_num, end - start, start
if verbose:
print("search range: ", search_lower_bound, search_lower_bound + search_range)
threshold = search_lower_bound
m = (-1., -1., -1.)
m_t = 0.0
for i in range(search_step):
threshold += search_range / float(search_step)
target = calc_seq(score, label, threshold, calc_latency=True)
if target[0] > m[0]:
m_t = threshold
m = target
if verbose and i % display_freq == 0:
print("cur thr: ", threshold, target, m, m_t)
print(m, m_t)
return m, m_t