diff --git a/src/ruptures/base.py b/src/ruptures/base.py index 5becb9db..ec166c09 100644 --- a/src/ruptures/base.py +++ b/src/ruptures/base.py @@ -5,6 +5,7 @@ """ import abc +from typing_extensions import Self from ruptures.utils import pairwise @@ -18,17 +19,17 @@ class BaseEstimator(metaclass=abc.ABCMeta): """ @abc.abstractmethod - def fit(self, *args, **kwargs): + def fit(self, *args, **kwargs) -> Self: """To call the segmentation algorithm.""" pass @abc.abstractmethod - def predict(self, *args, **kwargs): + def predict(self, *args, **kwargs) -> list[int]: """To call the segmentation algorithm.""" pass @abc.abstractmethod - def fit_predict(self, *args, **kwargs): + def fit_predict(self, *args, **kwargs) -> list[int]: """To call the segmentation algorithm.""" pass @@ -43,17 +44,17 @@ class BaseCost(object, metaclass=abc.ABCMeta): """ @abc.abstractmethod - def fit(self, *args, **kwargs): + def fit(self, *args, **kwargs) -> Self: """Set the parameters of the cost function, for instance the Gram matrix, etc.""" pass @abc.abstractmethod - def error(self, start, end): + def error(self, start: int, end: int) -> float: """Returns the cost on segment [start:end].""" pass - def sum_of_costs(self, bkps): + def sum_of_costs(self, bkps: list[int]) -> float: """Returns the sum of segments cost for the given segmentation. Args: @@ -67,5 +68,5 @@ def sum_of_costs(self, bkps): @property @abc.abstractmethod - def model(self): + def model(self) -> str: pass diff --git a/src/ruptures/costs/costautoregressive.py b/src/ruptures/costs/costautoregressive.py index 5086ee0e..b8326b44 100644 --- a/src/ruptures/costs/costautoregressive.py +++ b/src/ruptures/costs/costautoregressive.py @@ -1,7 +1,9 @@ import numpy as np from numpy.lib.stride_tricks import as_strided from numpy.linalg import lstsq +from numpy.typing import NDArray from copy import deepcopy +from typing_extensions import Self from ruptures.base import BaseCost from ruptures.costs import NotEnoughPoints @@ -12,7 +14,7 @@ class CostAR(BaseCost): model = "ar" - def __init__(self, order=4): + def __init__(self, order: int = 4) -> None: """Initialize the object. Args: @@ -23,7 +25,7 @@ def __init__(self, order=4): self.min_size = max(5, order + 1) self.order = order - def fit(self, signal): + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. The signal must be 1D. Args: @@ -49,7 +51,7 @@ def fit(self, signal): self.signal[: self.order] = self.signal[self.order] return self - def error(self, start, end): + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costclinear.py b/src/ruptures/costs/costclinear.py index 5858d5fc..89f6baeb 100644 --- a/src/ruptures/costs/costclinear.py +++ b/src/ruptures/costs/costclinear.py @@ -1,6 +1,8 @@ r"""Continuous linear change.""" import numpy as np +from numpy.typing import NDArray +from typing_extensions import Self from ruptures.base import BaseCost from ruptures.costs import NotEnoughPoints @@ -11,12 +13,12 @@ class CostCLinear(BaseCost): model = "clinear" - def __init__(self): + def __init__(self) -> None: """Initialize the object.""" self.signal = None self.min_size = 3 - def fit(self, signal) -> "CostCLinear": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -32,7 +34,7 @@ def fit(self, signal) -> "CostCLinear": return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costcosine.py b/src/ruptures/costs/costcosine.py index dcacea76..bc2fee2b 100644 --- a/src/ruptures/costs/costcosine.py +++ b/src/ruptures/costs/costcosine.py @@ -1,6 +1,8 @@ r"""CostCosine (kernel change point detection with the cosine similarity)""" +from typing_extensions import Self import numpy as np +from numpy.typing import NDArray from ruptures.base import BaseCost from ruptures.costs import NotEnoughPoints from scipy.spatial.distance import pdist, squareform @@ -11,14 +13,14 @@ class CostCosine(BaseCost): model = "cosine" - def __init__(self): + def __init__(self) -> None: """Initialize the object.""" self.signal = None self.min_size = 1 self._gram = None @property - def gram(self): + def gram(self) -> NDArray[np.number]: """Generate the gram matrix (lazy loading). Only access this function after a `.fit()` (otherwise @@ -28,7 +30,7 @@ def gram(self): self._gram = squareform(1 - pdist(self.signal, metric="cosine")) return self._gram - def fit(self, signal) -> "CostCosine": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -43,7 +45,7 @@ def fit(self, signal) -> "CostCosine": self.signal = signal return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costl1.py b/src/ruptures/costs/costl1.py index 80c3a6f9..08f16702 100644 --- a/src/ruptures/costs/costl1.py +++ b/src/ruptures/costs/costl1.py @@ -1,6 +1,9 @@ r"""CostL1 (least absolute deviation)""" +from typing_extensions import Self + import numpy as np +from numpy.typing import NDArray from ruptures.base import BaseCost from ruptures.costs import NotEnoughPoints @@ -16,7 +19,7 @@ def __init__(self) -> None: self.signal = None self.min_size = 2 - def fit(self, signal) -> "CostL1": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -32,7 +35,7 @@ def fit(self, signal) -> "CostL1": return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costl2.py b/src/ruptures/costs/costl2.py index f1d81f52..6d286ae0 100644 --- a/src/ruptures/costs/costl2.py +++ b/src/ruptures/costs/costl2.py @@ -1,5 +1,10 @@ r"""CostL2 (least squared deviation)""" +from numpy.typing import NDArray +import numpy as np +from typing_extensions import Self + + from ruptures.costs import NotEnoughPoints from ruptures.base import BaseCost @@ -10,12 +15,12 @@ class CostL2(BaseCost): model = "l2" - def __init__(self): + def __init__(self) -> None: """Initialize the object.""" self.signal = None self.min_size = 1 - def fit(self, signal) -> "CostL2": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -31,7 +36,7 @@ def fit(self, signal) -> "CostL2": return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costlinear.py b/src/ruptures/costs/costlinear.py index 7464288a..d0fca31e 100644 --- a/src/ruptures/costs/costlinear.py +++ b/src/ruptures/costs/costlinear.py @@ -1,6 +1,10 @@ r"""Linear model change.""" +from typing_extensions import Self + +import numpy as np from numpy.linalg import lstsq +from numpy.typing import NDArray from ruptures.base import BaseCost from ruptures.costs import NotEnoughPoints @@ -11,13 +15,13 @@ class CostLinear(BaseCost): model = "linear" - def __init__(self): + def __init__(self) -> None: """Initialize the object.""" self.signal = None self.covar = None self.min_size = 2 - def fit(self, signal) -> "CostLinear": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. The first column contains the observed variable. The other columns contains the covariates. @@ -33,7 +37,7 @@ def fit(self, signal) -> "CostLinear": self.covar = signal[:, 1:] return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costml.py b/src/ruptures/costs/costml.py index 2c91a448..e8064df1 100644 --- a/src/ruptures/costs/costml.py +++ b/src/ruptures/costs/costml.py @@ -1,7 +1,11 @@ r"""Change detection with a Mahalanobis-type metric.""" import numpy as np +from numpy.typing import NDArray + from numpy.linalg import inv +from typing import Optional +from typing_extensions import Self from ruptures.base import BaseCost from ruptures.exceptions import NotEnoughPoints @@ -12,7 +16,7 @@ class CostMl(BaseCost): model = "mahalanobis" - def __init__(self, metric=None): + def __init__(self, metric: Optional[NDArray[np.number]] = None) -> None: """Create a new instance. Args: @@ -25,7 +29,7 @@ def __init__(self, metric=None): self.gram = None self.min_size = 2 - def fit(self, signal) -> "CostMl": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -47,7 +51,7 @@ def fit(self, signal) -> "CostMl": return self - def error(self, start, end): + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costnormal.py b/src/ruptures/costs/costnormal.py index 9e69803e..9504697e 100644 --- a/src/ruptures/costs/costnormal.py +++ b/src/ruptures/costs/costnormal.py @@ -1,5 +1,8 @@ r"""Gaussian process changes (CostNormal)""" +from typing import Optional +from numpy.typing import NDArray +from typing_extensions import Self import warnings import numpy as np @@ -13,7 +16,7 @@ class CostNormal(BaseCost): model = "normal" - def __init__(self, add_small_diag=True): + def __init__(self, add_small_diag: Optional[bool] = True) -> None: """Initialize the object. Args: @@ -32,7 +35,7 @@ def __init__(self, add_small_diag=True): UserWarning, ) - def fit(self, signal) -> "CostNormal": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -49,7 +52,7 @@ def fit(self, signal) -> "CostNormal": self.n_samples, self.n_dims = self.signal.shape return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costrank.py b/src/ruptures/costs/costrank.py index 322a7ea6..3e471b18 100644 --- a/src/ruptures/costs/costrank.py +++ b/src/ruptures/costs/costrank.py @@ -1,7 +1,9 @@ r"""Rank-based cost function (CostRank)""" +from typing_extensions import Self import numpy as np from numpy.linalg import pinv, LinAlgError +from numpy.typing import NDArray from scipy.stats.mstats import rankdata from ruptures.base import BaseCost @@ -13,13 +15,13 @@ class CostRank(BaseCost): model = "rank" - def __init__(self): + def __init__(self) -> None: """Initialize the object.""" self.inv_cov = None self.ranks = None self.min_size = 2 - def fit(self, signal) -> "CostRank": + def fit(self, signal: NDArray[np.number]) -> Self: """Set parameters of the instance. Args: @@ -55,7 +57,7 @@ def fit(self, signal) -> "CostRank": return self - def error(self, start, end): + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/costrbf.py b/src/ruptures/costs/costrbf.py index 788b86ca..a247061e 100644 --- a/src/ruptures/costs/costrbf.py +++ b/src/ruptures/costs/costrbf.py @@ -1,6 +1,9 @@ r"""Kernelized mean change.""" +from typing import Optional +from typing_extensions import Self import numpy as np +from numpy.typing import NDArray from scipy.spatial.distance import pdist, squareform from ruptures.exceptions import NotEnoughPoints @@ -12,14 +15,14 @@ class CostRbf(BaseCost): model = "rbf" - def __init__(self, gamma=None): + def __init__(self, gamma: Optional[float] = None) -> None: """Initialize the object.""" self.min_size = 1 self.gamma = gamma self._gram = None @property - def gram(self): + def gram(self) -> NDArray[np.number]: """Generate the gram matrix (lazy loading). Only access this function after a `.fit()` (otherwise @@ -39,7 +42,7 @@ def gram(self): self._gram = np.exp(squareform(-K)) return self._gram - def fit(self, signal) -> "CostRbf": + def fit(self, signal: NDArray[np.number]) -> Self: """Sets parameters of the instance. Args: @@ -61,7 +64,7 @@ def fit(self, signal) -> "CostRbf": return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/src/ruptures/costs/factory.py b/src/ruptures/costs/factory.py index 8f1dab74..803a7f8c 100644 --- a/src/ruptures/costs/factory.py +++ b/src/ruptures/costs/factory.py @@ -3,7 +3,7 @@ from ruptures.base import BaseCost -def cost_factory(model, *args, **kwargs): +def cost_factory(model: str, *args, **kwargs) -> BaseCost: for cls in BaseCost.__subclasses__(): if cls.model == model: return cls(*args, **kwargs) diff --git a/src/ruptures/datasets/pw_constant.py b/src/ruptures/datasets/pw_constant.py index 429ba271..88006857 100644 --- a/src/ruptures/datasets/pw_constant.py +++ b/src/ruptures/datasets/pw_constant.py @@ -1,13 +1,20 @@ """Piecewise constant signal (with noise)""" +from typing import Optional import numpy as np +from numpy.typing import NDArray from ruptures.utils import draw_bkps def pw_constant( - n_samples=200, n_features=1, n_bkps=3, noise_std=None, delta=(1, 10), seed=None -): + n_samples: int = 200, + n_features: int = 1, + n_bkps: int = 3, + noise_std: Optional[float] = None, + delta: tuple[int, int] = (1, 10), + seed: Optional[int] = None, +) -> tuple[NDArray[np.number], list[int]]: """Return a piecewise constant signal and the associated changepoints. Args: diff --git a/src/ruptures/datasets/pw_linear.py b/src/ruptures/datasets/pw_linear.py index 1a71976b..5128b0e4 100644 --- a/src/ruptures/datasets/pw_linear.py +++ b/src/ruptures/datasets/pw_linear.py @@ -1,11 +1,19 @@ r"""Shift in linear model.""" import numpy as np +from typing import Optional +from numpy.typing import NDArray from . import pw_constant -def pw_linear(n_samples=200, n_features=1, n_bkps=3, noise_std=None, seed=None): +def pw_linear( + n_samples: int = 200, + n_features: int = 1, + n_bkps: int = 3, + noise_std: Optional[float] = None, + seed: Optional[int] = None, +) -> tuple[NDArray[np.floating], list[int]]: """Return piecewise linear signal and the associated changepoints. Args: diff --git a/src/ruptures/datasets/pw_normal.py b/src/ruptures/datasets/pw_normal.py index ab2149ae..d2008a32 100644 --- a/src/ruptures/datasets/pw_normal.py +++ b/src/ruptures/datasets/pw_normal.py @@ -1,13 +1,17 @@ """2D piecewise Gaussian process (pw_normal)""" from itertools import cycle +from typing import Optional import numpy as np +from numpy.typing import NDArray from ruptures.utils import draw_bkps -def pw_normal(n_samples=200, n_bkps=3, seed=None): +def pw_normal( + n_samples: int = 200, n_bkps: int = 3, seed: Optional[int] = None +) -> tuple[NDArray[np.floating], list[int]]: """Return a 2D piecewise Gaussian signal and the associated changepoints. Args: diff --git a/src/ruptures/datasets/pw_wavy.py b/src/ruptures/datasets/pw_wavy.py index b55eb6e0..e153b6f7 100644 --- a/src/ruptures/datasets/pw_wavy.py +++ b/src/ruptures/datasets/pw_wavy.py @@ -1,13 +1,20 @@ """Piecewise sinusoidal (pw_wavy)""" from itertools import cycle +from typing import Optional import numpy as np +from numpy.typing import NDArray from ruptures.utils import draw_bkps -def pw_wavy(n_samples=200, n_bkps=3, noise_std=None, seed=None): +def pw_wavy( + n_samples: int = 200, + n_bkps: int = 3, + noise_std: Optional[float] = None, + seed: Optional[int] = None, +) -> tuple[NDArray[np.floating], list[int]]: """Return a 1D piecewise wavy signal and the associated changepoints. Args: diff --git a/src/ruptures/detection/binseg.py b/src/ruptures/detection/binseg.py index ecdae799..86fa825c 100644 --- a/src/ruptures/detection/binseg.py +++ b/src/ruptures/detection/binseg.py @@ -1,8 +1,11 @@ r"""Binary segmentation.""" from functools import lru_cache +from typing import Any, Optional, Union +from typing_extensions import Self import numpy as np +from numpy.typing import NDArray from ruptures.base import BaseCost, BaseEstimator from ruptures.costs import cost_factory from ruptures.exceptions import BadSegmentationParameters @@ -12,7 +15,14 @@ class Binseg(BaseEstimator): """Binary segmentation.""" - def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): + def __init__( + self, + model: str = "l2", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 5, + params: Optional[dict[str, Any]] = None, + ) -> None: """Initialize a Binseg instance. Args: @@ -34,7 +44,12 @@ def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None self.n_samples = None self.signal = None - def _seg(self, n_bkps=None, pen=None, epsilon=None): + def _seg( + self, + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> dict[tuple[int, int], float]: """Computes the binary segmentation. The stopping rule depends on the parameter passed to the function. @@ -81,7 +96,7 @@ def _seg(self, n_bkps=None, pen=None, epsilon=None): return partition @lru_cache(maxsize=None) - def single_bkp(self, start, end): + def single_bkp(self, start: int, end: int) -> tuple[Union[int, None], float]: """Return the optimal breakpoint of [start:end] (if it exists).""" segment_cost = self.cost.error(start, end) if np.isinf(segment_cost) and segment_cost < 0: # if cost is -inf @@ -101,7 +116,7 @@ def single_bkp(self, start, end): return None, 0 return bkp, gain - def fit(self, signal) -> "Binseg": + def fit(self, signal: NDArray[np.number]) -> Self: """Compute params to segment signal. Args: @@ -121,7 +136,12 @@ def fit(self, signal) -> "Binseg": return self - def predict(self, n_bkps=None, pen=None, epsilon=None): + def predict( + self, + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the @@ -157,7 +177,13 @@ def predict(self, n_bkps=None, pen=None, epsilon=None): bkps = sorted(e for s, e in partition.keys()) return bkps - def fit_predict(self, signal, n_bkps=None, pen=None, epsilon=None): + def fit_predict( + self, + signal: NDArray[np.number], + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once diff --git a/src/ruptures/detection/bottomup.py b/src/ruptures/detection/bottomup.py index 7ec34afa..ffec0b7c 100644 --- a/src/ruptures/detection/bottomup.py +++ b/src/ruptures/detection/bottomup.py @@ -3,7 +3,11 @@ import heapq from bisect import bisect_left from functools import lru_cache +from typing import Optional, Any +from typing_extensions import Self +import numpy as np +from numpy.typing import NDArray from ruptures.base import BaseCost, BaseEstimator from ruptures.costs import cost_factory from ruptures.utils import Bnode, pairwise, sanity_check @@ -13,7 +17,14 @@ class BottomUp(BaseEstimator): """Bottom-up segmentation.""" - def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): + def __init__( + self, + model: str = "l2", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 5, + params: Optional[dict[str, Any]] = None, + ) -> None: """Initialize a BottomUp instance. Args: @@ -66,7 +77,7 @@ def _grow_tree(self): return leaves @lru_cache(maxsize=None) - def merge(self, left, right): + def merge(self, left: Bnode, right: Bnode) -> Bnode: """Merge two contiguous segments.""" assert left.end == right.start, "Segments are not contiguous." start, end = left.start, right.end @@ -74,7 +85,12 @@ def merge(self, left, right): node = Bnode(start, end, val, left=left, right=right) return node - def _seg(self, n_bkps=None, pen=None, epsilon=None): + def _seg( + self, + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ): """Compute the bottom-up segmentation. The stopping rule depends on the parameter passed to the function. @@ -144,7 +160,7 @@ def _seg(self, n_bkps=None, pen=None, epsilon=None): partition = {(leaf.start, leaf.end): leaf.val for leaf in leaves} return partition - def fit(self, signal) -> "BottomUp": + def fit(self, signal: NDArray[np.number]) -> Self: """Compute params to segment signal. Args: @@ -164,7 +180,12 @@ def fit(self, signal) -> "BottomUp": self.leaves = self._grow_tree() return self - def predict(self, n_bkps=None, pen=None, epsilon=None): + def predict( + self, + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed @@ -200,7 +221,13 @@ def predict(self, n_bkps=None, pen=None, epsilon=None): bkps = sorted(e for s, e in partition.keys()) return bkps - def fit_predict(self, signal, n_bkps=None, pen=None, epsilon=None): + def fit_predict( + self, + signal: NDArray[np.number], + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once diff --git a/src/ruptures/detection/dynp.py b/src/ruptures/detection/dynp.py index 8afa935d..9511f9ea 100644 --- a/src/ruptures/detection/dynp.py +++ b/src/ruptures/detection/dynp.py @@ -1,7 +1,11 @@ r"""Dynamic programming.""" from functools import lru_cache +from typing import Any, Optional +from typing_extensions import Self +import numpy as np +from numpy.typing import NDArray from ruptures.utils import sanity_check from ruptures.costs import cost_factory from ruptures.base import BaseCost, BaseEstimator @@ -15,7 +19,14 @@ class Dynp(BaseEstimator): sum of errors is minimum. """ - def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): + def __init__( + self, + model: str = "l2", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 5, + params: Optional[dict[str, Any]] = None, + ) -> None: """Creates a Dynp instance. Args: @@ -38,7 +49,7 @@ def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None self.n_samples = None @lru_cache(maxsize=None) - def seg(self, start, end, n_bkps): + def seg(self, start: int, end: int, n_bkps: int) -> dict[tuple[int, int], float]: """Recurrence to find the optimal partition of signal[start:end]. This method is to be memoized and then used. @@ -92,7 +103,7 @@ def seg(self, start, end, n_bkps): # Find the optimal partition return min(sub_problems, key=lambda d: sum(d.values())) - def fit(self, signal) -> "Dynp": + def fit(self, signal: NDArray[np.number]) -> Self: """Create the cache associated with the signal. Dynamic programming is a recurrence; intermediate results are cached to speed up @@ -111,7 +122,7 @@ def fit(self, signal) -> "Dynp": self.n_samples = signal.shape[0] return self - def predict(self, n_bkps): + def predict(self, n_bkps: int) -> list[int]: """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed @@ -139,7 +150,7 @@ def predict(self, n_bkps): bkps = sorted(e for s, e in partition.keys()) return bkps - def fit_predict(self, signal, n_bkps): + def fit_predict(self, signal: NDArray[np.number], n_bkps: int) -> list[int]: """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once diff --git a/src/ruptures/detection/kernelcpd.py b/src/ruptures/detection/kernelcpd.py index 970ac7f8..a465c994 100644 --- a/src/ruptures/detection/kernelcpd.py +++ b/src/ruptures/detection/kernelcpd.py @@ -5,6 +5,9 @@ from ruptures.utils import from_path_matrix_to_bkps_list, sanity_check from ruptures.exceptions import BadSegmentationParameters import numpy as np +from numpy.typing import NDArray +from typing import Optional, Any +from typing_extensions import Self from ._detection.ekcpd import ( ekcpd_cosine, @@ -27,7 +30,13 @@ class KernelCPD(BaseEstimator): more information. """ - def __init__(self, kernel="linear", min_size=2, jump=1, params=None): + def __init__( + self, + kernel: str = "linear", + min_size: int = 2, + jump: int = 1, + params: Optional[dict[str, Any]] = None, + ) -> None: r"""Creates a KernelCPD instance. Available kernels: @@ -62,7 +71,7 @@ def __init__(self, kernel="linear", min_size=2, jump=1, params=None): self.n_samples = None self.segmentations_dict = dict() # {n_bkps: bkps_list} - def fit(self, signal) -> "KernelCPD": + def fit(self, signal: NDArray[np.number]) -> Self: """Update some parameters (no computation in this function). Args: @@ -77,7 +86,9 @@ def fit(self, signal) -> "KernelCPD": self.n_samples = signal.shape[0] return self - def predict(self, n_bkps=None, pen=None): + def predict( + self, n_bkps: Optional[int] = None, pen: Optional[float] = None + ) -> list[int]: """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed to @@ -149,7 +160,12 @@ def predict(self, n_bkps=None, pen=None): ind = path_matrix[ind] return my_bkps[::-1] - def fit_predict(self, signal, n_bkps=None, pen=None): + def fit_predict( + self, + signal: NDArray[np.number], + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + ) -> list[int]: """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once diff --git a/src/ruptures/detection/pelt.py b/src/ruptures/detection/pelt.py index ec8ea657..f2a5b7fa 100644 --- a/src/ruptures/detection/pelt.py +++ b/src/ruptures/detection/pelt.py @@ -1,7 +1,11 @@ r"""Pelt.""" from math import floor +from typing import Any, Optional +from typing_extensions import Self +import numpy as np +from numpy.typing import NDArray from ruptures.costs import cost_factory from ruptures.base import BaseCost, BaseEstimator from ruptures.exceptions import BadSegmentationParameters @@ -15,7 +19,14 @@ class Pelt(BaseEstimator): minimizes the constrained sum of approximation errors. """ - def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): + def __init__( + self, + model="l2", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 5, + params: Optional[dict[str, Any]] = None, + ) -> None: """Initialize a Pelt instance. Args: @@ -36,7 +47,7 @@ def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None self.jump = jump self.n_samples = None - def _seg(self, pen): + def _seg(self, pen: float) -> dict[tuple[int, int], float]: """Computes the segmentation for a given penalty using PELT (or a list of penalties). @@ -85,7 +96,7 @@ def _seg(self, pen): del best_partition[(0, 0)] return best_partition - def fit(self, signal) -> "Pelt": + def fit(self, signal: NDArray[np.number]) -> Self: """Set params. Args: @@ -103,7 +114,7 @@ def fit(self, signal) -> "Pelt": self.n_samples = n_samples return self - def predict(self, pen): + def predict(self, pen: float) -> list[int]: """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed @@ -132,7 +143,7 @@ def predict(self, pen): bkps = sorted(e for s, e in partition.keys()) return bkps - def fit_predict(self, signal, pen): + def fit_predict(self, signal: NDArray[np.number], pen: float) -> list[int]: """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once diff --git a/src/ruptures/detection/window.py b/src/ruptures/detection/window.py index 354ac562..dc669279 100644 --- a/src/ruptures/detection/window.py +++ b/src/ruptures/detection/window.py @@ -1,6 +1,9 @@ r"""Window-based change point detection.""" +from typing import Any, Optional +from typing_extensions import Self import numpy as np +from numpy.typing import NDArray from scipy.signal import argrelmax from ruptures.base import BaseCost, BaseEstimator @@ -13,8 +16,14 @@ class Window(BaseEstimator): """Window sliding method.""" def __init__( - self, width=100, model="l2", custom_cost=None, min_size=2, jump=5, params=None - ): + self, + width: int = 100, + model: str = "l2", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 5, + params: Optional[dict[str, Any]] = None, + ) -> None: """Instanciate with window length. Args: @@ -40,7 +49,12 @@ def __init__( self.cost = cost_factory(model=model, **params) self.score = list() - def _seg(self, n_bkps=None, pen=None, epsilon=None): + def _seg( + self, + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Sequential peak search. The stopping rule depends on the parameter passed to the function. @@ -99,7 +113,7 @@ def _seg(self, n_bkps=None, pen=None, epsilon=None): return bkps - def fit(self, signal) -> "Window": + def fit(self, signal: NDArray[np.number]) -> Self: """Compute params to segment signal. Args: @@ -136,7 +150,12 @@ def fit(self, signal) -> "Window": self.score = np.array(score) return self - def predict(self, n_bkps=None, pen=None, epsilon=None): + def predict( + self, + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed @@ -171,7 +190,13 @@ def predict(self, n_bkps=None, pen=None, epsilon=None): bkps = self._seg(n_bkps=n_bkps, pen=pen, epsilon=epsilon) return bkps - def fit_predict(self, signal, n_bkps=None, pen=None, epsilon=None): + def fit_predict( + self, + signal: NDArray[np.number], + n_bkps: Optional[int] = None, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ) -> list[int]: """Helper method to call fit and predict once.""" self.fit(signal) return self.predict(n_bkps=n_bkps, pen=pen, epsilon=epsilon) diff --git a/src/ruptures/utils/bnode.py b/src/ruptures/utils/bnode.py index a4c6f304..f99a0305 100644 --- a/src/ruptures/utils/bnode.py +++ b/src/ruptures/utils/bnode.py @@ -1,6 +1,9 @@ """Binary node.""" +from __future__ import annotations + import functools +from typing import Optional import numpy as np @@ -11,7 +14,15 @@ class Bnode: In binary segmentation, each segment [start, end) is a binary node. """ - def __init__(self, start, end, val, left=None, right=None, parent=None): + def __init__( + self, + start: int, + end: int, + val: np.number, + left: Optional[Bnode] = None, + right: Optional[Bnode] = None, + parent: Optional[Bnode] = None, + ) -> None: self.start = start self.end = end self.val = val @@ -20,7 +31,7 @@ def __init__(self, start, end, val, left=None, right=None, parent=None): self.parent = parent @property - def gain(self): + def gain(self) -> float: """Return the cost decrease when splitting this node.""" if self.left is None or self.right is None: return 0 @@ -28,15 +39,15 @@ def gain(self): return 0 return self.val - (self.left.val + self.right.val) - def __lt__(self, other): + def __lt__(self, other: object) -> bool: return self.start < other.start - def __eq__(self, other): + def __eq__(self, other: object) -> bool: return ( isinstance(other, self.__class__) and self.start == other.start and self.end == other.end ) - def __hash__(self): + def __hash__(self) -> int: return hash((self.__class__, self.start, self.end)) diff --git a/src/ruptures/utils/drawbkps.py b/src/ruptures/utils/drawbkps.py index 5789c125..073e5b56 100644 --- a/src/ruptures/utils/drawbkps.py +++ b/src/ruptures/utils/drawbkps.py @@ -1,9 +1,12 @@ r"""Draw a random partition.""" +from typing import Optional import numpy as np -def draw_bkps(n_samples=100, n_bkps=3, seed=None): +def draw_bkps( + n_samples: int = 100, n_bkps: int = 3, seed: Optional[int] = None +) -> list[int]: """Draw a random partition with specified number of samples and specified number of changes.""" rng = np.random.default_rng(seed=seed) diff --git a/src/ruptures/utils/utils.py b/src/ruptures/utils/utils.py index 6db4fc7b..f9dd0661 100644 --- a/src/ruptures/utils/utils.py +++ b/src/ruptures/utils/utils.py @@ -1,22 +1,25 @@ """Miscellaneous functions for ruptures.""" +from typing import Iterable, TypeVar, Iterator from itertools import tee from math import ceil +T = TypeVar("T") -def pairwise(iterable): + +def pairwise(iterable: Iterable[T]) -> Iterator[tuple[T, T]]: """S -> (s0,s1), (s1,s2), (s2, s3), ...""" a, b = tee(iterable) next(b, None) return zip(a, b) -def unzip(seq): +def unzip(seq: Iterable[tuple[T, ...]]) -> Iterator[tuple[T, ...]]: """Reverse zip.""" return zip(*seq) -def sanity_check(n_samples, n_bkps, jump, min_size): +def sanity_check(n_samples: int, n_bkps: int, jump: int, min_size: int) -> bool: """Check if a partition if possible given some segmentation parameters. Args: