-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
307 lines (258 loc) · 13.1 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#!/usr/bin/python
"""
Sudhanva Sreesha,
28-Mar-2018
Gonzalo Ferrer,
"""
import contextlib
import os
from argparse import ArgumentParser
from warnings import warn
import numpy as np
from matplotlib import animation as anim
from matplotlib import pyplot as plt
from progress.bar import FillingCirclesBar
from filters.ekf import EKF
from filters.pf import PF
from tools.data import generate_data as generate_input_data
from tools.data import load_data
from tools.data import save_data as save_input_data
from tools.objects import FilterTrajectory
from tools.objects import Gaussian
from tools.plot import plot2dcov
from tools.plot import plot_field
from tools.plot import plot_observation
from tools.plot import plot_robot
@contextlib.contextmanager
def get_dummy_context_mgr():
"""
:return: A dummy context manager for conditionally writing to a movie file.
"""
yield None
def get_cli_args():
parser = ArgumentParser('Runs localization filters (EKF or PF) on generated, simulation data.')
parser.add_argument('-i',
'--input-data-file',
type=str,
action='store',
help='File with generated data to simulate the filter '
'against. Supported format: "npy", and "mat".')
parser.add_argument('-n',
'--num-steps',
type=int,
action='store',
help='The number of time steps to generate data for the simulation. '
'This option overrides the data file argument.',
default=100)
parser.add_argument('-f',
'--filter',
dest='filter_name',
choices=['ekf', 'pf'],
action='store',
help='The localization filter to use for the simulation.',
default='ekf')
parser.add_argument('--num-particles',
type=int,
action='store',
help='The number of particles to use in the PF.',
default=100)
parser.add_argument('--global-localization',
action='store_true',
help='Uniformly distributes the particles around the field at the beginning of the simulation.')
parser.add_argument('-a',
'--alphas',
nargs=4,
metavar=('A1', 'A2', 'A3', 'A4'),
action='store',
help='Squared root of alphas, used for transition noise in action space (M_t). (format: a1 a2 a3 a4).',
default=(0.05, 0.001, 0.05, 0.01))
parser.add_argument('-b',
'--bearing_std',
type=float,
action='store',
help='Diagonal of Standard deviations of the Observation noise Q. (format: rad).',
default=0.35)
parser.add_argument('--dt', type=float, action='store', help='Time step (in seconds).', default=0.1)
# TODO remove animate and let just -s
parser.add_argument('--animate', action='store_true', help='Show and animation of the simulation, in real-time.')
parser.add_argument('--show-particles',
action='store_true',
help='Show the particles when using the particle filter.')
parser.add_argument('-s', '--show-trajectory',
action='store_true',
help='Shows the full robot trajectory as estimated by the filter. '
'If --show-particles is also specified with the particle filter, '
'this option will show one trajectory per particle (warn: this'
'can be chaotic to look at).')
parser.add_argument('--plot-pause-len',
type=float,
action='store',
help='Time (in seconds) to pause the plot animation for between frames.',
default=0.01)
parser.add_argument('-m',
'--movie-file',
type=str,
help='The full path to movie file to write the simulation animation to.',
default=None)
parser.add_argument('--movie-fps',
type=float,
action='store',
help='The FPS rate of the movie to write.',
default=10.)
parser.add_argument('-o',
'--output-dir',
type=str,
default=None,
action='store',
help='The output directory to which the input and '
'output data from the simulation will be stored.')
parser.add_argument('--global_localization', action='store_true', help='Task E, Global localization enabled, only for PF.')
return parser.parse_args()
def validate_cli_args(args):
if args.input_data_file and not os.path.exists(args.input_data_file):
raise OSError('The input data file {} does not exist.'.format(args.input_data_file))
if not args.input_data_file and not args.num_steps:
raise RuntimeError('Neither `--input-data-file` nor `--num-steps` were present in the arguments.')
if args.filter_name != 'pf' and args.global_localization:
warn('Global localization is only supported for the particle filter. Ignoring the flag.')
if not args.animate:
if args.show_trajectory:
warn('Since animation for the simulation was not enabled, ignoring `--show-trajectory`.')
if args.show_particles:
warn('Since animation for the simulation was not enabled, ignoring `--show-particles`.')
if args.show_particles and args.filter_name != 'pf':
warn('Since the simulation is not running the particle filter, ignoring `--show-particles`.')
if args.show_particles and args.output_dir:
warn('Since `--output-dir` is specified, ignoring `--show-particles` to generate just one trajectory.')
def main():
args = get_cli_args()
validate_cli_args(args)
# weights for covariance action noise R and observation noise Q
alphas = np.array(args.alphas) **2 # variance of noise R proportional to alphas, see tools/tasks@get_motion_noise_covariance()
bearing_std = args.bearing_std # see also filters/localization_filter.py
mean_prior = np.array([180., 50., 0.])
Sigma_prior = 1e-12 * np.eye(3, 3)
initial_state = Gaussian(mean_prior, Sigma_prior)
if args.input_data_file:
data = load_data(args.input_data_file)
elif args.num_steps:
# Generate data, assuming `--num-steps` was present in the CL args.
data = generate_input_data(initial_state.mu.T, args.num_steps, alphas, bearing_std, args.dt)
else:
raise RuntimeError('')
store_sim_data = True if args.output_dir else False
show_plots = True if args.animate else False
write_movie = True if args.movie_file else False
show_trajectory = True if args.animate and args.show_trajectory else False
show_particles = args.show_particles and args.animate and args.filter_name == 'pf'
update_mean_trajectory = True if show_trajectory or store_sim_data else False
update_plots = True if show_plots or write_movie else False
one_trajectory_per_particle = True if show_particles and not store_sim_data else False
if store_sim_data:
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
save_input_data(data, os.path.join(args.output_dir, 'input_data.npy'))
# ---------------------------------------------------------------------------------------------------
# Student's task: You will fill these function inside 'filters/.py'
# ---------------------------------------------------------------------------------------------------
localization_filter = None
if args.filter_name == 'ekf':
localization_filter = EKF(initial_state, alphas, bearing_std)
elif args.filter_name == 'pf':
localization_filter = PF(initial_state, alphas, bearing_std, args.num_particles, args.global_localization)
fig = None
if show_plots or write_movie:
fig = plt.figure(1)
if show_plots:
plt.ion()
# Initialize the trajectory if user opted-in to display.
sim_trajectory = None
if update_mean_trajectory:
if one_trajectory_per_particle:
mean_trajectory = np.zeros((data.num_steps, localization_filter.state_dim, args.num_particles))
else:
mean_trajectory = np.zeros((data.num_steps, localization_filter.state_dim))
sim_trajectory = FilterTrajectory(mean_trajectory)
if store_sim_data:
# Pre-allocate the memory to store the covariance matrix of the trajectory at each time step.
sim_trajectory.covariance = np.zeros((localization_filter.state_dim,
localization_filter.state_dim,
data.num_steps))
# Initialize the movie writer if `--movie-file` was present in the CL args.
movie_writer = None
if write_movie:
get_ff_mpeg_writer = anim.writers['ffmpeg']
metadata = dict(title='Localization Filter', artist='matplotlib', comment='PS2')
movie_fps = min(args.movie_fps, float(1. / args.plot_pause_len))
movie_writer = get_ff_mpeg_writer(fps=movie_fps, metadata=metadata)
progress_bar = FillingCirclesBar('Simulation Progress', max=data.num_steps)
with movie_writer.saving(fig, args.movie_file, data.num_steps) if write_movie else get_dummy_context_mgr():
for t in range(data.num_steps):
# Used as means to include the t-th time-step while plotting.
tp1 = t + 1
# Control at the current step.
u = data.filter.motion_commands[t]
# Observation at the current step.
z = data.filter.observations[t]
localization_filter.predict(u)
localization_filter.update(z)
if update_mean_trajectory:
if one_trajectory_per_particle:
sim_trajectory.mean[t, :, :] = localization_filter.X.T
else:
sim_trajectory.mean[t] = localization_filter.mu
if store_sim_data:
sim_trajectory.covariance[:, :, t] = localization_filter.Sigma
progress_bar.next()
if not update_plots:
continue
plt.cla()
plot_field(z[1])
plot_robot(data.debug.real_robot_path[t])
plot_observation(data.debug.real_robot_path[t],
data.debug.noise_free_observations[t],
data.filter.observations[t])
plt.plot(data.debug.real_robot_path[1:tp1, 0], data.debug.real_robot_path[1:tp1, 1], 'g')
plt.plot(data.debug.noise_free_robot_path[1:tp1, 0], data.debug.noise_free_robot_path[1:tp1, 1], 'm')
#plt.plot([data.debug.real_robot_path[t, 0]], [data.debug.real_robot_path[t, 1]], '*g')
plt.plot([data.debug.noise_free_robot_path[t, 0]], [data.debug.noise_free_robot_path[t, 1]], '*m')
if show_particles:
samples = localization_filter.X.T
plt.scatter(samples[0], samples[1], s=2)
else:
plot2dcov(localization_filter.mu_bar[:-1],
localization_filter.Sigma_bar[:-1, :-1],
'red', 3,
legend='{} -'.format(args.filter_name.upper()))
plot2dcov(localization_filter.mu[:-1],
localization_filter.Sigma[:-1, :-1],
'blue', 3,
legend='{} +'.format(args.filter_name.upper()))
plt.legend()
if show_trajectory:
if len(sim_trajectory.mean.shape) > 2:
# This means that we probably intend to show the trajectory for ever particle.
x = np.squeeze(sim_trajectory.mean[0:t, 0, :])
y = np.squeeze(sim_trajectory.mean[0:t, 1, :])
plt.plot(x, y)
else:
plt.plot(sim_trajectory.mean[0:t, 0], sim_trajectory.mean[0:t, 1], 'blue')
if show_plots:
# Draw all the plots and pause to create an animation effect.
plt.draw()
plt.pause(args.plot_pause_len)
if write_movie:
movie_writer.grab_frame()
progress_bar.finish()
if show_plots:
plt.show(block=True)
if store_sim_data:
file_path = os.path.join(args.output_dir, 'output_data.npy')
with open(file_path, 'wb') as data_file:
np.savez(data_file,
mean_trajectory=sim_trajectory.mean,
covariance_trajectory=sim_trajectory.covariance)
if __name__ == '__main__':
main()