"""Systematic contamination injection, FITS I/O, and w(θ) recovery.
This module provides the end-to-end infrastructure for simulation-based
validation of the decontamination pipeline:
1. Load reference catalogs (Uchuu lightcone or GLASS mock).
2. Load LSDR10 systematic maps.
3. Inject contamination into catalog positions using the pixel-level forward
model from ``sys_mapping.contamination``.
4. Save contaminated catalogs to FITS.
5. Run all fast decontamination methods and compare recovered w(θ) to truth.
Contamination injection strategy
---------------------------------
For a catalog of N galaxies, the true overdensity at each pixel is estimated
from the catalog itself (galaxy counts / random counts − 1). The forward
contamination model (Eq. 11-13 of Berlfein et al. 2024) is applied to yield a
contaminated overdensity field. The per-galaxy contamination weight is then:
WEIGHT_CONT(p) = (1 + δ_cont(p)) / (1 + δ_g(p))
When computing w(θ) with TreeCorr, passing WEIGHT_CONT as galaxy weights
exactly reproduces the effect that the contamination model would have on the
angular correlation function, without physically removing or adding galaxies.
The "truth" w(θ) is computed with uniform weights (no contamination).
References
----------
Berlfein et al. 2024, MNRAS 531, 4954. arXiv:2401.12293
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Sequence
import healpy as hp
import jax.numpy as jnp
import numpy as np
from .contamination import apply_contamination
from .maps import (
assign_template_values,
compute_overdensity,
load_real_template,
pixelize_catalog,
)
from .regression import run_decontamination
# ── Systematic map catalogue ───────────────────────────────────────────────────
# Maps present in ~/data/legacysurvey/dr10/systematics/{nside_dir}/
# Each entry: (filename_template, column_name, nside_zero_padding)
_MAP_SPECS: dict[str, tuple[str, str, int]] = {
"LS10_EBV": ("LS10_EBV_NSIDE_{nside}.fits", "EBV", 4),
"LS10_GALDEPTH_Z": ("LS10_GALDEPTH_Z_NSIDE_{nside}.fits", "GALDEPTH_Z", 4),
"LS10_PSFSIZE_R": ("LS10_PSFSIZE_R_NSIDE_{nside}.fits", "PSFSIZE_R", 4),
"LS10_NOBS_R": ("LS10_NOBS_R_NSIDE_{nside}.fits", "NOBS_R", 4),
"GAIA_nstar_faint": ("GAIA_nstar_faint_NSIDE_{nside}.fits", "nstar_faint", 5),
}
DEFAULT_MAP_NAMES: list[str] = [
"LS10_EBV",
"LS10_GALDEPTH_Z",
"LS10_PSFSIZE_R",
"LS10_NOBS_R",
"GAIA_nstar_faint",
]
# Contamination amplitude magnitudes for each level
LEVELS: dict[str, float] = {"low": 0.02, "medium": 0.05, "high": 0.10}
DEFAULT_METHODS: list[str] = ["OLS", "ISD-1", "ElasticNet"]
# ── Data structures ────────────────────────────────────────────────────────────
[docs]
@dataclass
class ContaminationConfig:
"""Parameters for a single contamination scenario.
Attributes
----------
level:
Human-readable amplitude label: ``"low"``, ``"medium"``, or ``"high"``.
scenario:
Contamination model: ``"additive"``, ``"multiplicative"``, or
``"combined"``.
a_true:
Additive amplitudes (shape ``(n_sys,)``). Zero for multiplicative.
b_true:
Multiplicative amplitudes (shape ``(n_sys,)``). Zero for additive.
"""
level: str
scenario: str
a_true: np.ndarray
b_true: np.ndarray
@property
def n_sys(self) -> int:
return len(self.a_true)
[docs]
def make_contamination_grid(
n_sys: int,
seed: int = 0,
) -> list[ContaminationConfig]:
"""Build a 3×3 grid of contamination configurations.
Returns 9 ``ContaminationConfig`` objects: one for each combination of
three amplitude levels (low/medium/high) × three scenarios
(additive/multiplicative/combined).
The *signs* of the amplitudes are drawn once from the given seed and
shared across levels so that the impact of amplitude magnitude can be
cleanly isolated.
Parameters
----------
n_sys:
Number of systematic templates.
seed:
Random seed for the sign draws.
Returns
-------
List of 9 ``ContaminationConfig`` objects.
Examples
--------
>>> from sys_mapping.simulation import make_contamination_grid
>>> cfgs = make_contamination_grid(n_sys=3)
>>> len(cfgs)
9
>>> {c.level for c in cfgs} == {'low', 'medium', 'high'}
True
>>> {c.scenario for c in cfgs} == {'additive', 'multiplicative', 'combined'}
True
"""
rng = np.random.default_rng(seed)
signs_a = rng.choice([-1, 1], size=n_sys).astype(float)
signs_b = rng.choice([-1, 1], size=n_sys).astype(float)
configs: list[ContaminationConfig] = []
for level_name, mag in LEVELS.items():
a_base = signs_a * mag
b_base = signs_b * mag
for scenario in ("additive", "multiplicative", "combined"):
a = a_base.copy() if scenario in ("additive", "combined") else np.zeros(n_sys)
b = b_base.copy() if scenario in ("multiplicative", "combined") else np.zeros(n_sys)
configs.append(ContaminationConfig(level=level_name, scenario=scenario, a_true=a, b_true=b))
return configs
# ── I/O functions ──────────────────────────────────────────────────────────────
[docs]
def load_uchuu_mock(data_fits: str | Path, rand_fits: str | Path) -> dict:
"""Load Uchuu lightcone FITS catalog.
Parameters
----------
data_fits:
Path to the ``*_DATA.fits`` file. Expected columns: ``RA``, ``DEC``,
``redshift_S``.
rand_fits:
Path to the ``*_RAND.fits`` file. Expected columns: ``RA``, ``DEC``.
Returns
-------
dict with keys ``ra``, ``dec``, ``z`` (galaxies) and ``ra_rand``,
``dec_rand`` (randoms).
Examples
--------
>>> import os
>>> p = os.path.expanduser(
... '~/data/Uchuu/FullSky/mock_catalogues/'
... 'MOCK_VLIM_ANY_10.65_Mstar_12.0_0.05_z_0.26_N_0923373/'
... 'MOCK_VLIM_ANY_10.65_Mstar_12.0_0.05_z_0.26_N_0923373_DATA.fits')
>>> # doctest: +SKIP
>>> cat = load_uchuu_mock(p, p.replace('_DATA', '_RAND'))
"""
from astropy.io import fits as afits
with afits.open(str(data_fits)) as hdul:
data = hdul[1].data
ra = np.asarray(data["RA"], dtype=float)
dec = np.asarray(data["DEC"], dtype=float)
z = np.asarray(data["redshift_S"], dtype=float)
with afits.open(str(rand_fits)) as hdul:
rand = hdul[1].data
ra_rand = np.asarray(rand["RA"], dtype=float)
dec_rand = np.asarray(rand["DEC"], dtype=float)
return {"ra": ra, "dec": dec, "z": z, "ra_rand": ra_rand, "dec_rand": dec_rand}
[docs]
def load_systematic_maps(
syst_dir: str | Path,
nside: int,
map_names: list[str] | None = None,
) -> tuple[np.ndarray, list[str], np.ndarray]:
"""Load LSDR10 systematic maps at a given HEALPix resolution.
Parameters
----------
syst_dir:
Directory containing NSIDE subdirectories, e.g.
``~/data/legacysurvey/dr10/systematics/``. The function looks for
files in ``{syst_dir}/{nside:04d}/`` (NSIDE in 4-digit zero-padded
subdirectory).
nside:
Target HEALPix resolution.
map_names:
Which maps to load. Defaults to ``DEFAULT_MAP_NAMES`` (5 maps).
Returns
-------
templates : (n_sys, 12*nside²) array, zero-mean and unit-std over valid pixels.
names : list of map name strings.
footprint : (12*nside²,) bool array — True where *all* loaded maps are valid.
Use this to restrict mock catalogs to the survey footprint before
injecting systematics or measuring w(θ).
Examples
--------
>>> # doctest: +SKIP
>>> t, names, footprint = load_systematic_maps('~/data/legacysurvey/dr10/systematics/', 64)
>>> t.shape
(5, 49152)
>>> footprint.sum() # number of pixels inside the LS10 footprint
...
"""
syst_dir = Path(syst_dir).expanduser()
if map_names is None:
map_names = DEFAULT_MAP_NAMES
nside_dir = syst_dir / f"{nside:04d}"
templates = []
valid_names = []
footprint = np.ones(hp.nside2npix(nside), dtype=bool)
for name in map_names:
fname_tmpl, column, zfill = _MAP_SPECS[name]
nside_str = str(nside).zfill(zfill)
fname = fname_tmpl.replace("{nside}", nside_str)
fpath = nside_dir / fname
if not fpath.exists():
raise FileNotFoundError(f"Systematic map not found: {fpath}")
tmpl, valid_mask = load_real_template(fpath, column=column, nside=nside)
templates.append(tmpl)
valid_names.append(name)
footprint &= valid_mask
return np.stack(templates, axis=0), valid_names, footprint
# ── Contamination injection ────────────────────────────────────────────────────
[docs]
def inject_systematics(
ra: np.ndarray,
dec: np.ndarray,
ra_rand: np.ndarray,
dec_rand: np.ndarray,
templates: np.ndarray,
a: np.ndarray,
b: np.ndarray,
nside: int,
) -> np.ndarray:
"""Compute per-galaxy contamination weights.
The function estimates the pixel-level galaxy overdensity from the input
catalog, applies the forward contamination model, and returns per-galaxy
weights that reproduce the contaminated angular correlation function when
passed to TreeCorr.
Parameters
----------
ra, dec:
Galaxy positions in degrees.
ra_rand, dec_rand:
Random catalog positions in degrees.
templates:
Systematic template maps (shape ``(n_sys, n_pix)``).
a:
Additive contamination amplitudes (shape ``(n_sys,)``).
b:
Multiplicative contamination amplitudes (shape ``(n_sys,)``).
nside:
HEALPix NSIDE used for pixelisation.
Returns
-------
WEIGHT_CONT : (n_gal,) per-galaxy weights. Equal to 1 when a=b=0.
Clipped to a minimum of 0.01 to prevent unphysical negative weights.
Notes
-----
The weight is defined as::
WEIGHT_CONT(p) = (1 + δ_cont(p)) / (1 + δ_g(p))
where δ_cont is the contaminated overdensity and δ_g is the observed
overdensity from the input catalog. Pixels where 1 + δ_g(p) ≤ 0 are
assigned weight 1.
Examples
--------
>>> import numpy as np
>>> from sys_mapping.simulation import inject_systematics
>>> rng = np.random.default_rng(0)
>>> n_gal, n_rand = 5000, 50000
>>> ra = rng.uniform(0, 360, n_gal)
>>> dec = rng.uniform(-90, 90, n_gal)
>>> ra_r = rng.uniform(0, 360, n_rand)
>>> dec_r = rng.uniform(-90, 90, n_rand)
>>> templates = rng.standard_normal((2, 12 * 32**2))
>>> a = np.array([0.0, 0.0])
>>> b = np.array([0.0, 0.0])
>>> w = inject_systematics(ra, dec, ra_r, dec_r, templates, a, b, nside=32)
>>> w.shape == (n_gal,)
True
>>> np.allclose(w, 1.0, atol=0.15) # near unity for zero contamination
True
"""
n_pix = hp.nside2npix(nside)
gal_counts = pixelize_catalog(ra, dec, nside)
rand_counts = pixelize_catalog(ra_rand, dec_rand, nside)
# Full-map overdensity (don't apply good-pixel mask here)
rand_norm = rand_counts * (gal_counts.sum() / rand_counts.sum())
with np.errstate(invalid="ignore", divide="ignore"):
delta_g_full = np.where(rand_norm > 0, gal_counts / rand_norm - 1.0, 0.0)
# Apply contamination model on the full sky
delta_cont_full = np.asarray(
apply_contamination(
jnp.asarray(delta_g_full),
jnp.asarray(templates),
jnp.asarray(a),
jnp.asarray(b),
)
)
# Per-galaxy weight from per-pixel ratio
theta = np.radians(90.0 - dec)
phi = np.radians(ra)
pix_idx = hp.ang2pix(nside, theta, phi)
denom = 1.0 + delta_g_full[pix_idx]
numer = 1.0 + delta_cont_full[pix_idx]
weight = np.where(np.abs(denom) > 0.01, numer / denom, 1.0)
return np.clip(weight, 0.01, None)
# ── FITS I/O ───────────────────────────────────────────────────────────────────
[docs]
def save_simulation_catalog(
ra: np.ndarray,
dec: np.ndarray,
z: np.ndarray,
weight_cont: np.ndarray,
config: ContaminationConfig,
output_path: str | Path,
) -> None:
"""Write a contaminated simulation catalog to a FITS BinTableHDU.
Parameters
----------
ra, dec:
Galaxy positions in degrees.
z:
Galaxy redshifts.
weight_cont:
Per-galaxy contamination weights (from ``inject_systematics``).
config:
Contamination configuration stored in the FITS header.
output_path:
Destination FITS file path. Parent directory is created if needed.
"""
from astropy.io import fits as afits
from astropy.table import Table
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
tbl = Table(
{
"RA": ra.astype(np.float32),
"DEC": dec.astype(np.float32),
"Z": z.astype(np.float32),
"WEIGHT_CONT": weight_cont.astype(np.float32),
}
)
hdu = afits.BinTableHDU(tbl)
for key, val in config.to_header_dict().items():
hdu.header[key] = val
hdul = afits.HDUList([afits.PrimaryHDU(), hdu])
hdul.writeto(str(output_path), overwrite=True)
[docs]
def load_simulation_catalog(fits_path: str | Path) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, ContaminationConfig]:
"""Load a contaminated simulation catalog from FITS.
Returns
-------
ra, dec, z, weight_cont, config
"""
from astropy.io import fits as afits
with afits.open(str(fits_path)) as hdul:
hdr = hdul[1].header
data = hdul[1].data
ra = np.asarray(data["RA"], dtype=float)
dec = np.asarray(data["DEC"], dtype=float)
z = np.asarray(data["Z"], dtype=float)
weight_cont = np.asarray(data["WEIGHT_CONT"], dtype=float)
level = hdr.get("LEVEL", "unknown")
scenario = hdr.get("SCENARIO", "unknown")
n_sys = sum(1 for k in hdr if k.startswith("A_TRUE_"))
a_true = np.array([hdr[f"A_TRUE_{i}"] for i in range(n_sys)])
b_true = np.array([hdr[f"B_TRUE_{i}"] for i in range(n_sys)])
config = ContaminationConfig(level=level, scenario=scenario, a_true=a_true, b_true=b_true)
return ra, dec, z, weight_cont, config
# ── Weighted two-point function ────────────────────────────────────────────────
def _measure_wtheta(
ra: np.ndarray,
dec: np.ndarray,
ra_rand: np.ndarray,
dec_rand: np.ndarray,
weights: np.ndarray | None = None,
min_sep: float = 0.1,
max_sep: float = 10.0,
nbins: int = 10,
sep_units: str = "degrees",
) -> tuple[np.ndarray, np.ndarray]:
"""Landy-Szalay w(θ) optionally weighted per galaxy.
Galaxy weights are passed to TreeCorr via ``Catalog(w=weights)``.
Randoms always have uniform weight.
"""
import treecorr
cat_gal = treecorr.Catalog(ra=ra, dec=dec, w=weights, ra_units="degrees", dec_units="degrees")
cat_rand = treecorr.Catalog(ra=ra_rand, dec=dec_rand, ra_units="degrees", dec_units="degrees")
cfg = dict(min_sep=min_sep, max_sep=max_sep, nbins=nbins, sep_units=sep_units, metric="Arc")
dd = treecorr.NNCorrelation(**cfg)
dr = treecorr.NNCorrelation(**cfg)
rr = treecorr.NNCorrelation(**cfg)
dd.process(cat_gal)
dr.process(cat_gal, cat_rand)
rr.process(cat_rand)
xi, _ = dd.calculateXi(rr=rr, dr=dr)
theta = np.exp(dd.meanlogr)
return theta, xi
# ── w(θ) recovery pipeline ────────────────────────────────────────────────────
[docs]
def run_wtheta_recovery(
catalog: dict,
templates: np.ndarray,
template_names: list[str],
config: ContaminationConfig,
nside: int = 64,
methods: list[str] | None = None,
min_sep: float = 0.1,
max_sep: float = 10.0,
nbins: int = 10,
) -> dict:
"""Run full decontamination pipeline and return w(θ) at each stage.
Given a clean catalog and contamination parameters, this function:
1. Computes the reference (truth) w(θ) from the clean catalog.
2. Injects contamination to build per-galaxy ``WEIGHT_CONT``.
3. Computes the contaminated w(θ) using those weights.
4. For each decontamination method, fits the contamination model and
applies per-galaxy correction weights; computes the recovered w(θ).
Parameters
----------
catalog:
Dict with keys ``ra``, ``dec``, ``z``, ``ra_rand``, ``dec_rand``.
templates:
Full-sky systematic maps (shape ``(n_sys, n_pix)``).
template_names:
Names of the template maps (for metadata in output).
config:
Contamination configuration to inject.
nside:
HEALPix resolution for overdensity computation.
methods:
Decontamination methods to run. Default: OLS, ISD-1, ElasticNet.
min_sep, max_sep:
Angular separation range in degrees.
nbins:
Number of angular bins.
Returns
-------
dict with keys:
- ``theta``: bin centres in degrees
- ``w_true``: w(θ) of clean catalog (truth)
- ``w_contaminated``: w(θ) with WEIGHT_CONT applied
- ``w_recovered_{method}``: w(θ) after decontamination for each method
- ``params_{method}``: dict of recovered a_hat, b_hat
- ``config``: the ``ContaminationConfig`` used
- ``n_gal``, ``n_rand``: catalog sizes
"""
if methods is None:
methods = DEFAULT_METHODS
ra = catalog["ra"]
dec = catalog["dec"]
ra_rand = catalog["ra_rand"]
dec_rand = catalog["dec_rand"]
# 1. Truth w(θ) — uniform galaxy weights
theta, w_true = _measure_wtheta(
ra, dec, ra_rand, dec_rand,
min_sep=min_sep, max_sep=max_sep, nbins=nbins,
)
# 2. Contamination weights
weight_cont = inject_systematics(
ra, dec, ra_rand, dec_rand, templates, config.a_true, config.b_true, nside
)
# 3. Contaminated w(θ)
_, w_cont = _measure_wtheta(
ra, dec, ra_rand, dec_rand,
weights=weight_cont,
min_sep=min_sep, max_sep=max_sep, nbins=nbins,
)
# 4. Decontamination and recovery
gal_counts = pixelize_catalog(ra, dec, nside, weights=weight_cont)
rand_counts = pixelize_catalog(ra_rand, dec_rand, nside)
delta_g_obs, good_pixels = compute_overdensity(gal_counts, rand_counts)
delta_t_masked = assign_template_values(templates, good_pixels)
result: dict = {
"theta": theta,
"w_true": w_true,
"w_contaminated": w_cont,
"config": config,
"n_gal": len(ra),
"n_rand": len(ra_rand),
"template_names": template_names,
}
# Precompute per-galaxy pixel assignment once (independent of method)
theta_pix = np.radians(90.0 - dec)
phi_pix = np.radians(ra)
pix_gal = hp.ang2pix(nside, theta_pix, phi_pix)
for method in methods:
res = run_decontamination(method, delta_g_obs, delta_t_masked)
weights_sys = res["weights"] # (n_good_pix,)
# Map good-pixel correction weights back to per-galaxy
n_full = hp.nside2npix(nside)
weight_full = np.ones(n_full)
weight_full[good_pixels] = weights_sys
weight_recovered = weight_full[pix_gal]
# Apply the correction ON TOP of the contamination weight.
# The contaminated catalog is represented as original galaxies with
# weight_cont; decontamination must act on that contaminated sample,
# not on the original clean galaxies.
weight_decontaminated = weight_cont * weight_recovered
_, w_rec = _measure_wtheta(
ra, dec, ra_rand, dec_rand,
weights=weight_decontaminated,
min_sep=min_sep, max_sep=max_sep, nbins=nbins,
)
result[f"w_recovered_{method}"] = w_rec
result[f"params_{method}"] = {
"a_hat": res.get("a_hat"),
"b_hat": res.get("b_hat"),
"elapsed_s": res.get("elapsed_s"),
}
return result