Coverage for python/lsst/images/tests/_checks.py: 10%

429 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 08:10 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "arrays_to_legacy_points", 

16 "assert_cell_coadds_equal", 

17 "assert_close", 

18 "assert_equal_allow_nan", 

19 "assert_images_equal", 

20 "assert_masked_images_equal", 

21 "assert_masks_equal", 

22 "assert_projections_equal", 

23 "assert_psfs_equal", 

24 "assert_visit_images_equal", 

25 "check_archive_tree_class_invariants", 

26 "check_astropy_wcs_interface", 

27 "check_projection", 

28 "check_transform", 

29 "compare_amplifier_to_legacy", 

30 "compare_aperture_corrections_to_legacy", 

31 "compare_cell_coadd_to_legacy", 

32 "compare_detector_to_legacy", 

33 "compare_field_to_legacy", 

34 "compare_image_to_legacy", 

35 "compare_mask_to_legacy", 

36 "compare_masked_image_to_legacy", 

37 "compare_observation_summary_stats_to_legacy", 

38 "compare_photo_calib_to_legacy", 

39 "compare_projection_to_legacy_wcs", 

40 "compare_psf_to_legacy", 

41 "compare_visit_image_to_legacy", 

42 "iter_concrete_archive_tree_subclasses", 

43 "legacy_coords_to_astropy", 

44 "legacy_points_to_xy_array", 

45) 

46 

47import dataclasses 

48import math 

49import unittest 

50from collections.abc import Iterator, Mapping 

51from typing import TYPE_CHECKING, Any, Literal, cast 

52 

53import astropy.units as u 

54import astropy.wcs.wcsapi 

55import numpy as np 

56from astropy.coordinates import SkyCoord 

57 

58from .._geom import XY, YX, Box 

59from .._image import Image 

60from .._mask import Mask, MaskPlane 

61from .._masked_image import MaskedImage 

62from .._observation_summary_stats import ObservationSummaryStats 

63from .._transforms import DetectorFrame, Frame, Projection, SkyFrame, TractFrame, Transform 

64from .._visit_image import VisitImage 

65from ..aperture_corrections import ApertureCorrectionMap 

66from ..cameras import Amplifier, Detector, DetectorType, ReadoutCorner 

67from ..cells import CellCoadd, CellIJ, CoaddProvenance 

68from ..fields import BaseField, ChebyshevField 

69from ..psfs import PointSpreadFunction 

70from ..serialization import ArchiveTree 

71 

72if TYPE_CHECKING: 

73 try: 

74 from lsst.cell_coadds import MultipleCellCoadd 

75 except ImportError: 

76 type MultipleCellCoadd = Any # type: ignore[no-redef] 

77 try: 

78 from lsst.afw.image import PhotoCalib as LegacyPhotoCalib 

79 except ImportError: 

80 type LegacyPhotoCalib = Any # type: ignore[no-redef] 

81 

82 

83def assert_close( 

84 tc: unittest.TestCase, 

85 a: np.ndarray | u.Quantity | float, 

86 b: np.ndarray | u.Quantity | float, 

87 **kwargs: Any, 

88) -> None: 

89 """Test that two arrays, floats, or quantities are almost equal. 

90 

91 Parameters 

92 ---------- 

93 tc 

94 Test case object with assert methods to use. 

95 a 

96 Array, float, or quantity to compare. 

97 b 

98 Array, float, or quantity to compare. 

99 **kwargs 

100 Forwarded to `astropy.units.allclose`. 

101 """ 

102 tc.assertTrue(u.allclose(a, b, **kwargs), msg=f"{a} != {b}") 

103 

104 

105def assert_equal_allow_nan(tc: unittest.TestCase, a: float, b: float) -> None: 

106 """Test that two floating point values are equal, with nan == nan.""" 

107 try: 

108 tc.assertEqual(a, b) 

109 except AssertionError: 

110 if not (math.isnan(a) and math.isnan(b)): 

111 raise 

112 

113 

114def assert_images_equal( 

115 tc: unittest.TestCase, 

116 a: Image, 

117 b: Image, 

118 *, 

119 rtol: float = 0.0, 

120 atol: float = 0.0, 

121 expect_view: bool | Literal["array"] | None = None, 

122) -> None: 

123 """Assert that two images are equal or nearly equal.""" 

124 tc.assertEqual(a.bbox, b.bbox) 

125 tc.assertEqual(a.unit, b.unit) 

126 assert_projections_equal(tc, a.projection, b.projection) 

127 if expect_view is not None: 

128 tc.assertEqual(np.may_share_memory(a.array, b.array), bool(expect_view)) 

129 if expect_view == "array": 

130 tc.assertEqual(a.metadata, b.metadata) 

131 else: 

132 tc.assertEqual(a.metadata is b.metadata, expect_view) 

133 if not expect_view: 

134 assert_close(tc, a.array, b.array, atol=atol, rtol=rtol) 

135 tc.assertEqual(a.metadata, b.metadata) 

136 

137 

138def assert_masks_equal(tc: unittest.TestCase, a: Mask, b: Mask) -> None: 

139 """Assert that two masks are equal or nearly equal.""" 

140 tc.assertEqual(a.bbox, b.bbox) 

141 tc.assertEqual(a.schema, b.schema) 

142 tc.assertEqual(a.metadata, b.metadata) 

143 assert_projections_equal(tc, a.projection, b.projection) 

144 np.testing.assert_array_equal(a.array, b.array) 

145 

146 

147def assert_masked_images_equal( 

148 tc: unittest.TestCase, 

149 a: MaskedImage, 

150 b: MaskedImage, 

151 *, 

152 rtol: float = 0.0, 

153 atol: float = 0.0, 

154 expect_view: bool | None = None, 

155) -> None: 

156 """Assert that two masked images are equal or nearly equal.""" 

157 tc.assertEqual(a.metadata, b.metadata) 

158 assert_projections_equal(tc, a.projection, b.projection) 

159 assert_images_equal(tc, a.image, b.image, rtol=rtol, atol=atol, expect_view=expect_view) 

160 assert_masks_equal(tc, a.mask, b.mask) 

161 assert_images_equal(tc, a.variance, b.variance, rtol=rtol, atol=atol, expect_view=expect_view) 

162 

163 

164def assert_psfs_equal( 

165 tc: unittest.TestCase, 

166 psf1: PointSpreadFunction, 

167 psf2: PointSpreadFunction, 

168 points: YX[np.ndarray] | XY[np.ndarray] | None = None, 

169) -> int: 

170 """Compare two PSF objets. 

171 

172 Parameters 

173 ---------- 

174 tc 

175 Test case object with assert methods to use. 

176 psf1 

177 Point-spread function to test. 

178 psf2 

179 The other point-spread function to test. 

180 points 

181 Points to evaluate the PSFs at. If not provided, the intersection of 

182 the PSF bounding boxes are used to generate points on a grid. 

183 

184 Returns 

185 ------- 

186 `int` 

187 The number of points actually tested. 

188 """ 

189 if points is None: 

190 points = psf1.bounds.bbox.intersection(psf2.bounds.bbox).meshgrid(3).map(np.ravel) 

191 

192 tc.assertEqual(psf1.kernel_bbox, psf2.kernel_bbox) 

193 

194 n_points_tested: int = 0 

195 for x, y in zip(points.x, points.y): 

196 # The two PSFs must agree on which points fall inside their input 

197 # domain. Querying ``.contains`` directly (rather than relying on 

198 # ``compute_kernel_image`` to raise) makes this test tolerant of 

199 # implementations that do not raise on out-of-domain points -- in 

200 # particular ``CellPointSpreadFunction``, where evaluating in a 

201 # missing cell does not always raise ``BoundsError``. 

202 contains1 = psf1.bounds.contains(x=x, y=y) 

203 contains2 = psf2.bounds.contains(x=x, y=y) 

