"""
Filename: kalman.py
Reference: http://quant-econ.net/py/kalman.html
Implements the Kalman filter for a linear Gaussian state space model.
"""
from textwrap import dedent
import numpy as np
from numpy import dot
from scipy.linalg import inv
from quantecon.lss import LinearStateSpace
from quantecon.matrix_eqn import solve_discrete_riccati
[docs]class Kalman(object):
r"""
Implements the Kalman filter for the Gaussian state space model
x_{t+1} = A x_t + C w_{t+1}
y_t = G x_t + H v_t.
Here x_t is the hidden state and y_t is the measurement. The shocks
w_t and v_t are iid standard normals. Below we use the notation
Q := CC'
R := HH'
Parameters
-----------
ss : instance of LinearStateSpace
An instance of the quantecon.lss.LinearStateSpace class
x_hat : scalar(float) or array_like(float), optional(default=None)
An n x 1 array representing the mean x_hat and covariance
matrix Sigma of the prior/predictive density. Set to zero if
not supplied.
Sigma : scalar(float) or array_like(float), optional(default=None)
An n x n array representing the covariance matrix Sigma of
the prior/predictive density. Must be positive definite.
Set to the identity if not supplied.
Attributes
----------
Sigma, x_hat : as above
Sigma_infinity : array_like or scalar(float)
The infinite limit of Sigma_t
K_infinity : array_like or scalar(float)
The stationary Kalman gain.
References
----------
http://quant-econ.net/py/kalman.html
"""
def __init__(self, ss, x_hat=None, Sigma=None):
self.ss = ss
self.set_state(x_hat, Sigma)
self.K_infinity = None
self.Sigma_infinity = None
[docs] def set_state(self, x_hat, Sigma):
if Sigma is None:
Sigma = np.identity(self.ss.n)
else:
self.Sigma = np.atleast_2d(Sigma)
if x_hat is None:
x_hat = np.zeros((self.ss.n, 1))
else:
self.x_hat = np.atleast_2d(x_hat)
self.x_hat.shape = self.ss.n, 1
def __repr__(self):
return self.__str__()
def __str__(self):
m = """\
Kalman filter:
- dimension of state space : {n}
- dimension of observation equation : {k}
"""
return dedent(m.format(n=self.ss.n, k=self.ss.k))
[docs] def whitener_lss(self):
r"""
This function takes the linear state space system
that is an input to the Kalman class and it converts
that system to the time-invariant whitener represenation
given by
\tilde{x}_{t+1}^* = \tilde{A} \tilde{x} + \tilde{C} v
a = \tilde{G} \tilde{x}
where
\tilde{x}_t = [x+{t}, \hat{x}_{t}, v_{t}]
and
\tilde{A} = [A 0 0
KG A-KG KH
0 0 0]
\tilde{C} = [C 0
0 0
0 I]
\tilde{G} = [G -G H]
with A, C, G, H coming from the linear state space system
that defines the Kalman instance
Returns
-------
whitened_lss : LinearStateSpace
This is the linear state space system that represents
the whitened system
"""
# Check for steady state Sigma and K
if self.K_infinity is None:
Sig, K = self.stationary_values()
self.Sigma_infinity = Sig
self.K_infinity = K
else:
K = self.K_infinity
# Get the matrix sizes
n, k, m, l = self.ss.n, self.ss.k, self.ss.m, self.ss.l
A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
Atil = np.vstack([np.hstack([A, np.zeros((n, n)), np.zeros((n, l))]),
np.hstack([dot(K, G), A-dot(K, G), dot(K, H)]),
np.zeros((l, 2*n + l))])
Ctil = np.vstack([np.hstack([C, np.zeros((n, l))]),
np.zeros((n, m+l)),
np.hstack([np.zeros((l, m)), np.eye(l)])])
Gtil = np.hstack([G, -G, H])
whitened_lss = LinearStateSpace(Atil, Ctil, Gtil)
self.whitened_lss = whitened_lss
return whitened_lss
[docs] def prior_to_filtered(self, y):
r"""
Updates the moments (x_hat, Sigma) of the time t prior to the
time t filtering distribution, using current measurement y_t.
The updates are according to
x_{hat}^F = x_{hat} + Sigma G' (G Sigma G' + R)^{-1}
(y - G x_{hat})
Sigma^F = Sigma - Sigma G' (G Sigma G' + R)^{-1} G
Sigma
Parameters
----------
y : scalar or array_like(float)
The current measurement
"""
# === simplify notation === #
G, H = self.ss.G, self.ss.H
R = np.dot(H, H.T)
# === and then update === #
y = np.atleast_2d(y)
y.shape = self.ss.k, 1
E = dot(self.Sigma, G.T)
F = dot(dot(G, self.Sigma), G.T) + R
M = dot(E, inv(F))
self.x_hat = self.x_hat + dot(M, (y - dot(G, self.x_hat)))
self.Sigma = self.Sigma - dot(M, dot(G, self.Sigma))
[docs] def filtered_to_forecast(self):
"""
Updates the moments of the time t filtering distribution to the
moments of the predictive distribution, which becomes the time
t+1 prior
"""
# === simplify notation === #
A, C = self.ss.A, self.ss.C
Q = np.dot(C, C.T)
# === and then update === #
self.x_hat = dot(A, self.x_hat)
self.Sigma = dot(A, dot(self.Sigma, A.T)) + Q
[docs] def update(self, y):
"""
Updates x_hat and Sigma given k x 1 ndarray y. The full
update, from one period to the next
Parameters
----------
y : np.ndarray
A k x 1 ndarray y representing the current measurement
"""
self.prior_to_filtered(y)
self.filtered_to_forecast()
[docs] def stationary_values(self):
"""
Computes the limit of Sigma_t as t goes to infinity by
solving the associated Riccati equation. Computation is via the
doubling algorithm (see the documentation in
`matrix_eqn.solve_discrete_riccati`).
Returns
-------
Sigma_infinity : array_like or scalar(float)
The infinite limit of Sigma_t
K_infinity : array_like or scalar(float)
The stationary Kalman gain.
"""
# === simplify notation === #
A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
Q, R = np.dot(C, C.T), np.dot(H, H.T)
# === solve Riccati equation, obtain Kalman gain === #
Sigma_infinity = solve_discrete_riccati(A.T, G.T, Q, R)
temp1 = dot(dot(A, Sigma_infinity), G.T)
temp2 = inv(dot(G, dot(Sigma_infinity, G.T)) + R)
K_infinity = dot(temp1, temp2)
# == record as attributes and return == #
self.Sigma_infinity, self.K_infinity = Sigma_infinity, K_infinity
return Sigma_infinity, K_infinity
[docs] def stationary_coefficients(self, j, coeff_type='ma'):
"""
Wold representation moving average or VAR coefficients for the
steady state Kalman filter.
Parameters
----------
j : int
The lag length
coeff_type : string, either 'ma' or 'var' (default='ma')
The type of coefficent sequence to compute. Either 'ma' for
moving average or 'var' for VAR.
"""
# == simplify notation == #
A, G = self.ss.A, self.ss.G
K_infinity = self.K_infinity
# == make sure that K_infinity has actually been computed == #
if K_infinity is None:
S, K_infinity = self.stationary_values()
# == compute and return coefficients == #
coeffs = []
i = 1
if coeff_type == 'ma':
coeffs.append(np.identity(self.ss.k))
P_mat = A
P = np.identity(self.ss.n) # Create a copy
elif coeff_type == 'var':
coeffs.append(dot(G, K_infinity))
P_mat = A - dot(K_infinity, G)
P = np.copy(P_mat) # Create a copy
else:
raise ValueError("Unknown coefficient type")
while i <= j:
coeffs.append(dot(dot(G, P), K_infinity))
P = dot(P, P_mat)
i += 1
return coeffs
[docs] def stationary_innovation_covar(self):
# == simplify notation == #
H, G = self.ss.H, self.ss.G
R = np.dot(H, H.T)
Sigma_infinity = self.Sigma_infinity
# == make sure that Sigma_infinity has been computed == #
if Sigma_infinity is None:
Sigma_infinity, K = self.stationary_values()
return dot(G, dot(Sigma_infinity, G.T)) + R