https://github.com/bashtage/arch
Raw File
Tip revision: 6c6fb94b230114e844499d381a3c5845d136fb23 authored by Kevin Sheppard on 26 May 2023, 16:58:13 UTC
Merge pull request #663 from bashtage/backport-632
Tip revision: 6c6fb94
cointegration.py
from __future__ import annotations

from typing import Sequence

import numpy as np
import pandas as pd
from pandas.util._decorators import Appender, Substitution
from scipy import stats
from statsmodels.iolib.summary import Summary, fmt_2cols, fmt_params
from statsmodels.iolib.table import SimpleTable
from statsmodels.regression.linear_model import OLS, RegressionResults

import arch.covariance.kernel as lrcov
from arch.typing import ArrayLike1D, ArrayLike2D, Float64Array, Literal, UnitRootTrend
from arch.unitroot._engle_granger import EngleGrangerTestResults, engle_granger
from arch.unitroot._phillips_ouliaris import (
    CriticalValueWarning,
    PhillipsOuliarisTestResults,
    phillips_ouliaris,
)
from arch.unitroot._shared import (
    KERNEL_ERR,
    KERNEL_ESTIMATORS,
    _check_cointegrating_regression,
    _check_kernel,
    _cross_section,
)
from arch.unitroot.unitroot import SHORT_TREND_DESCRIPTION
from arch.utility.array import ensure2d
from arch.utility.io import pval_format, str_format
from arch.utility.timeseries import add_trend
from arch.vendor import cached_property

__all__ = [
    "engle_granger",
    "EngleGrangerTestResults",
    "DynamicOLS",
    "DynamicOLSResults",
    "phillips_ouliaris",
    "PhillipsOuliarisTestResults",
    "CriticalValueWarning",
]


