Coverage for python/lsst/images/cells/_provenance.py: 25%

145 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 08:08 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("CoaddProvenance", "CoaddProvenanceSerializationModel") 

15 

16from collections.abc import Iterable 

17from typing import TYPE_CHECKING, Any, ClassVar 

18 

19import astropy.table 

20import astropy.units as u 

21import numpy as np 

22import pydantic 

23 

24from .._cell_grid import CellIJ 

25from .._polygon import Polygon 

26from ..serialization import ArchiveTree, InputArchive, InvalidParameterError, OutputArchive, TableModel 

27 

28if TYPE_CHECKING: 

29 try: 

30 from lsst.cell_coadds import CoaddInputs as LegacyCoaddInputs 

31 from lsst.cell_coadds import MultipleCellCoadd, ObservationIdentifiers 

32 from lsst.skymap import Index2D 

33 except ImportError: 

34 type Index2D = Any # type: ignore[no-redef] 

35 type LegacyCoaddInputs = Any # type: ignore[no-redef] 

36 type MultipleCellCoadd = Any # type: ignore[no-redef] 

37 type ObservationIdentifiers = Any # type: ignore[no-redef] 

38 

39 

40class CoaddProvenance: 

41 """A pair of tables that record the inputs to a cell-based coadd. 

42 

43 Parameters 

44 ---------- 

45 inputs 

46 A table of {visit, detector} combinations that contribute to any cell 

47 in the coadd. 

48 contributions 

49 A table of {visit, detector, cell} combinations that describe how an 

50 observation contributed to a cell. 

51 

52 Notes 

53 ----- 

54 This object can represent the provenance of a whole patch, a single cell, 

55 or anything in between. In the single-cell case, the ``inputs`` and 

56 ``contributions`` tables have the same number of rows (but may not be 

57 ordered the same way!). 

58 """ 

59 

60 def __init__(self, inputs: astropy.table.Table, contributions: astropy.table.Table): 

61 self._inputs = inputs 

62 self._contributions = contributions 

63 

64 _INPUT_TABLE_COLUMNS: ClassVar[list[tuple[str, type, str]]] = [ 

65 ("instrument", np.object_, "Name of the instrument."), 

66 ("visit", np.uint64, "ID of the visit."), 

67 ("detector", np.uint16, "ID of the detector."), 

68 ("physical_filter", np.object_, "Full name of the bandpass filter."), 

69 ("day_obs", np.uint32, "Observation night as a YYYYMMDD integer."), 

70 ( 

71 "polygon", 

72 np.object_, 

73 ( 

74 "Polygon that approximates the overlap of the observation and the coadd patch, " 

75 "in coadd coordinates." 

76 ), 

77 ), 

78 ] 

79 

80 _CONTRIBUTION_TABLE_COLUMNS: ClassVar[list[tuple[str, type, str, u.UnitBase | None]]] = [ 

81 ("cell_i", np.uint16, "Y-axis index of the cell within the patch.", None), 

82 ("cell_j", np.uint16, "X-axis index of the cell within the patch.", None), 

83 ("instrument", np.object_, "Name of the instrument.", None), 

84 ("visit", np.uint64, "ID of the visit.", None), 

85 ("detector", np.uint16, "ID of the detector.", None), 

86 ("overlaps_center", np.bool_, "Whether a this observation overlaps the center of the cell.", None), 

87 ("overlap_fraction", np.float64, "Fraction of the cell that is covered by the overlap region.", None), 

88 ("weight", np.float64, "Weight to be used for this input in this cell.", None), 

89 ("psf_shape_xx", np.float64, "Second order moments of the PSF.", u.pix**2), 

90 ("psf_shape_yy", np.float64, "Second order moments of the PSF.", u.pix**2), 

91 ("psf_shape_xy", np.float64, "Second order moments of the PSF.", u.pix**2), 

92 ( 

93 "psf_shape_flag", 

94 np.bool_, 

95 "Flag indicating whether the PSF shape measurement was successful.", 

96 None, 

97 ), 

98 ] 

99 

100 @classmethod 

101 def make_empty_input_table(cls, n_rows: int) -> astropy.table.Table: 

102 """Make an empty `inputs` table with a set number of rows.""" 

103 return astropy.table.Table( 

104 [ 

105 astropy.table.Column(name=name, length=n_rows, dtype=dtype, description=description) 

106 for name, dtype, description in cls._INPUT_TABLE_COLUMNS 

107 ] 

108 ) 

109 

110 @classmethod 

111 def make_empty_contribution_table(cls, n_rows: int) -> astropy.table.Table: 

112 """Make an empty `contributions` table with a set number of rows.""" 

