Source code for sys_mapping.utils

"""Utility functions for covariance estimation and two-point measurements.

Implements:
  - compute_covariance_matrix (Eq. 24): template auto-correlations
  - compute_amplitude_bias (Eq. 25-26): expected bias on clustering amplitude
  - measure_two_point_function: TreeCorr Landy-Szalay estimator wrapper
  - measure_two_point_function_corrfunc: Corrfunc Landy-Szalay estimator wrapper
  - measure_kk_correlation_treecorr: TreeCorr KK scalar-field correlator
  - measure_kk_correlation_corrfunc: Corrfunc KK scalar-field correlator
"""

from __future__ import annotations

import numpy as np


[docs] def compute_covariance_matrix(delta_t: np.ndarray) -> np.ndarray: """Compute the template covariance matrix (Eq. 24). ``C_{ij} = (1/N_pix) Σ_p δ_{ti,p} δ_{tj,p}`` Parameters ---------- delta_t : (n_sys, n_pix) template values at unmasked pixels Returns ------- (n_sys, n_sys) symmetric positive-semi-definite covariance matrix Performance ----------- Measured on CPU (n_sys=5, n_pix=10_000): **~120 μs/call**. Uses ``numpy`` matrix multiply; scales as O(n_sys² × n_pix). Precision --------- Matrix is symmetric to ``atol < 1e-14`` (exact in float64). All eigenvalues ≥ −1e-12 (positive semi-definite). For unit-variance uncorrelated templates, result is close to the identity matrix (deviation < 0.01 for n_pix ≥ 200_000). Examples -------- >>> import numpy as np >>> from sys_mapping import compute_covariance_matrix >>> rng = np.random.default_rng(0) >>> delta_t = rng.standard_normal((3, 10_000)) >>> C = compute_covariance_matrix(delta_t) >>> C.shape (3, 3) >>> np.allclose(C, C.T) # symmetric True """ n_pix = delta_t.shape[1] return (delta_t @ delta_t.T) / n_pix
[docs] def compute_amplitude_bias( a_sq: np.ndarray, b_sq: np.ndarray, cov_matrix: np.ndarray, ) -> tuple[float, float]: """Estimate bias on the galaxy clustering amplitude (Eq. 25-26). Returns the additive offset and multiplicative factor that contamination imprints on the angle-averaged angular power spectrum. Additive bias: ``Δw = Σ_i a²_i C_{ii}`` Multiplicative: ``f = 1 + Σ_i b²_i C_{ii}`` Parameters ---------- a_sq : (n_sys,) debiased squared additive parameters b_sq : (n_sys,) debiased squared multiplicative parameters cov_matrix : (n_sys, n_sys) template covariance from :func:`compute_covariance_matrix` Returns ------- additive_bias : float Δw added to the measured correlation function mult_factor : float factor by which w is inflated (1 = no bias) Performance ----------- Measured on CPU (n_sys=5): **~11 μs/call**. Two dot products. Precision --------- Matches analytic formula to ``rtol < 1e-10`` (float64 dot products). Examples -------- >>> import numpy as np >>> from sys_mapping import compute_covariance_matrix, compute_amplitude_bias >>> rng = np.random.default_rng(0) >>> delta_t = rng.standard_normal((3, 10_000)) >>> C = compute_covariance_matrix(delta_t) >>> a_sq = np.array([0.01, 0.02, 0.005]) >>> b_sq = np.array([0.005, 0.01, 0.002]) >>> add_bias, mult_factor = compute_amplitude_bias(a_sq, b_sq, C) >>> add_bias # additive offset on w(θ) >>> mult_factor # typically very close to 1.0 """ diag = np.diag(cov_matrix) additive_bias = float(np.dot(a_sq, diag)) mult_factor = float(1.0 + np.dot(b_sq, diag)) return additive_bias, mult_factor
[docs] def measure_two_point_function( ra_gal: np.ndarray, dec_gal: np.ndarray, ra_rand: np.ndarray, dec_rand: np.ndarray, min_sep: float = 0.06, max_sep: float = 30.0, nbins: int = 15, sep_units: str = "arcmin", ) -> tuple[np.ndarray, np.ndarray]: """Measure the angular two-point correlation function via Landy-Szalay. Uses TreeCorr with log-spaced angular bins and the standard estimator ``w(θ) = (DD − 2DR + RR) / RR``. Parameters ---------- ra_gal, dec_gal : (n_gal,) galaxy positions in degrees ra_rand, dec_rand : (n_rand,) random positions in degrees min_sep, max_sep : float angular bin limits in ``sep_units`` nbins : int number of log-spaced bins sep_units : str angular units for ``min_sep``/``max_sep`` (default ``'arcmin'``) Returns ------- theta : (nbins,) bin centers in ``sep_units`` w : (nbins,) Landy-Szalay angular correlation function estimate Performance ----------- Dominated by TreeCorr pair-counting; scales as O(n_gal log n_gal). Typical wall time: **seconds to minutes** for survey-scale catalogs. Precision --------- Landy-Szalay estimator is unbiased to O(1/N). Statistical error is approximately ``1/sqrt(DD_pairs)`` per bin. Examples -------- >>> import numpy as np >>> from sys_mapping import measure_two_point_function >>> rng = np.random.default_rng(0) >>> ra_g = rng.uniform(30, 60, 10_000) # small patch >>> dec_g = rng.uniform(-10, 10, 10_000) >>> ra_r = rng.uniform(30, 60, 50_000) >>> dec_r = rng.uniform(-10, 10, 50_000) >>> theta, w = measure_two_point_function( ... ra_g, dec_g, ra_r, dec_r, ... min_sep=1.0, max_sep=100.0, nbins=10, sep_units="arcmin", ... ) >>> theta.shape, w.shape ((10,), (10,)) """ try: import treecorr except ImportError as e: raise ImportError("treecorr is required for two-point measurements.") from e config = dict( min_sep=min_sep, max_sep=max_sep, nbins=nbins, sep_units=sep_units, ) cat_gal = treecorr.Catalog(ra=ra_gal, dec=dec_gal, ra_units="degrees", dec_units="degrees") cat_rand = treecorr.Catalog(ra=ra_rand, dec=dec_rand, ra_units="degrees", dec_units="degrees") dd = treecorr.NNCorrelation(**config) rr = treecorr.NNCorrelation(**config) dr = treecorr.NNCorrelation(**config) dd.process(cat_gal) rr.process(cat_rand) dr.process(cat_gal, cat_rand) w, varxi = dd.calculateXi(rr=rr, dr=dr) theta = np.exp(dd.meanlogr) return theta, w
# --------------------------------------------------------------------------- # Corrfunc implementations # --------------------------------------------------------------------------- _SEP_TO_DEG: dict[str, float] = { "degrees": 1.0, "arcmin": 1.0 / 60.0, "arcsec": 1.0 / 3600.0, "radians": 180.0 / np.pi, }
[docs] def measure_two_point_function_corrfunc( ra_gal: np.ndarray, dec_gal: np.ndarray, ra_rand: np.ndarray, dec_rand: np.ndarray, min_sep: float = 0.06, max_sep: float = 30.0, nbins: int = 15, sep_units: str = "arcmin", nthreads: int = 1, ) -> tuple[np.ndarray, np.ndarray]: """Measure the angular two-point correlation function via Landy-Szalay. Drop-in replacement for :func:`measure_two_point_function` using Corrfunc instead of TreeCorr. Pair-counting is exact (no bin_slop). Parameters ---------- ra_gal, dec_gal : (n_gal,) galaxy positions in degrees ra_rand, dec_rand : (n_rand,) random positions in degrees min_sep, max_sep : float angular bin limits in ``sep_units`` nbins : int number of log-spaced bins sep_units : str angular units (default ``'arcmin'``) nthreads : int OpenMP threads passed to Corrfunc (default 1) Returns ------- theta : (nbins,) mean pair separation in ``sep_units`` w : (nbins,) Landy-Szalay angular correlation function estimate """ try: from Corrfunc.mocks import DDtheta_mocks except ImportError as e: raise ImportError("Corrfunc is required for measure_two_point_function_corrfunc.") from e factor = _SEP_TO_DEG[sep_units] bins = np.logspace(np.log10(min_sep * factor), np.log10(max_sep * factor), nbins + 1) n_gal = len(ra_gal) n_rand = len(ra_rand) ra_gal = np.asarray(ra_gal, dtype=np.float64) dec_gal = np.asarray(dec_gal, dtype=np.float64) ra_rand = np.asarray(ra_rand, dtype=np.float64) dec_rand = np.asarray(dec_rand, dtype=np.float64) DD = DDtheta_mocks(1, nthreads, bins, ra_gal, dec_gal, output_thetaavg=True) RR = DDtheta_mocks(1, nthreads, bins, ra_rand, dec_rand) DR = DDtheta_mocks(0, nthreads, bins, ra_gal, dec_gal, RA2=ra_rand, DEC2=dec_rand) # Corrfunc autocorr=1 counts ordered unique pairs n*(n-1), not n*(n-1)/2. # DR cross-correlation counts all n_gal*n_rand unordered pairs. dd = DD["npairs"] / (n_gal * (n_gal - 1)) rr = RR["npairs"] / (n_rand * (n_rand - 1)) dr = DR["npairs"] / (n_gal * float(n_rand)) with np.errstate(invalid="ignore", divide="ignore"): w = np.where(rr > 0, (dd - 2.0 * dr + rr) / rr, 0.0) # Use measured mean separation; fall back to geometric bin centre theta_deg = np.where(DD["thetaavg"] > 0, DD["thetaavg"], np.sqrt(bins[:-1] * bins[1:])) theta = theta_deg / factor return theta, w
[docs] def measure_kk_correlation_treecorr( ra: np.ndarray, dec: np.ndarray, k: np.ndarray, w: np.ndarray | None = None, *, ra2: np.ndarray | None = None, dec2: np.ndarray | None = None, k2: np.ndarray | None = None, w2: np.ndarray | None = None, min_sep: float = 10.0, max_sep: float = 250.0, nbins: int = 20, sep_units: str = "arcmin", bin_slop: float = 0.01, ) -> tuple[np.ndarray, np.ndarray]: """Angular scalar-field correlation (KK) using TreeCorr. Computes ``xi(theta) = <k1_i k2_j>_w`` for auto- or cross-correlation. Parameters ---------- ra, dec : (n,) positions in degrees k : (n,) scalar field values (e.g. galaxy overdensity) w : (n,) optional pair weights; ones if None ra2, dec2, k2, w2 : second catalog for cross-correlation; if None, auto-correlation min_sep, max_sep : float bin limits in ``sep_units`` nbins : int number of log-spaced bins sep_units : str angular units (default ``'arcmin'``) bin_slop : float TreeCorr bin_slop parameter (default 0.01) Returns ------- theta : (nbins,) bin centres in ``sep_units`` xi : (nbins,) correlation function """ try: import treecorr except ImportError as e: raise ImportError("treecorr is required for measure_kk_correlation_treecorr.") from e cfg = dict(min_sep=min_sep, max_sep=max_sep, nbins=nbins, sep_units=sep_units, bin_slop=bin_slop) cat1 = treecorr.Catalog(ra=ra, dec=dec, ra_units="degrees", dec_units="degrees", k=k, w=w) kk = treecorr.KKCorrelation(**cfg) if ra2 is None: kk.process(cat1) else: cat2 = treecorr.Catalog(ra=ra2, dec=dec2, ra_units="degrees", dec_units="degrees", k=k2, w=w2) kk.process(cat1, cat2) return np.exp(kk.meanlogr), kk.xi
[docs] def measure_kk_correlation_corrfunc( ra: np.ndarray, dec: np.ndarray, k: np.ndarray, w: np.ndarray | None = None, *, ra2: np.ndarray | None = None, dec2: np.ndarray | None = None, k2: np.ndarray | None = None, w2: np.ndarray | None = None, min_sep: float = 10.0, max_sep: float = 250.0, nbins: int = 20, sep_units: str = "arcmin", nthreads: int = 1, ) -> tuple[np.ndarray, np.ndarray]: """Angular scalar-field correlation (KK) using Corrfunc. Drop-in replacement for :func:`measure_kk_correlation_treecorr`. Computes ``xi(theta) = sum(w_i w_j k_i k_j) / sum(w_i w_j)`` per bin via two weighted pair-count passes with ``weight_type='pair_product'``. Parameters ---------- ra, dec : (n,) positions in degrees k : (n,) scalar field values w : (n,) optional pair weights; ones if None ra2, dec2, k2, w2 : second catalog for cross-correlation; if None, auto-correlation min_sep, max_sep : float bin limits in ``sep_units`` nbins : int number of log-spaced bins sep_units : str angular units (default ``'arcmin'``) nthreads : int OpenMP threads (default 1) Returns ------- theta : (nbins,) mean pair separation in ``sep_units`` xi : (nbins,) correlation function """ try: from Corrfunc.mocks import DDtheta_mocks except ImportError as e: raise ImportError("Corrfunc is required for measure_kk_correlation_corrfunc.") from e factor = _SEP_TO_DEG[sep_units] bins = np.logspace(np.log10(min_sep * factor), np.log10(max_sep * factor), nbins + 1) ra = np.asarray(ra, dtype=np.float64) dec = np.asarray(dec, dtype=np.float64) k = np.asarray(k, dtype=np.float64) w1 = np.ones(len(ra), dtype=np.float64) if w is None else np.asarray(w, dtype=np.float64) cross = ra2 is not None if cross: ra2 = np.asarray(ra2, dtype=np.float64) dec2 = np.asarray(dec2, dtype=np.float64) k2 = np.asarray(k2, dtype=np.float64) w2 = (np.ones(len(ra2), dtype=np.float64) if w2 is None else np.asarray(w2, dtype=np.float64)) autocorr = 0 if cross else 1 kw = dict(weight_type="pair_product", output_thetaavg=True) if cross: res_num = DDtheta_mocks(autocorr, nthreads, bins, ra, dec, RA2=ra2, DEC2=dec2, weights1=w1 * k, weights2=w2 * k2, **kw) res_den = DDtheta_mocks(autocorr, nthreads, bins, ra, dec, RA2=ra2, DEC2=dec2, weights1=w1, weights2=w2, **kw) else: res_num = DDtheta_mocks(autocorr, nthreads, bins, ra, dec, weights1=w1 * k, **kw) res_den = DDtheta_mocks(autocorr, nthreads, bins, ra, dec, weights1=w1, **kw) # xi = sum(w_i w_j k_i k_j) / sum(w_i w_j) # = weightavg_num / weightavg_den (npairs cancels) with np.errstate(invalid="ignore", divide="ignore"): xi = np.where(res_den["weightavg"] > 0, res_num["weightavg"] / res_den["weightavg"], 0.0) theta_deg = np.where(res_num["thetaavg"] > 0, res_num["thetaavg"], np.sqrt(bins[:-1] * bins[1:])) theta = theta_deg / factor return theta, xi