204 tc.assertEqual( 

205 contains1, 

206 contains2, 

207 f"PSFs disagree on whether ({x}, {y}) is in-bounds: psf1={contains1}, psf2={contains2}", 

208 ) 

209 if not contains1: 

210 continue 

211 tc.assertEqual(psf1.compute_kernel_image(x=x, y=y), psf2.compute_kernel_image(x=x, y=y)) 

212 tc.assertEqual(psf1.compute_stellar_bbox(x=x, y=y), psf2.compute_stellar_bbox(x=x, y=y)) 

213 tc.assertEqual(psf1.compute_stellar_image(x=x, y=y), psf2.compute_stellar_image(x=x, y=y)) 

214 n_points_tested += 1 

215 return n_points_tested 

216 

217 

218def assert_visit_images_equal( 

219 tc: unittest.TestCase, 

220 a: VisitImage, 

221 b: VisitImage, 

222 *, 

223 expect_view: bool | None = None, 

224) -> None: 

225 """Assert that two `.VisitImage` instances carry the same persistent state. 

226 

227 Extends `assert_masked_images_equal` with the VisitImage-specific 

228 attributes (PSF, filter, observation info, detector, aperture 

229 corrections, photometric scaling, backgrounds, polygon bounds, 

230 summary stats) so a round-trip check on a `.VisitImage` does not 

231 silently miss differences in any of them. 

232 """ 

233 assert_masked_images_equal(tc, a, b, expect_view=expect_view) 

234 tc.assertEqual(a.summary_stats, b.summary_stats) 

235 tc.assertEqual(a.physical_filter, b.physical_filter) 

236 tc.assertEqual(a.band, b.band) 

237 tc.assertEqual(a.obs_info, b.obs_info) 

238 tc.assertEqual(a.detector, b.detector) 

239 tc.assertEqual(dict(a.aperture_corrections), dict(b.aperture_corrections)) 

240 tc.assertEqual(a.photometric_scaling, b.photometric_scaling) 

241 tc.assertEqual(dict(a.backgrounds), dict(b.backgrounds)) 

242 tc.assertEqual(a.backgrounds.subtracted, b.backgrounds.subtracted) 

243 tc.assertEqual(a.bounds, b.bounds) 

244 assert_psfs_equal(tc, a.psf, b.psf) 

245 

246 

247def assert_cell_coadds_equal( 

248 tc: unittest.TestCase, 

249 a: CellCoadd, 

250 b: CellCoadd, 

251 *, 

252 expect_view: bool | None = None, 

253) -> None: 

254 """Assert that two `.CellCoadd` instances carry the same persistent state. 

255 

256 Extends the masked-image-style equality check with the 

257 CellCoadd-specific attributes (PSF, cell grid, missing cells, 

258 backgrounds, patch/tract, band) so a round-trip check does not 

259 silently miss differences in any of them. 

260 """ 

261 assert_masked_images_equal(tc, a, b, expect_view=expect_view) 

262 tc.assertEqual(a.band, b.band) 

263 tc.assertEqual(a.patch, b.patch) 

264 tc.assertEqual(a.tract, b.tract) 

265 tc.assertEqual(a.grid, b.grid) 

266 tc.assertEqual(a.bounds.missing, b.bounds.missing) 

267 tc.assertEqual(dict(a.backgrounds), dict(b.backgrounds)) 

268 tc.assertEqual(a.backgrounds.subtracted, b.backgrounds.subtracted) 

269 assert_psfs_equal(tc, a.psf, b.psf) 

270 

271 

272def compare_image_to_legacy( 

273 tc: unittest.TestCase, image: Image, legacy_image: Any, expect_view: bool | None = None 

274) -> None: 

275 """Compare an `.Image` object to a legacy `lsst.afw.image.Image` object.""" 

276 tc.assertEqual(image.bbox, Box.from_legacy(legacy_image.getBBox())) 

277 if expect_view is not None: 

278 tc.assertEqual(np.may_share_memory(image.array, legacy_image.array), expect_view) 

279 if not expect_view: 

280 np.testing.assert_array_equal(image.array, legacy_image.array) 

281 

282 

283def compare_mask_to_legacy( 

284 tc: unittest.TestCase, mask: Mask, legacy_mask: Any, plane_map: Mapping[str, MaskPlane] | None = None 

285) -> None: 

286 """Compare a `.Mask` object to a legacy `lsst.afw.image.Mask` object.""" 

287 tc.assertEqual(mask.bbox, Box.from_legacy(legacy_mask.getBBox())) 

288 if plane_map is None: 

289 plane_map = {plane.name: plane for plane in mask.schema if plane is not None} 

290 for old_name, new_plane in plane_map.items(): 

291 np.testing.assert_array_equal( 

292 (legacy_mask.array & legacy_mask.getPlaneBitMask(old_name)).astype(bool), 

293 mask.get(new_plane.name), 

294 ) 

295 

296 

297def compare_masked_image_to_legacy( 

298 tc: unittest.TestCase, 

299 masked_image: MaskedImage, 

300 legacy_masked_image: Any, 

301 *, 

302 plane_map: Mapping[str, MaskPlane] | None = None, 

303 expect_view: bool | None = None, 

304 alternates: Mapping[str, Any] | None = None, 

305) -> None: 

306 """Compare a `.MaskedImage` object to a legacy `lsst.afw.image.MaskedImage` 

307 object. 

308 

309 Parameters 

310 ---------- 

311 tc 

312 Test case to use for asserts. 

313 masked_image 

314 New image to test. 

315 legacy_masked_image 

316 Legacy image to test against. 

317 plane_map 

318 Mapping between new and legacy mask planes. 

319 expect_view 

320 Whether to test that the image and variance arrays do or do not share 

321 memory. 

322 alternates 

323 A mapping of other versions of one or more (new) components to also 

324 check against the legacy versions of those components. 

325 """ 

326 compare_image_to_legacy(tc, masked_image.image, legacy_masked_image.getImage(), expect_view=expect_view) 

327 compare_mask_to_legacy(tc, masked_image.mask, legacy_masked_image.getMask(), plane_map=plane_map) 

328 compare_image_to_legacy( 

329 tc, masked_image.variance, legacy_masked_image.getVariance(), expect_view=expect_view 

330 ) 

331 if alternates: 

332 if image := alternates.get("image"): 

333 compare_image_to_legacy(tc, image, legacy_masked_image.getImage(), expect_view=expect_view) 

334 if mask := alternates.get("mask"): 

335 compare_mask_to_legacy(tc, mask, legacy_masked_image.getMask(), plane_map=plane_map) 

336 if variance := alternates.get("variance"): 

337 compare_image_to_legacy(tc, variance, legacy_masked_image.getVariance(), expect_view=expect_view) 

338 

339 

340def compare_visit_image_to_legacy( 

341 tc: unittest.TestCase, 

342 visit_image: VisitImage, 

343 legacy_exposure: Any, 

344 *, 

345 plane_map: Mapping[str, MaskPlane] | None = None, 

346 expect_view: bool | None = None, 

347 instrument: str, 

348 visit: int, 

349 detector: int, 

350 applied_legacy_photo_calib: LegacyPhotoCalib | None = None, 

351 alternates: Mapping[str, Any] | None = None, 

352) -> None: 

353 """Compare a `.VisitImage` object to a legacy `lsst.afw.image.Exposure` 

354 object. 

355 

356 Parameters 

357 ---------- 

358 tc 

359 Test case to use for asserts. 

360 visit_image 

361 New image to test. 

362 legacy_exposure 

363 Legacy image to test against. 

364 plane_map 

365 Mapping between new and legacy mask planes. 

366 expect_view 

367 Whether to test that the image and variance arrays do or do not share 

368 memory. 

369 instrument 

370 Expected instrument name. 

371 visit 

372 Expected visit ID. 

373 detector 

374 Expected detector ID. 

375 alternates 

376 A mapping of other versions of one or more (new) components to also 

377 check against the legacy versions of those components. 

378 """ 

379 compare_masked_image_to_legacy( 

380 tc, 

381 visit_image, 

382 legacy_exposure.getMaskedImage(), 

383 plane_map=plane_map, 

384 expect_view=expect_view, 

385 alternates=alternates, 

386 ) 

387 detector_bbox = Box.from_legacy(legacy_exposure.getDetector().getBBox()) 

388 compare_projection_to_legacy_wcs( 

389 tc, 

390 visit_image.projection, 

391 legacy_exposure.getWcs(), 

392 DetectorFrame(instrument=instrument, visit=visit, detector=detector, bbox=detector_bbox), 

393 visit_image.bbox, 

394 ) 

395 tc.assertIs(visit_image.projection, visit_image.mask.projection) 

396 tc.assertIs(visit_image.projection, visit_image.variance.projection) 

397 compare_psf_to_legacy(tc, visit_image.psf, legacy_exposure.getPsf()) 

398 compare_observation_summary_stats_to_legacy( 

399 tc, visit_image.summary_stats, legacy_exposure.info.getSummaryStats() 

400 ) 

401 compare_detector_to_legacy(tc, visit_image.detector, legacy_exposure.getDetector(), is_raw_assembled=True) 

402 # Make a tiny box for Field comparisons that need to make arrays; that can 

403 # get expensive otherwisre. 

404 tiny_bbox = detector_bbox.local[2:4, 3:6] 

405 compare_aperture_corrections_to_legacy( 

406 tc, visit_image.aperture_corrections, legacy_exposure.info.getApCorrMap(), tiny_bbox 

407 ) 

408 compare_photo_calib_to_legacy( 

409 tc, 

410 visit_image.photometric_scaling, 

411 legacy_exposure.info.getPhotoCalib(), 

412 applied_legacy_photo_calib=applied_legacy_photo_calib, 

413 subimage_bbox=tiny_bbox, 

414 ) 

415 if alternates: 

416 if (bbox := alternates.get("bbox")) is not None: 

417 tc.assertEqual(bbox, visit_image.bbox) 

418 if projection := alternates.get("projection"): 

419 compare_projection_to_legacy_wcs( 

420 tc, 

421 projection, 

422 legacy_exposure.getWcs(), 

423 DetectorFrame(instrument=instrument, visit=visit, detector=detector, bbox=detector_bbox), 

424 visit_image.bbox, 

425 ) 

426 if psf := alternates.get("psf"): 

427 compare_psf_to_legacy(tc, psf, legacy_exposure.getPsf()) 

428 if summary_stats := alternates.get("summary_stats"): 

429 compare_observation_summary_stats_to_legacy( 

430 tc, summary_stats, legacy_exposure.info.getSummaryStats() 

431 ) 

432 if detector_obj := alternates.get("detector"): 

433 compare_detector_to_legacy(tc, detector_obj, legacy_exposure.getDetector(), is_raw_assembled=True) 

434 if obs_info := alternates.get("obs_info"): 

435 visitInfo = legacy_exposure.visitInfo 

436 tc.assertEqual(obs_info.instrument, visitInfo.getInstrumentLabel()) 

437 if aperture_corrections := alternates.get("aperture_corrections"): 

438 compare_aperture_corrections_to_legacy( 

439 tc, aperture_corrections, legacy_exposure.info.getApCorrMap(), tiny_bbox 

440 ) 

441 if (photometric_scaling := alternates.get("photometic_scaling", ...)) is not ...: 

442 compare_photo_calib_to_legacy( 

443 tc, 

444 photometric_scaling, 

445 legacy_exposure.info.getPhotoCalib(), 

446 applied_legacy_photo_calib=applied_legacy_photo_calib, 

447 subimage_bbox=tiny_bbox, 

448 ) 

449 

450 

451def compare_photo_calib_to_legacy( 

452 tc: unittest.TestCase, 

453 photometric_scaling: BaseField | None, 

454 legacy_photo_calib: LegacyPhotoCalib, 

455 *, 

456 applied_legacy_photo_calib: LegacyPhotoCalib | None = None, 

457 subimage_bbox: Box, 

458) -> None: 

459 if legacy_photo_calib._isConstant: 

460 if legacy_photo_calib.getCalibrationMean() == 1.0: 

461 if applied_legacy_photo_calib is None: 

462 tc.assertIsNone(photometric_scaling) 

463 return 

464 else: 

465 legacy_photo_calib = applied_legacy_photo_calib 

466 if legacy_photo_calib._isConstant: 

467 assert isinstance(photometric_scaling, ChebyshevField) 

468 assert_close( 

469 tc, photometric_scaling.coefficients, np.array([[legacy_photo_calib.getCalibrationMean()]]) 

470 ) 

471 else: 

472 assert photometric_scaling is not None 

473 compare_field_to_legacy( 

474 tc, 

475 photometric_scaling / legacy_photo_calib.getCalibrationMean(), 

476 legacy_photo_calib.computeScaledCalibration(), 

477 subimage_bbox, 

478 ) 

479 

480 

481def compare_cell_coadd_to_legacy( 

482 tc: unittest.TestCase, 

483 cell_coadd: CellCoadd, 

484 legacy_cell_coadd: MultipleCellCoadd, 

485 *, 

486 tract_bbox: Box, 

487 plane_map: Mapping[str, MaskPlane] | None = None, 

488 alternates: Mapping[str, Any] | None = None, 

489 psf_points: XY[np.ndarray] | YX[np.ndarray] | None = None, 

490) -> None: 

491 """Compare a `.cells.CellCoadd` object to a legacy 

492 `lsst.cell_coadds.MultipleCellCoadd` object. 

493 

494 Parameters 

495 ---------- 

496 tc 

497 Test case to use for asserts. 

498 cell_coadd 

499 New coadd to test. 

500 legacy_cell_coadd 

501 Legacy coadd to test against. 

502 tract_bbox 

503 Bounding box of the full tract. 

504 psf_points 

505 Points to use to compare the PSFs. 

506 plane_map 

507 Mapping between new and legacy mask planes. 

508 alternates 

509 A mapping of other versions of one or more (new) components to also 

510 check against the legacy versions of those components. 

511 """ 

512 legacy_stitched = legacy_cell_coadd.stitch(cell_coadd.bbox.to_legacy()) 

513 compare_image_to_legacy(tc, cell_coadd.image, legacy_stitched.image, expect_view=False) 

514 compare_mask_to_legacy(tc, cell_coadd.mask, legacy_stitched.mask, plane_map=plane_map) 

515 compare_image_to_legacy(tc, cell_coadd.variance, legacy_stitched.variance, expect_view=False) 

516 if legacy_stitched.mask_fractions is not None: 

517 compare_image_to_legacy( 

518 tc, cell_coadd.mask_fractions["rejected"], legacy_stitched.mask_fractions, expect_view=False 

519 ) 

520 for n in range(legacy_stitched.n_noise_realizations): 

521 compare_image_to_legacy( 

522 tc, cell_coadd.noise_realizations[n], legacy_stitched.noise_realizations[n], expect_view=False 

523 ) 

524 tc.assertEqual(cell_coadd.skymap, legacy_stitched.identifiers.skymap) 

525 tc.assertEqual(cell_coadd.tract, legacy_stitched.identifiers.tract) 

526 tc.assertEqual(cell_coadd.patch.index.x, legacy_stitched.identifiers.patch.x) 

527 tc.assertEqual(cell_coadd.patch.index.y, legacy_stitched.identifiers.patch.y) 

528 tc.assertEqual(cell_coadd.band, legacy_stitched.identifiers.band) 

529 tc.assertTrue(tract_bbox.contains(cell_coadd.patch.outer_bbox)) 

