Source code for quantecon.arma

"""
Filename: arma.py
Authors: Doc-Jin Jang, Jerry Choi, Thomas Sargent, John Stachurski

Provides functions for working with and visualizing scalar ARMA processes.

TODO: 1. Fix warnings concerning casting complex variables back to floats

"""
import numpy as np
from numpy import conj, pi
import matplotlib.pyplot as plt
from scipy.signal import dimpulse, freqz, dlsim


[docs]class ARMA(object): r""" This class represents scalar ARMA(p, q) processes. If phi and theta are scalars, then the model is understood to be .. math:: X_t = \phi X_{t-1} + \epsilon_t + \theta \epsilon_{t-1} where :math:`epsilon_t` is a white noise process with standard deviation :math:`sigma`. If phi and theta are arrays or sequences, then the interpretation is the ARMA(p, q) model .. math:: X_t = \phi_1 X_{t-1} + ... + \phi_p X_{t-p} + \epsilon_t + \theta_1 \epsilon_{t-1} + ... + \theta_q \epsilon_{t-q} where * :math:`\phi = (\phi_1, \phi_2,..., \phi_p)` * :math:`\theta = (\theta_1, \theta_2,..., \theta_q)` * :math:`\sigma` is a scalar, the standard deviation of the white noise Parameters ---------- phi : scalar or iterable or array_like(float) Autocorrelation values for the autocorrelated variable. See above for explanation. theta : scalar or iterable or array_like(float) Autocorrelation values for the white noise of the model. See above for explanation sigma : scalar(float) The standard deviation of the white noise Attributes ---------- phi, theta, sigma : see Parmeters ar_poly : array_like(float) The polynomial form that is needed by scipy.signal to do the processing we desire. Corresponds with the phi values ma_poly : array_like(float) The polynomial form that is needed by scipy.signal to do the processing we desire. Corresponds with the theta values """ def __init__(self, phi, theta=0, sigma=1): self._phi, self._theta = phi, theta self.sigma = sigma self.set_params() def __repr__(self): m = "ARMA(phi=%s, theta=%s, sigma=%s)" return m % (self.phi, self.theta, self.sigma) def __str__(self): m = "An ARMA({p}, {q}) process" p = np.asarray(self.phi).size q = np.asarray(self.theta).size return m.format(p=p, q=q) # Special latex print method for working in notebook def _repr_latex_(self): m = r"$X_t = " phi = np.atleast_1d(self.phi) theta = np.atleast_1d(self.theta) rhs = "" for (tm, phi_p) in enumerate(phi): # don't include terms if they are equal to zero if abs(phi_p) > 1e-12: rhs += r"%+g X_{t-%i}" % (phi_p, tm+1) if rhs[0] == "+": rhs = rhs[1:] # remove initial `+` if phi_1 was positive rhs += r" + \epsilon_t" for (tm, th_q) in enumerate(theta): # don't include terms if they are equal to zero if abs(th_q) > 1e-12: rhs += r"%+g \epsilon_{t-%i}" % (th_q, tm+1) return m + rhs + "$" @property def phi(self): return self._phi @phi.setter def phi(self, new_value): self._phi = new_value self.set_params() @property def theta(self): return self._theta @theta.setter def theta(self, new_value): self._theta = new_value self.set_params()
[docs] def set_params(self): r""" Internally, scipy.signal works with systems of the form .. math:: ar_{poly}(L) X_t = ma_{poly}(L) \epsilon_t where L is the lag operator. To match this, we set .. math:: ar_{poly} = (1, -\phi_1, -\phi_2,..., -\phi_p) ma_{poly} = (1, \theta_1, \theta_2,..., \theta_q) In addition, ar_poly must be at least as long as ma_poly. This can be achieved by padding it out with zeros when required. """ # === set up ma_poly === # ma_poly = np.asarray(self._theta) self.ma_poly = np.insert(ma_poly, 0, 1) # The array (1, theta) # === set up ar_poly === # if np.isscalar(self._phi): ar_poly = np.array(-self._phi) else: ar_poly = -np.asarray(self._phi) self.ar_poly = np.insert(ar_poly, 0, 1) # The array (1, -phi) # === pad ar_poly with zeros if required === # if len(self.ar_poly) < len(self.ma_poly): temp = np.zeros(len(self.ma_poly) - len(self.ar_poly)) self.ar_poly = np.hstack((self.ar_poly, temp))
[docs] def impulse_response(self, impulse_length=30): """ Get the impulse response corresponding to our model. Returns ------- psi : array_like(float) psi[j] is the response at lag j of the impulse response. We take psi[0] as unity. """ sys = self.ma_poly, self.ar_poly, 1 times, psi = dimpulse(sys, n=impulse_length) psi = psi[0].flatten() # Simplify return value into flat array return psi
[docs] def spectral_density(self, two_pi=True, res=1200): r""" Compute the spectral density function. The spectral density is the discrete time Fourier transform of the autocovariance function. In particular, .. math:: f(w) = \sum_k \gamma(k) exp(-ikw) where gamma is the autocovariance function and the sum is over the set of all integers. Parameters ---------- two_pi : Boolean, optional Compute the spectral density function over [0, pi] if two_pi is False and [0, 2 pi] otherwise. Default value is True res : scalar or array_like(int), optional(default=1200) If res is a scalar then the spectral density is computed at `res` frequencies evenly spaced around the unit circle, but if res is an array then the function computes the response at the frequencies given by the array Returns ------- w : array_like(float) The normalized frequencies at which h was computed, in radians/sample spect : array_like(float) The frequency response """ w, h = freqz(self.ma_poly, self.ar_poly, worN=res, whole=two_pi) spect = h * conj(h) * self.sigma**2 return w, spect
[docs] def autocovariance(self, num_autocov=16): """ Compute the autocovariance function from the ARMA parameters over the integers range(num_autocov) using the spectral density and the inverse Fourier transform. Parameters ---------- num_autocov : scalar(int), optional(default=16) The number of autocovariances to calculate """ spect = self.spectral_density()[1] acov = np.fft.ifft(spect).real # num_autocov should be <= len(acov) / 2 return acov[:num_autocov]
[docs] def simulation(self, ts_length=90): """ Compute a simulated sample path assuming Gaussian shocks. Parameters ---------- ts_length : scalar(int), optional(default=90) Number of periods to simulate for Returns ------- vals : array_like(float) A simulation of the model that corresponds to this class """ sys = self.ma_poly, self.ar_poly, 1 u = np.random.randn(ts_length, 1) * self.sigma vals = dlsim(sys, u)[1] return vals.flatten()
[docs] def plot_impulse_response(self, ax=None, show=True): if show: fig, ax = plt.subplots() ax.set_title('Impulse response') yi = self.impulse_response() ax.stem(list(range(len(yi))), yi) ax.set_xlim(xmin=(-0.5)) ax.set_ylim(min(yi)-0.1, max(yi)+0.1) ax.set_xlabel('time') ax.set_ylabel('response') if show: plt.show()
[docs] def plot_spectral_density(self, ax=None, show=True): if show: fig, ax = plt.subplots() ax.set_title('Spectral density') w, spect = self.spectral_density(two_pi=False) ax.semilogy(w, spect) ax.set_xlim(0, pi) ax.set_ylim(0, np.max(spect)) ax.set_xlabel('frequency') ax.set_ylabel('spectrum') if show: plt.show()
[docs] def plot_autocovariance(self, ax=None, show=True): if show: fig, ax = plt.subplots() ax.set_title('Autocovariance') acov = self.autocovariance() ax.stem(list(range(len(acov))), acov) ax.set_xlim(-0.5, len(acov) - 0.5) ax.set_xlabel('time') ax.set_ylabel('autocovariance') if show: plt.show()
[docs] def plot_simulation(self, ax=None, show=True): if show: fig, ax = plt.subplots() ax.set_title('Sample path') x_out = self.simulation() ax.plot(x_out) ax.set_xlabel('time') ax.set_ylabel('state space') if show: plt.show()
[docs] def quad_plot(self): """ Plots the impulse response, spectral_density, autocovariance, and one realization of the process. """ num_rows, num_cols = 2, 2 fig, axes = plt.subplots(num_rows, num_cols, figsize=(12, 8)) plt.subplots_adjust(hspace=0.4) plot_functions = [self.plot_impulse_response, self.plot_spectral_density, self.plot_autocovariance, self.plot_simulation] for plot_func, ax in zip(plot_functions, axes.flatten()): plot_func(ax, show=False) plt.show()