Coverage for python/lsst/cell_coadds/_coadd_ap_corr_map.py: 23%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 01:00 -0700

1# This file is part of cell_coadds. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "CoaddApCorrMapStacker", 

26 "EMPTY_AP_CORR_MAP", 

27) 

28 

29 

30from collections.abc import Iterable 

31 

32import numpy as np 

33from frozendict import frozendict 

34 

35from lsst.afw.image import ApCorrMap 

36from lsst.afw.math import BoundedField 

37from lsst.geom import Point2D 

38 

39from .typing_helpers import SingleCellCoaddApCorrMap 

40 

41EMPTY_AP_CORR_MAP: SingleCellCoaddApCorrMap = frozendict() 

42"""Default empty aperture correction map for a single cell coadd.""" 

43 

44 

45class CoaddApCorrMapStacker: 

46 """Online aperture correction map for a cell-based coadd. 

47 

48 This class is responsible for implementing the logic to coadd the 

49 aperture correction values and their uncertainties. 

50 

51 Parameters 

52 ---------- 

53 evaluation_point : `~lsst.geom.Point2D` 

54 The point at which the input aperture correction is evaluated. 

55 do_coadd_inverse_ap_corr : `bool`, optional 

56 If True, the inverse aperture correction is applied to the coadd. 

57 

58 Notes 

59 ----- 

60 At least one of class variables are set dynamically the first time the 

61 ``add`` method is called on any instance of this class. This behavior 

62 is based on the practical assumption that all ``ApCorrMap`` instances will 

63 have the same set of field names during the entire processing. A schema 

64 is therefore not expected at the time of initialization. 

65 """ 

66 

67 def __init__(self, evaluation_point: Point2D, do_coadd_inverse_ap_corr: bool = True) -> None: 

68 # Initialize frozen attributes. 

69 self._evaluation_point = evaluation_point 

70 self._do_coadd_inverse_ap_corr = do_coadd_inverse_ap_corr 

71 

72 # Initialize mutable attributes. 

73 self._total_weight = 0.0 

74 self._intermediate_ap_corr_map: dict[str, float] = {} 

75 self._ap_corr_names: Iterable[str] = () 

76 # An iterable of algorithm names that have aperture correction values. 

77 # This is set when the first time the add method is called on any 

78 # instance. 

79 

80 def _setup_ap_corr_names(self, ap_corr_map: ApCorrMap) -> None: 

81 """Set up the aperture correction name set. 

82 

83 Parameters 

84 ---------- 

85 ap_corr_map : `~lsst.meas.base.ApCorrMap` 

86 The aperture correction map to add. 

87 

88 Raises 

89 ------ 

90 RuntimeError 

91 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or 

92 "_instFluxErr". 

93 """ 

94 ap_corr_name_set = set() 

95 for field_name in ap_corr_map: 

96 algorithm_name, suffix = field_name.split("_instFlux") 

97 if suffix not in ("", "Err"): 

98 raise RuntimeError(f"Invalid field name {field_name} in aperture correction map.") 

99 

100 ap_corr_name_set.add(algorithm_name) 

101 

102 self._ap_corr_names = tuple(sorted(ap_corr_name_set)) 

103 

104 def reset(self) -> None: 

105 """Reset to the post-initialization state.""" 

106 self._total_weight = 0.0 

107 self._intermediate_ap_corr_map = {} 

108 self._ap_corr_names = () 

109 

110 @property 

111 def evaluation_point(self) -> Point2D: 

112 """The point at which the aperture correction is evaluated.""" 

113 return self._evaluation_point 

114 

115 @property 

116 def do_coadd_inverse_ap_corr(self) -> bool: 

117 """If True, the inverse aperture correction is applied to the coadd.""" 

118 return self._do_coadd_inverse_ap_corr 

119 

120 @property 

121 def ap_corr_names(self) -> Iterable[str]: 

122 """Iterable of algorithm names that have aperture correction values.""" 

123 return self._ap_corr_names 

124 

125 @property 

126 def total_weight(self) -> float: 

127 """The total weight of the aperture correction map.""" 

128 return self._total_weight 

129 

130 def add(self, ap_corr_map: ApCorrMap, weight: float) -> None: 

131 """Add an aperture correction map to the coadd. 

132 

133 Parameters 

134 ---------- 

135 ap_corr_map : `~lsst.meas.base.ApCorrMap` 

136 The aperture correction map to add. 

137 weight : `float` 

138 The weight to apply to the aperture correction map. 

139 

140 Raises 

141 ------ 

142 RuntimeError 

143 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or 

144 "_instFluxErr". 

145 ValueError 

146 Raised if the aperture correction value or its error is missing. 

147 """ 

