-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdetect.py
194 lines (158 loc) · 5.93 KB
/
detect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# common imports
import os
import pandas as pd
import numpy as np
from sklearn import preprocessing
import seaborn as sns
sns.set(color_codes=True)
import matplotlib.pyplot as plt
from numpy.random import seed
from tensorflow import set_random_seed
from keras.layers import Input, Dropout
from keras.layers.core import Dense
from keras.models import Model, Sequential, load_model
from keras import regularizers
from keras.models import model_from_json
data_dir = '2nd_test'
merged_data = pd.DataFrame()
for filename in os.listdir(data_dir):
print(filename)
dataset=pd.read_csv(os.path.join(data_dir, filename), sep='\t')
dataset_mean_abs = np.array(dataset.abs().mean())
dataset_mean_abs = pd.DataFrame(dataset_mean_abs.reshape(1,4))
dataset_mean_abs.index = [filename]
merged_data = merged_data.append(dataset_mean_abs)
merged_data.columns = ['Bearing 1','Bearing 2','Bearing 3','Bearing 4']
merged_data.index = pd.to_datetime(merged_data.index, format='%Y.%m.%d.%H.%M.%S')
merged_data = merged_data.sort_index()
merged_data.to_csv('merged_dataset_BearingTest_2.csv')
merged_data.head()
# define train/test data
dataset_train = merged_data['2004-02-12 11:02:39':'2004-02-13 23:52:39']
dataset_test = merged_data['2004-02-13 23:52:39':]
dataset_train.plot(figsize = (12,6))
# normalize
scaler = preprocessing.MinMaxScaler()
X_train = pd.DataFrame(scaler.fit_transform(dataset_train), columns=dataset_train.columns, index=dataset_train.index)
# Random shuffle training data
X_train.sample(frac=1)
X_test = pd.DataFrame(scaler.transform(dataset_test), columns=dataset_test.columns, index=dataset_test.index)
# PCA model
from sklearn.decomposition import PCA
pca = PCA(n_components=2, svd_solver= 'full')
X_train_PCA = pca.fit_transform(X_train)
X_train_PCA = pd.DataFrame(X_train_PCA)
X_train_PCA.index = X_train.index
X_test_PCA = pca.transform(X_test)
X_test_PCA = pd.DataFrame(X_test_PCA)
X_test_PCA.index = X_test.index
# covariance matrix
def cov_matrix(data, verbose=False):
covariance_matrix = np.cov(data, rowvar=False)
if is_pos_def(covariance_matrix):
inv_covariance_matrix = np.linalg.inv(covariance_matrix)
if is_pos_def(inv_covariance_matrix):
return covariance_matrix, inv_covariance_matrix
else:
print("Error: Inverse of Covariance Matrix is not positive definite!")
else:
print("Error: Covariance Matrix is not positive definite!")
# mahalanobis distance
def MahalanobisDist(inv_cov_matrix, mean_distr, data, verbose=False):
inv_covariance_matrix = inv_cov_matrix
vars_mean = mean_distr
diff = data - vars_mean
md = []
for i in range(len(diff)):
md.append(np.sqrt(diff[i].dot(inv_covariance_matrix).dot(diff[i])))
return md
# detect outliers
def MD_detectOutliers(dist, extreme=False, verbose=False):
k = 3. if extreme else 2.
threshold = np.mean(dist) * k
outliers = []
for i in range(len(dist)):
if dist[i] >= threshold:
outliers.append(i) # index of the outlier
return np.array(outliers)
# threshold
def MD_threshold(dist, extreme=False, verbose=False):
k = 3. if extreme else 2.
threshold = np.mean(dist) * k
return threshold
# check if matrix is positive definite
def is_pos_def(A):
if np.allclose(A, A.T):
try:
np.linalg.cholesky(A)
return True
except np.linalg.LinAlgError:
return False
else:
return False
# setup PCA model
data_train = np.array(X_train_PCA.v)
data_test = np.array(X_test_PCA.values)
# calculate covariance and inverse
cov_matrix, inv_cov_matrix = cov_matrix(data_train)
# mean value
mean_distr = data_train.mean(axis=0)
# using covariance matrix and inverse, we can calculate the Mahalanobis
# distance for the training data defining "normal conditions", and find
# the threshold value to flag datapoints as an anomaly. One can then calculate
# the Mahalanobis distance for the datapoints in the test set, and compare
# that with the anomaly threshold.
dist_test = MahalanobisDist(inv_cov_matrix, mean_distr, data_test, verbose=False)
dist_train = MahalanobisDist(inv_cov_matrix, mean_distr, data_train, verbose=False)
threshold = MD_threshold(dist_train, extreme= True)
# threshold value for flagging anomalies
#
# The square of the Mahalanobis distance to the centroid of the distribution should follow
# a x^2 distribution if the assumption of normal distributed input variables is fulfilled.
# This is also the assumption behind the above calculation of the "threshold value" for flagging anomaly.
# As this assumption is not always true, it is good to visualise the distribution on order to
# decide on a good threshold value.
plt.figure()
sns.distplot(
np.square(dist_train),
bins=10,
kde=False
)
plt.xlim([0.0, 15])
# visualise the Mahalanobis distance itself
plt.figure()
sns.distplot(
dist_train,
bins=10,
kde=True,
color='green'
)
plt.xlim([0.0,5])
plt.xlabel('Mahalanobis dist')
# from the above we decide on 3.8
# we can then save the Mahalanobis distance as well as the threshold value and anomaly
# flag variable for both train and test data in a dataframe.
anomaly_train = pd.DataFrame()
anomaly_train['Mob dist'] = dist_train
anomaly_train['Thresh'] = threshold
# if mob dist above threshold -> flag as anomaly
anomaly_train['Anomaly'] = anomaly_train['Mob dist'] > anomaly_train['Thresh']
anomaly_train.index = X_train_PCA.index
# repeat with test data
anomaly = pd.DataFrame()
anomaly['Mob dist']= dist_test
anomaly['Thresh'] = threshold
# If Mob dist above threshold: Flag as anomaly
anomaly['Anomaly'] = anomaly['Mob dist'] > anomaly['Thresh']
anomaly.index = X_test_PCA.index
anomaly.head()
# we can now merge the data into a single dataframe and save as csv
anomaly_alldata = pd.concat([anomaly_train, anomaly])
anomaly_alldata.to_csv('Anomaly_distance.csv')
# verifying PCA model on test data
anomaly_alldata.plot(
logy=True,
figsize=(10,6),
ylim=[1e-1, 1e3],
color=['green','red']
)