Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended Kalman filter #396

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 332 additions & 3 deletions src/traffic/algorithms/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)

from impunity import impunity
from scipy import signal
from scipy import linalg, signal
from typing_extensions import Annotated, NotRequired

import numpy as np
Expand Down Expand Up @@ -598,6 +598,229 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame:
return data.assign(**self.postprocess(filtered))


class ProcessXYZZFilterBase(FilterBase):
@impunity
def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
groundspeed: Annotated[Any, "kts"] = df.groundspeed
# the angle is wrapped between 0 and 2pi, we need to unwrap it
math_angle: Annotated[Any, "radians"] = np.unwrap(
np.radians(90 - df.track)
)
velocity: Annotated[Any, "m/s"] = groundspeed

x: Annotated[Any, "m"] = df.x
y: Annotated[Any, "m"] = df.y

altitude: Annotated[Any, "ft"] = df.altitude
geoaltitude: Annotated[Any, "ft"] = df.geoaltitude
alt_baro: Annotated[Any, "m"] = altitude
alt_geo: Annotated[Any, "m"] = geoaltitude

vertical_rate: Annotated[Any, "ft/min"] = df.vertical_rate
vert_rate: Annotated[Any, "m/s"] = vertical_rate

return pd.DataFrame(
{
"x": x,
"y": y,
"alt_baro": alt_baro,
"alt_geo": alt_geo,
"math_angle": math_angle,
"velocity": velocity,
"vert_rate": vert_rate,
}
).set_index(df["timestamp"])

@impunity
def postprocess(
self, df: pd.DataFrame
) -> Dict[str, npt.NDArray[np.float64]]:
x: Annotated[Any, "m"] = df.x
y: Annotated[Any, "m"] = df.y
alt_baro: Annotated[Any, "m"] = df.alt_baro
alt_geo: Annotated[Any, "m"] = df.alt_geo
math_angle: Annotated[Any, "radians"] = df.math_angle
velocity: Annotated[Any, "m/s"] = df.velocity
vert_rate: Annotated[Any, "m/s"] = df.vert_rate

altitude: Annotated[Any, "ft"] = alt_baro
geoaltitude: Annotated[Any, "ft"] = alt_geo
track: Annotated[Any, "degree"] = (90 - np.degrees(math_angle)) % 360
groundspeed: Annotated[Any, "kts"] = velocity
vertical_rate: Annotated[Any, "ft/min"] = vert_rate

return dict(
x=x,
y=y,
altitude=altitude,
geoaltitude=geoaltitude,
track=track,
groundspeed=groundspeed,
vertical_rate=vertical_rate,
)


class EKF7D(ProcessXYZZFilterBase):
@staticmethod
def state_transition_function(state: pd.Series, dt: float) -> pd.Series:
# Unpack the state vector
_x, _y, _alt_baro, _alt_geo, math_angle, velocity, vert_rate = state

# Compute the derivatives
x_dot = velocity * np.cos(math_angle)
y_dot = velocity * np.sin(math_angle)
altitude_dot = vert_rate
geoaltitude_dot = vert_rate

# Compute the predicted state
state_pred = state.copy()
state_pred.loc["x"] += x_dot * dt
state_pred.loc["y"] += y_dot * dt
state_pred.loc["alt_baro"] += altitude_dot * dt
state_pred.loc["alt_geo"] += geoaltitude_dot * dt
# Other state variables (math_angle, velocity, vert_rate) are assumed
# to be constant over the time step
return state_pred

@staticmethod
def jacobian_state_transition(
x: pd.Series, dt: float
) -> npt.NDArray[np.float64]:
# Unpack the state vector
_, _, _, _, math_angle, velocity, _ = x

# Compute the Jacobian matrix
F_jacobian = np.eye(7)
# Partial derivative of x_dot w.r.t math_angle:
F_jacobian[0, 4] = -velocity * np.sin(math_angle) * dt
# Partial derivative of x_dot w.r.t velocity:
F_jacobian[0, 5] = np.cos(math_angle) * dt
# Partial derivative of y_dot w.r.t math_angle:
F_jacobian[1, 4] = velocity * np.cos(math_angle) * dt
# Partial derivative of y_dot w.r.t velocity:
F_jacobian[1, 5] = np.sin(math_angle) * dt
# Partial derivative of altitude_dot w.r.t vertical_rate
F_jacobian[2, 6] = dt
# Partial derivative of geoaltitude_dot w.r.t vertical_rate
F_jacobian[3, 6] = dt