148 if not self.ap_corr_names: 

149 # Lazily initialize the aperture correction name set. 

150 self._setup_ap_corr_names(ap_corr_map) 

151 

152 if not self._intermediate_ap_corr_map: 

153 self._intermediate_ap_corr_map = dict.fromkeys( 

154 [f"{algorithm_name}_instFlux" for algorithm_name in self.ap_corr_names] 

155 + [f"{algorithm_name}_instFluxErr" for algorithm_name in self.ap_corr_names], 

156 0.0, 

157 ) 

158 

159 # Accumulate the aperture correction values in a temporary dict. 

160 # This is so that if we error out in the middle, we don't leave the 

161 # aperture correction map in an inconsistent state. 

162 temp_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0) 

163 

164 for algorithm_name in self.ap_corr_names: 

165 # Accumulate the aperture correction values. 

166 ap_corr_field: BoundedField | None 

167 if (ap_corr_field := ap_corr_map.get(f"{algorithm_name}_instFlux")) is None: 

168 ap_corr_value = np.nan 

169 else: 

170 ap_corr_value = ap_corr_field.evaluate(self.evaluation_point) 

171 

172 # Calculate the term to accumulate depending on the boolean. 

173 if self.do_coadd_inverse_ap_corr: 

174 if ap_corr_value == 0: 

175 raise ValueError("This should not have happened. ap_corr_value is zero.") 

176 else: 

177 term = weight / ap_corr_value 

178 else: 

179 term = weight * ap_corr_value 

180 

181 temp_ap_corr_map[f"{algorithm_name}_instFlux"] = term 

182 

183 # Accumulate the aperture correction error values. 

184 ap_corr_err_field: BoundedField | None 

185 if (ap_corr_err_field := ap_corr_map.get(f"{algorithm_name}_instFluxErr")) is None: 

186 ap_corr_err_value = np.nan 

187 else: 

188 ap_corr_err_value = ap_corr_err_field.evaluate(self.evaluation_point) 

189 

190 # Calculate the term to accumulate depending on the boolean. 

191 if self.do_coadd_inverse_ap_corr: 

192 term = (weight * ap_corr_err_value) ** 2 / ap_corr_value**4 

193 else: 

194 term = (weight * ap_corr_err_value) ** 2 

195 

196 temp_ap_corr_map[f"{algorithm_name}_instFluxErr"] = term 

197 

198 # Update the intermediate aperture correction map. 

199 for key in self._intermediate_ap_corr_map: 

200 self._intermediate_ap_corr_map[key] += temp_ap_corr_map[key] 

201 

202 # Add the weight to the total weight. 

203 self._total_weight += weight 

204 

205 @property 

206 def final_ap_corr_map(self) -> SingleCellCoaddApCorrMap: 

207 """Final coadded aperture correction map. 

208 

209 This should be called after all aperture correction maps have been 

210 added. 

211 

212 Raises 

213 ------ 

214 RuntimeError 

215 Raised if the total weight is zero. 

216 """ 

217 if self.total_weight == 0 or not self.ap_corr_names: 

218 raise RuntimeError("Cannot get an empty aperture correction map.") 

219 

220 final_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0) 

221 

222 # The transformation equation is different for the aperture correction 

223 # values and their uncertainties and it also depends on whether we 

224 # accumulate the aperture corrections or their inverse. 

225 

226 if self.do_coadd_inverse_ap_corr: 

227 for algorithm_name in self.ap_corr_names: 

228 if ( 

229 inverse_ap_corr_value := self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"] 

230 ) > 0: 

231 final_ap_corr_map[f"{algorithm_name}_instFlux"] = ( 

232 self.total_weight / inverse_ap_corr_value 

233 ) 

234 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = ( 

235 final_ap_corr_map[f"{algorithm_name}_instFlux"] ** 2 

236 * np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"]) 

237 / self.total_weight 

238 ) 

239 else: 

240 for algorithm_name in self.ap_corr_names: 

241 final_ap_corr_map[f"{algorithm_name}_instFlux"] = ( 

242 self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"] / self.total_weight 

243 ) 

244 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = ( 

245 np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"]) 

246 / self.total_weight 

247 ) 

248 

249 # Return the finalized (immutable) aperture correction map. 

250 return frozendict(final_ap_corr_map)