Les filtres de Kalman
Les filtres de Kalman permettent d’estimer l’état d’un système dynamique pour lequel on connait :
- les règles / lois d’évolution
- les actions qui lui sont appliquées
- une série d’observations / mesures potentiellement incomplètes ou bruitées.
Dans sa version la plus simple, l’ensemble des transformations du système ainsi que les observations réalisées peuvent être modélisées sous forme de transformations linéaires appliquées à l’état du système.
Supposons un système dynamique dont l’état à l’instant est .
À tout moment, l’estimation de l’état est connue avec une certaine incertitude représentée par une matrice de covariance .
Les gaussiennes multivariées
Ce sont des distributions bien connues avec de nombreuses propriétés mathématiques pratiques:
- marginalisation
- conditionnement
- etc.
Pour ce type de formule, vous pouvez par exemple utiliser le « Matrix CookBook »
%matplotlib inline
import numpy as np
from scipy.linalg import eigh
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from sklearn.datasets import make_spd_matrix
plt.ion()
def compute_ellipse(cov):
w, u = eigh(cov)
std = np.sqrt(w)
angle = np.arctan2(u[1, 0], u[0, 0])
return 3 * std[0], 3 * std[1], angle * 180. / np.pi
def plot_gaussian(mu, cov):
"""Plot the mean and covariance of a 2D Gaussian distribution.
Parameters
----------
mu : array_like, shape (2,)
Mean vector of the distribution.
cov: array_like, shape (2, 2)
Covariance matrix of the distribution.
"""
# Plot the mean vector.
sc = plt.scatter(mu[0], mu[1], label='Mean')
width, height, angle = compute_ellipse(cov[:2, :2])
# Plot the covariance matrix as an ellipse.
ellipse = Ellipse(xy=mu, width=width, height=height, angle=angle,
alpha=0.2, label='Covariance')
plt.gca().add_artist(ellipse)
return sc, ellipse
C = make_spd_matrix(2, random_state=1)
mu = np.array([1, 2])
X = multivariate_normal(mu, C).rvs(1000)
plt.scatter(*X.T, s=1)
plot_gaussian(mu, C);
Distribution jointe
Décomposons notre état distribuée selon une loi Gaussienne, en deux sous-parties, la distribution jointe des deux sous partie est :
Avec .
Marginalisation
Conditionnement
Si on connait la valeur de alors:
Combinaison linéaire
Si on a deux variable aléatoires indépendantes :
Alors la combinaison linéaire des deux est distribuée comme:
Propagation de la dynamique du système
Les filtres de Kalman modélisent le système en temps discret et l’évolution entre deux pas de temps est donnée par la loi linéaire :
avec :
* : L’estimtation de l’état au pas après avoir pris en compte les éventuelles observations
* : L’estimation de l’état au pas par propagation de la dynamique du système depuis l’estimation au pas précédent.
* : matrice de transition qui représente l’évolution « naturelle » du système (pas nécessairement constante)
* : une commande (ou contrôle) appliquée au système
* : l’effet de la commande sur le système (pas nécessairement constante)
* : le bruit du processus, c’est-à-dire ce qu’on ne modélise pas bien.
Exemple
Point mobile dans le plan
Appliquons cela à un objet ponctuel en mouvement sans frottement dans un plan.
L’état de l’objet à un instant est encodé par sa position et sa vitesse :
Le contrôle sur l’objet se fait par l’intermédiaire de l’équivalent d’une poussée :
On peut donc appliquer les équations du mouvement de Newton pour modéliser l’évolution du mobile entre deux instants proches. Si on considère que l’accélération est constante entre est , la vitesse va être données par:
Soit la vitesse initiale à laquelle on ajoute la variation de vitesse due à la poussée.
Et le déplacement peut se calculer par:
Où l’on retrouve la position à l’instant précédent, plus la variation de poistion due à la vitesse initiale, plus la variation de position liée à la poussée.
Mise en équation de la propagation du filtre de Kalman
Pour se caler dans le formalisme des filtre de Kalman, je suppose que l’état du système, au pas de temps , est donné par sa position et sa vitesse :
On peut ensuite séparer dans les équations de mouvement. Les parties qui dépendent de l’état précédent (position et vitesse ) et les parties qui dépendent du contrôle (ici l’accélération) et on peut ainsi construire les différents termes de l’étape de propagation:
Le contrôle corresond ici à la poussée:
Et l’évolution due au contrôle est donnée par:
Simulons un tel systeme:
delta_t = 0.1
t = np.arange(0, 10., delta_t)
# L'accélération est modifée à 3 reprises pendant 10 pas de temps dans des directions différentes
acceleration = np.zeros((len(t), 2))
acceleration[10: 20] = (0, 0.4)
acceleration[30: 40] = (0, -0.6)
acceleration[50: 60] = (0.1, 0.3)
# Matrice d'évolution de l'état
F = np.identity(4)
F[0, 2] = delta_t
F[1, 3] = delta_t
# Matrice donnant l'effet du contrôle sur système
G = np.zeros((4, 2))
G[0, 0] = G[1, 1] = 0.5 * delta_t**2
G[2, 0] = G[3, 1] = delta_t
# Estimation du bruit lié au processus d'évolution
Q = np.diag([0.001, 0.001, 0.002, 0.002]) ** 2
# Etat initial
state_actual = np.zeros((len(t), 4))
state_actual[0] = (0, 0, 0.1, 0)
state_mean = np.zeros((len(t), 4))
state_mean[0] = (0, 0, 0.1, 0)
# Evolution dans le temps
np.random.seed(0)
for i in range(1, len(t)):
state_actual[i] = multivariate_normal(F @ state_actual[i - 1] + G @ acceleration[i], Q).rvs()
state_mean[i] = F @ state_mean[i - 1] + G @ acceleration[i]
from itertools import chain
from IPython.display import HTML
from matplotlib.animation import FuncAnimation
class AnimResult:
def __init__(self, state_actual, acceleration, state_mean, covariance=None, observations=None):
self.state_actual = state_actual
self.acceleration = acceleration
self.state_mean = state_mean
self.covariance = covariance
self.observations = observations
def plot_init(self):
plt.plot(*state_mean.T[:2], "--", color="tab:blue", label="Trajectoire estimée")
plt.plot(*state_actual.T[:2], "-", color="tab:orange", label="Trajectoire simulée")
if self.observations is not None:
self.plot_observations()
def plot_acceleration(self):
if self.acceleration is None:
self.accel_arrow = None
else:
self.accel_arrow = plt.arrow(
*chain(self.state_mean[0, :2],
0.5 * self.acceleration[0]),
width=0.01,
visible=np.any(self.acceleration[i] != 0)
)
def plot_estimate(self):
self.actual_scatter = plt.scatter(*self.state_actual[[0], :2].T, color="tab:orange", label="Position simulée")
if self.covariance is None:
self.mean_scatter = plt.scatter(*self.state_mean[[0], :2].T, color="tab:blue", label="Position estimée")
self.cov_ellipse = None
else:
self.mean_scatter, self.cov_ellipse = plot_gaussian(self.state_mean[0, :2], self.covariance[0, :2, :2])
def update(self, i):
self.actual_scatter.set_offsets(*self.state_actual[[i], :2])
self.mean_scatter.set_offsets(*self.state_mean[[i], :2])
if self.cov_ellipse is not None:
width, height, angle = compute_ellipse(self.covariance[i, :2, :2])
self.cov_ellipse.set_center(self.state_mean[i, :2])
self.cov_ellipse.set_width(width)
self.cov_ellipse.set_height(height)
self.cov_ellipse.set_angle(angle)
if self.acceleration is not None:
self.accel_arrow.set_data(
x=self.state_mean[i, 0],
y=self.state_mean[i, 1],
dx=0.5 * self.acceleration[i, 0],
dy=0.5 * self.acceleration[i, 1]
)
self.accel_arrow.set_visible(np.any(self.acceleration[i] != 0))
if self.observations is not None:
self.update_observations(i)
def plot_observations(self, i):
pass
def update_observations(self, i):
pass
def animate(self):
self.plot_init()
self.plot_estimate()
plt.legend()
self.plot_acceleration()
ani = FuncAnimation(
plt.gcf(), self.update, frames=np.arange(len(t)), interval=100
)
return HTML(ani.to_html5_video())
plt.ioff()
plt.figure()
plt.gca().set_aspect("equal")
AnimResult(state_actual=state_actual, acceleration=acceleration, state_mean=state_mean).animate()
Propagation de l’incertitude
Puisque nous n’avons que des lois normales et des transformations linéaires, l’erreur peut également être propagée avec, en notant est l’estimation de la covariance de l’état :
Puisque:
# Etat initial
state_actual = np.zeros((len(t), 4))
state_actual[0] = (0, 0, 0.1, 0)
state_mean = np.zeros((len(t), 4))
state_mean[0] = (0, 0, 0.1, 0)
state_cov = np.zeros((len(t), 4, 4))
state_cov[0] = Q * 5 ** 2
# Evolution dans le temps
np.random.seed(0)
for i in range(1, len(t)):
state_actual[i] = multivariate_normal(F @ state_actual[i - 1] + G @ acceleration[i], Q).rvs()
state_mean[i] = F @ state_mean[i - 1] + G @ acceleration[i]
state_cov[i] = F @ state_cov[i - 1] @ F.T + Q
plt.ioff()
plt.figure()
plt.gca().set_aspect("equal")
AnimResult(state_actual=state_actual, acceleration=acceleration, state_mean=state_mean, covariance=state_cov).animate()
On voit donc ici le mouvement estimé du système connaissant uniquement l’état initial et les contrôles appliqués sur le système. L’incertitude sur l’état (ici, on ne visualise que l’incertitude sur la position) augmente continuellement au cours du temps.
Pour la diminuer, on peut avoir recours à des observations.
Observations
Théorie
Par ailleurs, une observation se modélise par:
avec:
- : la mesure issue de l’observation
- : la matrice modélisation le processus d’observation
- : le bruit d’observation.
Pour calculer l’information gagnée après une observation, on part de la distribution jointe de l’état courant et du résultat de l’observation:
On peut ensuite appliquer les formules associées aux gaussiennes multivariées:
Soit ici, en supposant qu’on a observé :
Simulons ici une observation correspondant à la mesure de la position et de la vitesse selon l’axe uniquement:
cela donne:
from scipy.stats import multivariate_normal
# Etat initial
state_actual = np.zeros((len(t), 4))
state_actual[0] = (0, 0, 0.1, 0)
state_mean = np.zeros((len(t), 4))
state_mean[0] = (0, 0, 0.1, 0)
state_cov = np.zeros((len(t), 4, 4))
state_cov[0] = Q * 5 ** 2
# Transformation leading to the observation
H = np.array([
[1, 0, 0, 0],
[0, 0, 1, 0]
])
# Observation noise
R = (0.01 ** 2, 0.1 ** 2) * np.identity(2)
observation_times = [40, 60]
observation_results = {}
# Evolution dans le temps
np.random.seed(0)
for i in range(1, len(t)):
state_actual[i] = multivariate_normal(F @ state_actual[i - 1] + G @ acceleration[i], Q).rvs()
state_mean[i] = F @ state_mean[i - 1] + G @ acceleration[i]
state_cov[i] = F @ state_cov[i - 1] @ F.T + Q
if i in observation_times:
# Observation process
observation_results[i] = obs_i = multivariate_normal(H @ state_actual[i], R).rvs()
# update the estimates
z_i = H @ state_mean[i]
cov_state_obs = state_cov[i] @ H.T
cov_obs_obs = H @ state_cov[i] @ H.T + R
state_mean[i] += cov_state_obs @ np.linalg.solve(cov_obs_obs, obs_i - z_i)
state_cov[i] -= cov_state_obs @ np.linalg.solve(cov_obs_obs, cov_state_obs.T)
class AnimLinearObs(AnimResult):
def plot_observations(self):
ymin, ymax = plt.ylim()
dx = 3 * np.sqrt(R.flatten())[0]
self.observation_lines = {}
self.observation_fill = {}
for i, obs_i in self.observations.items():
xi = obs_i[0]
self.observation_lines[i] = plt.vlines(xi, ymin, ymax, color="tab:green", linestyle="--", alpha=0.1)
self.observation_fill[i] = plt.fill_between([xi - dx, xi + dx], [ymin, ymin], [ymax, ymax], color="lightgreen", alpha=0.1)
plt.ylim(ymin, ymax)
def update_observations(self, i):
for iobs, line in self.observation_lines.items():
line.set_alpha(0.8 if i == iobs else 0.1)
self.observation_fill[iobs].set_alpha(0.8 if i == iobs else 0.1)
plt.ioff()
plt.figure()
plt.gca().set_aspect("equal")
AnimLinearObs(state_actual, acceleration, state_mean, state_cov, observation_results).animate()
On peut voir la contraction de l’incertitude selon liée à l’observation.
Extended Kalman Filters
Si les lois d’évolution et d’observation ne sont plus linéaires, il est tout de même possible d’appliquer le filtre de Kalman en le linéarisant localement:
Il suffit alors de remplacer les formules de mise à jour des covariances en utilisant la jacobienne des fonctions et :
Ici, on simule une observation, où l’on peut mesurer la distance et l’angle d’observation depuis l’origine du repère:
La loi de mouvement reste inchangée, donc la jacobienne est identique à la matrice donnée plus tôt:
import jax
import jax.numpy as jnp
def movement(state, control):
return F @ state + G @ control
def observation(state):
x, y = state[:2]
r = jnp.sqrt(x ** 2 + y ** 2)
angle = jnp.arctan2(y, x)
return jnp.array([r, angle * 180 / np.pi])
R = (np.identity(2) * (0.025, 0.5)) ** 2
# Etat initial
state_actual = np.zeros((len(t), 4))
state_actual[0] = (0, 0, 0.1, 0)
state_mean = np.zeros((len(t), 4))
state_mean[0] = (0, 0, 0.1, 0)
state_cov = np.zeros((len(t), 4, 4))
state_cov[0] = Q * 5 ** 2
observation_results = {}
# Evolution dans le temps
np.random.seed(0)
for i in range(1, len(t)):
state_actual[i] = multivariate_normal(movement(state_actual[i - 1], acceleration[i]), Q).rvs()
state_mean[i] = movement(state_mean[i - 1], acceleration[i])
F_i = jax.jacobian(movement)(state_mean[i - 1], acceleration[i])
state_cov[i] = F_i @ state_cov[i - 1] @ F_i.T + Q
if i in observation_times:
# Observation process
observation_results[i] = obs_i = multivariate_normal(observation(state_actual[i]), R).rvs()
# update the estimates
z_i = observation(state_mean[i])
H_i = jax.jacobian(observation)(state_mean[i])
cov_state_obs = state_cov[i] @ H_i.T
cov_obs_obs = H_i @ state_cov[i] @ H_i.T + R
state_mean[i] += cov_state_obs @ np.linalg.solve(cov_obs_obs, obs_i - z_i)
state_cov[i] -= cov_state_obs @ np.linalg.solve(cov_obs_obs, cov_state_obs.T)
def draw_polar_section(r, angle, dr, dangle, circle_grid=10):
x0 = r * np.cos(angle / 180. * np.pi)
y0 = r * np.sin(angle / 180. * np.pi)
rs = np.array([ r - dr, r + dr ] + [r + dr ] * circle_grid + [r - dr] + [r - dr] * circle_grid)
angles = np.array(
[angle - dangle, angle - dangle] +
[(angle - dangle) + (i + 1) / circle_grid * 2 * dangle for i in range(circle_grid)] +
[angle + dangle ] +
[(angle + dangle) - (i + 1) / circle_grid * 2 * dangle for i in range(circle_grid)]
)
polygon_xs = rs * np.cos(angles / 180. * np.pi)
polygon_ys = rs * np.sin(angles / 180. * np.pi)
return plt.scatter([x0], [y0], c="tab:green", alpha=0.1), plt.fill(polygon_xs, polygon_ys, alpha=0.1, color="lightgreen")
class AnimNonLinearObs(AnimResult):
def plot_observations(self):
dr, dangle = 3 * np.sqrt(R.diagonal())
self.observation_patches = {
i: draw_polar_section(*obs_i, dr, dangle) for i, obs_i in self.observations.items()
}
def update_observations(self, i):
for iobs, (obs_center, obs_poly) in self.observation_patches.items():
alpha = 0.8 if i == iobs else 0.1
obs_center.set_alpha(alpha)
for p in obs_poly: p.set_alpha(alpha)
plt.ioff()
plt.figure()
plt.gca().set_aspect("equal")
AnimNonLinearObs(state_actual, acceleration, state_mean, state_cov, observation_results).animate()
Filtres de Kalman sans parfum (Unscented Kalman Filter)
La transformation sans parfum
L’idée de cette transformation et d’estimer l’effet de transformations non-linéaires en utilisant des échantillons bien choisis appelés « points sigma ».
On fait passer ces points à travers les transformations d’évolution temporelle et d’observation et on estime ensuite, à partir de ces points, les paramètres de la distribution résultante.
L’idée est de construire un ensemble de paires de points qui pointent dans les directions principales de la distribution Gaussienne. Si on décompose la covariance en :
Alors, on construit les points sigma comme suit :
à chaque point est associé un poids:
où est un paramètre d’échelle.
%matplotlib inline
import numpy as np
from scipy.linalg import eigh
from math import gcd
def sigma_points(mu, sigma, lambda_):
n_dims = len(mu)
w, u = eigh(sigma)
w = np.maximum(w, 0)
L = (u * np.sqrt(w)).T
X = np.vstack(
[ mu ] +
[ mu + np.sqrt(n_dims + lambda_) * L[i] for i in range(n_dims) ] +
[ mu - np.sqrt(n_dims + lambda_) * L[i] for i in range(n_dims) ]
)
w = np.hstack([lambda_ / (n_dims + lambda_)] + [0.5 / (n_dims + lambda_)] * (2 * n_dims))
return X, w
C = np.array([[1.0, 0.8], [0.8, 1]])
mu = np.array([1, 2])
X_sigma, w_sigma = sigma_points(mu, C, lambda_=1)
plt.ion()
plt.figure()
plot_gaussian(mu, C);
plt.scatter(*X_sigma.T, s=10);
Application to the Kalman Filter
On passe ces points à travers la transformation d’évolution temporelle et à travers la fonction d’observation et on estime ensuite les différentes quantité d’intérêt en intégrant les poids:
# Apply on sigma points matrix instead of single vector
def movement(state, control):
return state @ F.T + G @ control
def observation(state):
x, y = state.T[:2]
r = np.sqrt(x ** 2 + y ** 2)
angle = np.arctan2(y, x)
return np.vstack([r, angle * 180 / np.pi]).T
# Etat initial
state_actual = np.zeros((len(t), 4))
state_actual[0] = (0, 0, 0.1, 0)
state_mean = np.zeros((len(t), 4))
state_mean[0] = (0, 0, 0.1, 0)
state_cov = np.zeros((len(t), 4, 4))
state_cov[0] = Q * 5 ** 2
observation_results = {}
lambda_ = 1e-1
# Evolution dans le temps
np.random.seed(0)
for i in range(1, len(t)):
state_actual[i] = multivariate_normal(movement(state_actual[[i - 1]], acceleration[i])[0], Q).rvs()
state_sigma, weight_sigma = sigma_points(state_mean[i - 1], state_cov[i - 1], lambda_)
state_sigma_tr = movement(state_sigma, acceleration[i])
state_mean[i] = np.einsum("ij, i -> j", state_sigma_tr, weight_sigma)
state_diff = state_sigma_tr - state_mean[i]
state_cov[i] = np.einsum("ij, ik, i -> jk", state_diff, state_diff, weight_sigma)
if i in observation_times:
# Observation process
observation_results[i] = obs_i = multivariate_normal(observation(state_actual[[i]])[0], R).rvs()
# update the estimates
z_sigma = observation(state_sigma_tr)
z_i = np.einsum("ij, i -> j", z_sigma, weight_sigma)
z_diff = z_sigma - z_i
cov_state_obs = np.einsum("ij, ik, i -> jk", state_diff, z_diff, weight_sigma)
cov_obs_obs = np.einsum("ij, ik, i -> jk", z_diff, z_diff, weight_sigma)
state_mean[i] += cov_state_obs @ np.linalg.solve(cov_obs_obs, obs_i - z_i)
state_cov[i] -= cov_state_obs @ np.linalg.solve(cov_obs_obs, cov_state_obs.T)
plt.ioff()
plt.figure()
plt.gca().set_aspect("equal")
AnimNonLinearObs(state_actual, acceleration, state_mean, state_cov, observation_results).animate()