530 tc.assertTrue(cell_coadd.patch.outer_bbox.contains(cell_coadd.patch.inner_bbox)) 

531 tc.assertTrue(cell_coadd.patch.outer_bbox.contains(cell_coadd.bbox)) 

532 tc.assertEqual(cell_coadd.unit, u.Unit(legacy_cell_coadd.common.units.value)) 

533 tc.assertTrue(cell_coadd.bounds.bbox.contains(cell_coadd.bbox)) 

534 tc.assertTrue(cell_coadd.grid.bbox.contains(cell_coadd.bbox)) 

535 compare_projection_to_legacy_wcs( 

536 tc, 

537 cell_coadd.projection, 

538 legacy_cell_coadd.wcs, 

539 TractFrame( 

540 skymap=legacy_cell_coadd.identifiers.skymap, 

541 tract=legacy_cell_coadd.identifiers.tract, 

542 bbox=tract_bbox, 

543 ), 

544 cell_coadd.bbox, 

545 is_fits=True, 

546 ) 

547 tc.assertIs(cell_coadd.projection, cell_coadd.mask.projection) 

548 tc.assertIs(cell_coadd.projection, cell_coadd.variance.projection) 

549 compare_psf_to_legacy( 

550 tc, cell_coadd.psf, legacy_stitched.psf, expect_legacy_raise_on_out_of_bounds=True, points=psf_points 

551 ) 

552 compare_cell_coadd_provenance_to_legacy(tc, cell_coadd.provenance, legacy_cell_coadd) 

553 if alternates: 

554 if projection := alternates.get("projection"): 

555 compare_projection_to_legacy_wcs( 

556 tc, 

557 projection, 

558 legacy_stitched.wcs, 

559 TractFrame( 

560 skymap=legacy_cell_coadd.identifiers.skymap, 

561 tract=legacy_cell_coadd.identifiers.tract, 

562 bbox=tract_bbox, 

563 ), 

564 cell_coadd.bbox, 

565 is_fits=True, 

566 ) 

567 if psf := alternates.get("psf"): 

568 compare_psf_to_legacy(tc, psf, legacy_stitched.psf, points=psf_points) 

569 if provenance := alternates.get("provenance"): 

570 compare_cell_coadd_provenance_to_legacy(tc, provenance, legacy_cell_coadd) 

571 

572 

573def compare_cell_coadd_provenance_to_legacy( 

574 tc: unittest.TestCase, provenance: CoaddProvenance, legacy_cell_coadd: MultipleCellCoadd 

575) -> None: 

576 """Compare a `.cells.CoaddProvenance` object to a legacy 

577 `lsst.cell_coadds.MultipleCellCoadd` object. 

578 

579 Parameters 

580 ---------- 

581 tc 

582 Test case to use for asserts. 

583 provenance 

584 New provenance object to test. 

585 legacy_cell_coadd 

586 Legacy coadd to test against. 

587 """ 

588 from lsst.cell_coadds import ObservationIdentifiers 

589 

590 for legacy_cell in legacy_cell_coadd.cells.values(): 

591 cell_index = CellIJ.from_legacy(legacy_cell.identifiers.cell) 

592 prov = provenance[cell_index] 

593 legacy_table = astropy.table.Table( 

594 rows=[ 

595 [ 

596 ids.instrument, 

597 ids.visit, 

598 ids.detector, 

599 ids.day_obs, 

600 ids.physical_filter, 

601 legacy_input.overlaps_center, 

602 legacy_input.overlap_fraction, 

603 legacy_input.weight, 

604 legacy_input.psf_shape.getIxx(), 

605 legacy_input.psf_shape.getIyy(), 

606 legacy_input.psf_shape.getIxy(), 

607 legacy_input.psf_shape_flag, 

608 ] 

609 for ids, legacy_input in legacy_cell.inputs.items() 

610 ], 

611 dtype=[ 

612 np.object_, 

613 np.uint64, 

614 np.uint16, 

615 np.uint32, 

616 np.object_, 

617 np.bool_, 

618 np.float64, 

619 np.float64, 

620 np.float64, 

621 np.float64, 

622 np.float64, 

623 np.bool_, 

624 ], 

625 names=[ 

626 "instrument", 

627 "visit", 

628 "detector", 

629 "day_obs", 

630 "physical_filter", 

631 "overlaps_center", 

632 "overlap_fraction", 

633 "weight", 

634 "psf_shape_xx", 

635 "psf_shape_yy", 

636 "psf_shape_xy", 

637 "psf_shape_flag", 

638 ], 

639 ) 

640 # For a single cell all 'inputs' are also 'contributions'. 

641 tc.assertEqual(len(legacy_cell.inputs), len(prov.inputs)) 

642 tc.assertEqual(len(legacy_cell.inputs), len(prov.contributions)) 

643 prov.inputs.sort(["instrument", "visit", "detector"]) 

644 prov.contributions.sort(["instrument", "visit", "detector"]) 

645 legacy_table.sort(["instrument", "visit", "detector"]) 

646 np.testing.assert_array_equal(prov.inputs["instrument"], prov.contributions["instrument"]) 

647 np.testing.assert_array_equal(prov.inputs["visit"], prov.contributions["visit"]) 

648 np.testing.assert_array_equal(prov.inputs["detector"], prov.contributions["detector"]) 

649 np.testing.assert_array_equal(prov.inputs["instrument"], legacy_table["instrument"]) 

650 np.testing.assert_array_equal(prov.inputs["visit"], legacy_table["visit"]) 

651 np.testing.assert_array_equal(prov.inputs["detector"], legacy_table["detector"]) 

652 np.testing.assert_array_equal(prov.inputs["physical_filter"], legacy_table["physical_filter"]) 

653 np.testing.assert_array_equal(prov.inputs["day_obs"], legacy_table["day_obs"]) 

654 np.testing.assert_array_equal(prov.contributions["overlaps_center"], legacy_table["overlaps_center"]) 

655 np.testing.assert_array_equal( 

656 prov.contributions["overlap_fraction"], legacy_table["overlap_fraction"] 

657 ) 

658 np.testing.assert_array_equal(prov.contributions["weight"], legacy_table["weight"]) 

659 np.testing.assert_array_equal(prov.contributions["psf_shape_xx"], legacy_table["psf_shape_xx"]) 

660 np.testing.assert_array_equal(prov.contributions["psf_shape_yy"], legacy_table["psf_shape_yy"]) 

661 np.testing.assert_array_equal(prov.contributions["psf_shape_xy"], legacy_table["psf_shape_xy"]) 

662 np.testing.assert_array_equal(prov.contributions["psf_shape_flag"], legacy_table["psf_shape_flag"]) 

663 for row in prov.inputs: 

664 polygon_key = ObservationIdentifiers(**{k: row[k] for k in row.keys() if k != "polygon"}) 

665 legacy_polygon = legacy_cell_coadd.common.visit_polygons[polygon_key] 

666 tc.assertEqual(legacy_polygon, row["polygon"].to_legacy()) 

667 

668 

669def compare_psf_to_legacy( 

670 tc: unittest.TestCase, 

671 psf: PointSpreadFunction, 

672 legacy_psf: Any, 

673 points: YX[np.ndarray] | XY[np.ndarray] | None = None, 

674 expect_legacy_raise_on_out_of_bounds: bool = False, 

675) -> int: 

676 """Compare a PSF model object to its legacy interface. 

677 

678 Parameters 

679 ---------- 

680 tc 

681 Test case object with assert methods to use. 

682 psf 

683 Point-spread function to test. 

684 legacy_psf 

685 Legacy `lsst.afw.detection.Psf` instance to compare with. 

686 points 

687 Points to evaluate the PSFs at. If not provided, the intersection of 

688 the PSF bounding boxes are used to generate points on a grid. 

689 expect_legacy_raise_on_out_of_bounds 

690 If `True`, expect ``legacy_psf`` to raise 

691 `lsst.afw.detection.InvalidPsfError` when evaluated at a position 

692 considered out-of-bounds by ``psf``. 

693 

694 Returns 

695 ------- 

696 `int` 

697 The number of points actually tested. 

698 """ 

