-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcomparison_tests.py
151 lines (121 loc) · 6.17 KB
/
comparison_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import pickle
import numpy as np
from pykalman import KalmanFilter
from dataset.kalman_smoother import KalmanSmoother
from ai_model.losses import TestLoss, SequenceLoss
import tensorflow as tf
from dataset.load_dataset import LoadDataSet
import matplotlib.pyplot as plt
class KalmanFilterComparison:
def __init__(self, look_back, look_forth, data_file, params_file):
self.look_back = look_back
self.look_forth = look_forth
robots_f = open(data_file + '.pkl', 'rb')
self.robots_t = pickle.load(robots_f)
filter_params_f = open(params_file + '.pkl', 'rb')
params = pickle.load(filter_params_f)
self.transition_matrix = params.A
self.observation_matrix = params.C
self.observation_covariance = np.linalg.inv(np.matmul(params.V_neg_sqrt, params.V_neg_sqrt))
self.transition_covariance = np.linalg.inv(np.matmul(params.W_neg_sqrt, params.W_neg_sqrt))
self.smoother = KalmanSmoother()
self.smoother.load_params(params_file)
self.true = []
self.predicted = []
robots_f.close()
filter_params_f.close()
def get_future(self, a_matrix, last_pos):
res = []
for i in range(self.look_forth):
pos = np.inner(a_matrix, last_pos)
last_pos = pos
res.append([pos[0], pos[2]])
return res
def process_robots(self, robots):
for k in range(0, len(robots)):
for robot_id, series in robots[k].items():
if len(series['x']) > 101:
x_hat, _, _ = self.smoother.smooth(series['x'], series['y'], series['mask'])
x_sm = x_hat[:, 0]
y_sm = x_hat[:, 2]
ism = [series['x'][0], 0, series['y'][0], 0]
kf = KalmanFilter(transition_matrices=self.transition_matrix,
observation_matrices=self.observation_matrix,
initial_state_mean=ism,
observation_covariance=self.observation_covariance,
transition_covariance=self.transition_covariance)
initial = np.array((series['x'][0:self.look_back], series['y'][0:self.look_back])).T
means, cov = kf.filter(initial)
self.true.append(np.array((x_sm[self.look_back:(self.look_back + self.look_forth)],
y_sm[self.look_back:(self.look_back + self.look_forth)])).T)
self.predicted.append(np.array(self.get_future(kf.transition_matrices, means[-1])))
means, cov = means[-1], cov[-1]
for i in range(self.look_back + 1, len(series['x']) - self.look_forth - 1):
self.true.append(np.array((x_sm[(i + 1):(i + 1 + self.look_forth)],
y_sm[(i + 1):(i + 1 + self.look_forth)])).T)
means, cov = kf.filter_update(means, cov,
np.array((series['x'][i], series['y'][i])))
self.predicted.append(np.array(self.get_future(kf.transition_matrices, means)))
break
def perform_test(self):
self.process_robots(self.robots_t['blue'])
self.process_robots(self.robots_t['yellow'])
true = np.array(self.true)
predicted = np.array(self.predicted)
loss = TestLoss()
loss(true, predicted)
print("----Kalman filter results----")
print(f'Look back: {self.look_back} | Look forth: {self.look_forth}')
loss.print_error()
class MLPBatchLogs(tf.keras.callbacks.Callback):
def __init__(self):
super(MLPBatchLogs, self).__init__()
self.batch_logs = []
self.val_logs = []
def on_train_batch_end(self, batch, logs=None):
self.batch_logs.append(logs['loss'])
def on_test_end(self, logs=None):
self.val_logs.append(logs['loss'])
class MLPComparison:
model = None
def __init__(self, look_back, look_forth, output_dims, use_cuda=True):
self.look_back = look_back
self.output_dims = output_dims
self.look_forth = look_forth
os.environ['CUDA_VISIBLE_DEVICES'] = '0' if use_cuda else '-1'
self.loader = LoadDataSet(look_back, look_forth)
def create_model(self):
data_input = tf.keras.Input(shape=(self.look_back, 5))
x = tf.keras.layers.Dense(128, activation='relu')(data_input)
x = tf.keras.layers.Dense(1024, activation='relu')(x)
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(self.output_dims * self.look_forth)(x)
x = tf.keras.layers.Reshape((self.look_forth, self.output_dims))(x)
return tf.keras.Model(inputs=data_input, outputs=x)
def train_model(self, file_path: list, model_name):
if self.model is None:
self.model = self.create_model()
robot_x, _, _, y = self.loader.load_data(file_path)
batch_logs = MLPBatchLogs()
self.model.compile(optimizer=tf.optimizers.Adam(), loss=SequenceLoss(), run_eagerly=False)
self.model.fit(robot_x, y, epochs=10, batch_size=1024, callbacks=[batch_logs], validation_split=0.1)
# Uncomment this to visualize training metrics
# plt.figure()
# plt.plot(batch_logs.batch_logs)
# plt.title('Batch loss during training')
# plt.plot(batch_logs.val_logs)
# plt.title('Batch loss during validation')
self.model.compile(optimizer=tf.optimizers.Adam(), loss=tf.losses.mean_squared_error)
self.model.save(model_name + '.h5')
def test_model(self, file_path: list, model_name):
if self.model is None:
self.model = tf.keras.models.load_model(model_name + '.h5')
robot_x, _, _, y = self.loader.load_data(file_path, for_test=True)
response = self.model.predict(robot_x)
y_pred_conv = self.loader.convert_batch(robot_x, response)
self.loader.convert_to_real(y)
test_loss = TestLoss()
test_loss(y[:, :, 0:2], y_pred_conv)
test_loss.print_error()