113 return astropy.table.Table( 

114 [ 

115 astropy.table.Column( 

116 name=name, length=n_rows, dtype=dtype, description=description, unit=unit 

117 ) 

118 for name, dtype, description, unit in cls._CONTRIBUTION_TABLE_COLUMNS 

119 ] 

120 ) 

121 

122 @property 

123 def inputs(self) -> astropy.table.Table: 

124 """A table of {visit, detector} combinations that contribute to any 

125 cell in the coadd. 

126 """ 

127 return self._inputs 

128 

129 @property 

130 def contributions(self) -> astropy.table.Table: 

131 """A table of {visit, detector, cell} combinations that describe how an 

132 observation contributed to a cell. 

133 """ 

134 return self._contributions 

135 

136 def __getitem__(self, cell: CellIJ) -> CoaddProvenance: 

137 return self.subset([cell]) 

138 

139 def subset(self, cells: Iterable[CellIJ]) -> CoaddProvenance: 

140 """Return a new provenance object with just the given cells.""" 

141 cells_to_keep = astropy.table.Table( 

142 rows=[(index.i, index.j) for index in cells], 

143 names=["cell_i", "cell_j"], 

144 dtype=[np.uint16, np.uint16], 

145 ) 

146 contributions = astropy.table.join(self._contributions, cells_to_keep) 

147 assert contributions.columns.keys() == {name for name, _, _, _ in self._CONTRIBUTION_TABLE_COLUMNS} 

148 inputs = astropy.table.join(contributions["instrument", "visit", "detector"], self._inputs) 

149 assert inputs.columns.keys() == {name for name, _, _ in self._INPUT_TABLE_COLUMNS} 

150 return CoaddProvenance(inputs=inputs, contributions=contributions) 

151 

152 def serialize(self, archive: OutputArchive[Any]) -> CoaddProvenanceSerializationModel: 

153 """Serialize the provenance to an output archive. 

154 

155 Parameters 

156 ---------- 

157 archive 

158 Archive to write to. 

159 """ 

160 inputs = self._inputs.copy(copy_data=False) 

161 contributions = self._contributions.copy(copy_data=False) 

162 instrument = CoaddProvenanceSerializationModel._fix_str_for_serialization( 

163 "instrument", inputs, contributions 

164 ) 

165 physical_filter = CoaddProvenanceSerializationModel._fix_str_for_serialization( 

166 "physical_filter", inputs 

167 ) 

168 CoaddProvenanceSerializationModel._fix_polygon_for_serialization(inputs) 

169 inputs_model = archive.add_table(inputs, name="inputs") 

170 contributions_model = archive.add_table(contributions, name="contributions") 

171 return CoaddProvenanceSerializationModel( 

172 instrument=instrument, 

173 physical_filter=physical_filter, 

174 inputs=inputs_model, 

175 contributions=contributions_model, 

176 ) 

177 

178 @staticmethod 

179 def from_legacy(legacy_cell_coadd: MultipleCellCoadd) -> CoaddProvenance: 

180 """Extract provenance from a legacy 

181 `lsst.cell_coadds.MultipleCellCoadd` object. 

182 """ 

183 inputs = CoaddProvenance.make_empty_input_table(len(legacy_cell_coadd.common.visit_polygons)) 

184 for n, (legacy_identifiers, legacy_polygon) in enumerate( 

185 legacy_cell_coadd.common.visit_polygons.items() 

186 ): 

187 inputs["instrument"][n] = legacy_identifiers.instrument 

188 inputs["visit"][n] = legacy_identifiers.visit 

189 inputs["detector"][n] = legacy_identifiers.detector 

190 inputs["physical_filter"][n] = legacy_identifiers.physical_filter 

191 inputs["day_obs"][n] = legacy_identifiers.day_obs 

192 inputs["polygon"][n] = Polygon.from_legacy(legacy_polygon) 

193 n_contributions = 0 

194 for legacy_cell in legacy_cell_coadd.cells.values(): 

195 n_contributions += len(legacy_cell.inputs) 

196 contributions = CoaddProvenance.make_empty_contribution_table(n_contributions) 

197 n = 0 

198 for legacy_cell in legacy_cell_coadd.cells.values(): 

199 for legacy_identifiers, legacy_inputs in legacy_cell.inputs.items(): 

200 contributions["cell_i"][n] = legacy_cell.identifiers.cell.y 

201 contributions["cell_j"][n] = legacy_cell.identifiers.cell.x 

202 contributions["instrument"][n] = legacy_identifiers.instrument 

203 contributions["visit"][n] = legacy_identifiers.visit 

