Online Changepoint Detection in Univariate and Multivariate Data Streams using FOCUS algorithms
Project description
focus_cpt
- Features
- Installation
- Quick Start: Offline vs Online Usage
- Available Functions
- Notes
- Examples
- Flexibility: Statistics Independent of Detector Type
- C++ Integration
- Performance Comparison
- References
- Authors and Contributors
- License
import numpy as np
import matplotlib.pyplot as plt
from focus_cpt import focus_offline, Detector
focus_cpt is a high-performance Python package for online changepoint detection in univariate and multivariate data streams. It provides efficient C++ implementations of the FOCuS and md-FOCuS algorithms with Python bindings for real-time monitoring and offline analysis.
Features
- Multiple distributions: supports Gaussian change-in-mean, Poisson change-in-rate, Gamma/Exponential change-in-scale, Bernoulli change-in-probability, and Non-parametric detectors. New models and cost functions are easy to extend.
- Univariate and multivariate detection
- Known or unknown pre-change parameters
- One-sided and two-sided detection
- C++ backend optimized for speed and scalability
Installation
You can install the development version of focus from source with:
pip install git+https://github.com/gtromano/unified_focus.git#subdirectory=focus_cpt
Or, if you have the local source directory:
pip install path/to/focus_cpt
Quick Start: Offline vs Online Usage
The Python bindings provide two complementary modes:
Offline Mode (focus_offline)
All computations occur inside C++ for maximum efficiency. This mode is ideal for benchmarking, batch processing, and full statistic trajectories.
Key behaviors:
- Stops immediately when the threshold is exceeded.
- Use
threshold=np.infto compute statistics for all samples (useful for visualization).
np.random.seed(123)
Y = np.concatenate([np.random.normal(0, 1, 500), np.random.normal(2, 1, 500)])
# Offline detection (entirely in C++)
res = focus_offline(Y, threshold=20, type="univariate", family="gaussian")
plt.plot(res["stat"], lw=2)
plt.title("FOCuS Detection Statistic (Offline)")
plt.xlabel("Time")
plt.ylabel("Statistic")
plt.axhline(res["threshold"], color="red", linestyle="--", lw=2)
plt.show()
Online Mode (Sequential Updates)
The online implementation allows step-by-step updates in Python. It’s slower (each update crosses the Python–C++ boundary) but supports real-time or adaptive control.
# Create detector
detector = Detector(type='univariate')
# Update sequentially
stat_trace = []
threshold = 20.0
for i, y in enumerate(Y, start=1):
detector.update(y)
result = detector.get_statistics(family='gaussian')
stat_trace.append(result['stat'])
if result['stat'] > threshold:
print("Detection at time", i, "with changepoint estimate τ =", result['changepoint'])
break
# Plot results
plt.plot(stat_trace)
plt.title("FOCuS Detection Statistic (Online)")
plt.xlabel("Time")
plt.ylabel("Statistic")
plt.axhline(threshold, color="red", linestyle="--", linewidth=2)
plt.show()
Detection at time 504 with changepoint estimate τ = 500
Both modes yield the same statistical results. The offline mode is typically tens to hundreds of times faster.
Available Functions
Offline Mode Interface
focus_offline(Y, threshold, type, family, ...)
Run the FOCuS detector in batch/offline mode with all cycles handled in C++ for maximum efficiency. Stops at detection by default; usethreshold = np.infto compute statistics for all observations.Y: Observation data (1D array or 2D array)threshold: Detection threshold(s). Can be:- Scalar: Single threshold applied to all statistics
- Vector: (In case of multiple values returned per statistics, see Notes below)
type: One of"univariate","univariate_one_sided","multivariate","npfocus", or"arp"family: Distribution family -"gaussian","poisson","bernoulli","gamma","npfocus", or"arp"theta0: (Optional) Baseline parameter vector for cost computationshape: (Optional) Shape parameter forfamily = "gamma"(required for gamma)dim_indexes: (Optional) List of dimension index vectors for multivariate projectionsquantiles: (Optional) Quantile vector fortype = "npfocus"rho: (Optional) AR coefficients array fortype = "arp"mu0_arp: (Optional) Pre-change mean fortype = "arp"pruning_mult,pruning_offset: Pruning parameters (default: 2, 1)side: Pruning side -"right"or"left"(default:"right")- Returns: dict-like object with
stat(2D numpy array where each row is a time and each column is a statistic),changepoint,detection_time,detected_changepoint,candidates,threshold,n,type,family, andshape(if gamma)
Online/Sequential Interface
-
Detector(type, ...)
Create a new online detector object. Returns a Detector instance.type: One of"univariate","univariate_one_sided","multivariate","npfocus", or"arp"dim_indexes: (Optional) List of dimension index vectors for multivariate projectionsquantiles: (Optional) Quantile vector fortype = "npfocus"rho: (Optional) AR coefficients array fortype = "arp"mu0_arp: (Optional) Pre-change mean fortype = "arp"pruning_mult,pruning_offset: Pruning parameters (default: 2, 1)side: Pruning side -"right"or"left"(default:"right")
-
Detector.update(y)
Update the detector with a new observation vectory.y: scalar (univariate) or 1D numpy array (multivariate)
-
Detector.get_statistics(family, theta0=None, shape=None)
Compute changepoint statistics for the current state. Returns a dict-like result with keysstopping_time,changepoint, andstat. -
Inspection helpers:
Detector.get_n_candidates()- Get number of candidate segmentsDetector.get_n()- Get number of observations processedDetector.get_sn()- Get cumulative sum stateDetector.get_candidates()- Get all candidate changepoints as a dict-like object
Notes
- Multiple Statistics: Some detectors (e.g.,
family = "npfocus") return multiple statistics. Infocus_offline(), thestatreturn value is a 2D array where each row corresponds to a time point and each column corresponds to a statistic.- For single-statistic families, the matrix has one column (use
res['stat'].flatten()when needed) - For multi-statistic families, use vectorized thresholds or a single threshold (with warning)
- For single-statistic families, the matrix has one column (use
- Gamma Family: When using
family = "gamma", you must provide a positiveshapeparameter. The gamma cost function assumes this shape is known.
Examples
Gaussian Univariate Detection
Unknown Pre-change Mean
np.random.seed(45)
Y = np.concatenate([np.random.normal(0, 1, 1000), np.random.normal(-1, 1, 500)])
res = focus_offline(Y, threshold=20, type="univariate", family="gaussian")
print("Detection time:", res["detection_time"])
print("Estimated changepoint:", res["detected_changepoint"])
plt.plot(res["stat"], lw=2)
plt.title("FOCuS: Unknown Pre-change Mean")
plt.xlabel("Time")
plt.ylabel("Statistic")
plt.axhline(res["threshold"], color="red", linestyle="--")
plt.axvline(res["detection_time"], color="blue", linestyle="--")
plt.axvline(1000, color="green", linestyle=":")
plt.show()
Detection time: 1023
Estimated changepoint: 1008
Known Pre-change Mean
np.random.seed(45)
theta0 = 0
Y = np.concatenate([np.random.normal(theta0, 1, 1000),
np.random.normal(theta0 - 1, 1, 500)])
res_known = focus_offline(Y, threshold=np.inf, type="univariate",
family="gaussian", theta0=theta0)
res_unknown = focus_offline(Y, threshold=np.inf, type="univariate",
family="gaussian")
plt.subplot(1, 2, 1)
plt.plot(res_known["stat"], lw=2)
plt.title("Known θ₀ = 0")
plt.axvline(1000, color="green", linestyle=":")
plt.subplot(1, 2, 2)
plt.plot(res_unknown["stat"], lw=2)
plt.title("Unknown θ₀")
plt.axvline(1000, color="green", linestyle=":")
plt.tight_layout()
plt.show()
One-sided Detection
np.random.seed(789)
Y = np.concatenate([np.random.normal(0, 1, 800),
np.random.normal(1.5, 1, 400)])
res_right = focus_offline(Y, threshold=30,
type="univariate_one_sided",
family="gaussian", side="right")
res_left = focus_offline(Y, threshold=30,
type="univariate_one_sided",
family="gaussian", side="left")
print("Right-sided detection:", res_right["detection_time"])
print("Left-sided detection:", res_left["detection_time"])
plt.subplot(1, 2, 1)
plt.plot(res_right["stat"], lw=2)
plt.axhline(30, color="red", linestyle="--")
plt.title("Right-sided (detects increases)")
plt.subplot(1, 2, 2)
plt.plot(res_left["stat"], lw=2)
plt.axhline(30, color="red", linestyle="--")
plt.title("Left-sided (no detection)")
plt.tight_layout()
plt.show()
Right-sided detection: 817
Left-sided detection: None
Anomaly Detection and Intensity Filtering
The anomaly_intensity parameter focuses on detecting anomalies
(epidemic changepoints) and more transient changes, while ignoring
longer, less intense changes. This is particularly useful when we have
an expected background rate, and we seek any deviations from it.
When anomaly_intensity is set to a positive value, candidates are
retained only if they show sufficient “signal intensity” — i.e., the
magnitude of the change relative to the segment length is at least
anomaly_intensity. This filtering occurs during candidate pruning and
helps reduce the number of spurious changepoints.
The following example runs the detector sequentially and flags an alarm while we’re in the anomalous period:
# Create synthetic data with brief anomalies
np.random.seed(999)
n = 1000
Y_anom = np.concatenate([
np.random.normal(0, 1, n//2),
np.random.normal(-3, 1, 10), # Brief, weak anomaly
np.random.normal(0, 1, n//2),
np.random.normal(5, 1, 10), # Stronger brief anomaly
np.random.normal(0, 1, n//2)
])
# Online (sequential) detection
detector = Detector(type="univariate", anomaly_intensity=2)
threshold = 20.0
in_anom = False
starts = []
ends = []
for i, y in enumerate(Y_anom, start=1):
detector.update(y)
res = detector.get_statistics(family="gaussian", theta0=0)
stat = res["stat"] if res["stat"] is not None else 0
if not in_anom and stat > threshold:
in_anom = True
starts.append(res["changepoint"])
print(f"anomaly starting at {i}")
if in_anom and stat <= threshold:
in_anom = False
ends.append(res["changepoint"])
print(f"anomaly ending at {i}")
# If we were still in an anomaly at the end, close it
if in_anom:
ends.append(len(Y_anom))
print(f"anomaly ending at {len(Y_anom)} (end of series)")
# Plot the data and mark starts/ends
plt.figure(figsize=(12, 5))
plt.plot(Y_anom, lw=1.2)
for s in starts:
if s is not None:
plt.axvline(s, color="green", linestyle="--", label="an. start" if s == starts[0] else "")
for e in ends:
if e is not None:
plt.axvline(e, color="red", linestyle="--", label="an. end" if e == ends[0] else "")
plt.title("Data with Detected Anomalies")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.tight_layout()
plt.show()
anomaly starting at 503
anomaly ending at 513
anomaly starting at 1011
anomaly ending at 1034
- Default behavior (
anomaly_intensity = None): All candidates are retained based on standard pruning rules. - When set to a positive value: Candidates are filtered based on the
infinity norm of the signal-to-length ratio. A candidate survives only
if at least one dimension has a signal magnitude ≥
anomaly_intensity.
Multivariate Gaussian Detection
np.random.seed(42)
n, p = 1500, 3
Y1 = np.random.normal(0, 1, (1000, p))
Y2 = np.random.normal(1.2, 1, (500, p))
Y = np.vstack([Y1, Y2])
res_multi = focus_offline(Y, threshold=30,
type="multivariate", family="gaussian")
print("Detection time:", res_multi["detection_time"])
print("Estimated changepoint:", res_multi["detected_changepoint"])
plt.plot(res_multi["stat"], lw=2)
plt.axhline(res_multi["threshold"], color="red", linestyle="--")
plt.axvline(res_multi["detection_time"], color="blue", linestyle="--")
plt.axvline(1000, color="green", linestyle=":")
plt.title("Multivariate FOCuS (p=3)")
plt.show()
Detection time: 1013
Estimated changepoint: 1001
Bernoulli Example
np.random.seed(123)
n = 2000
Y = np.concatenate([
np.random.binomial(1, 0.2, n//2),
np.random.binomial(1, 0.5, n//2)
])
res = focus_offline(Y, threshold=np.inf, type="univariate", family="bernoulli")
plt.plot(res["stat"])
plt.title("Bernoulli: Change in Success Probability")
plt.show()
Poisson Example
np.random.seed(101)
n = 2000
Y = np.concatenate([
np.random.poisson(2, n//2),
np.random.poisson(6, n//2)
])
res = focus_offline(Y, threshold=np.inf, type="univariate", family="poisson")
plt.plot(res["stat"])
plt.title("Poisson: Change in Rate")
plt.show()
Gamma Example
np.random.seed(124)
n = 2000
Y = np.concatenate([
np.random.gamma(2, 2, n//2),
np.random.gamma(2, 0.5, n//2)
])
res = focus_offline(Y, threshold=np.inf, type="univariate",
family="gamma", shape=2, theta0=2)
plt.plot(res["stat"])
plt.title("Gamma: Change in Scale (shape=2)")
plt.show()
Non-parametric Detection (NPFOCuS)
np.random.seed(123)
Y = np.concatenate([np.random.normal(0, 1, 1000),
np.random.standard_cauchy(200)])
quants = np.quantile(np.random.normal(size=10000), np.linspace(0.01, 0.99, 5))
res = focus_offline(Y=Y, threshold=[80, 25],
type="npfocus", family="npfocus",
quantiles=quants)
plt.subplot(3, 1, 1)
plt.plot(Y)
plt.subplot(3, 1, 2)
plt.plot(res["stat"][:, 0])
plt.title("NPFOCuS Statistic (sum)")
plt.subplot(3, 1, 3)
plt.plot(res["stat"][:, 1])
plt.title("NPFOCuS Statistic (max)")
plt.tight_layout()
plt.show()
AutoRegressive Process (ARP) changepoint detection
The library also supports changepoint detection in AutoRegressive (AR) processes. This is useful for detecting changes in the mean of AR(p) processes while accounting for the temporal dependencies. The AR coefficients can be specified (or estimated from data). As in the iid cases, the pre-change mean can be provided if known, however this has to be done when creating the detector (as the parameter is necessary for pruning logic).
np.random.seed(123)
from statsmodels.tsa.arima_process import ArmaProcess
# Generate AR(2) process with changepoint
n_pre = 500
n_post = 500
ar_coefs = np.array([0.7, -0.3]) # AR(2) coefficients
# Create AR(2) process
ar_process = ArmaProcess(np.concatenate([np.array([1]), -ar_coefs]), [1])
Y_pre = ar_process.generate_sample(nsample=n_pre)
Y_post = 2 + ar_process.generate_sample(nsample=n_post)
Y = np.concatenate([Y_pre, Y_post])
# Estimate AR parameters from pre-change data (in practice, use historical data)
from statsmodels.tsa.ar_model import AutoReg
ar_model = AutoReg(ar_process.generate_sample(nsample=300), lags=2).fit()
rho_est = ar_model.params[1:] # Estimated AR coefficients
# Run offline ARP detection
res = focus_offline(
Y=Y,
threshold=20,
type="arp",
family="arp",
rho=rho_est
)
# Plot results
fig, axes = plt.subplots(2, 1, figsize=(10, 6))
axes[0].plot(Y)
axes[0].set_title("AR(2) Process with Mean Shift")
axes[0].set_xlabel("Time")
axes[0].set_ylabel("Value")
axes[0].axvline(n_pre, color="red", linestyle="--", lw=2)
axes[1].plot(res["stat"])
axes[1].set_title("ARP Detection Statistic")
axes[1].set_xlabel("Time")
axes[1].set_ylabel("Statistic")
axes[1].axhline(res["threshold"], color="blue", linestyle="--")
if res["detection_time"] is not None:
axes[1].axvline(res["detection_time"], color="red", linestyle="--", lw=2)
plt.tight_layout()
plt.show()
# Show detection results
print(f"Detection time: {res['detection_time']}")
print(f"Estimated changepoint: {res['detected_changepoint']}")
print(f"True changepoint: {n_pre}")
Detection time: 505
Estimated changepoint: 500
True changepoint: 500
And in the online setting:
np.random.seed(123)
# Create online ARP detector
detector = Detector(type="arp", rho=rho_est)
stat_trace = []
for i, y in enumerate(Y, start=1):
detector.update(y)
result = detector.get_statistics(family="arp")
stat_trace.append(result["stat"])
if result["stat"] > 20:
print(f"Detection at time {i} with changepoint estimate τ = {result['changepoint']}")
break
# Plot results
plt.plot(stat_trace, lw=2)
plt.title("Online ARP Detection Statistic")
plt.xlabel("Time")
plt.ylabel("Statistic")
plt.show()
Detection at time 505 with changepoint estimate τ = 500
Flexibility: Statistics Independent of Detector Type
A key feature of the library is that the detector (how candidate segments are managed) is independent of the statistical model (how costs/statistics are computed). You can create a detector once, feed it data, and then compute different statistics (Gaussian, Poisson, etc.) on the same detector state.
Example: simulate Poisson counts, update a univariate detector, and compare Gaussian vs Poisson statistics on the same detector state.
np.random.seed(2024)
# Generate Poisson count data with a rate change
Y_counts = np.concatenate([np.random.poisson(10, 500), np.random.poisson(15, 500)])
# Create a univariate detector and update with all data
det = Detector(type='univariate')
for y in Y_counts:
det.update(y)
# Compute statistics on the SAME detector state
result_gaussian = det.get_statistics(family='gaussian')
result_poisson = det.get_statistics(family='poisson', theta0=10)
print('Using Gaussian statistic:')
print(' Changepoint:', result_gaussian['changepoint'])
print(' Statistic:', result_gaussian['stat'])
print('\nUsing Poisson statistic (more appropriate for count data):')
print(' Changepoint:', result_poisson['changepoint'])
print(' Statistic:', result_poisson['stat'])
# Full trajectories for comparison (offline mode is still faster)
det2 = Detector(type='univariate')
stat_gaussian = []
stat_poisson = []
for y in Y_counts:
det2.update(y)
stat_gaussian.append(det2.get_statistics(family='gaussian')['stat'])
stat_poisson.append(det2.get_statistics(family='poisson', theta0=10)['stat'])
import matplotlib.pyplot as plt
plt.figure(figsize=(10,4))
plt.plot(stat_gaussian, label='Gaussian')
plt.plot(stat_poisson, label='Poisson')
plt.legend()
plt.title('Gaussian vs Poisson statistics on same detector state')
plt.show()
Using Gaussian statistic:
Changepoint: 500
Statistic: 6922.160999999964
Using Poisson statistic (more appropriate for count data):
Changepoint: 500
Statistic: 557.7242432563144
C++ Integration
If you wish to use the library entirely in C++ (for maximum speed or
integration into other C++ projects), you can follow the same patterns
used in the R and Python bindings. The core classes are Info (and
derived classes), cost functions like compute_costs_gaussian, and the
ChangepointResult structure.
Example C++ usage:
#include "Info.h"
#include "Costs.h"
// Create detector
auto info = std::make_shared<UnivariateInfo>();
// Update with data
for (const auto& y : data) {
info->update({y});
auto result = compute_costs_gaussian(*info, {theta0});
if (result.stat.value() > threshold) {
// Detection!
break;
}
}
Performance Comparison
np.random.seed(999)
n = 100_000
Y = np.concatenate([np.random.normal(0, 1, n//2),
np.random.normal(1, 1, n//2)])
import time
t0 = time.time()
res_offline = focus_offline(Y, threshold=np.inf, type="univariate", family="gaussian")
offline_time = time.time() - t0
t1 = time.time()
det = Detector(type="univariate")
stat_online = []
for y in Y:
det.update(y)
stat_online.append(det.get_statistics(family='gaussian')["stat"])
online_time = time.time() - t1
print(f"Offline time: {offline_time:.2f}s")
print(f"Online time: {online_time:.2f}s")
print(f"Offline is {online_time / offline_time:.1f}× faster")
Offline time: 0.22s
Online time: 0.35s
Offline is 1.6× faster
References
Pishchagina, Liudmila, Gaetano Romano, Paul Fearnhead, Vincent Runge, and Guillem Rigaill. 2025. “Online Multivariate Changepoint Detection: Leveraging Links with Computational Geometry.” Journal of the Royal Statistical Society Series B: Statistical Methodology: qkaf046. https://doi.org/10.1093/jrsssb/qkaf046
Romano, Gaetano, Idris A. Eckley, and Paul Fearnhead. 2024. “A Log-Linear Nonparametric Online Changepoint Detection Algorithm Based on Functional Pruning.” IEEE Transactions on Signal Processing 72: 594–606. https://doi.org/10.1109/tsp.2023.3343550.
Romano, Gaetano, Idris A Eckley, Paul Fearnhead, and Guillem Rigaill. 2023. “Fast Online Changepoint Detection via Functional Pruning CUSUM Statistics.” Journal of Machine Learning Research 24 (81): 1–36. https://www.jmlr.org/papers/v24/21-1230.html.
Ward, Kes, Gaetano Romano, Idris Eckley, and Paul Fearnhead. 2024. “A Constant-Per-Iteration Likelihood Ratio Test for Online Changepoint Detection for Exponential Family Models.” Statistics and Computing 34 (3): 1–11.
Authors and Contributors
-
Gaetano Romano: email (Author) (Maintainer) (Creator) (Translator)
-
Kes Ward: email (Author)
-
Yuntang Fan: email (Author)
-
Guillem Rigaill: email (Author)
-
Vincent Runge: email (Author)
-
Paul Fearnhead: email (Author)
-
Idris A. Eckley: email (Author)
License
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see https://www.gnu.org/licenses/gpl-3.0.txt.
External libraries
The Python package depends on the Qhull library (http://www.qhull.org/), from C.B. Barber and The Geometry Center. If the library is not found, the user will be notified with instructions to install.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file focus_cpt-0.1.2.tar.gz.
File metadata
- Download URL: focus_cpt-0.1.2.tar.gz
- Upload date:
- Size: 667.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7711f476360a3a723721718fa5001b990d4532cc2f7245f4fde780871e788584
|
|
| MD5 |
58b9139b8aa0c1f4a2dc5e79eddd482e
|
|
| BLAKE2b-256 |
aec84ef3b782ce7ca38685661a705c278c6500f27ae41b94231883c068e97589
|