699 from lsst.afw.detection import InvalidPsfError 

700 

701 if points is None: 

702 points = psf.bounds.bbox.meshgrid(n=3).map(np.ravel) 

703 legacy_points = arrays_to_legacy_points(points.x, points.y) 

704 n_points_tested: int = 0 

705 for p in legacy_points: 

706 if not psf.bounds.contains(x=p.x, y=p.y): 

707 if expect_legacy_raise_on_out_of_bounds: 

708 with tc.assertRaises(InvalidPsfError): 

709 legacy_psf.computeKernelImage(p) 

710 continue 

711 tc.assertEqual(psf.kernel_bbox, Box.from_legacy(legacy_psf.computeKernelBBox(p))) 

712 tc.assertEqual( 

713 psf.compute_kernel_image(x=p.x, y=p.y), Image.from_legacy(legacy_psf.computeKernelImage(p)) 

714 ) 

715 tc.assertEqual( 

716 psf.compute_stellar_bbox(x=p.x, y=p.y), Box.from_legacy(legacy_psf.computeImageBBox(p)) 

717 ) 

718 tc.assertEqual(psf.compute_stellar_image(x=p.x, y=p.y), Image.from_legacy(legacy_psf.computeImage(p))) 

719 n_points_tested += 1 

720 return n_points_tested 

721 

722 

723def compare_field_to_legacy( 

724 tc: unittest.TestCase, 

725 field: BaseField, 

726 legacy_field: Any, 

727 subimage_bbox: Box, 

728) -> None: 

729 """Test a Field object by comparing it to an equivalent 

730 `lsst.afw.math.BoundedField`. 

731 

732 Parameters 

733 ---------- 

734 tc 

735 Test case object with assert methods to use. 

736 field 

737 Field to test. 

738 legacy_field : ``lsst.afw.math.BoundedField`` 

739 Equivalent legacy bounded field. 

740 subimage_bbox 

741 Bounding box for full-image tests. 

742 """ 

743 tc.assertEqual(field.bounds.bbox, Box.from_legacy(legacy_field.getBBox())) 

744 # Pixel coordinates to test the numpy array interface with. 

745 pixel_xy = field.bounds.bbox.meshgrid(n=5).map(np.ravel) 

746 assert_close(tc, field(x=pixel_xy.x, y=pixel_xy.y), legacy_field.evaluate(pixel_xy.x, pixel_xy.y)) 

747 legacy_image_1 = Image(0, bbox=subimage_bbox, dtype=np.float64).to_legacy() 

748 legacy_field.addToImage(legacy_image_1, overlapOnly=True) 

749 assert_images_equal( 

750 tc, field.render(subimage_bbox), Image.from_legacy(legacy_image_1, unit=field.unit), rtol=1e-13 

751 ) 

752 

753 

754def compare_aperture_corrections_to_legacy( 

755 tc: unittest.TestCase, 

756 aperture_corrections: ApertureCorrectionMap, 

757 legacy_ap_corr_map: Any, 

758 subimage_bbox: Box, 

759) -> None: 

760 """Test an aperture correction `dict` by comparing it to an equivalent 

761 `lsst.afw.image.ApCorrMap`. 

762 

763 Parameters 

764 ---------- 

765 tc 

766 Test case object with assert methods to use. 

767 aperture_corrections 

768 Dictionary to test. 

769 legacy_ap_corr_map : ``lsst.afw.image.ApCorrMap`` 

770 Equivalent legacy aperture correction map. 

771 subimage_bbox 

772 Bounding box for full-image tests. 

773 """ 

774 tc.assertEqual(aperture_corrections.keys(), set(legacy_ap_corr_map.keys())) 

775 for name, field in aperture_corrections.items(): 

776 compare_field_to_legacy(tc, field, legacy_ap_corr_map[name], subimage_bbox) 

777 

778 

779def compare_observation_summary_stats_to_legacy( 

780 tc: unittest.TestCase, 

781 summary_stats: ObservationSummaryStats, 

782 legacy_summary_stats: Any, 

783) -> None: 

784 """Test an ObservationSummaryStats object by comparing it to an equivalent 

785 `lsst.afw.image.ExposureSummaryStats`. 

786 

787 Parameters 

788 ---------- 

789 tc 

790 Test case object with assert methods to use. 

791 summary_stats 

792 Struct to test. 

793 legacy : ``lsst.afw.image.ExposureSummaryStats`` 

794 Equivalent legacy struct. 

795 """ 

796 for field in dataclasses.fields(legacy_summary_stats): 

797 a = getattr(legacy_summary_stats, field.name) 

798 b = getattr(summary_stats, field.name) 

799 if isinstance(b, tuple): 

800 for ai, bi in zip(a, b): 

801 tc.assertTrue(ai == bi or (math.isnan(ai) and math.isnan(bi)), f"{field.name}: {a} != {b}") 

802 else: 

803 tc.assertTrue(a == b or (math.isnan(a) and math.isnan(b)), f"{field.name}: {a} != {b}") 

804 

805 

806def compare_projection_to_legacy_wcs[F: Frame]( 

807 tc: unittest.TestCase, 

808 projection: Projection[F], 

809 legacy_wcs: Any, 

810 pixel_frame: F, 

811 subimage_bbox: Box, 

812 is_fits: bool = False, 

813) -> None: 

814 """Test a Projection object by comparing it to an equivalent 

815 `lsst.afw.geom.SkyWcs`. 

816 

817 Parameters 

818 ---------- 

819 tc 

820 Test case object with assert methods to use. 

821 projection 

822 Projection to test. 

823 legacy_wcs : ``lsst.afw.geom.SkyWcs`` 

824 Equivalent legacy WCS. 

825 pixel_frame 

826 Expected pixel frame for the projection. 

827 subimage_bbox 

828 Bounding box of points to generate for tests. 

829 is_fits 

830 Whether this projection is expected to be exactly representable as a 

831 FIT WCS. If `False` it is assumed to have a FITS approximation 

832 attached instead. 

833 """ 

834 # Pixel coordinates to test on over the subimage region of interest: 

835 pixel_xy = subimage_bbox.meshgrid(step=50).map(np.ravel) 

836 # Array indices of those pixel values (subtract off bbox starts): 

837 subimage_array_xy = XY(x=pixel_xy.x - subimage_bbox.x.start, y=pixel_xy.y - subimage_bbox.y.start) 

838 sky_coords = legacy_coords_to_astropy( 

839 legacy_wcs.pixelToSky(arrays_to_legacy_points(pixel_xy.x, pixel_xy.y)) 

840 ) 

841 # Test transforming with the Projection itself, which also tests its 

842 # nested Transform and an Astropy High-Level WCS view with no origin 

843 # change. 

844 check_projection(tc, projection, pixel_xy, sky_coords, pixel_frame) 

845 # Also test the Astropy High-Level WCS view with an origin change to 

846 # array indices. 

847 check_astropy_wcs_interface( 

848 tc, projection.as_astropy(subimage_bbox), subimage_array_xy, sky_coords, pixel_atol=1e-5 

849 ) 

850 if is_fits: 

851 fits_wcs = projection.as_fits_wcs(subimage_bbox, allow_approximation=True) 

852 assert fits_wcs is not None 

853 check_astropy_wcs_interface(tc, fits_wcs, subimage_array_xy, sky_coords, pixel_atol=1e-5) 

854 # Use that FITS approximation to check that we can make a 

855 # Projection from a FITS WCS, too. 

856 fits_projection = Projection.from_fits_wcs(fits_wcs, pixel_frame) 