204 contributions["detector"][n] = legacy_identifiers.detector 

205 contributions["overlaps_center"][n] = legacy_inputs.overlaps_center 

206 contributions["overlap_fraction"][n] = legacy_inputs.overlap_fraction 

207 contributions["weight"][n] = legacy_inputs.weight 

208 contributions["psf_shape_xx"][n] = legacy_inputs.psf_shape.getIxx() 

209 contributions["psf_shape_yy"][n] = legacy_inputs.psf_shape.getIyy() 

210 contributions["psf_shape_xy"][n] = legacy_inputs.psf_shape.getIxy() 

211 contributions["psf_shape_flag"][n] = legacy_inputs.psf_shape_flag 

212 n += 1 

213 return CoaddProvenance(inputs=inputs, contributions=contributions) 

214 

215 

216class CoaddProvenanceSerializationModel(ArchiveTree): 

217 """A Pydantic model used to represent a serialized `CoaddProvenance`. 

218 

219 Notes 

220 ----- 

221 We can't rewrite the Astropy tables directly into the archive (e.g. as 

222 FITS binary tables for a FITS archive), because: 

223 

224 - `str` columns are a huge pain in both Numpy and FITS; 

225 - the polygon columns need to be rewritten as array-valued columns. 

226 

227 To deal with the string columns (``instrument`` and ``physical_filter``) 

228 we do dictionary compression: we map each distinct value of those columns 

229 to an integer, and then we save that mapping to the model while saving 

230 an integer version of that column in the table. But if there is actually 

231 only one value in that column (the most common case by far) we just drop 

232 the column and store that value directly in the model. 

233 """ 

234 

235 SCHEMA_NAME: ClassVar[str] = "coadd_provenance" 

236 SCHEMA_VERSION: ClassVar[str] = "1.0.0" 

237 MIN_READ_VERSION: ClassVar[int] = 1 

238 

239 instrument: str | dict[str, int] = pydantic.Field( 

240 description=( 

241 "Instrument name for all inputs to this coadd, or a mapping from " 

242 "instrument name to the integer used in its place in the tables." 

243 ) 

244 ) 

245 physical_filter: str | dict[str, int] = pydantic.Field( 

246 description="Physical filter name for all inputs to this coadd." 

247 ) 

248 inputs: TableModel = pydantic.Field(description="Table of all inputs to the coadd.") 

249 contributions: TableModel = pydantic.Field(description="Table of per-cell contributions to the coadd.") 

250 

251 def deserialize(self, archive: InputArchive[Any], **kwargs: Any) -> CoaddProvenance: 

252 """Deserialize a provenance from an input archive. 

253 

254 Parameters 

255 ---------- 

256 archive 

257 Archive to read from. 

258 

259 Notes 

260 ----- 

261 While `CoaddProvenance.subset` can be used to filter provenance 

262 information down to just certain cells, there is no advantage to be 

263 had from doing this during deserialization (the table data is not 

264 ordered by cell, and hence there's read-slicing we can do). 

265 """ 

266 if kwargs: 

267 raise InvalidParameterError(f"Unrecognized parameters for CoaddProvenance: {set(kwargs.keys())}.") 

268 inputs = archive.get_table(self.inputs) 

269 contributions = archive.get_table(self.contributions) 

270 CoaddProvenanceSerializationModel._fix_str_for_deserialization( 

271 "instrument", self.instrument, inputs, contributions 

272 ) 

273 CoaddProvenanceSerializationModel._fix_str_for_deserialization( 

274 "physical_filter", self.physical_filter, inputs 

275 ) 

276 CoaddProvenanceSerializationModel._fix_polygon_for_deserialization(inputs) 

277 for name, _, description in CoaddProvenance._INPUT_TABLE_COLUMNS: 

278 inputs.columns[name].description = description 

279 for name, _, description, unit in CoaddProvenance._CONTRIBUTION_TABLE_COLUMNS: 

280 contributions.columns[name].description = description 

281 contributions.columns[name].unit = unit 

282 return CoaddProvenance(inputs=inputs, contributions=contributions) 

283 

284 @staticmethod 

285 def _fix_str_for_serialization(column: str, *tables: astropy.table.Table) -> str | dict[str, int]: 

286 """Rewrite a string column as an integer column or drop it. 

287 

288 Parameters 

289 ---------- 

290 column 

291 Name of the column to rewrite. 

292 *tables 

293 One or more astropy tables to rewrite. The first table is assumed 

294 to have all values for this column that might appear in any other 

295 tables. 

296 

297 Returns 

298 ------- 

299 `str` | `dict` [`str`, `int`] 

300 If there is only one unique value for this column in the first 

301 table, that value (and the column will have been dropped from 

302 all givne tables). If the tables are empty, the column is 

303 dropped and an empty `dict` is returned. In all other cases the 

304 given column is replaced with an integer column in all given 

305 tables and the mapping from strings to integers is returned. 

306 """ 