class _CommonCointegrationResults:
    def __init__(
        self,
        params: pd.Series,
        cov: pd.DataFrame,
        resid: pd.Series,
        kernel_est: lrcov.CovarianceEstimator,
        num_x: int,
        trend: UnitRootTrend,
        df_adjust: bool,
        r2: float,
        adj_r2: float,
        estimator_type: str,
    ):
        self._params = params
        self._cov = cov
        self._resid = resid
        self._bandwidth = kernel_est.bandwidth
        self._kernel = kernel_est.__class__.__name__
        self._kernel_est = kernel_est
        self._num_x = num_x
        self._trend = trend
        self._df_adjust = df_adjust
        self._ci_size = params.shape[0]
        self._rsquared = r2
        self._rsquared_adj = adj_r2
        self._estimator_type = estimator_type

    @property
    def params(self) -> pd.Series:
        """The estimated parameters of the cointegrating vector"""
        return self._params.iloc[: self._ci_size]

    @cached_property
    def std_errors(self) -> pd.Series:
        """
        Standard errors  of the parameters in the cointegrating vector
        """
        se = np.sqrt(np.diag(self.cov))
        return pd.Series(se, index=self.params.index, name="std_errors")

    @cached_property
    def tvalues(self) -> pd.Series:
        """
        T-statistics of the parameters in the cointegrating vector
        """
        return pd.Series(self.params / self.std_errors, name="tvalues")

    @cached_property
    def pvalues(self) -> pd.Series:
        """
        P-value of the parameters in the cointegrating vector
        """
        return pd.Series(2 * (1 - stats.norm.cdf(np.abs(self.tvalues))), name="pvalues")

    @property
    def cov(self) -> pd.DataFrame:
        """The estimated parameter covariance of the cointegrating vector"""
        return self._cov.iloc[: self._ci_size, : self._ci_size]

    @property
    def resid(self) -> pd.Series:
        """The model residuals"""
        return self._resid

    @property
    def kernel(self) -> str:
        """The kernel used to estimate the covariance"""
        return self._kernel

    @property
    def bandwidth(self) -> float:
        """The bandwidth used in the parameter covariance estimation"""
        return self._bandwidth

    @property
    def rsquared(self) -> float:
        """The model R²"""
        return self._rsquared

    @property
    def rsquared_adj(self) -> float:
        """The degree-of-freedom adjusted R²"""
        return self._rsquared_adj

    @cached_property
    def _cov_est(self) -> lrcov.CovarianceEstimate:
        r = np.asarray(self._resid)
        kern_class = self._kernel_est.__class__
        bw = self._bandwidth
        force_int = self._kernel_est.force_int
        cov_est = kern_class(r, bandwidth=bw, center=False, force_int=force_int)
        return cov_est.cov

    @property
    def _df_scale(self) -> float:
        if not self._df_adjust:
            return 1.0
        nobs = self._resid.shape[0]
        nvar = self.params.shape[0]
        return nobs / (nobs - nvar)

    @property
    def residual_variance(self) -> float:
        r"""
        The variance of the regression residual.

        Returns
        -------
        float
            The estimated residual variance.

        Notes
        -----
        The residual variance only accounts for the short-run variance of the
        residual and does not account for any autocorrelation. It is defined
        as

        .. math::

            \hat{\sigma}^2 = T^{-1} \sum _{t=p}^{T-q} \hat{\epsilon}_t^2

        If `df_adjust` is True, then the estimator is rescaled by T/(T-m) where
        m is the number of regressors in the model.
        """
        return self._df_scale * self._cov_est.short_run[0, 0]

    @property
    def long_run_variance(self) -> float:
        """
        The long-run variance of the regression residual.

        Returns
        -------
        float
            The estimated long-run variance of the residual.

        Notes
        -----
        The long-run variance is estimated from the model residuals
        using the same kernel used to estimate the parameter
        covariance.

        If `df_adjust` is True, then the estimator is rescaled by T/(T-m) where
        m is the number of regressors in the model.
        """
        return self._df_scale * self._cov_est.long_run[0, 0]

    @staticmethod
    def _top_table(
        top_left: Sequence[tuple[str, str]],
        top_right: Sequence[tuple[str, str]],
        title: str,
    ) -> SimpleTable:
        stubs = []
        vals = []
        for stub, val in top_left:
            stubs.append(stub)
            vals.append([val])
        table = SimpleTable(vals, txt_fmt=fmt_2cols, title=title, stubs=stubs)

        fmt = fmt_2cols.copy()
        fmt["data_fmts"][1] = "%18s"

        top_right = [("%-21s" % ("  " + k), v) for k, v in top_right]
        stubs = []
        vals = []
        for stub, val in top_right:
            stubs.append(stub)
            vals.append([val])
        table.extend_right(SimpleTable(vals, stubs=stubs))

        return table

    def _top_right(self) -> list[tuple[str, str]]:
        top_right = [
            ("No. Observations:", str(self._resid.shape[0])),
            ("R²:", str_format(self.rsquared)),
            ("Adjusted. R²:", str_format(self.rsquared_adj)),
            ("Residual Variance:", str_format(self.residual_variance)),
            ("Long-run Variance:", str_format(self.long_run_variance)),
            ("", ""),
        ]
        return top_right

    @staticmethod
    def _param_table(
        params: Float64Array,
        se: Float64Array,
        tstats: Float64Array,
        pvalues: Float64Array,
        stubs: list[str],
        title: str,
    ) -> SimpleTable:
        ci = params[:, None] + se[:, None] * stats.norm.ppf([[0.025, 0.975]])
        param_data = np.column_stack([params, se, tstats, pvalues, ci])
        data = []
        for row in param_data:
            txt_row = []
            for i, v in enumerate(row):
                f = str_format
                if i == 3:
                    f = pval_format
                txt_row.append(f(v))
            data.append(txt_row)

        header = ["Parameter", "Std. Err.", "T-stat", "P-value", "Lower CI", "Upper CI"]
        table = SimpleTable(
            data, stubs=stubs, txt_fmt=fmt_params, headers=header, title=title
        )
        return table

    def summary(self) -> Summary:
        """
        Summary of the model, containing estimated parameters and std. errors

        Returns
        -------
        Summary
            A summary instance with method that support export to text, csv
            or latex.
        """
        if self._bandwidth != int(self._bandwidth):
            bw = str_format(self._bandwidth)
        else:
            bw = str(int(self._bandwidth))

        top_left = [
            ("Trend:", SHORT_TREND_DESCRIPTION[self._trend]),
            ("Kernel:", str(self._kernel)),
            ("Bandwidth:", bw),
            ("", ""),
            ("", ""),
            ("", ""),
        ]

        top_right = self._top_right()
        smry = Summary()
        title = self._estimator_type
        table = self._top_table(top_left, top_right, title)
        # Top Table
        # Parameter table
        smry.tables.append(table)
        params = np.asarray(self.params)
        stubs = list(self.params.index)
        se = np.asarray(self.std_errors)
        tstats = np.asarray(self.tvalues)
        pvalues = np.asarray(self.pvalues)

        title = "Cointegrating Vector"
        table = self._param_table(params, se, tstats, pvalues, stubs, title)
        smry.tables.append(table)

        return smry


class DynamicOLSResults(_CommonCointegrationResults):
    """
    Estimation results for Dynamic OLS models

    Parameters
    ----------
    params : Series
        The estimated model parameters.
    cov : DataFrame
        The estimated parameter covariance.
    resid : Series
        The model residuals.
    lags : int
        The number of lags included in the model.
    leads : int
        The number of leads included in the model.
    cov_type : str
        The type of the parameter covariance estimator used.
    kernel_est : CovarianceEstimator
        The covariance estimator instance used to estimate the parameter
        covariance.
    reg_results : RegressionResults
        Regression results from fitting statsmodels OLS.
    df_adjust : bool
        Whether to degree of freedom adjust the estimator.
    """

    def __init__(
        self,
        params: pd.Series,
        cov: pd.DataFrame,
        resid: pd.Series,
        lags: int,
        leads: int,
        cov_type: str,
        kernel_est: lrcov.CovarianceEstimator,
        num_x: int,
        trend: UnitRootTrend,
        reg_results: RegressionResults,
        df_adjust: bool,
    ) -> None:
        super().__init__(
            params,
            cov,
            resid,
            kernel_est,
            num_x,
            trend,
            df_adjust,
            r2=reg_results.rsquared,
            adj_r2=reg_results.rsquared_adj,
            estimator_type="Dynamic OLS",
        )
        self._leads = leads
        self._lags = lags
        self._cov_type = cov_type
        self._ci_size = params.shape[0] - self._num_x * (leads + lags + 1)

    @property
    def full_params(self) -> pd.Series:
        """The complete set of parameters, including leads and lags"""
        return self._params

    @property
    def full_cov(self) -> pd.DataFrame:
        """
        Parameter covariance of the all model parameters, incl. leads and lags
        """
        return self._cov

    @property
    def lags(self) -> int:
        """The number of lags included in the model"""
        return self._lags

    @property
    def leads(self) -> int:
        """The number of leads included in the model"""
        return self._leads

    @property
    def cov_type(self) -> str:
        """The type of parameter covariance estimator used"""
        return self._cov_type

    @property
    def _df_scale(self) -> float:
        if not self._df_adjust:
            return 1.0
        nobs = self._resid.shape[0]
        nvar = self.full_params.shape[0]
        return nobs / (nobs - nvar)

    def summary(self, full: bool = False) -> Summary:
        """
        Summary of the model, containing estimated parameters and std. errors

        Parameters
        ----------
        full : bool, default False
            Flag indicating whether to include all estimated parameters
            (True) or only the parameters of the cointegrating vector

        Returns
        -------
        Summary
            A summary instance with method that support export to text, csv
            or latex.
        """
        if self._bandwidth != int(self._bandwidth):
            bw = str_format(self._bandwidth)
        else:
            bw = str(int(self._bandwidth))

        top_left = [
            ("Trend:", SHORT_TREND_DESCRIPTION[self._trend]),
            ("Leads:", str(self._leads)),
            ("Lags:", str(self._lags)),
            ("Cov Type:", str(self._cov_type)),
            ("Kernel:", str(self._kernel)),
            ("Bandwidth:", bw),
        ]

        top_right = self._top_right()
        smry = Summary()
        typ = "Cointegrating Vector" if not full else "Model"
        title = f"Dynamic OLS {typ} Summary"
        table = self._top_table(top_left, top_right, title)
        # Top Table
        # Parameter table
        smry.tables.append(table)
        if full:
            params = np.asarray(self.full_params)
            stubs = list(self.full_params.index)
            se = np.sqrt(np.diag(self.full_cov))
            tstats = params / se
            pvalues = 2 * (1 - stats.norm.cdf(np.abs(tstats)))
        else:
            params = np.asarray(self.params)
            stubs = list(self.params.index)
            se = np.asarray(self.std_errors)
            tstats = np.asarray(self.tvalues)
            pvalues = np.asarray(self.pvalues)

        title = "Cointegrating Vector" if not full else "Model Parameters"
        assert isinstance(se, np.ndarray)
        table = self._param_table(params, se, tstats, pvalues, stubs, title)
        smry.tables.append(table)

        return smry


class DynamicOLS:
    r"""
    Dynamic OLS (DOLS) cointegrating vector estimation

    Parameters
    ----------
    y : array_like
        The left-hand-side variable in the cointegrating regression.
    x : array_like
        The right-hand-side variables in the cointegrating regression.
    trend : {"n","c","ct","ctt"}, default "c"
        Trend to include in the cointegrating regression. Trends are:

        * "n": No deterministic terms
        * "c": Constant
        * "ct": Constant and linear trend
        * "ctt": Constant, linear and quadratic trends
    lags : int, default None
        The number of lags to include in the model.  If None, the optimal
        number of lags is chosen using method.
    leads : int, default None
        The number of leads to include in the model.  If None, the optimal
        number of leads is chosen using method.
    common : bool, default False
        Flag indicating that lags and leads should be restricted to the same
        value. When common is None, lags must equal leads and max_lag must
        equal max_lead.
    max_lag : int, default None
        The maximum lag to consider. See Notes for value used when None.
    max_lead : int, default None
        The maximum lead to consider. See Notes for value used when None.
    method : {"aic","bic","hqic"}, default "bic"
        The method used to select lag length when lags or leads is None.

        * "aic" - Akaike Information Criterion
        * "hqic" - Hannan-Quinn Information Criterion
        * "bic" - Schwartz/Bayesian Information Criterion

    Notes
    -----
    The cointegrating vector is estimated from the regression

    .. math ::

       Y_t = D_t \delta + X_t \beta + \Delta X_{t} \gamma
             + \sum_{i=1}^p \Delta X_{t-i} \kappa_i
             + \sum _{j=1}^q \Delta X_{t+j} \lambda_j + \epsilon_t

    where p is the lag length and q is the lead length.  :math:`D_t` is a
    vector containing the deterministic terms, if any. All specifications
    include the contemporaneous difference :math:`\Delta X_{t}`.

    When lag lengths are not provided, the optimal lag length is chosen to
    minimize an Information Criterion of the form

    .. math::

        \ln\left(\hat{\sigma}^2\right) + k\frac{c}{T}

    where c is 2 for Akaike, :math:`2\ln\ln T` for Hannan-Quinn and
    :math:`\ln T` for Schwartz/Bayesian.

    See [1]_ and [2]_ for further details.

    References
    ----------
    .. [1] Saikkonen, P. (1992). Estimation and testing of cointegrated
       systems by an autoregressive approximation. Econometric theory,
       8(1), 1-27.
    .. [2] Stock, J. H., & Watson, M. W. (1993). A simple estimator of
       cointegrating vectors in higher order integrated systems.
       Econometrica: Journal of the Econometric Society, 783-820.
    """

    def __init__(
        self,
        y: ArrayLike1D,
        x: ArrayLike2D,
        trend: UnitRootTrend = "c",
        lags: int | None = None,
        leads: int | None = None,
        common: bool = False,
        max_lag: int | None = None,
        max_lead: int | None = None,
        method: Literal["aic", "bic", "hqic"] = "bic",
    ) -> None:
        setup = _check_cointegrating_regression(y, x, trend)
        self._y = setup.y
        self._x = setup.x
        self._trend = setup.trend
        self._lags = lags
        self._leads = leads
        self._max_lag = max_lag
        self._max_lead = max_lead
        self._method = method
        self._common = bool(common)
        self._y_df = pd.DataFrame(self._y)
        self._check_inputs()

    def _check_inputs(self) -> None:
        """Validate the inputs"""
        if not isinstance(self._method, str) or self._method.lower() not in (
            "aic",
            "bic",
            "hqic",
        ):
            raise ValueError('method must be one of "aic", "bic", or "hqic"')
        max_lag = self._max_lag
        self._max_lag = int(max_lag) if max_lag is not None else max_lag
        max_lead = self._max_lead
        self._max_lead = int(max_lead) if max_lead is not None else max_lead
        self._leads = int(self._leads) if self._leads is not None else self._leads
        self._lags = int(self._lags) if self._lags is not None else self._lags

        if self._common and self._leads != self._lags:
            raise ValueError(
                "common is specified but leads and lags have different values"
            )
        if self._common and self._max_lead != self._max_lag:
            raise ValueError(
                "common is specified but max_lead and max_lag have different values"
            )
        max_ll = self._max_lead_lag()

        obs_remaining = self._y.shape[0] - 1
        obs_remaining -= max_ll if max_lag is None else max_lag
        obs_remaining -= max_ll if max_lead is None else max_lead
        if obs_remaining <= 0:
            raise ValueError(
                "max_lag and max_lead are too large for the amount of "
                "data. The largest model specification in the search "
                "cannot be estimated."
            )

    def _format_variables(
        self, leads: int, lags: int
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """Format the variables for the regression"""
        x = self._x
        y = self._y_df
        delta_x = x.diff()
        data = [y, x]

        for lag in range(-lags, leads + 1):
            lag_data = delta_x.shift(-lag)
            typ = "LAG" if lag < 0 else "LEAD"
            lag_data.columns = pd.Index(
                [f"D.{c}.{typ}{abs(lag)}" for c in lag_data.columns]
            )
            if lag == 0:
                lag_data.columns = pd.Index([f"D.{c}" for c in lag_data.columns])
            data.append(lag_data)
        data_df: pd.DataFrame = pd.concat(data, axis=1).dropna()
        lhs, rhs = data_df.iloc[:, :1], data_df.iloc[:, 1:]
        nrhs = rhs.shape[1]
        with_trend = add_trend(rhs, trend=self._trend, prepend=True)
        assert isinstance(with_trend, pd.DataFrame)
        rhs = with_trend
        ntrend = rhs.shape[1] - nrhs
        if ntrend:
            nx = x.shape[1]
            trend = rhs.iloc[:, :ntrend]
            rhs = pd.concat(
                [rhs.iloc[:, ntrend : ntrend + nx], trend, rhs.iloc[:, ntrend + nx :]],
                axis=1,
            )
        return lhs, rhs

    def _ic(self, resids: Float64Array, nparam: int) -> float:
        """Compute an info criterion"""
        nobs = resids.shape[0]
        sigma2 = float(resids.T @ resids / nobs)
        if self._method == "aic":
            penalty = 2.0
        elif self._method == "hqic":
            penalty = 2.0 * float(np.log(np.log(nobs)))
        else:  # bic
            penalty = float(np.log(nobs))
        return np.log(sigma2) + nparam * penalty / nobs

    def _max_lead_lag(self) -> int:
        nobs = self._y.shape[0]
        return int(np.ceil(12.0 * (nobs / 100) ** (1 / 4)))

    def _leads_and_lags(self) -> tuple[int, int]:
        """Select the optimal number of leads and lags"""
        if self._lags is not None and self._leads is not None:
            return self._leads, self._lags
        nobs = self._y.shape[0]
        max_lead_lag = int(np.ceil(12.0 * (nobs / 100) ** (1 / 4)))
        if self._lags is None:
            max_lag = max_lead_lag if self._max_lag is None else self._max_lag
            min_lag = 0
        else:
            min_lag = max_lag = self._lags
        if self._leads is None:
            max_lead = max_lead_lag if self._max_lead is None else self._max_lead
            min_lead = 0
        else:
            min_lead = max_lead = self._leads
        variables = self._format_variables(max_lead, max_lag)
        lhs = np.asarray(variables[0])
        rhs = np.asarray(variables[1])
        nx = self._x.shape[1]
        # +1 to account for the Delta X(t) (not a lead or a lag)
        lead_lag_offset = rhs.shape[1] - (max_lead + max_lag + 1) * nx
        always_loc = np.arange(lead_lag_offset)
        best_ic = np.inf
        best_leads_and_lags = (0, 0)
        for lag in range(min_lag, max_lag + 1):
            for lead in range(min_lead, max_lead + 1):
                if self._common and lag != lead:
                    continue
                lag_start = max_lag - lag
                # +1 to get LAG0 in all regressions
                lead_end = max_lag + 1 + lead
                lead_lag_locs = np.arange(lag_start * nx, lead_end * nx)
                lead_lag_locs += lead_lag_offset
                locs = np.r_[always_loc, lead_lag_locs]
                _rhs = rhs[:, locs]
                params = np.linalg.lstsq(_rhs, lhs, rcond=None)[0]
                resid = np.squeeze(lhs - _rhs @ params)
                ic = self._ic(resid, params.shape[0])
                if ic < best_ic:
                    best_ic = ic
                    best_leads_and_lags = (lead, lag)
        return best_leads_and_lags

    def fit(
        self,
        cov_type: Literal[
            "unadjusted", "homoskedastic", "robust", "kernel"
        ] = "unadjusted",
        kernel: str = "bartlett",
        bandwidth: int | None = None,
        force_int: bool = False,
        df_adjust: bool = False,
    ) -> DynamicOLSResults:
        r"""
        Estimate the Dynamic OLS regression

        Parameters
        ----------
        cov_type : str, default "unadjusted"
            Either "unadjusted" (or is equivalent "homoskedastic") or "robust"
            (or its equivalent "kernel").
        kernel : str, default "bartlett"
            The string name of any of any known kernel-based long-run
            covariance estimators. Common choices are "bartlett" for the
            Bartlett kernel (Newey-West), "parzen" for the Parzen kernel
            and "quadratic-spectral" for the Quadratic Spectral kernel.
        bandwidth : int, default None
            The bandwidth to use. If not provided, the optimal bandwidth is
            estimated from the data. Setting the bandwidth to 0 and using
            "unadjusted" produces the classic OLS covariance estimator.
            Setting the bandwidth to 0 and using "robust" produces White's
            covariance estimator.
        force_int : bool, default False
            Whether the force the estimated optimal bandwidth to be an integer.
        df_adjust : bool, default False
            Whether the adjust the parameter covariance to account for the
            number of parameters estimated in the regression. If true, the
            parameter covariance estimator is multiplied by T/(T-k) where
            k is the number of regressors in the model.

        Returns
        -------
        DynamicOLSResults
            The estimation results.

        See Also
        --------
        arch.unitroot.cointegration.engle_granger
            Cointegration testing using the Engle-Granger methodology
        statsmodels.regression.linear_model.OLS
            Ordinal Least Squares regression.

        Notes
        -----
        When using the unadjusted covariance, the parameter covariance is
        estimated as

        .. math::

            T^{-1} \hat{\sigma}^2_{HAC} \hat{\Sigma}_{ZZ}^{-1}

        where :math:`\hat{\sigma}^2_{HAC}` is an estimator of the long-run
        variance of the regression error and
        :math:`\hat{\Sigma}_{ZZ}=T^{-1}Z'Z`. :math:`Z_t` is a vector the
        includes all terms in the regression (i.e., deterministics,
        cross-sectional, leads and lags) When using the robust covariance,
        the parameter covariance is estimated as

        .. math::

            T^{-1} \hat{\Sigma}_{ZZ}^{-1} \hat{S}_{HAC} \hat{\Sigma}_{ZZ}^{-1}

        where :math:`\hat{S}_{HAC}` is a Heteroskedasticity-Autocorrelation
        Consistent estimator of the covariance of the regression scores
        :math:`Z_t\epsilon_t`.
        """
        leads, lags = self._leads_and_lags()
        # TODO: Rank check and drop??
        lhs, rhs = self._format_variables(leads, lags)
        mod = OLS(lhs, rhs)
        res = mod.fit()
        coeffs = np.asarray(res.params)
        resid = lhs.squeeze() - (rhs @ coeffs).squeeze()
        resid.name = "resid"
        cov, est = self._cov(
            cov_type, kernel, bandwidth, force_int, df_adjust, rhs, resid
        )
        params = pd.Series(np.squeeze(coeffs), index=rhs.columns, name="params")
        num_x = self._x.shape[1]
        return DynamicOLSResults(
            params,
            cov,
            resid,
            lags,
            leads,
            cov_type,
            est,
            num_x,
            self._trend,
            res,
            df_adjust,
        )

    @staticmethod
    def _cov(
        cov_type: Literal["unadjusted", "homoskedastic", "robust", "kernel"],
        kernel: str,
        bandwidth: int | None,
        force_int: bool,
        df_adjust: bool,
        rhs: pd.DataFrame,
        resids: pd.Series,
    ) -> tuple[pd.DataFrame, lrcov.CovarianceEstimator]:
        """Estimate the covariance"""
        kernel = kernel.lower().replace("-", "").replace("_", "")
        if kernel not in KERNEL_ESTIMATORS:
            raise ValueError(KERNEL_ERR)
        x = np.asarray(rhs)
        eps = ensure2d(np.asarray(resids), "eps")
        nobs, nx = x.shape
        sigma_xx = x.T @ x / nobs
        sigma_xx_inv = np.linalg.inv(sigma_xx)
        kernel_est = KERNEL_ESTIMATORS[kernel]
        scale = nobs / (nobs - nx) if df_adjust else 1.0
        if cov_type in ("unadjusted", "homoskedastic"):
            est = kernel_est(eps, bandwidth, center=False, force_int=force_int)
            sigma2 = np.squeeze(est.cov.long_run)
            cov = (scale * sigma2) * sigma_xx_inv / nobs
        elif cov_type in ("robust", "kernel"):
            scores = x * eps
            est = kernel_est(scores, bandwidth, center=False, force_int=force_int)
            s = est.cov.long_run
            cov = scale * sigma_xx_inv @ s @ sigma_xx_inv / nobs
        else:
            raise ValueError("Unknown cov_type")
        cov_df = pd.DataFrame(cov, columns=rhs.columns, index=rhs.columns)
        return cov_df, est


class CointegrationAnalysisResults(_CommonCointegrationResults):
    def __init__(
        self,
        params: pd.Series,
        cov: pd.DataFrame,
        resid: pd.Series,
        omega_112: float,
        kernel_est: lrcov.CovarianceEstimator,
        num_x: int,
        trend: UnitRootTrend,
        df_adjust: bool,
        rsquared: float,
        rsquared_adj: float,
        estimator_type: str,
    ):
        super().__init__(
            params,
            cov,
            resid,
            kernel_est,
            num_x,
            trend,
            df_adjust,
            rsquared,
            rsquared_adj,
            estimator_type,
        )
        self._omega_112 = omega_112

    @property
    def long_run_variance(self) -> float:
        """
        Long-run variance estimate used in the parameter covariance estimator
        """
        return self._omega_112


COMMON_DOCSTRING = r"""
    %(method)s cointegrating vector estimation.

    Parameters
    ----------
    y : array_like
        The left-hand-side variable in the cointegrating regression.
    x : array_like
        The right-hand-side variables in the cointegrating regression.
    trend : {"n","c","ct","ctt"}, default "c"
        Trend to include in the cointegrating regression. Trends are:

        * "n": No deterministic terms
        * "c": Constant
        * "ct": Constant and linear trend
        * "ctt": Constant, linear and quadratic trends
    x_trend : {None,"c","ct","ctt"}, default None
        Trends that affects affect the x-data but do not appear in the
        cointegrating regression. x_trend must be at least as large as
        trend, so that if trend is "ct", x_trend must be either "ct" or
        "ctt".

    Notes
    -----
    The cointegrating vector is estimated from the regressions

    .. math::

       Y_t & = D_{1t} \delta + X_t \beta + \eta_{1t} \\
       X_t & = D_{1t} \Gamma_1 + D_{2t}\Gamma_2 + \epsilon_{2t} \\
       \eta_{2t} & = \Delta \epsilon_{2t}

    or if estimated in differences, the last two lines are

    .. math::

       \Delta X_t = \Delta D_{1t} \Gamma_1 + \Delta D_{2t} \Gamma_2 + \eta_{2t}

    Define the vector of residuals as :math:`\eta = (\eta_{1t},\eta'_{2t})'`, and the
    long-run covariance

    .. math::

       \Omega = \sum_{h=-\infty}^{\infty} E[\eta_t\eta_{t-h}']

    and the one-sided long-run covariance matrix

    .. math::

       \Lambda_0 = \sum_{h=0}^\infty E[\eta_t\eta_{t-h}']

    The covariance matrices are partitioned into a block form

    .. math::

       \Omega = \left[\begin{array}{cc}
                         \omega_{11} & \omega_{12} \\
                         \omega'_{12} & \Omega_{22}
                \end{array} \right]

    The cointegrating vector is then estimated using modified data

%(estimator)s
    """

CCR_METHOD = "Canonical Cointegrating Regression"

CCR_ESTIMATOR = r"""
    .. math::

        X^\star_t & = X_t - \hat{\Lambda}_2'\hat{\Sigma}^{-1}\hat{\eta}_t \\
        Y^\star_t & = Y_t - (\hat{\Sigma}^{-1} \hat{\Lambda}_2 \hat{\beta}
                           + \hat{\kappa})' \hat{\eta}_t

    where :math:`\hat{\kappa} = (0,\hat{\Omega}_{22}^{-1}\hat{\Omega}'_{12})` and
    the regression

    .. math::

       Y^\star_t = D_{1t} \delta + X^\star_t \beta + \eta^\star_{1t}

    See [1]_ for further details.

    References
    ----------
    .. [1] Park, J. Y. (1992). Canonical cointegrating regressions. Econometrica:
       Journal of the Econometric Society, 119-143.
"""

FMOLS_METHOD = "Fully Modified OLS"

FMOLS_ESTIMATOR = r"""
    .. math::

       Y^\star_t = Y_t - \hat{\omega}_{12}\hat{\Omega}_{22}\hat{\eta}_{2t}

    as

    .. math::

        \hat{\theta} = \left[\begin{array}{c}\hat{\gamma}_1 \\ \hat{\beta} \end{array}\right]
                     = \left(\sum_{t=2}^T Z_tZ'_t\right)^{-1}
                       \left(\sum_{t=2}^t Z_t Y^\star_t -
                       T \left[\begin{array}{c} 0 \\ \lambda^{\star\prime}_{12}
                       \end{array}\right]\right)

    where the bias term is defined

    .. math::

       \lambda^\star_{12} = \hat{\lambda}_{12}
                            - \hat{\omega}_{12}\hat{\Omega}_{22}\hat{\omega}_{21}

    See [1]_ for further details.

    References
    ----------
    .. [1] Hansen, B. E., & Phillips, P. C. (1990). Estimation and inference in models of
       cointegration: A simulation study. Advances in Econometrics, 8(1989), 225-248.
"""


@Substitution(method=FMOLS_METHOD, estimator=FMOLS_ESTIMATOR)
@Appender(COMMON_DOCSTRING)
class FullyModifiedOLS:
    def __init__(
        self,
        y: ArrayLike1D,
        x: ArrayLike2D,
        trend: UnitRootTrend = "c",
        x_trend: UnitRootTrend | None = None,
    ) -> None:
        setup = _check_cointegrating_regression(y, x, trend)
        self._y = setup.y
        self._x = setup.x
        self._trend = setup.trend
        self._x_trend = x_trend
        self._y_df = pd.DataFrame(self._y)

    def _common_fit(
        self, kernel: str, bandwidth: float | None, force_int: bool, diff: bool
    ) -> tuple[lrcov.CovarianceEstimator, Float64Array, Float64Array]:
        kernel = _check_kernel(kernel)
        res = _cross_section(self._y, self._x, self._trend)
        x = np.asarray(self._x)
        eta_1 = np.asarray(res.resid)
        if self._x_trend is not None:
            x_trend = self._x_trend
        else:
            x_trend = self._trend
        tr = add_trend(nobs=x.shape[0], trend=x_trend)
        if tr.shape[1] > 1 and diff:
            delta_tr = np.diff(tr[:, 1:], axis=0)
            delta_x = np.diff(x, axis=0)
            gamma = np.linalg.lstsq(delta_tr, delta_x, rcond=None)[0]
            eta_2 = delta_x - delta_tr @ gamma
        else:
            if tr.shape[1]:
                gamma = np.linalg.lstsq(tr, x, rcond=None)[0]
                eps = x - tr @ gamma
            else:
                eps = x
            eta_2 = np.diff(eps, axis=0)
        eta = np.column_stack([eta_1[1:], eta_2])
        kernel = _check_kernel(kernel)
        kern_est = KERNEL_ESTIMATORS[kernel]
        cov_est = kern_est(eta, bandwidth=bandwidth, center=False, force_int=force_int)
        beta = np.asarray(res.params)[: x.shape[1]]
        return cov_est, eta, beta

    def _final_statistics(self, theta: pd.Series) -> tuple[pd.Series, float, float]:
        z = add_trend(self._x, self._trend)
        nobs, nvar = z.shape
        resid = self._y - np.asarray(z @ theta)
        resid.name = "resid"
        center = 0.0
        tss_df = 0
        if "c" in self._trend:
            center = float(self._y.mean())
            tss_df = 1
        y_centered = self._y - center
        ssr = resid.T @ resid
        tss = y_centered.T @ y_centered
        r2 = 1.0 - ssr / tss
        r2_adj = 1.0 - (ssr / (nobs - nvar)) / (tss / (nobs - tss_df))
        return resid, r2, r2_adj

    def fit(
        self,
        kernel: str = "bartlett",
        bandwidth: float | None = None,
        force_int: bool = True,
        diff: bool = False,
        df_adjust: bool = False,
    ) -> CointegrationAnalysisResults:
        """
        Estimate the cointegrating vector.

        Parameters
        ----------
        diff : bool, default False
            Use differenced data to estimate the residuals.
        kernel : str, default "bartlett"
            The string name of any of any known kernel-based long-run
            covariance estimators. Common choices are "bartlett" for the
            Bartlett kernel (Newey-West), "parzen" for the Parzen kernel
            and "quadratic-spectral" for the Quadratic Spectral kernel.
        bandwidth : int, default None
            The bandwidth to use. If not provided, the optimal bandwidth is
            estimated from the data. Setting the bandwidth to 0 and using
            "unadjusted" produces the classic OLS covariance estimator.
            Setting the bandwidth to 0 and using "robust" produces White's
            covariance estimator.
        force_int : bool, default False
            Whether the force the estimated optimal bandwidth to be an integer.
        df_adjust : bool, default False
            Whether the adjust the parameter covariance to account for the
            number of parameters estimated in the regression. If true, the
            parameter covariance estimator is multiplied by T/(T-k) where
            k is the number of regressors in the model.

        Returns
        -------
        CointegrationAnalysisResults
            The estimation results instance.
        """
        cov_est, eta, _ = self._common_fit(kernel, bandwidth, force_int, diff)
        omega = np.asarray(cov_est.cov.long_run)
        lmbda = np.asarray(cov_est.cov.one_sided)
        omega_12 = omega[:1, 1:]
        omega_22 = omega[1:, 1:]
        omega_22_inv = np.linalg.inv(omega_22)
        eta_2 = eta[:, 1:]
        y, x = np.asarray(self._y_df), np.asarray(self._x)
        y_dot = y[1:] - eta_2 @ omega_22_inv @ omega_12.T

        lmbda_12 = lmbda[:1, 1:]
        lmbda_22 = lmbda[1:, 1:]
        lmbda_12_dot = lmbda_12 - omega_12 @ omega_22_inv @ lmbda_22
        with_trend = add_trend(self._x, trend=self._trend)
        assert isinstance(with_trend, pd.DataFrame)
        z_df = with_trend
        z_df = z_df.iloc[1:]
        z = np.asarray(z_df)
        zpz = z.T @ z

        nobs, nvar = z.shape
        bias = np.zeros((nvar, 1))
        kx = x.shape[1]
        bias[:kx] = lmbda_12_dot.T
        zpydot = z.T @ y_dot - nobs * bias

        params = np.squeeze(np.linalg.solve(zpz, zpydot))
        omega_11 = omega[:1, :1]
        scale = 1.0 if not df_adjust else nobs / (nobs - nvar)
        omega_112 = scale * (omega_11 - omega_12 @ omega_22_inv @ omega_12.T)
        zpz_inv = np.linalg.inv(zpz)
        param_cov = omega_112 * zpz_inv
        cols = z_df.columns
        params_s = pd.Series(params.squeeze(), index=cols, name="params")
        param_cov = pd.DataFrame(param_cov, columns=cols, index=cols)
        resid, r2, r2_adj = self._final_statistics(params_s)
        resid_kern = KERNEL_ESTIMATORS[kernel](
            resid, bandwidth=cov_est.bandwidth, force_int=cov_est.force_int
        )
        return CointegrationAnalysisResults(
            params_s,
            param_cov,
            resid,
            omega_112[0, 0],
            resid_kern,
            kx,
            self._trend,
            df_adjust,
            r2,
            r2_adj,
            "Fully Modified OLS",
        )


@Substitution(method=CCR_METHOD, estimator=CCR_ESTIMATOR)
@Appender(COMMON_DOCSTRING)
class CanonicalCointegratingReg(FullyModifiedOLS):
    def __init__(
        self,
        y: ArrayLike1D,
        x: ArrayLike2D,
        trend: UnitRootTrend = "c",
        x_trend: UnitRootTrend | None = None,
    ) -> None:
        super().__init__(y, x, trend, x_trend)

    @Appender(FullyModifiedOLS.fit.__doc__)
    def fit(
        self,
        kernel: str = "bartlett",
        bandwidth: float | None = None,
        force_int: bool = True,
        diff: bool = False,
        df_adjust: bool = False,
    ) -> CointegrationAnalysisResults:
        cov_est, eta, beta = self._common_fit(kernel, bandwidth, force_int, diff)
        omega = np.asarray(cov_est.cov.long_run)
        lmbda = np.asarray(cov_est.cov.one_sided)
        sigma = np.asarray(cov_est.cov.short_run)

        lmbda2 = lmbda[:, 1:]
        sigma_inv = np.linalg.inv(sigma)
        y, x = np.asarray(self._y_df), np.asarray(self._x)
        x_star = x[1:] - eta @ (sigma_inv @ lmbda2)

        kx = x.shape[1]
        omega_12 = omega[:1, 1:]
        omega_22 = omega[1:, 1:]
        omega_22_inv = np.linalg.inv(omega_22)
        bias = np.zeros((kx + 1, 1))
        bias[1:] = omega_22_inv @ omega_12.T
        # K x K        K by 1
        #  K by 1
        y_star = y[1:] - eta @ (sigma_inv @ lmbda2 @ beta[:, None] + bias)
        z_star = add_trend(x_star, trend=self._trend)
        params = np.linalg.lstsq(z_star, y_star, rcond=None)[0]

        omega_11 = omega[:1, :1]
        nobs, nvar = z_star.shape
        scale = 1.0 if not df_adjust else nobs / (nobs - nvar)
        omega_112 = scale * omega_11 - omega_12 @ omega_22_inv @ omega_12.T
        param_cov = omega_112 * np.linalg.inv(z_star.T @ z_star)
        with_trend = add_trend(self._x.iloc[:10], self._trend)
        assert isinstance(with_trend, pd.DataFrame)
        cols = with_trend.columns
        params = pd.Series(params.squeeze(), index=cols, name="params")
        param_cov = pd.DataFrame(param_cov, columns=cols, index=cols)
        resid, r2, r2_adj = self._final_statistics(params)
        resid_kern = KERNEL_ESTIMATORS[kernel](
            resid, bandwidth=cov_est.bandwidth, force_int=cov_est.force_int
        )
        return CointegrationAnalysisResults(
            params,
            param_cov,
            resid,
            omega_112[0, 0],
            resid_kern,
            kx,
            self._trend,
            df_adjust,
            r2,
            r2_adj,
            "Fully Modified OLS",
        )
back to top