857 check_projection( 

858 tc, 

859 fits_projection, 

860 subimage_array_xy, 

861 sky_coords, 

862 pixel_frame, 

863 pixel_atol=1e-5, 

864 ) 

865 # We want Projections we create from a FITS WCS to be backed by an 

866 # AST FrameSet so we can convert them into legacy 

867 # `lsst.afw.geom.SkyWcs` objects if desired. 

868 tc.assertIn("Begin FrameSet", fits_projection.show()) 

869 else: 

870 tc.assertIsNone(projection.as_fits_wcs(subimage_bbox, allow_approximation=False)) 

871 # The legacy SkyWcs should instead have a FITS approximation 

872 # attached; run the same tests on that. 

873 assert projection.fits_approximation is not None 

874 compare_projection_to_legacy_wcs( 

875 tc, 

876 projection.fits_approximation, 

877 legacy_wcs.getFitsApproximation(), 

878 pixel_frame, 

879 subimage_bbox, 

880 is_fits=True, 

881 ) 

882 

883 

884def check_transform[I: Frame, O: Frame]( 

885 tc: unittest.TestCase, 

886 transform: Transform[I, O], 

887 input_xy: XY[np.ndarray], 

888 output_xy: XY[np.ndarray], 

889 in_frame: Frame, 

890 out_frame: Frame, 

891 *, 

892 check_inverted: bool = True, 

893 in_atol: u.Quantity | None = None, 

894 out_atol: u.Quantity | None = None, 

895) -> None: 

896 """Test Transform against known arrays of input and output points. 

897 

898 Parameters 

899 ---------- 

900 tc 

901 Test case object with assert methods to use. 

902 transform 

903 Transform to test. 

904 input_xy 

905 Arrays of input points. 

906 output_xy 

907 Arrays of output points. 

908 in_frame 

909 Expected input frame. 

910 out_frame 

911 Expected output frame. 

912 check_inverted 

913 If `True`, recurse (once) to test the inverse transform. 

914 in_atol 

915 Expected absolute precision of input points. 

916 out_atol 

917 Expected absolute precision of output points. 

918 """ 

919 tc.assertEqual(transform.in_frame, in_frame) 

920 tc.assertEqual(transform.out_frame, out_frame) 

921 in_atol_v = in_atol.to_value(in_frame.unit) if in_atol is not None else None 

922 out_atol_v = out_atol.to_value(out_frame.unit) if out_atol is not None else None 

923 # Test array interfaces. 

924 test_output_xy = transform.apply_forward(x=input_xy.x, y=input_xy.y) 

925 assert_close(tc, test_output_xy.x, output_xy.x, atol=out_atol_v) 

926 assert_close(tc, test_output_xy.y, output_xy.y, atol=out_atol_v) 

927 test_input_xy = transform.apply_inverse(x=output_xy.x, y=output_xy.y) 

928 assert_close(tc, test_input_xy.x, input_xy.x, atol=in_atol_v) 

929 assert_close(tc, test_input_xy.y, input_xy.y, atol=in_atol_v) 

930 # Test scalar interfaces with numpy scalars. 

931 for input_x, input_y, output_x, output_y in zip(input_xy.x, input_xy.y, output_xy.x, output_xy.y): 

932 assert_close(tc, transform.apply_forward(x=input_x, y=input_y).x, output_x, atol=out_atol_v) 

933 assert_close(tc, transform.apply_forward(x=input_x, y=input_y).y, output_y, atol=out_atol_v) 

934 assert_close(tc, transform.apply_inverse(x=output_x, y=output_y).x, input_x, atol=in_atol_v) 

935 assert_close(tc, transform.apply_inverse(x=output_x, y=output_y).y, input_y, atol=in_atol_v) 

936 # Test quantity array interfaces. 

937 input_q_xy = XY(x=input_xy.x * transform.in_frame.unit, y=input_xy.y * transform.in_frame.unit) 

938 output_q_xy = XY(x=output_xy.x * transform.out_frame.unit, y=output_xy.y * transform.out_frame.unit) 

939 test_output_q_xy = transform.apply_forward_q(x=input_q_xy.x, y=input_q_xy.y) 

940 assert_close(tc, test_output_q_xy.x, output_q_xy.x, atol=out_atol) 

941 assert_close(tc, test_output_q_xy.y, output_q_xy.y, atol=out_atol) 

942 test_input_q_xy = transform.apply_inverse_q(x=output_q_xy.x, y=output_q_xy.y) 

943 assert_close(tc, test_input_q_xy.x, input_q_xy.x, atol=in_atol) 

944 assert_close(tc, test_input_q_xy.y, input_q_xy.y, atol=in_atol) 

945 # Test quantity scalar interfaces. 

946 for input_q_x, input_q_y, output_q_x, output_q_y in zip( 

947 input_q_xy.x, input_q_xy.y, output_q_xy.x, output_q_xy.y 

948 ): 

949 assert_close(tc, transform.apply_forward_q(x=input_q_x, y=input_q_y).x, output_q_x, atol=out_atol) 

950 assert_close(tc, transform.apply_forward_q(x=input_q_x, y=input_q_y).y, output_q_y, atol=out_atol) 

951 assert_close(tc, transform.apply_inverse_q(x=output_q_x, y=output_q_y).x, input_q_x, atol=in_atol) 

952 assert_close(tc, transform.apply_inverse_q(x=output_q_x, y=output_q_y).y, input_q_y, atol=in_atol) 

953 if check_inverted: 

954 # Test the inverse transform. 

955 check_transform( 

956 tc, 

957 transform.inverted(), 

958 output_xy, 

959 input_xy, 

960 out_frame, 

961 in_frame, 

962 check_inverted=False, 

963 out_atol=in_atol, 

964 in_atol=out_atol, 

965 ) 

966 

967 

968def check_projection[P: Frame]( 

969 tc: unittest.TestCase, 

970 projection: Projection[P], 

971 pixel_xy: XY[np.ndarray], 

972 sky_coords: SkyCoord, 

973 pixel_frame: Frame, 

974 *, 

975 pixel_atol: float | None = None, 

976 sky_atol: u.Quantity | None = None, 

977) -> None: 

978 """Test a `.Projection` instance against known arrays of pixel and sky 

979 coordinates. 

980 

981 Parameters 

982 ---------- 

983 tc 

984 Test case object with assert methods to use. 

985 projection 

986 Projection to test. 

987 pixel_xy 

988 Arrays of pixel coordinates. 

989 sky_coords 

990 Corresponding sky coordinates. 

991 pixel_frame 

992 Expected pixel frame. 

993 pixel_atol 

994 Expected absolute precision of pixel points. 

995 sky_atol 

996 Expected absolute precision of sky coordinates. 

997 """ 

998 tc.assertEqual(projection.pixel_frame, pixel_frame) 

999 tc.assertEqual(projection.sky_frame, SkyFrame.ICRS) 

1000 sky_atol_v = sky_atol.to_value(SkyFrame.ICRS.unit) if sky_atol is not None else None 

1001 pixel_atol_q = pixel_atol * u.pix if pixel_atol is not None else None 

1002 # Test array interfaces. 

1003 test_pixel_xy = cast(XY[np.ndarray], projection.sky_to_pixel(sky_coords)) 

1004 assert_close(tc, test_pixel_xy.x, pixel_xy.x, atol=pixel_atol) 

1005 assert_close(tc, test_pixel_xy.y, pixel_xy.y, atol=pixel_atol) 

1006 test_sky_astropy = projection.pixel_to_sky(x=pixel_xy.x, y=pixel_xy.y) 

1007 assert_close(tc, test_sky_astropy.ra, sky_coords.ra, atol=sky_atol_v) 

1008 assert_close(tc, test_sky_astropy.dec, sky_coords.dec, atol=sky_atol_v) 

1009 # Test scalar interfaces. 

1010 for pixel_x, pixel_y, sky_single in zip(pixel_xy.x, pixel_xy.y, sky_coords): 