307 result: str | dict[str, int] = {name: n for n, name in enumerate(sorted(set(tables[0][column])))} 

308 match len(result): 

309 case 0: 

310 pass 

311 case 1: 

312 (result,) = result.keys() # type: ignore[union-attr] 

313 case _: 

314 for table in tables: 

315 table.columns[column] = astropy.table.Column( 

316 data=[result[k] for k in table.columns[column]], 

317 name=column, 

318 dtype=np.uint8, 

319 description=f"Integer mapped to {column} name.", 

320 ) 

321 return result 

322 # If we didn't remap to an integer (case 0 and 1 above), delete the 

323 # column. 

324 for table in tables: 

325 del table.columns[column] 

326 return result 

327 

328 @staticmethod 

329 def _fix_str_for_deserialization( 

330 column: str, value: str | dict[str, int], *tables: astropy.table.Table 

331 ) -> None: 

332 """Rewrite an integer column back to a string one. 

333 

334 Parameters 

335 ---------- 

336 column 

337 Name of the column to rewrite. 

338 value 

339 Value or mapping of values returned by 

340 `_fix_str_for_serialization`. 

341 tables 

342 Tables to rewrite this column in. 

343 """ 

344 match value: 

345 case str(): 

346 for table in tables: 

347 table.columns[column] = astropy.table.Column([value] * len(table), dtype=object) 

348 case dict(): 

349 mapping = {v: k for k, v in value.items()} 

350 for table in tables: 

351 table.columns[column] = astropy.table.Column( 

352 [mapping[k] for k in table[column]], dtype=object 

353 ) 

354 

355 @staticmethod 

356 def _fix_polygon_for_serialization(inputs: astropy.table.Table) -> None: 

357 """Rewrite a polygon `object` column as a pair of array-valued columns 

358 and an array-size column. 

359 

360 Parameters 

361 ---------- 

362 inputs 

363 A copy of the in-memory coadd inputs table to modify in-place into 

364 its serialization form. 

365 """ 

366 max_n_vertices = max(p.n_vertices for p in inputs["polygon"]) 

367 inputs["n_vertices"] = astropy.table.Column( 

368 [p.n_vertices for p in inputs["polygon"]], 

369 name="n_vertices", 

370 dtype=np.uint8, 

371 description="Number of polygon vertices.", 

372 ) 

373 inputs["x_vertices"] = astropy.table.Column( 

374 name="x_vertices", 

375 dtype=np.float64, 

376 length=len(inputs), 

377 shape=(max_n_vertices,), 

378 description="X coordinates of polygon vertices, in tract coordinates.", 

379 ) 

380 inputs["x_vertices"][:, :] = np.nan 

381 inputs["y_vertices"] = astropy.table.Column( 

382 name="y_vertices", 

383 dtype=np.float64, 

384 length=len(inputs), 

385 shape=(max_n_vertices,), 

386 description="Y coordinates of polygon vertices, in tract coordinates.", 

387 ) 

388 inputs["y_vertices"][:, :] = np.nan 

389 for i, polygon in enumerate(inputs["polygon"]): 

390 inputs["n_vertices"][i] = polygon.n_vertices 

391 inputs["x_vertices"][i][: polygon.n_vertices] = polygon.x_vertices 

392 inputs["y_vertices"][i][: polygon.n_vertices] = polygon.y_vertices 

393 del inputs["polygon"] 

394 

395 @staticmethod 

396 def _fix_polygon_for_deserialization(inputs: astropy.table.Table) -> None: 

397 """Rewrite a a pair of array-valued columns and an array-size column 

398 into a polygon `object` column. 

399 

400 Parameters 

401 ---------- 

402 inputs 

403 The serialized version of the coadd inputs table, to be modified 

404 in-place into its in-memory form. 

405 """ 

406 polygons = [ 

407 Polygon(x_vertices=x_vertices[:n_vertices], y_vertices=y_vertices[:n_vertices]) 

408 for n_vertices, x_vertices, y_vertices in zip( 

409 inputs["n_vertices"], inputs["x_vertices"], inputs["y_vertices"] 

410 ) 

411 ] 

412 del inputs["n_vertices"] 

413 del inputs["x_vertices"] 

414 del inputs["y_vertices"] 

415 inputs["polygon"] = astropy.table.Column(polygons, name="polygon", dtype=np.object_)