-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtracker.py
264 lines (189 loc) · 7.41 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import numpy as np
from queue import Queue
from scipy.misc import imresize
from operator import add
class Tracker:
"""
Generic tracking model. A location is represented by an affine transformation (e.g., Xt−1), which warps the
coordinate system so that the target lies within the unit square. Particles representing possible target locations Xt,
at time t are sampled according to P(Xt|Xt−1), which in this case is a diagonal-covariance Gaussian centered at Xt−1.
Where:
Xt = (xt, yt, θt, st, αt, φt)
denote x, y translation, rotation angle, scale, aspect ratio, and skew direction at time t.
P(Xt|Xt−1) = N (Xt; Xt−1, Ψ)
where Ψ is a diagonal covariance matrix whose elements are the corresponding variances of affine parameters, assumes the variance of each affine parameter does not change over time
See 3.3.1 Dynamic model in http://www.cs.toronto.edu/~dross/ivt/RossLimLinYang_ijcv.pdf for reference
Particle filter calss"""
def __init__(self, init_location,):
self.conf_q = Queue(maxsize=50)
self.pre_M_q = Queue(maxsize=50)
self.last_two_loc_q = Queue(maxsize=2)
self.location = init_location
self.params = self._init_params(init_location)
def _init_params(self, init_location):
"""Initialize tracker's parameters"""
params = {'p_sz': 64, 'p_num': 700, 'min_conf': 0.2,
'mv_thr': 0.1, 'up_thr': 0.35, 'roi_scale': 2}
diag_s = np.ceil((init_location[2]**2 + init_location[3]**2)**0.5/7)
params['aff_sig'] = [diag_s, diag_s, 0.004, 0.0, 0.0, 0]
params['ratio'] = init_location[2] / params['p_sz']
return params
def _qs_full(self):
if self.conf_q.full() and self.pre_M_q.full():
return True
else:
return False
def gen_best_M(self):
"""Returns the best pre_M in records."""
#assert self._qs_full()
pre_Ms = [self.pre_M_q.get() for _ in range(20)]
confs = [self.conf_q.get() for _ in range(20)]
idx = np.argmax(confs)
return pre_Ms[idx]
def draw_particles(self):
"""
Generates particles according to
P(Xt|Xt−1) = N (Xt; Xt−1, Ψ)
Args:
aff_params: affine parameters, see class doc string for
specific element definition.
[cx, cy, w/p_sz, 0, h/w, 0] for 6 degrees of freendom
[tlx, tly, w, h] for 4 degrees of freedom.
.
Returns:
aff_params_M : self.p_num*dof size matrix,
where rows are updated randomly drawed affine
params, columns repersent each particles.
"""
pass
def predict_location(self, pre_M, gt_last, resize_factor, t):
"""
Predict location for each particle. It is calculated by
1. compute the confidence of the i-th candidate, which is
the summation of all the heatmap values within the candidate region.
2. the candidate with the highest confidence value is predicted as target.
Args:
img_siz: tuple(image height, image width)
pre_M: predicted heat map
t: index of current frame
"""
pass
def get_most_conf_M(self):
"""Returns the most confidence heat maps."""
# Pull self.conf_records all out, and retrive
# the most confident heat map.
return updated_gt_M
def linear_prediction(self):
"""
Predicts current location linnearly according
to las two frames location. This may boost the
robustnesss of obejct blocking.
"""
pass
def distracted(self):
"""Distracter detection."""
# up-sampling pre_M
# Compute confidence according to
# S = with_in / with_out
conf_within = self.compute_conf(self.pre_M_resized, self.best_p_i_loc)
conf_all = np.sum(self.pre_M_resized)
distracter_score = conf_within / conf_all
print('The probability of been distracted is %s'%distracter_score)
if distracter_score > self.params['min_conf']:
return False
else:
return True
@classmethod
def compute_conf(self, roi, loc_p):
"""Helper func for computing confidence"""
x,y,w,h = loc_p
conf = np.sum(roi[y-int(0.5*h): y+int(0.5*h), \
x-int(0.5*w):x+int(0.5*w)])
return conf
@classmethod
def aff2loc(self, las_loc, aff_param):
"""Convert affine params to location."""
assert len(aff_param)==4, 'This method only works for dof 4 aff space.'
cur_loc = [i+j for i,j in zip(las_loc, aff_param)]
return cur_loc
class TrackerVanilla(Tracker):
"""Vanilla tracker
The covariance matrix has only 4 degrees of freedom,
specified by vertical, horizontal translation of the central
point, variance of the width, variance of the w/h ratio.
The corrresponding actual senarios are object replacment,
object zoom in/out, object rotaion. Should be sufficient
for most cases of car tracking.
"""
def __init__(self, init_location):
super(TrackerVanilla, self).__init__(init_location)
self._update_params()
def _update_params(self):
"""Update aff_sig param."""
self.params['aff_sig'] = [10, 10, 0.04, 0.04]
def draw_particles(self, last_location):
"""
The covariance matrix has only 4 degrees of freedom,
specified by vertical, horizontal translation of the central
point, variance of the width, variance of the w/h ratio.
The corrresponding actual senarios are object replacment,
object zoom in/out, object rotaion. Should be sufficient
for most cases of car tracking.
Args:
last_location: [tlx, tly, w, h]
"""
# Define degrees of freedom
dof = len(self.params['aff_sig'])
# Construct an p_num*6 size matrix with with each
# column repersents one particle
#aff_params_M = np.kron(np.ones((self.params['p_num'],1)), np.array(aff_params))
# First onstruct a p_num*dof size normal distribution with
# mean 0 and sigma 1
rand_norml_M = np.array([np.random.standard_normal(dof) for _ in range(self.params['p_num'])])
# Then construct a affine sigma matrix
aff_sig_M = np.kron(np.ones((self.params['p_num'], 1)), self.params['aff_sig'])
# Update particles
self.aff_params_M = rand_norml_M * aff_sig_M
def predict_location(self, pre_M, gt_last, resize_factor, t, roi_size):
"""
Predict location for each particle. It is calculated by
1. compute the confidence of the i-th candidate, which is
the summation of all the heatmap values within the candidate region.
2. the candidate with the highest confidence value is predicted as target.
Args:
img_siz: tuple(image height, image width)
pre_M: predicted heat map
t: index of current frame
"""
# transform self.aff_params_M to location_M with each column
# repersent [cx, cy, w, h] in the pre_M heat map
loc_M = np.zeros(self.aff_params_M.shape)
tlx, tly, w, h = gt_last
cx, cy = roi_size // 2, roi_size // 2
loc_M[:, 0] = cx
loc_M[:, 1] = cy
loc_M[:, 2] = resize_factor * w
loc_M[:, 3] = resize_factor * h
loc_M += self.aff_params_M
loc_M = loc_M.astype(np.int)
# Upsampling pre_M bicubicly to roi_size
pre_M_resized = imresize(pre_M[0,:,:,0], [roi_size, roi_size], interp='bicubic')
# Compute conf for each particle
conf_lsit = []
for p_i_loc in loc_M:
conf_i = self.compute_conf(pre_M_resized, p_i_loc)
conf_lsit += [conf_i]
# Get index and conf score of of the most confident one
idx = np.argmax(conf_lsit)
self.cur_best_conf = conf_lsit[idx]
self.conf_q.put(conf_lsit[idx])
# Store values for computing distraction
self.best_p_i_loc = loc_M[idx]
self.pre_M_resized = pre_M_resized
# Get the corresponding aff_param which is then
# used to predicted the cureent best location
best_aff = self.aff_params_M[idx]
print('The affine paramters: [dx,dy,dw,dh] is {0}'.format(best_aff))
self.pre_location = self.aff2loc(gt_last, best_aff)
# Stack into records queue
return self.pre_location