diff --git a/src/traffic/algorithms/filters.py b/src/traffic/algorithms/filters.py index 4b67ae50..61bc8791 100644 --- a/src/traffic/algorithms/filters.py +++ b/src/traffic/algorithms/filters.py @@ -15,7 +15,7 @@ ) from impunity import impunity -from scipy import signal +from scipy import linalg, signal from typing_extensions import Annotated, NotRequired import numpy as np @@ -598,6 +598,229 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame: return data.assign(**self.postprocess(filtered)) +class ProcessXYZZFilterBase(FilterBase): + @impunity + def preprocess(self, df: pd.DataFrame) -> pd.DataFrame: + groundspeed: Annotated[Any, "kts"] = df.groundspeed + # the angle is wrapped between 0 and 2pi, we need to unwrap it + math_angle: Annotated[Any, "radians"] = np.unwrap( + np.radians(90 - df.track) + ) + velocity: Annotated[Any, "m/s"] = groundspeed + + x: Annotated[Any, "m"] = df.x + y: Annotated[Any, "m"] = df.y + + altitude: Annotated[Any, "ft"] = df.altitude + geoaltitude: Annotated[Any, "ft"] = df.geoaltitude + alt_baro: Annotated[Any, "m"] = altitude + alt_geo: Annotated[Any, "m"] = geoaltitude + + vertical_rate: Annotated[Any, "ft/min"] = df.vertical_rate + vert_rate: Annotated[Any, "m/s"] = vertical_rate + + return pd.DataFrame( + { + "x": x, + "y": y, + "alt_baro": alt_baro, + "alt_geo": alt_geo, + "math_angle": math_angle, + "velocity": velocity, + "vert_rate": vert_rate, + } + ).set_index(df["timestamp"]) + + @impunity + def postprocess( + self, df: pd.DataFrame + ) -> Dict[str, npt.NDArray[np.float64]]: + x: Annotated[Any, "m"] = df.x + y: Annotated[Any, "m"] = df.y + alt_baro: Annotated[Any, "m"] = df.alt_baro + alt_geo: Annotated[Any, "m"] = df.alt_geo + math_angle: Annotated[Any, "radians"] = df.math_angle + velocity: Annotated[Any, "m/s"] = df.velocity + vert_rate: Annotated[Any, "m/s"] = df.vert_rate + + altitude: Annotated[Any, "ft"] = alt_baro + geoaltitude: Annotated[Any, "ft"] = alt_geo + track: Annotated[Any, "degree"] = (90 - np.degrees(math_angle)) % 360 + groundspeed: Annotated[Any, "kts"] = velocity + vertical_rate: Annotated[Any, "ft/min"] = vert_rate + + return dict( + x=x, + y=y, + altitude=altitude, + geoaltitude=geoaltitude, + track=track, + groundspeed=groundspeed, + vertical_rate=vertical_rate, + ) + + +class EKF7D(ProcessXYZZFilterBase): + @staticmethod + def state_transition_function(state: pd.Series, dt: float) -> pd.Series: + # Unpack the state vector + _x, _y, _alt_baro, _alt_geo, math_angle, velocity, vert_rate = state + + # Compute the derivatives + x_dot = velocity * np.cos(math_angle) + y_dot = velocity * np.sin(math_angle) + altitude_dot = vert_rate + geoaltitude_dot = vert_rate + + # Compute the predicted state + state_pred = state.copy() + state_pred.loc["x"] += x_dot * dt + state_pred.loc["y"] += y_dot * dt + state_pred.loc["alt_baro"] += altitude_dot * dt + state_pred.loc["alt_geo"] += geoaltitude_dot * dt + # Other state variables (math_angle, velocity, vert_rate) are assumed + # to be constant over the time step + return state_pred + + @staticmethod + def jacobian_state_transition( + x: pd.Series, dt: float + ) -> npt.NDArray[np.float64]: + # Unpack the state vector + _, _, _, _, math_angle, velocity, _ = x + + # Compute the Jacobian matrix + F_jacobian = np.eye(7) + # Partial derivative of x_dot w.r.t math_angle: + F_jacobian[0, 4] = -velocity * np.sin(math_angle) * dt + # Partial derivative of x_dot w.r.t velocity: + F_jacobian[0, 5] = np.cos(math_angle) * dt + # Partial derivative of y_dot w.r.t math_angle: + F_jacobian[1, 4] = velocity * np.cos(math_angle) * dt + # Partial derivative of y_dot w.r.t velocity: + F_jacobian[1, 5] = np.sin(math_angle) * dt + # Partial derivative of altitude_dot w.r.t vertical_rate + F_jacobian[2, 6] = dt + # Partial derivative of geoaltitude_dot w.r.t vertical_rate + F_jacobian[3, 6] = dt + + return F_jacobian + + def __init__(self, smooth=True, reject_sigma: int = 3) -> None: + super().__init__() + self.reject_sigma = reject_sigma + self.smooth = smooth + + def apply(self, data: pd.DataFrame) -> pd.DataFrame: + measurements = self.preprocess(data) + + # initial state + x0 = measurements.iloc[0] # Initial state + P = np.eye(7) # Initial covariance + + std_dev_gps = 0 + std_dev_baro = 0 + window_size = 17 + std_dev_track = 0 + std_dev_gps_speed = 0 + std_dev_baro_speed = 0 + R = ( + np.diag( + [ + ( + ( + measurements.x + - measurements.x.rolling(window_size).mean() + ).std() + ** 2 + + std_dev_gps**2 + ) + ** 0.5, + ( + ( + measurements.y + - measurements.y.rolling(window_size).mean() + ).std() + ** 2 + + std_dev_gps**2 + ) + ** 0.5, + ( + ( + measurements.alt_baro + - measurements.alt_baro.rolling(window_size).mean() + ).std() + ** 2 + + std_dev_baro**2 + ) + ** 0.5, + ( + ( + measurements.alt_geo + - measurements.alt_geo.rolling(window_size).mean() + ).std() + ** 2 + + std_dev_gps**2 + ) + ** 0.5, + ( + ( + measurements.math_angle + - measurements.math_angle.rolling( + window_size + ).mean() + ).std() + ** 2 + + std_dev_track**2 + ) + ** 0.5, + ( + ( + measurements.velocity + - measurements.velocity.rolling(window_size).mean() + ).std() + ** 2 + + std_dev_gps_speed**2 + ) + ** 0.5, + ( + ( + measurements.vert_rate + - measurements.vert_rate.rolling(window_size).mean() + ).std() + ** 2 + + std_dev_baro_speed**2 + ) + ** 0.5, + ] + ) + ** 2 + ) + + Q = np.diag([0.1, 0.1, 0.01, 0.01, 0.3, 1, 0.5]) * R + filtered_states, filtered_covariances = extended_kalman_filter( + measurements=measurements, + initial_state=x0, + initial_covariance=P, + Q=Q, + R=R, + jacobian_state_transition=EKF7D.jacobian_state_transition, + state_transition_function=EKF7D.state_transition_function, + reject_sigma=self.reject_sigma, + ) + if self.smooth: + filtered_states = rts_smoother( + filtered_states, + filtered_covariances, + Q, + measurements.index, + EKF7D.jacobian_state_transition, + EKF7D.state_transition_function, + ) + + return data.assign(**self.postprocess(filtered_states)) + + class KalmanSmoother6D(ProcessXYZFilterBase): # Descriptors are convenient to store the evolution of the process x_mes: TrackVariable[npt.NDArray[np.float64]] = TrackVariable() @@ -620,7 +843,6 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame: # initial state _id6 = np.eye(df.shape[1]) - dt = 1 self.x_mes = df.iloc[0].values self.x1_cor = df.iloc[0].values @@ -648,6 +870,9 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame: # >>> First FORWARD <<< for i in range(1, df.shape[0]): + dt = ( + data.iloc[i]["timestamp"] - data.iloc[i - 1]["timestamp"] + ).total_seconds() # measurement self.x_mes = df.iloc[i].values @@ -704,10 +929,12 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame: # >>> Now BACKWARD <<< self.x2_cor = self.x1_cor self.p2_cor = 100 * self.p1_cor - dt = -dt for i in range(df.shape[0] - 1, 0, -1): # measurement + dt = ( + data.iloc[i - 1]["timestamp"] - data.iloc[i]["timestamp"] + ).total_seconds() self.x_mes = df.iloc[i].values # replace NaN values with crazy values # they will be filtered out because out of the 3 \sigma enveloppe @@ -928,3 +1155,105 @@ def apply(self, data: pd.DataFrame) -> pd.DataFrame: ) return data.assign(**self.postprocess(filtered)) + + +def extended_kalman_filter( + measurements: pd.DataFrame, + initial_state: pd.Series, + initial_covariance: npt.NDArray[np.float64], + Q: npt.NDArray[np.float64], + R: npt.NDArray[np.float64], + jacobian_state_transition: Callable[ + [pd.Series, float], npt.NDArray[np.float64] + ], + state_transition_function: Callable[[pd.Series, float], pd.Series], + reject_sigma: float = 3, +) -> pd.DataFrame: + num_states = len(initial_state) + states = np.repeat( + initial_state.values.reshape(1, -1), measurements.shape[0], axis=0 + ) + covariances = np.zeros((measurements.shape[0], num_states, num_states)) + + x = initial_state + P = initial_covariance + timestamps = measurements.index.to_series() + + for i in range(1, len(timestamps)): + dt = (timestamps[i] - timestamps[i - 1]).total_seconds() + + # Prediction Step + F = jacobian_state_transition(x, dt) + # Predicted (a priori) state estimate: + x = state_transition_function(x, dt) + # Predicted (a priori) estimate covariance: + P = F @ P @ F.T + Q + + # Measurement update with rejection mechanism + measurement = measurements.iloc[i] + H = np.eye(num_states) + # Innovation or measurement pre-fit residual: + nu = measurement - x + # Innovation (or pre-fit residual) covariance: + S = H @ P @ H.T + R + std_devs = np.sqrt(np.diag(S)) + + # Component-wise standard deviation check (gating) + for j in range(num_states): + if abs(nu[j]) > abs(reject_sigma * std_devs[j]): + print( + f"Rejecting measurement {timestamps[i]} for state {measurements.columns[j]}" + ) + measurement.iloc[j] = x.iloc[j] # Replace faulty measurement + H[j, j] = 0 # Ignore this component in the update + + # Here we recompute the matrix in case something was rejected: + # Innovation (or pre-fit residual) covariance: + S = H @ P @ H.T + R + # Optimal Kalman gain: + K = linalg.solve(S, H @ P, assume_a="pos").T + # Updated (a posteriori) state estimate: + x = x + K @ nu + # Updated (a posteriori) estimate covariance: + P = (np.eye(num_states) - K @ H) @ P + + states[i] = x + covariances[i] = P + + return pd.DataFrame(states, columns=measurements.columns), covariances + + +def rts_smoother( + states: pd.DataFrame, + covariances: npt.NDArray[np.float64], + Q: npt.NDArray[np.float64], + timestamps: pd.Series, + jacobian_state_transition: Callable[ + [pd.Series, float], npt.NDArray[np.float64] + ], + state_transition_function: Callable[[pd.Series, float], pd.Series], +): + num_time_steps = states.shape[0] + smoothed_states = states.copy() + smoothed_covariances = covariances.copy() + + for i in range(num_time_steps - 2, -1, -1): + dt = (timestamps[i + 1] - timestamps[i]).total_seconds() + + F = jacobian_state_transition(states.iloc[i], dt) + + # predicted state + predicted_state = state_transition_function(states.iloc[i], dt) + # predicted covariance + Pp = F @ covariances[i] @ F.T + Q + + # G = linalg.solve(Pp, F @ covariances[i], assume_a="pos").T + G = covariances[i] @ F.T @ np.linalg.inv(Pp) + smoothed_states.iloc[i] = states.iloc[i] + G @ ( + states.iloc[i + 1] - predicted_state + ) + smoothed_covariances[i] = ( + covariances[i] + G @ (covariances[i + 1] - Pp) @ G.T + ) + + return smoothed_states diff --git a/src/traffic/core/traffic.py b/src/traffic/core/traffic.py index 2f385b3d..0237b00b 100644 --- a/src/traffic/core/traffic.py +++ b/src/traffic/core/traffic.py @@ -39,6 +39,7 @@ from ..core.cache import property_cache from ..core.structure import Airport from ..core.time import time_or_delta, timelike, to_datetime +from . import tqdm from .flight import Flight from .intervals import Interval, IntervalCollection from .lazy import LazyTraffic, lazy_evaluation diff --git a/src/traffic/data/datasets/__init__.py b/src/traffic/data/datasets/__init__.py index 8f2c5a9f..cf682cf8 100644 --- a/src/traffic/data/datasets/__init__.py +++ b/src/traffic/data/datasets/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations + from typing import Any from ... import config @@ -7,6 +8,7 @@ from ._squawk7700 import Squawk7700Dataset from .default import Default, Entry + datasets: dict[str, Entry] = dict( paris_toulouse_2017=dict( url="https://ndownloader.figshare.com/files/20291055",