Source code for sys_mapping.bootstrap

"""Spatial covariance estimation: block bootstrap and jack-knife (Sec. 6.2).

Block bootstrap (Berlfein+2024): resamples spatial patches with replacement.
Jack-knife (Ross+2011, Ho+2012): leave-one-patch-out deterministic estimator.
Both capture spatial correlations that pixel-level bootstrap misses.
"""

from __future__ import annotations

from typing import Callable

import numpy as np
import healpy as hp


def _assign_patches(
    good_pixels: np.ndarray,
    nside: int,
    n_patches: int,
) -> np.ndarray:
    """Assign unmasked pixels to spatial patches using HEALPix coarsening.

    Coarsens to a lower NSIDE so that the sky is divided into roughly
    ``n_patches`` equal-area regions, then relabels to compact integers.

    Parameters
    ----------
    good_pixels : (n_pix,) boolean mask (True = unmasked)
    nside : int  full-resolution HEALPix NSIDE
    n_patches : int  desired number of patches (approximate)

    Returns
    -------
    patch_ids : (n_good_pix,) integer patch label 0..K-1 for each unmasked pixel

    Precision
    ---------
    Every unmasked pixel receives a label; ``len(patch_ids) == good_pixels.sum()``.

    Examples
    --------
    >>> import numpy as np
    >>> from sys_mapping.bootstrap import _assign_patches
    >>> good = np.ones(12 * 16**2, dtype=bool)
    >>> patch_ids = _assign_patches(good, nside=16, n_patches=8)
    >>> len(patch_ids) == good.sum()
    True
    """
    pixel_indices = np.where(good_pixels)[0]
    # Coarsen to lower NSIDE to define patch boundaries
    nside_patch = max(1, nside // int(np.sqrt(hp.nside2npix(nside) / n_patches)))
    # Use the ring-scheme pixel index at the coarser resolution
    theta, phi = hp.pix2ang(nside, pixel_indices)
    coarse_pix = hp.ang2pix(nside_patch, theta, phi)
    # Relabel to compact integers 0..K-1
    unique, inverse = np.unique(coarse_pix, return_inverse=True)
    patch_ids = inverse
    return patch_ids


[docs] def block_bootstrap_variance( delta_g_obs: np.ndarray, delta_t: np.ndarray, good_pixels: np.ndarray, nside: int, estimator: Callable[[np.ndarray, np.ndarray], np.ndarray], n_bootstrap: int = 100, n_patches: int = 10, seed: int = 0, ) -> np.ndarray: """Estimate variance of an estimator via spatial block bootstrap. Divides the footprint into ~``n_patches`` spatial regions using HEALPix coarsening, then resamples regions with replacement ``n_bootstrap`` times. This captures spatial correlations that pixel-level bootstrap misses. Parameters ---------- delta_g_obs : (n_good_pix,) observed overdensity at unmasked pixels delta_t : (n_sys, n_good_pix) template values at unmasked pixels good_pixels : (n_pix,) boolean mask that defines pixel sky positions nside : int full-resolution HEALPix NSIDE estimator : callable(delta_g, delta_t) -> (n_out,) array The quantity whose variance is to be estimated. Called once per bootstrap resample. n_bootstrap : int number of bootstrap resamples (default 100) n_patches : int approximate number of spatial blocks (default 10) seed : int random seed for reproducibility Returns ------- variance : (n_out,) bootstrap variance estimate (ddof=1) Performance ----------- Measured on CPU (n_boot=50, n_patches=8, nside=32, simple mean estimator): **~32 ms/call**. Total cost scales as ``O(n_bootstrap × cost(estimator))``. Precision --------- Bootstrap variance is non-negative by construction. For the sample mean, variance decreases monotonically as n_pix increases. Convergence to the true variance typically requires n_bootstrap ≥ 100. Examples -------- >>> import numpy as np >>> from sys_mapping import block_bootstrap_variance >>> rng = np.random.default_rng(0) >>> nside = 16 >>> n_pix = 12 * nside ** 2 >>> good = np.ones(n_pix, dtype=bool) >>> dg = rng.standard_normal(n_pix) * 0.1 >>> dt = rng.standard_normal((3, n_pix)) >>> def mean_estimator(dg, dt): ... return np.array([np.mean(dg)]) >>> var = block_bootstrap_variance(dg, dt, good, nside, ... mean_estimator, ... n_bootstrap=100, n_patches=8, seed=1) >>> var.shape (1,) >>> float(var[0]) >= 0 True """ rng = np.random.default_rng(seed) patch_ids = _assign_patches(good_pixels, nside, n_patches) unique_patches = np.unique(patch_ids) K = len(unique_patches) bootstrap_estimates = [] for _ in range(n_bootstrap): # Resample patches with replacement sampled_patches = rng.choice(unique_patches, size=K, replace=True) pixel_mask = np.concatenate([ np.where(patch_ids == p)[0] for p in sampled_patches ]) dg_boot = delta_g_obs[pixel_mask] dt_boot = delta_t[:, pixel_mask] estimate = estimator(dg_boot, dt_boot) bootstrap_estimates.append(np.asarray(estimate)) estimates_arr = np.stack(bootstrap_estimates, axis=0) return np.var(estimates_arr, axis=0, ddof=1)
[docs] def jackknife_covariance( delta_g_obs: np.ndarray, delta_t: np.ndarray, good_pixels: np.ndarray, nside: int, estimator: Callable[[np.ndarray, np.ndarray], np.ndarray], n_patches: int = 10, ) -> np.ndarray: """Estimate the covariance matrix of an estimator via spatial jack-knife. Divides the footprint into ``n_patches`` spatial regions (using the same HEALPix coarsening as :func:`block_bootstrap_variance`), then runs a leave-one-patch-out jack-knife. The standard jack-knife prefactor ``(K-1)/K`` is applied so the estimate is unbiased. Parameters ---------- delta_g_obs : (n_good_pix,) observed overdensity at unmasked pixels delta_t : (n_sys, n_good_pix) template values at unmasked pixels good_pixels : (n_pix,) boolean mask that defines pixel sky positions nside : int full-resolution HEALPix NSIDE estimator : callable(delta_g, delta_t) -> (n_out,) array The quantity whose covariance is to be estimated. Called ``K`` times (once per patch dropped). n_patches : int approximate number of spatial patches (default 10) Returns ------- cov : (n_out, n_out) jack-knife covariance matrix Notes ----- The jack-knife covariance is: .. math:: C_{ij}^{\\rm JK} = \\frac{K-1}{K} \\sum_{k=1}^{K} (\\hat\\theta_k - \\bar\\theta)_i\\, (\\hat\\theta_k - \\bar\\theta)_j where :math:`\\hat\\theta_k` is the estimator with patch :math:`k` dropped and :math:`\\bar\\theta` is the mean over all leave-one-out estimates. Unlike block bootstrap, jack-knife is deterministic (no random resampling). For small footprints or fewer than 5 patches the estimate may be noisy; a warning is issued in that case. Performance ----------- Cost: ``O(K × cost(estimator))``. Much cheaper than bootstrap since ``K`` (number of patches) is typically 10–30 vs. 100+ bootstrap resamples. Precision --------- For a simple mean estimator, jack-knife and block bootstrap agree to within an order of magnitude. Jack-knife tends to be slightly conservative. Examples -------- >>> import numpy as np >>> from sys_mapping import jackknife_covariance >>> rng = np.random.default_rng(0) >>> nside = 16 >>> n_pix = 12 * nside ** 2 >>> good = np.ones(n_pix, dtype=bool) >>> dg = rng.standard_normal(n_pix) * 0.1 >>> dt = rng.standard_normal((3, n_pix)) >>> def mean_estimator(dg, dt): ... return np.array([np.mean(dg)]) >>> cov = jackknife_covariance(dg, dt, good, nside, ... mean_estimator, n_patches=8) >>> cov.shape (1, 1) >>> float(cov[0, 0]) >= 0 True References ---------- Ross et al. 2011, MNRAS 417, 1350. Ho et al. 2012, ApJ 761, 14. """ import warnings if n_patches < 5: warnings.warn( f"n_patches={n_patches} is small; jack-knife estimate may be noisy. " "Consider using n_patches >= 5.", stacklevel=2, ) patch_ids = _assign_patches(good_pixels, nside, n_patches) unique_patches = np.unique(patch_ids) K = len(unique_patches) leave_one_out = [] for k in unique_patches: keep = patch_ids != k dg_k = delta_g_obs[keep] dt_k = delta_t[:, keep] estimate_k = estimator(dg_k, dt_k) leave_one_out.append(np.asarray(estimate_k)) estimates_arr = np.stack(leave_one_out, axis=0) # (K, n_out) mean_est = estimates_arr.mean(axis=0) residuals = estimates_arr - mean_est[np.newaxis, :] # (K, n_out) cov = (K - 1) / K * (residuals.T @ residuals) return cov