return F_jacobian

def __init__(self, smooth=True, reject_sigma: int = 3) -> None:
super().__init__()
self.reject_sigma = reject_sigma
self.smooth = smooth

def apply(self, data: pd.DataFrame) -> pd.DataFrame:
measurements = self.preprocess(data)

# initial state
x0 = measurements.iloc[0] # Initial state
P = np.eye(7) # Initial covariance

std_dev_gps = 0
std_dev_baro = 0
window_size = 17
std_dev_track = 0
std_dev_gps_speed = 0
std_dev_baro_speed = 0
R = (
np.diag(
[
(
(
measurements.x
- measurements.x.rolling(window_size).mean()
).std()
** 2
+ std_dev_gps**2
)
** 0.5,
(
(
measurements.y
- measurements.y.rolling(window_size).mean()
).std()
** 2
+ std_dev_gps**2
)
** 0.5,
(
(
measurements.alt_baro
- measurements.alt_baro.rolling(window_size).mean()
).std()
** 2
+ std_dev_baro**2
)
** 0.5,
(
(
measurements.alt_geo
- measurements.alt_geo.rolling(window_size).mean()
).std()
** 2
+ std_dev_gps**2
)
** 0.5,
(
(
measurements.math_angle
- measurements.math_angle.rolling(
window_size
).mean()
).std()
** 2
+ std_dev_track**2
)
** 0.5,
(
(
measurements.velocity
- measurements.velocity.rolling(window_size).mean()
).std()
** 2
+ std_dev_gps_speed**2
)
** 0.5,
(
(
measurements.vert_rate
- measurements.vert_rate.rolling(window_size).mean()
).std()
** 2
+ std_dev_baro_speed**2
)
** 0.5,
]
)
** 2
)

Q = np.diag([0.1, 0.1, 0.01, 0.01, 0.3, 1, 0.5]) * R
filtered_states, filtered_covariances = extended_kalman_filter(
measurements=measurements,
initial_state=x0,
initial_covariance=P,
Q=Q,
R=R,
jacobian_state_transition=EKF7D.jacobian_state_transition,
state_transition_function=EKF7D.state_transition_function,
reject_sigma=self.reject_sigma,
)
if self.smooth:
filtered_states = rts_smoother(
filtered_states,
filtered_covariances,
Q,
measurements.index,
EKF7D.jacobian_state_transition,
EKF7D.state_transition_function,
)

return data.assign(**self.postprocess(filtered_states))


class KalmanSmoother6D(ProcessXYZFilterBase):
# Descriptors are convenient to store the evolution of the process
x_mes: TrackVariable[npt.NDArray[np.float64]] = TrackVariable()
Expand All @@ -620,7 +843,6 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame:

# initial state
_id6 = np.eye(df.shape[1])
dt = 1

self.x_mes = df.iloc[0].values
self.x1_cor = df.iloc[0].values
Expand Down Expand Up @@ -648,6 +870,9 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame:
# >>> First FORWARD <<<

for i in range(1, df.shape[0]):
dt = (
data.iloc[i]["timestamp"] - data.iloc[i - 1]["timestamp"]
).total_seconds()
# measurement
self.x_mes = df.iloc[i].values

Expand Down Expand Up @@ -704,10 +929,12 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame:
# >>> Now BACKWARD <<<
self.x2_cor = self.x1_cor
self.p2_cor = 100 * self.p1_cor
dt = -dt

for i in range(df.shape[0] - 1, 0, -1):
# measurement
dt = (
data.iloc[i - 1]["timestamp"] - data.iloc[i]["timestamp"]
).total_seconds()
self.x_mes = df.iloc[i].values
# replace NaN values with crazy values
# they will be filtered out because out of the 3 \sigma enveloppe
Expand Down Expand Up @@ -928,3 +1155,105 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame:
)

return data.assign(**self.postprocess(filtered))


def extended_kalman_filter(
measurements: pd.DataFrame,
initial_state: pd.Series,
initial_covariance: npt.NDArray[np.float64],
Q: npt.NDArray[np.float64],
R: npt.NDArray[np.float64],
jacobian_state_transition: Callable[
[pd.Series, float], npt.NDArray[np.float64]
],
state_transition_function: Callable[[pd.Series, float], pd.Series],
reject_sigma: float = 3,
) -> pd.DataFrame:
num_states = len(initial_state)
states = np.repeat(
initial_state.values.reshape(1, -1), measurements.shape[0], axis=0
)
covariances = np.zeros((measurements.shape[0], num_states, num_states))

x = initial_state
P = initial_covariance
timestamps = measurements.index.to_series()

for i in range(1, len(timestamps)):
dt = (timestamps[i] - timestamps[i - 1]).total_seconds()

# Prediction Step
F = jacobian_state_transition(x, dt)
# Predicted (a priori) state estimate:
x = state_transition_function(x, dt)
# Predicted (a priori) estimate covariance:
P = F @ P @ F.T + Q

# Measurement update with rejection mechanism
measurement = measurements.iloc[i]
H = np.eye(num_states)
# Innovation or measurement pre-fit residual:
nu = measurement - x
# Innovation (or pre-fit residual) covariance:
S = H @ P @ H.T + R
std_devs = np.sqrt(np.diag(S))

# Component-wise standard deviation check (gating)
for j in range(num_states):
if abs(nu[j]) > abs(reject_sigma * std_devs[j]):
print(
f"Rejecting measurement {timestamps[i]} for state {measurements.columns[j]}"
)
measurement.iloc[j] = x.iloc[j] # Replace faulty measurement
H[j, j] = 0 # Ignore this component in the update

# Here we recompute the matrix in case something was rejected:
# Innovation (or pre-fit residual) covariance:
S = H @ P @ H.T + R
# Optimal Kalman gain:
K = linalg.solve(S, H @ P, assume_a="pos").T
# Updated (a posteriori) state estimate:
x = x + K @ nu
# Updated (a posteriori) estimate covariance:
P = (np.eye(num_states) - K @ H) @ P

states[i] = x
covariances[i] = P

return pd.DataFrame(states, columns=measurements.columns), covariances


def rts_smoother(
states: pd.DataFrame,
covariances: npt.NDArray[np.float64],
Q: npt.NDArray[np.float64],
timestamps: pd.Series,
jacobian_state_transition: Callable[
[pd.Series, float], npt.NDArray[np.float64]
],
state_transition_function: Callable[[pd.Series, float], pd.Series],
):
num_time_steps = states.shape[0]
smoothed_states = states.copy()
smoothed_covariances = covariances.copy()

for i in range(num_time_steps - 2, -1, -1):
dt = (timestamps[i + 1] - timestamps[i]).total_seconds()

F = jacobian_state_transition(states.iloc[i], dt)

# predicted state
predicted_state = state_transition_function(states.iloc[i], dt)
# predicted covariance
Pp = F @ covariances[i] @ F.T + Q

# G = linalg.solve(Pp, F @ covariances[i], assume_a="pos").T
G = covariances[i] @ F.T @ np.linalg.inv(Pp)
smoothed_states.iloc[i] = states.iloc[i] + G @ (
states.iloc[i + 1] - predicted_state
)
smoothed_covariances[i] = (
covariances[i] + G @ (covariances[i + 1] - Pp) @ G.T
)

return smoothed_states
1 change: 1 addition & 0 deletions src/traffic/core/traffic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from ..core.cache import property_cache
from ..core.structure import Airport
from ..core.time import time_or_delta, timelike, to_datetime
from . import tqdm
from .flight import Flight
from .intervals import Interval, IntervalCollection
from .lazy import LazyTraffic, lazy_evaluation
Expand Down
2 changes: 2 additions & 0 deletions src/traffic/data/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations


from typing import Any

from ... import config
from ...core import Traffic
from ._squawk7700 import Squawk7700Dataset
from .default import Default, Entry


datasets: dict[str, Entry] = dict(
paris_toulouse_2017=dict(
url="https://ndownloader.figshare.com/files/20291055",
Expand Down