1011 assert_close(tc, projection.sky_to_pixel(sky_single).x, pixel_x, atol=pixel_atol) 

1012 assert_close(tc, projection.sky_to_pixel(sky_single).y, pixel_y, atol=pixel_atol) 

1013 assert_close(tc, projection.pixel_to_sky(x=pixel_x, y=pixel_y).ra, sky_single.ra, atol=sky_atol_v) 

1014 assert_close(tc, projection.pixel_to_sky(x=pixel_x, y=pixel_y).dec, sky_single.dec, atol=sky_atol_v) 

1015 # Test the underlying Transform object. 

1016 sky_xy = XY(x=sky_coords.ra.to_value(u.rad), y=sky_coords.dec.to_value(u.rad)) 

1017 check_transform( 

1018 tc, 

1019 projection.pixel_to_sky_transform, 

1020 pixel_xy, 

1021 sky_xy, 

1022 pixel_frame, 

1023 SkyFrame.ICRS, 

1024 check_inverted=False, 

1025 in_atol=pixel_atol_q, 

1026 out_atol=sky_atol, 

1027 ) 

1028 check_transform( 

1029 tc, 

1030 projection.sky_to_pixel_transform, 

1031 sky_xy, 

1032 pixel_xy, 

1033 SkyFrame.ICRS, 

1034 pixel_frame, 

1035 check_inverted=False, 

1036 in_atol=sky_atol, 

1037 out_atol=pixel_atol_q, 

1038 ) 

1039 # Test the Astropy interface adapter. 

1040 check_astropy_wcs_interface( 

1041 tc, projection.as_astropy(), pixel_xy, sky_coords, pixel_atol=pixel_atol, sky_atol=sky_atol 

1042 ) 

1043 

1044 

1045def assert_projections_equal( 

1046 tc: unittest.TestCase, 

1047 a: Projection[Any] | None, 

1048 b: Projection[Any] | None, 

1049 expect_identity: bool | None = None, 

1050) -> None: 

1051 """Test that two `.Projection` instances are equivalent.""" 

1052 if a is None and b is None: 

1053 return 

1054 assert a is not None and b is not None 

1055 match expect_identity: 

1056 case True: 

1057 tc.assertIs(a, b) 

1058 return 

1059 case False: 

1060 tc.assertIsNot(a, b) 

1061 case None if a is b: 

1062 return 

1063 tc.assertEqual(a.pixel_frame, b.pixel_frame) 

1064 tc.assertEqual(a.show(simplified=True), b.show(simplified=True)) 

1065 assert_projections_equal( 

1066 tc, a.fits_approximation, cast(Projection[Any], b.fits_approximation), expect_identity=False 

1067 ) 

1068 

1069 

1070def check_astropy_wcs_interface( 

1071 tc: unittest.TestCase, 

1072 wcs: astropy.wcs.wcsapi.BaseHighLevelWCS, 

1073 pixel_xy: XY[np.ndarray], 

1074 sky_coords: SkyCoord, 

1075 *, 

1076 pixel_atol: float | None = None, 

1077 sky_atol: u.Quantity | None = None, 

1078) -> None: 

1079 """Test an Astropy WCS instance against known arrays of pixel and 

1080 sky coordinates. 

1081 

1082 Parameters 

1083 ---------- 

1084 tc 

1085 Test case object with assert methods to use. 

1086 wcs 

1087 Astropy WCS object to test. 

1088 pixel_xy 

1089 Arrays of pixel coordinates. 

1090 sky_coords 

1091 Corresponding sky coordinates. 

1092 pixel_atol 

1093 Expected absolute precision of pixel points. 

1094 sky_atol 

1095 Expected absolute precision of sky coordinates. 

1096 """ 

1097 test_x, test_y = wcs.world_to_pixel(sky_coords) 

1098 assert_close(tc, test_x, pixel_xy.x, atol=pixel_atol) 

1099 assert_close(tc, test_y, pixel_xy.y, atol=pixel_atol) 

1100 test_sky_coords = wcs.pixel_to_world(pixel_xy.x, pixel_xy.y) 

1101 assert_close(tc, test_sky_coords.ra, sky_coords.ra, atol=sky_atol) 

1102 assert_close(tc, test_sky_coords.dec, sky_coords.dec, atol=sky_atol) 

1103 

1104 

1105def legacy_points_to_xy_array(legacy_points: list[Any]) -> XY[np.ndarray]: 

1106 """Convert a list of ``lsst.geom.Point2D`` objects to an `.XY` array.""" 

1107 return XY(x=np.array([p.x for p in legacy_points]), y=np.array([p.y for p in legacy_points])) 

1108 

1109 

1110def legacy_coords_to_astropy(legacy_coords: list[Any]) -> SkyCoord: 

1111 """Convert a list of ``lsst.geom.SpherePoint`` objects to an Astropy 

1112 coordinate object. 

1113 """ 

1114 return SkyCoord( 

1115 ra=np.array([p.getRa().asRadians() for p in legacy_coords]) * u.rad, 

1116 dec=np.array([p.getDec().asRadians() for p in legacy_coords]) * u.rad, 

1117 ) 

1118 

1119 

1120def arrays_to_legacy_points(x: np.ndarray, y: np.ndarray) -> list[Any]: 

1121 """Convert arrays of ``x`` and ``y`` to a list of ``lsst.geom.Point2D``.""" 

1122 from lsst.geom import Point2D 

1123 

1124 return [Point2D(x=xv, y=yv) for xv, yv in zip(x, y)] 

1125 

1126 

1127def compare_amplifier_to_legacy( 

1128 tc: unittest.TestCase, 

1129 amplifier: Amplifier, 

1130 legacy_amplifier: Any, 

1131 *, 

1132 is_raw_assembled: bool, 

1133 expect_nominal_calibrations: bool = True, 

1134) -> None: 

1135 """Compare an `~.cameras.Amplifier` to a legacy 

1136 `lsst.afw.cameraGeom.Amplifier`. 

1137 """ 

1138 tc.assertEqual(legacy_amplifier.getName(), amplifier.name) 

1139 tc.assertEqual(Box.from_legacy(legacy_amplifier.getBBox()), amplifier.bbox) 

1140 if is_raw_assembled: 

1141 raw_geom = amplifier.assembled_raw_geometry 

1142 else: 

1143 raw_geom = amplifier.unassembled_raw_geometry 

1144 assert raw_geom is not None 

1145 tc.assertEqual(ReadoutCorner.from_legacy(legacy_amplifier.getReadoutCorner()), raw_geom.readout_corner) 

1146 tc.assertEqual(Box.from_legacy(legacy_amplifier.getRawBBox()), raw_geom.bbox) 

1147 tc.assertEqual(Box.from_legacy(legacy_amplifier.getRawDataBBox()), raw_geom.data_bbox) 

1148 tc.assertEqual(legacy_amplifier.getRawFlipX(), raw_geom.flip_x) 

1149 tc.assertEqual(legacy_amplifier.getRawFlipY(), raw_geom.flip_y) 

1150 tc.assertEqual(legacy_amplifier.getRawXYOffset().getX(), raw_geom.x_offset) 

1151 tc.assertEqual(legacy_amplifier.getRawXYOffset().getY(), raw_geom.y_offset) 

1152 tc.assertEqual( 

1153 Box.from_legacy(legacy_amplifier.getRawHorizontalOverscanBBox()), raw_geom.horizontal_overscan_bbox 

1154 ) 

1155 tc.assertEqual( 

1156 Box.from_legacy(legacy_amplifier.getRawVerticalOverscanBBox()), raw_geom.vertical_overscan_bbox 

1157 ) 

1158 tc.assertEqual(Box.from_legacy(legacy_amplifier.getRawPrescanBBox()), raw_geom.horizontal_prescan_bbox) 

1159 if expect_nominal_calibrations: 

