Coverage for python/lsst/cell_coadds/_coadd_ap_corr_map.py: 23%
82 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:02 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:02 +0000
1# This file is part of cell_coadds.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "CoaddApCorrMapStacker",
26 "EMPTY_AP_CORR_MAP",
27)
30from collections.abc import Iterable
32import numpy as np
33from frozendict import frozendict
35from lsst.afw.image import ApCorrMap
36from lsst.afw.math import BoundedField
37from lsst.geom import Point2D
39from .typing_helpers import SingleCellCoaddApCorrMap
41EMPTY_AP_CORR_MAP: SingleCellCoaddApCorrMap = frozendict()
42"""Default empty aperture correction map for a single cell coadd."""
45class CoaddApCorrMapStacker:
46 """Online aperture correction map for a cell-based coadd.
48 This class is responsible for implementing the logic to coadd the
49 aperture correction values and their uncertainties.
51 Parameters
52 ----------
53 evaluation_point : `~lsst.geom.Point2D`
54 The point at which the input aperture correction is evaluated.
55 do_coadd_inverse_ap_corr : `bool`, optional
56 If True, the inverse aperture correction is applied to the coadd.
58 Notes
59 -----
60 At least one of class variables are set dynamically the first time the
61 ``add`` method is called on any instance of this class. This behavior
62 is based on the practical assumption that all ``ApCorrMap`` instances will
63 have the same set of field names during the entire processing. A schema
64 is therefore not expected at the time of initialization.
65 """
67 def __init__(self, evaluation_point: Point2D, do_coadd_inverse_ap_corr: bool = True) -> None:
68 # Initialize frozen attributes.
69 self._evaluation_point = evaluation_point
70 self._do_coadd_inverse_ap_corr = do_coadd_inverse_ap_corr
72 # Initialize mutable attributes.
73 self._total_weight = 0.0
74 self._intermediate_ap_corr_map: dict[str, float] = {}
75 self._ap_corr_names: Iterable[str] = ()
76 # An iterable of algorithm names that have aperture correction values.
77 # This is set when the first time the add method is called on any
78 # instance.
80 def _setup_ap_corr_names(self, ap_corr_map: ApCorrMap) -> None:
81 """Set up the aperture correction name set.
83 Parameters
84 ----------
85 ap_corr_map : `~lsst.meas.base.ApCorrMap`
86 The aperture correction map to add.
88 Raises
89 ------
90 RuntimeError
91 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or
92 "_instFluxErr".
93 """
94 ap_corr_name_set = set()
95 for field_name in ap_corr_map:
96 algorithm_name, suffix = field_name.split("_instFlux")
97 if suffix not in ("", "Err"):
98 raise RuntimeError(f"Invalid field name {field_name} in aperture correction map.")
100 ap_corr_name_set.add(algorithm_name)
102 self._ap_corr_names = tuple(sorted(ap_corr_name_set))
104 def reset(self) -> None:
105 """Reset to the post-initialization state."""
106 self._total_weight = 0.0
107 self._intermediate_ap_corr_map = {}
108 self._ap_corr_names = ()
110 @property
111 def evaluation_point(self) -> Point2D:
112 """The point at which the aperture correction is evaluated."""
113 return self._evaluation_point
115 @property
116 def do_coadd_inverse_ap_corr(self) -> bool:
117 """If True, the inverse aperture correction is applied to the coadd."""
118 return self._do_coadd_inverse_ap_corr
120 @property
121 def ap_corr_names(self) -> Iterable[str]:
122 """Iterable of algorithm names that have aperture correction values."""
123 return self._ap_corr_names
125 @property
126 def total_weight(self) -> float:
127 """The total weight of the aperture correction map."""
128 return self._total_weight
130 def add(self, ap_corr_map: ApCorrMap, weight: float) -> None:
131 """Add an aperture correction map to the coadd.
133 Parameters
134 ----------
135 ap_corr_map : `~lsst.meas.base.ApCorrMap`
136 The aperture correction map to add.
137 weight : `float`
138 The weight to apply to the aperture correction map.
140 Raises
141 ------
142 RuntimeError
143 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or
144 "_instFluxErr".
145 ValueError
146 Raised if the aperture correction value or its error is missing.
147 """
148 if not self.ap_corr_names:
149 # Lazily initialize the aperture correction name set.
150 self._setup_ap_corr_names(ap_corr_map)
152 if not self._intermediate_ap_corr_map:
153 self._intermediate_ap_corr_map = dict.fromkeys(
154 [f"{algorithm_name}_instFlux" for algorithm_name in self.ap_corr_names]
155 + [f"{algorithm_name}_instFluxErr" for algorithm_name in self.ap_corr_names],
156 0.0,
157 )
159 # Accumulate the aperture correction values in a temporary dict.
160 # This is so that if we error out in the middle, we don't leave the
161 # aperture correction map in an inconsistent state.
162 temp_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0)
164 for algorithm_name in self.ap_corr_names:
165 # Accumulate the aperture correction values.
166 ap_corr_field: BoundedField | None
167 if (ap_corr_field := ap_corr_map.get(f"{algorithm_name}_instFlux")) is None:
168 ap_corr_value = np.nan
169 else:
170 ap_corr_value = ap_corr_field.evaluate(self.evaluation_point)
172 # Calculate the term to accumulate depending on the boolean.
173 if self.do_coadd_inverse_ap_corr:
174 if ap_corr_value == 0:
175 raise ValueError("This should not have happened. ap_corr_value is zero.")
176 else:
177 term = weight / ap_corr_value
178 else:
179 term = weight * ap_corr_value
181 temp_ap_corr_map[f"{algorithm_name}_instFlux"] = term
183 # Accumulate the aperture correction error values.
184 ap_corr_err_field: BoundedField | None
185 if (ap_corr_err_field := ap_corr_map.get(f"{algorithm_name}_instFluxErr")) is None:
186 ap_corr_err_value = np.nan
187 else:
188 ap_corr_err_value = ap_corr_err_field.evaluate(self.evaluation_point)
190 # Calculate the term to accumulate depending on the boolean.
191 if self.do_coadd_inverse_ap_corr:
192 term = (weight * ap_corr_err_value) ** 2 / ap_corr_value**4
193 else:
194 term = (weight * ap_corr_err_value) ** 2
196 temp_ap_corr_map[f"{algorithm_name}_instFluxErr"] = term
198 # Update the intermediate aperture correction map.
199 for key in self._intermediate_ap_corr_map:
200 self._intermediate_ap_corr_map[key] += temp_ap_corr_map[key]
202 # Add the weight to the total weight.
203 self._total_weight += weight
205 @property
206 def final_ap_corr_map(self) -> SingleCellCoaddApCorrMap:
207 """Final coadded aperture correction map.
209 This should be called after all aperture correction maps have been
210 added.
212 Raises
213 ------
214 RuntimeError
215 Raised if the total weight is zero.
216 """
217 if self.total_weight == 0 or not self.ap_corr_names:
218 raise RuntimeError("Cannot get an empty aperture correction map.")
220 final_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0)
222 # The transformation equation is different for the aperture correction
223 # values and their uncertainties and it also depends on whether we
224 # accumulate the aperture corrections or their inverse.
226 if self.do_coadd_inverse_ap_corr:
227 for algorithm_name in self.ap_corr_names:
228 if (
229 inverse_ap_corr_value := self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"]
230 ) > 0:
231 final_ap_corr_map[f"{algorithm_name}_instFlux"] = (
232 self.total_weight / inverse_ap_corr_value
233 )
234 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = (
235 final_ap_corr_map[f"{algorithm_name}_instFlux"] ** 2
236 * np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"])
237 / self.total_weight
238 )
239 else:
240 for algorithm_name in self.ap_corr_names:
241 final_ap_corr_map[f"{algorithm_name}_instFlux"] = (
242 self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"] / self.total_weight
243 )
244 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = (
245 np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"])
246 / self.total_weight
247 )
249 # Return the finalized (immutable) aperture correction map.
250 return frozendict(final_ap_corr_map)