1160 assert amplifier.nominal_calibrations is not None 

1161 assert_equal_allow_nan(tc, legacy_amplifier.getGain(), amplifier.nominal_calibrations.gain) 

1162 assert_equal_allow_nan(tc, legacy_amplifier.getReadNoise(), amplifier.nominal_calibrations.read_noise) 

1163 assert_equal_allow_nan( 

1164 tc, legacy_amplifier.getSaturation(), amplifier.nominal_calibrations.saturation 

1165 ) 

1166 assert_equal_allow_nan( 

1167 tc, legacy_amplifier.getSuspectLevel(), amplifier.nominal_calibrations.suspect_level 

1168 ) 

1169 np.testing.assert_array_equal( 

1170 legacy_amplifier.getLinearityCoeffs(), amplifier.nominal_calibrations.linearity_coefficients 

1171 ) 

1172 tc.assertEqual(legacy_amplifier.getLinearityType(), amplifier.nominal_calibrations.linearity_type) 

1173 

1174 

1175def compare_detector_to_legacy( 

1176 tc: unittest.TestCase, 

1177 detector: Detector, 

1178 legacy_detector: Any, 

1179 *, 

1180 is_raw_assembled: bool, 

1181 expect_nominal_calibrations: bool = True, 

1182) -> None: 

1183 """Compare a `~.cameras.Detector` to a `lsst.afw.cameraGeom.Detector`.""" 

1184 from lsst.afw.cameraGeom import FIELD_ANGLE, FOCAL_PLANE, PIXELS 

1185 

1186 tc.assertEqual(legacy_detector.getName(), detector.name) 

1187 tc.assertEqual(legacy_detector.getId(), detector.id) 

1188 tc.assertEqual(DetectorType.from_legacy(legacy_detector.getType()), detector.type) 

1189 tc.assertEqual(Box.from_legacy(legacy_detector.getBBox()), detector.bbox) 

1190 tc.assertEqual(legacy_detector.getSerial(), detector.serial) 

1191 legacy_orientation = legacy_detector.getOrientation() 

1192 tc.assertEqual(legacy_orientation.getFpPosition3().getX(), detector.orientation.focal_plane_x) 

1193 tc.assertEqual(legacy_orientation.getFpPosition3().getY(), detector.orientation.focal_plane_y) 

1194 tc.assertEqual(legacy_orientation.getFpPosition3().getZ(), detector.orientation.focal_plane_z) 

1195 tc.assertEqual(legacy_orientation.getReferencePoint().getX(), detector.orientation.pixel_reference_x) 

1196 tc.assertEqual(legacy_orientation.getReferencePoint().getY(), detector.orientation.pixel_reference_y) 

1197 tc.assertEqual(legacy_orientation.getYaw().asRadians(), detector.orientation.yaw.to_value(u.rad)) 

1198 tc.assertEqual(legacy_orientation.getPitch().asRadians(), detector.orientation.pitch.to_value(u.rad)) 

1199 tc.assertEqual(legacy_orientation.getRoll().asRadians(), detector.orientation.roll.to_value(u.rad)) 

1200 tc.assertEqual(legacy_detector.getPixelSize().getX(), detector.pixel_size) 

1201 tc.assertEqual(legacy_detector.getPhysicalType(), detector.physical_type) 

1202 for amplifier, legacy_amplifier in zip(detector.amplifiers, legacy_detector.getAmplifiers(), strict=True): 

1203 compare_amplifier_to_legacy( 

1204 tc, 

1205 amplifier, 

1206 legacy_amplifier, 

1207 is_raw_assembled=is_raw_assembled, 

1208 expect_nominal_calibrations=expect_nominal_calibrations, 

1209 ) 

1210 pixel_xy = detector.bbox.meshgrid(n=3).map(lambda z: z.ravel().astype(np.float64)) 

1211 pixel_legacy_points = arrays_to_legacy_points(y=pixel_xy.y, x=pixel_xy.x) 

1212 fp_legacy_points = legacy_detector.transform(pixel_legacy_points, PIXELS, FOCAL_PLANE) 

1213 check_transform( 

1214 tc, 

1215 detector.to_focal_plane, 

1216 pixel_xy, 

1217 legacy_points_to_xy_array(fp_legacy_points), 

1218 detector.frame, 

1219 detector.to_focal_plane.out_frame, 

1220 in_atol=1e-9 * u.pix, 

1221 out_atol=1e-7 * detector.to_focal_plane.out_frame.unit, 

1222 ) 

1223 fa_legacy_points = legacy_detector.transform(pixel_legacy_points, PIXELS, FIELD_ANGLE) 

1224 check_transform( 

1225 tc, 

1226 detector.to_field_angle, 

1227 pixel_xy, 

1228 legacy_points_to_xy_array(fa_legacy_points), 

1229 detector.frame, 

1230 detector.to_field_angle.out_frame, 

1231 in_atol=1e-9 * u.pix, 

1232 out_atol=1e-7 * u.arcsec, 

1233 ) 

1234 

1235 

1236def iter_concrete_archive_tree_subclasses() -> Iterator[type[ArchiveTree]]: 

1237 """Yield every importable concrete `.serialization.ArchiveTree` subclass. 

1238 

1239 Walks the ``ArchiveTree.__subclasses__()`` tree, skipping abstract 

1240 classes. Importing this module already imports every ``lsst.images`` 

1241 module that defines a subclass, so the tree is fully populated by the time 

1242 this is called. 

1243 

1244 This discovery is deliberately separate from 

1245 `check_archive_tree_class_invariants` so that the per-class check stays 

1246 usable on a single class even if this metaprogramming is removed later. 

1247 """ 

1248 seen: set[type] = set() 

1249 stack: list[type] = [ArchiveTree] 

1250 while stack: 

1251 kls = stack.pop() 

1252 for sub in kls.__subclasses__(): 

1253 if sub in seen: 

1254 continue 

1255 seen.add(sub) 

1256 stack.append(sub) 

1257 if not getattr(sub, "__abstractmethods__", None): 

1258 yield sub 

1259 

1260 

1261def check_archive_tree_class_invariants(tc: unittest.TestCase, cls: type[ArchiveTree]) -> None: 

1262 """Assert that one concrete `.serialization.ArchiveTree` subclass declares 

1263 well-formed schema-version constants. 

1264 

1265 Checks that ``SCHEMA_NAME``, ``SCHEMA_VERSION`` and ``MIN_READ_VERSION`` 

1266 are present and well-typed, that the version is ``major.minor.patch``, and 

1267 that ``MIN_READ_VERSION`` does not exceed the schema major. 

1268 

1269 Parameters 

1270 ---------- 

1271 tc 

1272 Test case object with assert methods to use. 

1273 cls 

1274 The concrete `.serialization.ArchiveTree` subclass to check. 

1275 """ 

1276 tc.assertTrue(hasattr(cls, "SCHEMA_NAME"), f"{cls.__name__} lacks SCHEMA_NAME") 

1277 tc.assertTrue(hasattr(cls, "SCHEMA_VERSION"), f"{cls.__name__} lacks SCHEMA_VERSION") 

1278 tc.assertTrue(hasattr(cls, "MIN_READ_VERSION"), f"{cls.__name__} lacks MIN_READ_VERSION") 

1279 tc.assertIsInstance(cls.SCHEMA_NAME, str) 

1280 tc.assertGreater(len(cls.SCHEMA_NAME), 0) 

1281 tc.assertRegex(cls.SCHEMA_VERSION, r"^\d+\.\d+\.\d+$") 

1282 tc.assertIsInstance(cls.MIN_READ_VERSION, int) 

1283 tc.assertGreaterEqual(cls.MIN_READ_VERSION, 1) 

1284 major = int(cls.SCHEMA_VERSION.split(".")[0]) 

1285 tc.assertLessEqual(cls.MIN_READ_VERSION, major)