Coverage for python/lsst/images/tests/_checks.py: 10%
429 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "arrays_to_legacy_points",
16 "assert_cell_coadds_equal",
17 "assert_close",
18 "assert_equal_allow_nan",
19 "assert_images_equal",
20 "assert_masked_images_equal",
21 "assert_masks_equal",
22 "assert_projections_equal",
23 "assert_psfs_equal",
24 "assert_visit_images_equal",
25 "check_archive_tree_class_invariants",
26 "check_astropy_wcs_interface",
27 "check_projection",
28 "check_transform",
29 "compare_amplifier_to_legacy",
30 "compare_aperture_corrections_to_legacy",
31 "compare_cell_coadd_to_legacy",
32 "compare_detector_to_legacy",
33 "compare_field_to_legacy",
34 "compare_image_to_legacy",
35 "compare_mask_to_legacy",
36 "compare_masked_image_to_legacy",
37 "compare_observation_summary_stats_to_legacy",
38 "compare_photo_calib_to_legacy",
39 "compare_projection_to_legacy_wcs",
40 "compare_psf_to_legacy",
41 "compare_visit_image_to_legacy",
42 "iter_concrete_archive_tree_subclasses",
43 "legacy_coords_to_astropy",
44 "legacy_points_to_xy_array",
45)
47import dataclasses
48import math
49import unittest
50from collections.abc import Iterator, Mapping
51from typing import TYPE_CHECKING, Any, Literal, cast
53import astropy.units as u
54import astropy.wcs.wcsapi
55import numpy as np
56from astropy.coordinates import SkyCoord
58from .._geom import XY, YX, Box
59from .._image import Image
60from .._mask import Mask, MaskPlane
61from .._masked_image import MaskedImage
62from .._observation_summary_stats import ObservationSummaryStats
63from .._transforms import DetectorFrame, Frame, Projection, SkyFrame, TractFrame, Transform
64from .._visit_image import VisitImage
65from ..aperture_corrections import ApertureCorrectionMap
66from ..cameras import Amplifier, Detector, DetectorType, ReadoutCorner
67from ..cells import CellCoadd, CellIJ, CoaddProvenance
68from ..fields import BaseField, ChebyshevField
69from ..psfs import PointSpreadFunction
70from ..serialization import ArchiveTree
72if TYPE_CHECKING:
73 try:
74 from lsst.cell_coadds import MultipleCellCoadd
75 except ImportError:
76 type MultipleCellCoadd = Any # type: ignore[no-redef]
77 try:
78 from lsst.afw.image import PhotoCalib as LegacyPhotoCalib
79 except ImportError:
80 type LegacyPhotoCalib = Any # type: ignore[no-redef]
83def assert_close(
84 tc: unittest.TestCase,
85 a: np.ndarray | u.Quantity | float,
86 b: np.ndarray | u.Quantity | float,
87 **kwargs: Any,
88) -> None:
89 """Test that two arrays, floats, or quantities are almost equal.
91 Parameters
92 ----------
93 tc
94 Test case object with assert methods to use.
95 a
96 Array, float, or quantity to compare.
97 b
98 Array, float, or quantity to compare.
99 **kwargs
100 Forwarded to `astropy.units.allclose`.
101 """
102 tc.assertTrue(u.allclose(a, b, **kwargs), msg=f"{a} != {b}")
105def assert_equal_allow_nan(tc: unittest.TestCase, a: float, b: float) -> None:
106 """Test that two floating point values are equal, with nan == nan."""
107 try:
108 tc.assertEqual(a, b)
109 except AssertionError:
110 if not (math.isnan(a) and math.isnan(b)):
111 raise
114def assert_images_equal(
115 tc: unittest.TestCase,
116 a: Image,
117 b: Image,
118 *,
119 rtol: float = 0.0,
120 atol: float = 0.0,
121 expect_view: bool | Literal["array"] | None = None,
122) -> None:
123 """Assert that two images are equal or nearly equal."""
124 tc.assertEqual(a.bbox, b.bbox)
125 tc.assertEqual(a.unit, b.unit)
126 assert_projections_equal(tc, a.projection, b.projection)
127 if expect_view is not None:
128 tc.assertEqual(np.may_share_memory(a.array, b.array), bool(expect_view))
129 if expect_view == "array":
130 tc.assertEqual(a.metadata, b.metadata)
131 else:
132 tc.assertEqual(a.metadata is b.metadata, expect_view)
133 if not expect_view:
134 assert_close(tc, a.array, b.array, atol=atol, rtol=rtol)
135 tc.assertEqual(a.metadata, b.metadata)
138def assert_masks_equal(tc: unittest.TestCase, a: Mask, b: Mask) -> None:
139 """Assert that two masks are equal or nearly equal."""
140 tc.assertEqual(a.bbox, b.bbox)
141 tc.assertEqual(a.schema, b.schema)
142 tc.assertEqual(a.metadata, b.metadata)
143 assert_projections_equal(tc, a.projection, b.projection)
144 np.testing.assert_array_equal(a.array, b.array)
147def assert_masked_images_equal(
148 tc: unittest.TestCase,
149 a: MaskedImage,
150 b: MaskedImage,
151 *,
152 rtol: float = 0.0,
153 atol: float = 0.0,
154 expect_view: bool | None = None,
155) -> None:
156 """Assert that two masked images are equal or nearly equal."""
157 tc.assertEqual(a.metadata, b.metadata)
158 assert_projections_equal(tc, a.projection, b.projection)
159 assert_images_equal(tc, a.image, b.image, rtol=rtol, atol=atol, expect_view=expect_view)
160 assert_masks_equal(tc, a.mask, b.mask)
161 assert_images_equal(tc, a.variance, b.variance, rtol=rtol, atol=atol, expect_view=expect_view)
164def assert_psfs_equal(
165 tc: unittest.TestCase,
166 psf1: PointSpreadFunction,
167 psf2: PointSpreadFunction,
168 points: YX[np.ndarray] | XY[np.ndarray] | None = None,
169) -> int:
170 """Compare two PSF objets.
172 Parameters
173 ----------
174 tc
175 Test case object with assert methods to use.
176 psf1
177 Point-spread function to test.
178 psf2
179 The other point-spread function to test.
180 points
181 Points to evaluate the PSFs at. If not provided, the intersection of
182 the PSF bounding boxes are used to generate points on a grid.
184 Returns
185 -------
186 `int`
187 The number of points actually tested.
188 """
189 if points is None:
190 points = psf1.bounds.bbox.intersection(psf2.bounds.bbox).meshgrid(3).map(np.ravel)
192 tc.assertEqual(psf1.kernel_bbox, psf2.kernel_bbox)
194 n_points_tested: int = 0
195 for x, y in zip(points.x, points.y):
196 # The two PSFs must agree on which points fall inside their input
197 # domain. Querying ``.contains`` directly (rather than relying on
198 # ``compute_kernel_image`` to raise) makes this test tolerant of
199 # implementations that do not raise on out-of-domain points -- in
200 # particular ``CellPointSpreadFunction``, where evaluating in a
201 # missing cell does not always raise ``BoundsError``.
202 contains1 = psf1.bounds.contains(x=x, y=y)
203 contains2 = psf2.bounds.contains(x=x, y=y)
204 tc.assertEqual(
205 contains1,
206 contains2,
207 f"PSFs disagree on whether ({x}, {y}) is in-bounds: psf1={contains1}, psf2={contains2}",
208 )
209 if not contains1:
210 continue
211 tc.assertEqual(psf1.compute_kernel_image(x=x, y=y), psf2.compute_kernel_image(x=x, y=y))
212 tc.assertEqual(psf1.compute_stellar_bbox(x=x, y=y), psf2.compute_stellar_bbox(x=x, y=y))
213 tc.assertEqual(psf1.compute_stellar_image(x=x, y=y), psf2.compute_stellar_image(x=x, y=y))
214 n_points_tested += 1
215 return n_points_tested
218def assert_visit_images_equal(
219 tc: unittest.TestCase,
220 a: VisitImage,
221 b: VisitImage,
222 *,
223 expect_view: bool | None = None,
224) -> None:
225 """Assert that two `.VisitImage` instances carry the same persistent state.
227 Extends `assert_masked_images_equal` with the VisitImage-specific
228 attributes (PSF, filter, observation info, detector, aperture
229 corrections, photometric scaling, backgrounds, polygon bounds,
230 summary stats) so a round-trip check on a `.VisitImage` does not
231 silently miss differences in any of them.
232 """
233 assert_masked_images_equal(tc, a, b, expect_view=expect_view)
234 tc.assertEqual(a.summary_stats, b.summary_stats)
235 tc.assertEqual(a.physical_filter, b.physical_filter)
236 tc.assertEqual(a.band, b.band)
237 tc.assertEqual(a.obs_info, b.obs_info)
238 tc.assertEqual(a.detector, b.detector)
239 tc.assertEqual(dict(a.aperture_corrections), dict(b.aperture_corrections))
240 tc.assertEqual(a.photometric_scaling, b.photometric_scaling)
241 tc.assertEqual(dict(a.backgrounds), dict(b.backgrounds))
242 tc.assertEqual(a.backgrounds.subtracted, b.backgrounds.subtracted)
243 tc.assertEqual(a.bounds, b.bounds)
244 assert_psfs_equal(tc, a.psf, b.psf)
247def assert_cell_coadds_equal(
248 tc: unittest.TestCase,
249 a: CellCoadd,
250 b: CellCoadd,
251 *,
252 expect_view: bool | None = None,
253) -> None:
254 """Assert that two `.CellCoadd` instances carry the same persistent state.
256 Extends the masked-image-style equality check with the
257 CellCoadd-specific attributes (PSF, cell grid, missing cells,
258 backgrounds, patch/tract, band) so a round-trip check does not
259 silently miss differences in any of them.
260 """
261 assert_masked_images_equal(tc, a, b, expect_view=expect_view)
262 tc.assertEqual(a.band, b.band)
263 tc.assertEqual(a.patch, b.patch)
264 tc.assertEqual(a.tract, b.tract)
265 tc.assertEqual(a.grid, b.grid)
266 tc.assertEqual(a.bounds.missing, b.bounds.missing)
267 tc.assertEqual(dict(a.backgrounds), dict(b.backgrounds))
268 tc.assertEqual(a.backgrounds.subtracted, b.backgrounds.subtracted)
269 assert_psfs_equal(tc, a.psf, b.psf)
272def compare_image_to_legacy(
273 tc: unittest.TestCase, image: Image, legacy_image: Any, expect_view: bool | None = None
274) -> None:
275 """Compare an `.Image` object to a legacy `lsst.afw.image.Image` object."""
276 tc.assertEqual(image.bbox, Box.from_legacy(legacy_image.getBBox()))
277 if expect_view is not None:
278 tc.assertEqual(np.may_share_memory(image.array, legacy_image.array), expect_view)
279 if not expect_view:
280 np.testing.assert_array_equal(image.array, legacy_image.array)
283def compare_mask_to_legacy(
284 tc: unittest.TestCase, mask: Mask, legacy_mask: Any, plane_map: Mapping[str, MaskPlane] | None = None
285) -> None:
286 """Compare a `.Mask` object to a legacy `lsst.afw.image.Mask` object."""
287 tc.assertEqual(mask.bbox, Box.from_legacy(legacy_mask.getBBox()))
288 if plane_map is None:
289 plane_map = {plane.name: plane for plane in mask.schema if plane is not None}
290 for old_name, new_plane in plane_map.items():
291 np.testing.assert_array_equal(
292 (legacy_mask.array & legacy_mask.getPlaneBitMask(old_name)).astype(bool),
293 mask.get(new_plane.name),
294 )
297def compare_masked_image_to_legacy(
298 tc: unittest.TestCase,
299 masked_image: MaskedImage,
300 legacy_masked_image: Any,
301 *,
302 plane_map: Mapping[str, MaskPlane] | None = None,
303 expect_view: bool | None = None,
304 alternates: Mapping[str, Any] | None = None,
305) -> None:
306 """Compare a `.MaskedImage` object to a legacy `lsst.afw.image.MaskedImage`
307 object.
309 Parameters
310 ----------
311 tc
312 Test case to use for asserts.
313 masked_image
314 New image to test.
315 legacy_masked_image
316 Legacy image to test against.
317 plane_map
318 Mapping between new and legacy mask planes.
319 expect_view
320 Whether to test that the image and variance arrays do or do not share
321 memory.
322 alternates
323 A mapping of other versions of one or more (new) components to also
324 check against the legacy versions of those components.
325 """
326 compare_image_to_legacy(tc, masked_image.image, legacy_masked_image.getImage(), expect_view=expect_view)
327 compare_mask_to_legacy(tc, masked_image.mask, legacy_masked_image.getMask(), plane_map=plane_map)
328 compare_image_to_legacy(
329 tc, masked_image.variance, legacy_masked_image.getVariance(), expect_view=expect_view
330 )
331 if alternates:
332 if image := alternates.get("image"):
333 compare_image_to_legacy(tc, image, legacy_masked_image.getImage(), expect_view=expect_view)
334 if mask := alternates.get("mask"):
335 compare_mask_to_legacy(tc, mask, legacy_masked_image.getMask(), plane_map=plane_map)
336 if variance := alternates.get("variance"):
337 compare_image_to_legacy(tc, variance, legacy_masked_image.getVariance(), expect_view=expect_view)
340def compare_visit_image_to_legacy(
341 tc: unittest.TestCase,
342 visit_image: VisitImage,
343 legacy_exposure: Any,
344 *,
345 plane_map: Mapping[str, MaskPlane] | None = None,
346 expect_view: bool | None = None,
347 instrument: str,
348 visit: int,
349 detector: int,
350 applied_legacy_photo_calib: LegacyPhotoCalib | None = None,
351 alternates: Mapping[str, Any] | None = None,
352) -> None:
353 """Compare a `.VisitImage` object to a legacy `lsst.afw.image.Exposure`
354 object.
356 Parameters
357 ----------
358 tc
359 Test case to use for asserts.
360 visit_image
361 New image to test.
362 legacy_exposure
363 Legacy image to test against.
364 plane_map
365 Mapping between new and legacy mask planes.
366 expect_view
367 Whether to test that the image and variance arrays do or do not share
368 memory.
369 instrument
370 Expected instrument name.
371 visit
372 Expected visit ID.
373 detector
374 Expected detector ID.
375 alternates
376 A mapping of other versions of one or more (new) components to also
377 check against the legacy versions of those components.
378 """
379 compare_masked_image_to_legacy(
380 tc,
381 visit_image,
382 legacy_exposure.getMaskedImage(),
383 plane_map=plane_map,
384 expect_view=expect_view,
385 alternates=alternates,
386 )
387 detector_bbox = Box.from_legacy(legacy_exposure.getDetector().getBBox())
388 compare_projection_to_legacy_wcs(
389 tc,
390 visit_image.projection,
391 legacy_exposure.getWcs(),
392 DetectorFrame(instrument=instrument, visit=visit, detector=detector, bbox=detector_bbox),
393 visit_image.bbox,
394 )
395 tc.assertIs(visit_image.projection, visit_image.mask.projection)
396 tc.assertIs(visit_image.projection, visit_image.variance.projection)
397 compare_psf_to_legacy(tc, visit_image.psf, legacy_exposure.getPsf())
398 compare_observation_summary_stats_to_legacy(
399 tc, visit_image.summary_stats, legacy_exposure.info.getSummaryStats()
400 )
401 compare_detector_to_legacy(tc, visit_image.detector, legacy_exposure.getDetector(), is_raw_assembled=True)
402 # Make a tiny box for Field comparisons that need to make arrays; that can
403 # get expensive otherwisre.
404 tiny_bbox = detector_bbox.local[2:4, 3:6]
405 compare_aperture_corrections_to_legacy(
406 tc, visit_image.aperture_corrections, legacy_exposure.info.getApCorrMap(), tiny_bbox
407 )
408 compare_photo_calib_to_legacy(
409 tc,
410 visit_image.photometric_scaling,
411 legacy_exposure.info.getPhotoCalib(),
412 applied_legacy_photo_calib=applied_legacy_photo_calib,
413 subimage_bbox=tiny_bbox,
414 )
415 if alternates:
416 if (bbox := alternates.get("bbox")) is not None:
417 tc.assertEqual(bbox, visit_image.bbox)
418 if projection := alternates.get("projection"):
419 compare_projection_to_legacy_wcs(
420 tc,
421 projection,
422 legacy_exposure.getWcs(),
423 DetectorFrame(instrument=instrument, visit=visit, detector=detector, bbox=detector_bbox),
424 visit_image.bbox,
425 )
426 if psf := alternates.get("psf"):
427 compare_psf_to_legacy(tc, psf, legacy_exposure.getPsf())
428 if summary_stats := alternates.get("summary_stats"):
429 compare_observation_summary_stats_to_legacy(
430 tc, summary_stats, legacy_exposure.info.getSummaryStats()
431 )
432 if detector_obj := alternates.get("detector"):
433 compare_detector_to_legacy(tc, detector_obj, legacy_exposure.getDetector(), is_raw_assembled=True)
434 if obs_info := alternates.get("obs_info"):
435 visitInfo = legacy_exposure.visitInfo
436 tc.assertEqual(obs_info.instrument, visitInfo.getInstrumentLabel())
437 if aperture_corrections := alternates.get("aperture_corrections"):
438 compare_aperture_corrections_to_legacy(
439 tc, aperture_corrections, legacy_exposure.info.getApCorrMap(), tiny_bbox
440 )
441 if (photometric_scaling := alternates.get("photometic_scaling", ...)) is not ...:
442 compare_photo_calib_to_legacy(
443 tc,
444 photometric_scaling,
445 legacy_exposure.info.getPhotoCalib(),
446 applied_legacy_photo_calib=applied_legacy_photo_calib,
447 subimage_bbox=tiny_bbox,
448 )
451def compare_photo_calib_to_legacy(
452 tc: unittest.TestCase,
453 photometric_scaling: BaseField | None,
454 legacy_photo_calib: LegacyPhotoCalib,
455 *,
456 applied_legacy_photo_calib: LegacyPhotoCalib | None = None,
457 subimage_bbox: Box,
458) -> None:
459 if legacy_photo_calib._isConstant:
460 if legacy_photo_calib.getCalibrationMean() == 1.0:
461 if applied_legacy_photo_calib is None:
462 tc.assertIsNone(photometric_scaling)
463 return
464 else:
465 legacy_photo_calib = applied_legacy_photo_calib
466 if legacy_photo_calib._isConstant:
467 assert isinstance(photometric_scaling, ChebyshevField)
468 assert_close(
469 tc, photometric_scaling.coefficients, np.array([[legacy_photo_calib.getCalibrationMean()]])
470 )
471 else:
472 assert photometric_scaling is not None
473 compare_field_to_legacy(
474 tc,
475 photometric_scaling / legacy_photo_calib.getCalibrationMean(),
476 legacy_photo_calib.computeScaledCalibration(),
477 subimage_bbox,
478 )
481def compare_cell_coadd_to_legacy(
482 tc: unittest.TestCase,
483 cell_coadd: CellCoadd,
484 legacy_cell_coadd: MultipleCellCoadd,
485 *,
486 tract_bbox: Box,
487 plane_map: Mapping[str, MaskPlane] | None = None,
488 alternates: Mapping[str, Any] | None = None,
489 psf_points: XY[np.ndarray] | YX[np.ndarray] | None = None,
490) -> None:
491 """Compare a `.cells.CellCoadd` object to a legacy
492 `lsst.cell_coadds.MultipleCellCoadd` object.
494 Parameters
495 ----------
496 tc
497 Test case to use for asserts.
498 cell_coadd
499 New coadd to test.
500 legacy_cell_coadd
501 Legacy coadd to test against.
502 tract_bbox
503 Bounding box of the full tract.
504 psf_points
505 Points to use to compare the PSFs.
506 plane_map
507 Mapping between new and legacy mask planes.
508 alternates
509 A mapping of other versions of one or more (new) components to also
510 check against the legacy versions of those components.
511 """
512 legacy_stitched = legacy_cell_coadd.stitch(cell_coadd.bbox.to_legacy())
513 compare_image_to_legacy(tc, cell_coadd.image, legacy_stitched.image, expect_view=False)
514 compare_mask_to_legacy(tc, cell_coadd.mask, legacy_stitched.mask, plane_map=plane_map)
515 compare_image_to_legacy(tc, cell_coadd.variance, legacy_stitched.variance, expect_view=False)
516 if legacy_stitched.mask_fractions is not None:
517 compare_image_to_legacy(
518 tc, cell_coadd.mask_fractions["rejected"], legacy_stitched.mask_fractions, expect_view=False
519 )
520 for n in range(legacy_stitched.n_noise_realizations):
521 compare_image_to_legacy(
522 tc, cell_coadd.noise_realizations[n], legacy_stitched.noise_realizations[n], expect_view=False
523 )
524 tc.assertEqual(cell_coadd.skymap, legacy_stitched.identifiers.skymap)
525 tc.assertEqual(cell_coadd.tract, legacy_stitched.identifiers.tract)
526 tc.assertEqual(cell_coadd.patch.index.x, legacy_stitched.identifiers.patch.x)
527 tc.assertEqual(cell_coadd.patch.index.y, legacy_stitched.identifiers.patch.y)
528 tc.assertEqual(cell_coadd.band, legacy_stitched.identifiers.band)
529 tc.assertTrue(tract_bbox.contains(cell_coadd.patch.outer_bbox))
530 tc.assertTrue(cell_coadd.patch.outer_bbox.contains(cell_coadd.patch.inner_bbox))
531 tc.assertTrue(cell_coadd.patch.outer_bbox.contains(cell_coadd.bbox))
532 tc.assertEqual(cell_coadd.unit, u.Unit(legacy_cell_coadd.common.units.value))
533 tc.assertTrue(cell_coadd.bounds.bbox.contains(cell_coadd.bbox))
534 tc.assertTrue(cell_coadd.grid.bbox.contains(cell_coadd.bbox))
535 compare_projection_to_legacy_wcs(
536 tc,
537 cell_coadd.projection,
538 legacy_cell_coadd.wcs,
539 TractFrame(
540 skymap=legacy_cell_coadd.identifiers.skymap,
541 tract=legacy_cell_coadd.identifiers.tract,
542 bbox=tract_bbox,
543 ),
544 cell_coadd.bbox,
545 is_fits=True,
546 )
547 tc.assertIs(cell_coadd.projection, cell_coadd.mask.projection)
548 tc.assertIs(cell_coadd.projection, cell_coadd.variance.projection)
549 compare_psf_to_legacy(
550 tc, cell_coadd.psf, legacy_stitched.psf, expect_legacy_raise_on_out_of_bounds=True, points=psf_points
551 )
552 compare_cell_coadd_provenance_to_legacy(tc, cell_coadd.provenance, legacy_cell_coadd)
553 if alternates:
554 if projection := alternates.get("projection"):
555 compare_projection_to_legacy_wcs(
556 tc,
557 projection,
558 legacy_stitched.wcs,
559 TractFrame(
560 skymap=legacy_cell_coadd.identifiers.skymap,
561 tract=legacy_cell_coadd.identifiers.tract,
562 bbox=tract_bbox,
563 ),
564 cell_coadd.bbox,
565 is_fits=True,
566 )
567 if psf := alternates.get("psf"):
568 compare_psf_to_legacy(tc, psf, legacy_stitched.psf, points=psf_points)
569 if provenance := alternates.get("provenance"):
570 compare_cell_coadd_provenance_to_legacy(tc, provenance, legacy_cell_coadd)
573def compare_cell_coadd_provenance_to_legacy(
574 tc: unittest.TestCase, provenance: CoaddProvenance, legacy_cell_coadd: MultipleCellCoadd
575) -> None:
576 """Compare a `.cells.CoaddProvenance` object to a legacy
577 `lsst.cell_coadds.MultipleCellCoadd` object.
579 Parameters
580 ----------
581 tc
582 Test case to use for asserts.
583 provenance
584 New provenance object to test.
585 legacy_cell_coadd
586 Legacy coadd to test against.
587 """
588 from lsst.cell_coadds import ObservationIdentifiers
590 for legacy_cell in legacy_cell_coadd.cells.values():
591 cell_index = CellIJ.from_legacy(legacy_cell.identifiers.cell)
592 prov = provenance[cell_index]
593 legacy_table = astropy.table.Table(
594 rows=[
595 [
596 ids.instrument,
597 ids.visit,
598 ids.detector,
599 ids.day_obs,
600 ids.physical_filter,
601 legacy_input.overlaps_center,
602 legacy_input.overlap_fraction,
603 legacy_input.weight,
604 legacy_input.psf_shape.getIxx(),
605 legacy_input.psf_shape.getIyy(),
606 legacy_input.psf_shape.getIxy(),
607 legacy_input.psf_shape_flag,
608 ]
609 for ids, legacy_input in legacy_cell.inputs.items()
610 ],
611 dtype=[
612 np.object_,
613 np.uint64,
614 np.uint16,
615 np.uint32,
616 np.object_,
617 np.bool_,
618 np.float64,
619 np.float64,
620 np.float64,
621 np.float64,
622 np.float64,
623 np.bool_,
624 ],
625 names=[
626 "instrument",
627 "visit",
628 "detector",
629 "day_obs",
630 "physical_filter",
631 "overlaps_center",
632 "overlap_fraction",
633 "weight",
634 "psf_shape_xx",
635 "psf_shape_yy",
636 "psf_shape_xy",
637 "psf_shape_flag",
638 ],
639 )
640 # For a single cell all 'inputs' are also 'contributions'.
641 tc.assertEqual(len(legacy_cell.inputs), len(prov.inputs))
642 tc.assertEqual(len(legacy_cell.inputs), len(prov.contributions))
643 prov.inputs.sort(["instrument", "visit", "detector"])
644 prov.contributions.sort(["instrument", "visit", "detector"])
645 legacy_table.sort(["instrument", "visit", "detector"])
646 np.testing.assert_array_equal(prov.inputs["instrument"], prov.contributions["instrument"])
647 np.testing.assert_array_equal(prov.inputs["visit"], prov.contributions["visit"])
648 np.testing.assert_array_equal(prov.inputs["detector"], prov.contributions["detector"])
649 np.testing.assert_array_equal(prov.inputs["instrument"], legacy_table["instrument"])
650 np.testing.assert_array_equal(prov.inputs["visit"], legacy_table["visit"])
651 np.testing.assert_array_equal(prov.inputs["detector"], legacy_table["detector"])
652 np.testing.assert_array_equal(prov.inputs["physical_filter"], legacy_table["physical_filter"])
653 np.testing.assert_array_equal(prov.inputs["day_obs"], legacy_table["day_obs"])
654 np.testing.assert_array_equal(prov.contributions["overlaps_center"], legacy_table["overlaps_center"])
655 np.testing.assert_array_equal(
656 prov.contributions["overlap_fraction"], legacy_table["overlap_fraction"]
657 )
658 np.testing.assert_array_equal(prov.contributions["weight"], legacy_table["weight"])
659 np.testing.assert_array_equal(prov.contributions["psf_shape_xx"], legacy_table["psf_shape_xx"])
660 np.testing.assert_array_equal(prov.contributions["psf_shape_yy"], legacy_table["psf_shape_yy"])
661 np.testing.assert_array_equal(prov.contributions["psf_shape_xy"], legacy_table["psf_shape_xy"])
662 np.testing.assert_array_equal(prov.contributions["psf_shape_flag"], legacy_table["psf_shape_flag"])
663 for row in prov.inputs:
664 polygon_key = ObservationIdentifiers(**{k: row[k] for k in row.keys() if k != "polygon"})
665 legacy_polygon = legacy_cell_coadd.common.visit_polygons[polygon_key]
666 tc.assertEqual(legacy_polygon, row["polygon"].to_legacy())
669def compare_psf_to_legacy(
670 tc: unittest.TestCase,
671 psf: PointSpreadFunction,
672 legacy_psf: Any,
673 points: YX[np.ndarray] | XY[np.ndarray] | None = None,
674 expect_legacy_raise_on_out_of_bounds: bool = False,
675) -> int:
676 """Compare a PSF model object to its legacy interface.
678 Parameters
679 ----------
680 tc
681 Test case object with assert methods to use.
682 psf
683 Point-spread function to test.
684 legacy_psf
685 Legacy `lsst.afw.detection.Psf` instance to compare with.
686 points
687 Points to evaluate the PSFs at. If not provided, the intersection of
688 the PSF bounding boxes are used to generate points on a grid.
689 expect_legacy_raise_on_out_of_bounds
690 If `True`, expect ``legacy_psf`` to raise
691 `lsst.afw.detection.InvalidPsfError` when evaluated at a position
692 considered out-of-bounds by ``psf``.
694 Returns
695 -------
696 `int`
697 The number of points actually tested.
698 """
699 from lsst.afw.detection import InvalidPsfError
701 if points is None:
702 points = psf.bounds.bbox.meshgrid(n=3).map(np.ravel)
703 legacy_points = arrays_to_legacy_points(points.x, points.y)
704 n_points_tested: int = 0
705 for p in legacy_points:
706 if not psf.bounds.contains(x=p.x, y=p.y):
707 if expect_legacy_raise_on_out_of_bounds:
708 with tc.assertRaises(InvalidPsfError):
709 legacy_psf.computeKernelImage(p)
710 continue
711 tc.assertEqual(psf.kernel_bbox, Box.from_legacy(legacy_psf.computeKernelBBox(p)))
712 tc.assertEqual(
713 psf.compute_kernel_image(x=p.x, y=p.y), Image.from_legacy(legacy_psf.computeKernelImage(p))
714 )
715 tc.assertEqual(
716 psf.compute_stellar_bbox(x=p.x, y=p.y), Box.from_legacy(legacy_psf.computeImageBBox(p))
717 )
718 tc.assertEqual(psf.compute_stellar_image(x=p.x, y=p.y), Image.from_legacy(legacy_psf.computeImage(p)))
719 n_points_tested += 1
720 return n_points_tested
723def compare_field_to_legacy(
724 tc: unittest.TestCase,
725 field: BaseField,
726 legacy_field: Any,
727 subimage_bbox: Box,
728) -> None:
729 """Test a Field object by comparing it to an equivalent
730 `lsst.afw.math.BoundedField`.
732 Parameters
733 ----------
734 tc
735 Test case object with assert methods to use.
736 field
737 Field to test.
738 legacy_field : ``lsst.afw.math.BoundedField``
739 Equivalent legacy bounded field.
740 subimage_bbox
741 Bounding box for full-image tests.
742 """
743 tc.assertEqual(field.bounds.bbox, Box.from_legacy(legacy_field.getBBox()))
744 # Pixel coordinates to test the numpy array interface with.
745 pixel_xy = field.bounds.bbox.meshgrid(n=5).map(np.ravel)
746 assert_close(tc, field(x=pixel_xy.x, y=pixel_xy.y), legacy_field.evaluate(pixel_xy.x, pixel_xy.y))
747 legacy_image_1 = Image(0, bbox=subimage_bbox, dtype=np.float64).to_legacy()
748 legacy_field.addToImage(legacy_image_1, overlapOnly=True)
749 assert_images_equal(
750 tc, field.render(subimage_bbox), Image.from_legacy(legacy_image_1, unit=field.unit), rtol=1e-13
751 )
754def compare_aperture_corrections_to_legacy(
755 tc: unittest.TestCase,
756 aperture_corrections: ApertureCorrectionMap,
757 legacy_ap_corr_map: Any,
758 subimage_bbox: Box,
759) -> None:
760 """Test an aperture correction `dict` by comparing it to an equivalent
761 `lsst.afw.image.ApCorrMap`.
763 Parameters
764 ----------
765 tc
766 Test case object with assert methods to use.
767 aperture_corrections
768 Dictionary to test.
769 legacy_ap_corr_map : ``lsst.afw.image.ApCorrMap``
770 Equivalent legacy aperture correction map.
771 subimage_bbox
772 Bounding box for full-image tests.
773 """
774 tc.assertEqual(aperture_corrections.keys(), set(legacy_ap_corr_map.keys()))
775 for name, field in aperture_corrections.items():
776 compare_field_to_legacy(tc, field, legacy_ap_corr_map[name], subimage_bbox)
779def compare_observation_summary_stats_to_legacy(
780 tc: unittest.TestCase,
781 summary_stats: ObservationSummaryStats,
782 legacy_summary_stats: Any,
783) -> None:
784 """Test an ObservationSummaryStats object by comparing it to an equivalent
785 `lsst.afw.image.ExposureSummaryStats`.
787 Parameters
788 ----------
789 tc
790 Test case object with assert methods to use.
791 summary_stats
792 Struct to test.
793 legacy : ``lsst.afw.image.ExposureSummaryStats``
794 Equivalent legacy struct.
795 """
796 for field in dataclasses.fields(legacy_summary_stats):
797 a = getattr(legacy_summary_stats, field.name)
798 b = getattr(summary_stats, field.name)
799 if isinstance(b, tuple):
800 for ai, bi in zip(a, b):
801 tc.assertTrue(ai == bi or (math.isnan(ai) and math.isnan(bi)), f"{field.name}: {a} != {b}")
802 else:
803 tc.assertTrue(a == b or (math.isnan(a) and math.isnan(b)), f"{field.name}: {a} != {b}")
806def compare_projection_to_legacy_wcs[F: Frame](
807 tc: unittest.TestCase,
808 projection: Projection[F],
809 legacy_wcs: Any,
810 pixel_frame: F,
811 subimage_bbox: Box,
812 is_fits: bool = False,
813) -> None:
814 """Test a Projection object by comparing it to an equivalent
815 `lsst.afw.geom.SkyWcs`.
817 Parameters
818 ----------
819 tc
820 Test case object with assert methods to use.
821 projection
822 Projection to test.
823 legacy_wcs : ``lsst.afw.geom.SkyWcs``
824 Equivalent legacy WCS.
825 pixel_frame
826 Expected pixel frame for the projection.
827 subimage_bbox
828 Bounding box of points to generate for tests.
829 is_fits
830 Whether this projection is expected to be exactly representable as a
831 FIT WCS. If `False` it is assumed to have a FITS approximation
832 attached instead.
833 """
834 # Pixel coordinates to test on over the subimage region of interest:
835 pixel_xy = subimage_bbox.meshgrid(step=50).map(np.ravel)
836 # Array indices of those pixel values (subtract off bbox starts):
837 subimage_array_xy = XY(x=pixel_xy.x - subimage_bbox.x.start, y=pixel_xy.y - subimage_bbox.y.start)
838 sky_coords = legacy_coords_to_astropy(
839 legacy_wcs.pixelToSky(arrays_to_legacy_points(pixel_xy.x, pixel_xy.y))
840 )
841 # Test transforming with the Projection itself, which also tests its
842 # nested Transform and an Astropy High-Level WCS view with no origin
843 # change.
844 check_projection(tc, projection, pixel_xy, sky_coords, pixel_frame)
845 # Also test the Astropy High-Level WCS view with an origin change to
846 # array indices.
847 check_astropy_wcs_interface(
848 tc, projection.as_astropy(subimage_bbox), subimage_array_xy, sky_coords, pixel_atol=1e-5
849 )
850 if is_fits:
851 fits_wcs = projection.as_fits_wcs(subimage_bbox, allow_approximation=True)
852 assert fits_wcs is not None
853 check_astropy_wcs_interface(tc, fits_wcs, subimage_array_xy, sky_coords, pixel_atol=1e-5)
854 # Use that FITS approximation to check that we can make a
855 # Projection from a FITS WCS, too.
856 fits_projection = Projection.from_fits_wcs(fits_wcs, pixel_frame)
857 check_projection(
858 tc,
859 fits_projection,
860 subimage_array_xy,
861 sky_coords,
862 pixel_frame,
863 pixel_atol=1e-5,
864 )
865 # We want Projections we create from a FITS WCS to be backed by an
866 # AST FrameSet so we can convert them into legacy
867 # `lsst.afw.geom.SkyWcs` objects if desired.
868 tc.assertIn("Begin FrameSet", fits_projection.show())
869 else:
870 tc.assertIsNone(projection.as_fits_wcs(subimage_bbox, allow_approximation=False))
871 # The legacy SkyWcs should instead have a FITS approximation
872 # attached; run the same tests on that.
873 assert projection.fits_approximation is not None
874 compare_projection_to_legacy_wcs(
875 tc,
876 projection.fits_approximation,
877 legacy_wcs.getFitsApproximation(),
878 pixel_frame,
879 subimage_bbox,
880 is_fits=True,
881 )
884def check_transform[I: Frame, O: Frame](
885 tc: unittest.TestCase,
886 transform: Transform[I, O],
887 input_xy: XY[np.ndarray],
888 output_xy: XY[np.ndarray],
889 in_frame: Frame,
890 out_frame: Frame,
891 *,
892 check_inverted: bool = True,
893 in_atol: u.Quantity | None = None,
894 out_atol: u.Quantity | None = None,
895) -> None:
896 """Test Transform against known arrays of input and output points.
898 Parameters
899 ----------
900 tc
901 Test case object with assert methods to use.
902 transform
903 Transform to test.
904 input_xy
905 Arrays of input points.
906 output_xy
907 Arrays of output points.
908 in_frame
909 Expected input frame.
910 out_frame
911 Expected output frame.
912 check_inverted
913 If `True`, recurse (once) to test the inverse transform.
914 in_atol
915 Expected absolute precision of input points.
916 out_atol
917 Expected absolute precision of output points.
918 """
919 tc.assertEqual(transform.in_frame, in_frame)
920 tc.assertEqual(transform.out_frame, out_frame)
921 in_atol_v = in_atol.to_value(in_frame.unit) if in_atol is not None else None
922 out_atol_v = out_atol.to_value(out_frame.unit) if out_atol is not None else None
923 # Test array interfaces.
924 test_output_xy = transform.apply_forward(x=input_xy.x, y=input_xy.y)
925 assert_close(tc, test_output_xy.x, output_xy.x, atol=out_atol_v)
926 assert_close(tc, test_output_xy.y, output_xy.y, atol=out_atol_v)
927 test_input_xy = transform.apply_inverse(x=output_xy.x, y=output_xy.y)
928 assert_close(tc, test_input_xy.x, input_xy.x, atol=in_atol_v)
929 assert_close(tc, test_input_xy.y, input_xy.y, atol=in_atol_v)
930 # Test scalar interfaces with numpy scalars.
931 for input_x, input_y, output_x, output_y in zip(input_xy.x, input_xy.y, output_xy.x, output_xy.y):
932 assert_close(tc, transform.apply_forward(x=input_x, y=input_y).x, output_x, atol=out_atol_v)
933 assert_close(tc, transform.apply_forward(x=input_x, y=input_y).y, output_y, atol=out_atol_v)
934 assert_close(tc, transform.apply_inverse(x=output_x, y=output_y).x, input_x, atol=in_atol_v)
935 assert_close(tc, transform.apply_inverse(x=output_x, y=output_y).y, input_y, atol=in_atol_v)
936 # Test quantity array interfaces.
937 input_q_xy = XY(x=input_xy.x * transform.in_frame.unit, y=input_xy.y * transform.in_frame.unit)
938 output_q_xy = XY(x=output_xy.x * transform.out_frame.unit, y=output_xy.y * transform.out_frame.unit)
939 test_output_q_xy = transform.apply_forward_q(x=input_q_xy.x, y=input_q_xy.y)
940 assert_close(tc, test_output_q_xy.x, output_q_xy.x, atol=out_atol)
941 assert_close(tc, test_output_q_xy.y, output_q_xy.y, atol=out_atol)
942 test_input_q_xy = transform.apply_inverse_q(x=output_q_xy.x, y=output_q_xy.y)
943 assert_close(tc, test_input_q_xy.x, input_q_xy.x, atol=in_atol)
944 assert_close(tc, test_input_q_xy.y, input_q_xy.y, atol=in_atol)
945 # Test quantity scalar interfaces.
946 for input_q_x, input_q_y, output_q_x, output_q_y in zip(
947 input_q_xy.x, input_q_xy.y, output_q_xy.x, output_q_xy.y
948 ):
949 assert_close(tc, transform.apply_forward_q(x=input_q_x, y=input_q_y).x, output_q_x, atol=out_atol)
950 assert_close(tc, transform.apply_forward_q(x=input_q_x, y=input_q_y).y, output_q_y, atol=out_atol)
951 assert_close(tc, transform.apply_inverse_q(x=output_q_x, y=output_q_y).x, input_q_x, atol=in_atol)
952 assert_close(tc, transform.apply_inverse_q(x=output_q_x, y=output_q_y).y, input_q_y, atol=in_atol)
953 if check_inverted:
954 # Test the inverse transform.
955 check_transform(
956 tc,
957 transform.inverted(),
958 output_xy,
959 input_xy,
960 out_frame,
961 in_frame,
962 check_inverted=False,
963 out_atol=in_atol,
964 in_atol=out_atol,
965 )
968def check_projection[P: Frame](
969 tc: unittest.TestCase,
970 projection: Projection[P],
971 pixel_xy: XY[np.ndarray],
972 sky_coords: SkyCoord,
973 pixel_frame: Frame,
974 *,
975 pixel_atol: float | None = None,
976 sky_atol: u.Quantity | None = None,
977) -> None:
978 """Test a `.Projection` instance against known arrays of pixel and sky
979 coordinates.
981 Parameters
982 ----------
983 tc
984 Test case object with assert methods to use.
985 projection
986 Projection to test.
987 pixel_xy
988 Arrays of pixel coordinates.
989 sky_coords
990 Corresponding sky coordinates.
991 pixel_frame
992 Expected pixel frame.
993 pixel_atol
994 Expected absolute precision of pixel points.
995 sky_atol
996 Expected absolute precision of sky coordinates.
997 """
998 tc.assertEqual(projection.pixel_frame, pixel_frame)
999 tc.assertEqual(projection.sky_frame, SkyFrame.ICRS)
1000 sky_atol_v = sky_atol.to_value(SkyFrame.ICRS.unit) if sky_atol is not None else None
1001 pixel_atol_q = pixel_atol * u.pix if pixel_atol is not None else None
1002 # Test array interfaces.
1003 test_pixel_xy = cast(XY[np.ndarray], projection.sky_to_pixel(sky_coords))
1004 assert_close(tc, test_pixel_xy.x, pixel_xy.x, atol=pixel_atol)
1005 assert_close(tc, test_pixel_xy.y, pixel_xy.y, atol=pixel_atol)
1006 test_sky_astropy = projection.pixel_to_sky(x=pixel_xy.x, y=pixel_xy.y)
1007 assert_close(tc, test_sky_astropy.ra, sky_coords.ra, atol=sky_atol_v)
1008 assert_close(tc, test_sky_astropy.dec, sky_coords.dec, atol=sky_atol_v)
1009 # Test scalar interfaces.
1010 for pixel_x, pixel_y, sky_single in zip(pixel_xy.x, pixel_xy.y, sky_coords):
1011 assert_close(tc, projection.sky_to_pixel(sky_single).x, pixel_x, atol=pixel_atol)
1012 assert_close(tc, projection.sky_to_pixel(sky_single).y, pixel_y, atol=pixel_atol)
1013 assert_close(tc, projection.pixel_to_sky(x=pixel_x, y=pixel_y).ra, sky_single.ra, atol=sky_atol_v)
1014 assert_close(tc, projection.pixel_to_sky(x=pixel_x, y=pixel_y).dec, sky_single.dec, atol=sky_atol_v)
1015 # Test the underlying Transform object.
1016 sky_xy = XY(x=sky_coords.ra.to_value(u.rad), y=sky_coords.dec.to_value(u.rad))
1017 check_transform(
1018 tc,
1019 projection.pixel_to_sky_transform,
1020 pixel_xy,
1021 sky_xy,
1022 pixel_frame,
1023 SkyFrame.ICRS,
1024 check_inverted=False,
1025 in_atol=pixel_atol_q,
1026 out_atol=sky_atol,
1027 )
1028 check_transform(
1029 tc,
1030 projection.sky_to_pixel_transform,
1031 sky_xy,
1032 pixel_xy,
1033 SkyFrame.ICRS,
1034 pixel_frame,
1035 check_inverted=False,
1036 in_atol=sky_atol,
1037 out_atol=pixel_atol_q,
1038 )
1039 # Test the Astropy interface adapter.
1040 check_astropy_wcs_interface(
1041 tc, projection.as_astropy(), pixel_xy, sky_coords, pixel_atol=pixel_atol, sky_atol=sky_atol
1042 )
1045def assert_projections_equal(
1046 tc: unittest.TestCase,
1047 a: Projection[Any] | None,
1048 b: Projection[Any] | None,
1049 expect_identity: bool | None = None,
1050) -> None:
1051 """Test that two `.Projection` instances are equivalent."""
1052 if a is None and b is None:
1053 return
1054 assert a is not None and b is not None
1055 match expect_identity:
1056 case True:
1057 tc.assertIs(a, b)
1058 return
1059 case False:
1060 tc.assertIsNot(a, b)
1061 case None if a is b:
1062 return
1063 tc.assertEqual(a.pixel_frame, b.pixel_frame)
1064 tc.assertEqual(a.show(simplified=True), b.show(simplified=True))
1065 assert_projections_equal(
1066 tc, a.fits_approximation, cast(Projection[Any], b.fits_approximation), expect_identity=False
1067 )
1070def check_astropy_wcs_interface(
1071 tc: unittest.TestCase,
1072 wcs: astropy.wcs.wcsapi.BaseHighLevelWCS,
1073 pixel_xy: XY[np.ndarray],
1074 sky_coords: SkyCoord,
1075 *,
1076 pixel_atol: float | None = None,
1077 sky_atol: u.Quantity | None = None,
1078) -> None:
1079 """Test an Astropy WCS instance against known arrays of pixel and
1080 sky coordinates.
1082 Parameters
1083 ----------
1084 tc
1085 Test case object with assert methods to use.
1086 wcs
1087 Astropy WCS object to test.
1088 pixel_xy
1089 Arrays of pixel coordinates.
1090 sky_coords
1091 Corresponding sky coordinates.
1092 pixel_atol
1093 Expected absolute precision of pixel points.
1094 sky_atol
1095 Expected absolute precision of sky coordinates.
1096 """
1097 test_x, test_y = wcs.world_to_pixel(sky_coords)
1098 assert_close(tc, test_x, pixel_xy.x, atol=pixel_atol)
1099 assert_close(tc, test_y, pixel_xy.y, atol=pixel_atol)
1100 test_sky_coords = wcs.pixel_to_world(pixel_xy.x, pixel_xy.y)
1101 assert_close(tc, test_sky_coords.ra, sky_coords.ra, atol=sky_atol)
1102 assert_close(tc, test_sky_coords.dec, sky_coords.dec, atol=sky_atol)
1105def legacy_points_to_xy_array(legacy_points: list[Any]) -> XY[np.ndarray]:
1106 """Convert a list of ``lsst.geom.Point2D`` objects to an `.XY` array."""
1107 return XY(x=np.array([p.x for p in legacy_points]), y=np.array([p.y for p in legacy_points]))
1110def legacy_coords_to_astropy(legacy_coords: list[Any]) -> SkyCoord:
1111 """Convert a list of ``lsst.geom.SpherePoint`` objects to an Astropy
1112 coordinate object.
1113 """
1114 return SkyCoord(
1115 ra=np.array([p.getRa().asRadians() for p in legacy_coords]) * u.rad,
1116 dec=np.array([p.getDec().asRadians() for p in legacy_coords]) * u.rad,
1117 )
1120def arrays_to_legacy_points(x: np.ndarray, y: np.ndarray) -> list[Any]:
1121 """Convert arrays of ``x`` and ``y`` to a list of ``lsst.geom.Point2D``."""
1122 from lsst.geom import Point2D
1124 return [Point2D(x=xv, y=yv) for xv, yv in zip(x, y)]
1127def compare_amplifier_to_legacy(
1128 tc: unittest.TestCase,
1129 amplifier: Amplifier,
1130 legacy_amplifier: Any,
1131 *,
1132 is_raw_assembled: bool,
1133 expect_nominal_calibrations: bool = True,
1134) -> None:
1135 """Compare an `~.cameras.Amplifier` to a legacy
1136 `lsst.afw.cameraGeom.Amplifier`.
1137 """
1138 tc.assertEqual(legacy_amplifier.getName(), amplifier.name)
1139 tc.assertEqual(Box.from_legacy(legacy_amplifier.getBBox()), amplifier.bbox)
1140 if is_raw_assembled:
1141 raw_geom = amplifier.assembled_raw_geometry
1142 else:
1143 raw_geom = amplifier.unassembled_raw_geometry
1144 assert raw_geom is not None
1145 tc.assertEqual(ReadoutCorner.from_legacy(legacy_amplifier.getReadoutCorner()), raw_geom.readout_corner)
1146 tc.assertEqual(Box.from_legacy(legacy_amplifier.getRawBBox()), raw_geom.bbox)
1147 tc.assertEqual(Box.from_legacy(legacy_amplifier.getRawDataBBox()), raw_geom.data_bbox)
1148 tc.assertEqual(legacy_amplifier.getRawFlipX(), raw_geom.flip_x)
1149 tc.assertEqual(legacy_amplifier.getRawFlipY(), raw_geom.flip_y)
1150 tc.assertEqual(legacy_amplifier.getRawXYOffset().getX(), raw_geom.x_offset)
1151 tc.assertEqual(legacy_amplifier.getRawXYOffset().getY(), raw_geom.y_offset)
1152 tc.assertEqual(
1153 Box.from_legacy(legacy_amplifier.getRawHorizontalOverscanBBox()), raw_geom.horizontal_overscan_bbox
1154 )
1155 tc.assertEqual(
1156 Box.from_legacy(legacy_amplifier.getRawVerticalOverscanBBox()), raw_geom.vertical_overscan_bbox
1157 )
1158 tc.assertEqual(Box.from_legacy(legacy_amplifier.getRawPrescanBBox()), raw_geom.horizontal_prescan_bbox)
1159 if expect_nominal_calibrations:
1160 assert amplifier.nominal_calibrations is not None
1161 assert_equal_allow_nan(tc, legacy_amplifier.getGain(), amplifier.nominal_calibrations.gain)
1162 assert_equal_allow_nan(tc, legacy_amplifier.getReadNoise(), amplifier.nominal_calibrations.read_noise)
1163 assert_equal_allow_nan(
1164 tc, legacy_amplifier.getSaturation(), amplifier.nominal_calibrations.saturation
1165 )
1166 assert_equal_allow_nan(
1167 tc, legacy_amplifier.getSuspectLevel(), amplifier.nominal_calibrations.suspect_level
1168 )
1169 np.testing.assert_array_equal(
1170 legacy_amplifier.getLinearityCoeffs(), amplifier.nominal_calibrations.linearity_coefficients
1171 )
1172 tc.assertEqual(legacy_amplifier.getLinearityType(), amplifier.nominal_calibrations.linearity_type)
1175def compare_detector_to_legacy(
1176 tc: unittest.TestCase,
1177 detector: Detector,
1178 legacy_detector: Any,
1179 *,
1180 is_raw_assembled: bool,
1181 expect_nominal_calibrations: bool = True,
1182) -> None:
1183 """Compare a `~.cameras.Detector` to a `lsst.afw.cameraGeom.Detector`."""
1184 from lsst.afw.cameraGeom import FIELD_ANGLE, FOCAL_PLANE, PIXELS
1186 tc.assertEqual(legacy_detector.getName(), detector.name)
1187 tc.assertEqual(legacy_detector.getId(), detector.id)
1188 tc.assertEqual(DetectorType.from_legacy(legacy_detector.getType()), detector.type)
1189 tc.assertEqual(Box.from_legacy(legacy_detector.getBBox()), detector.bbox)
1190 tc.assertEqual(legacy_detector.getSerial(), detector.serial)
1191 legacy_orientation = legacy_detector.getOrientation()
1192 tc.assertEqual(legacy_orientation.getFpPosition3().getX(), detector.orientation.focal_plane_x)
1193 tc.assertEqual(legacy_orientation.getFpPosition3().getY(), detector.orientation.focal_plane_y)
1194 tc.assertEqual(legacy_orientation.getFpPosition3().getZ(), detector.orientation.focal_plane_z)
1195 tc.assertEqual(legacy_orientation.getReferencePoint().getX(), detector.orientation.pixel_reference_x)
1196 tc.assertEqual(legacy_orientation.getReferencePoint().getY(), detector.orientation.pixel_reference_y)
1197 tc.assertEqual(legacy_orientation.getYaw().asRadians(), detector.orientation.yaw.to_value(u.rad))
1198 tc.assertEqual(legacy_orientation.getPitch().asRadians(), detector.orientation.pitch.to_value(u.rad))
1199 tc.assertEqual(legacy_orientation.getRoll().asRadians(), detector.orientation.roll.to_value(u.rad))
1200 tc.assertEqual(legacy_detector.getPixelSize().getX(), detector.pixel_size)
1201 tc.assertEqual(legacy_detector.getPhysicalType(), detector.physical_type)
1202 for amplifier, legacy_amplifier in zip(detector.amplifiers, legacy_detector.getAmplifiers(), strict=True):
1203 compare_amplifier_to_legacy(
1204 tc,
1205 amplifier,
1206 legacy_amplifier,
1207 is_raw_assembled=is_raw_assembled,
1208 expect_nominal_calibrations=expect_nominal_calibrations,
1209 )
1210 pixel_xy = detector.bbox.meshgrid(n=3).map(lambda z: z.ravel().astype(np.float64))
1211 pixel_legacy_points = arrays_to_legacy_points(y=pixel_xy.y, x=pixel_xy.x)
1212 fp_legacy_points = legacy_detector.transform(pixel_legacy_points, PIXELS, FOCAL_PLANE)
1213 check_transform(
1214 tc,
1215 detector.to_focal_plane,
1216 pixel_xy,
1217 legacy_points_to_xy_array(fp_legacy_points),
1218 detector.frame,
1219 detector.to_focal_plane.out_frame,
1220 in_atol=1e-9 * u.pix,
1221 out_atol=1e-7 * detector.to_focal_plane.out_frame.unit,
1222 )
1223 fa_legacy_points = legacy_detector.transform(pixel_legacy_points, PIXELS, FIELD_ANGLE)
1224 check_transform(
1225 tc,
1226 detector.to_field_angle,
1227 pixel_xy,
1228 legacy_points_to_xy_array(fa_legacy_points),
1229 detector.frame,
1230 detector.to_field_angle.out_frame,
1231 in_atol=1e-9 * u.pix,
1232 out_atol=1e-7 * u.arcsec,
1233 )
1236def iter_concrete_archive_tree_subclasses() -> Iterator[type[ArchiveTree]]:
1237 """Yield every importable concrete `.serialization.ArchiveTree` subclass.
1239 Walks the ``ArchiveTree.__subclasses__()`` tree, skipping abstract
1240 classes. Importing this module already imports every ``lsst.images``
1241 module that defines a subclass, so the tree is fully populated by the time
1242 this is called.
1244 This discovery is deliberately separate from
1245 `check_archive_tree_class_invariants` so that the per-class check stays
1246 usable on a single class even if this metaprogramming is removed later.
1247 """
1248 seen: set[type] = set()
1249 stack: list[type] = [ArchiveTree]
1250 while stack:
1251 kls = stack.pop()
1252 for sub in kls.__subclasses__():
1253 if sub in seen:
1254 continue
1255 seen.add(sub)
1256 stack.append(sub)
1257 if not getattr(sub, "__abstractmethods__", None):
1258 yield sub
1261def check_archive_tree_class_invariants(tc: unittest.TestCase, cls: type[ArchiveTree]) -> None:
1262 """Assert that one concrete `.serialization.ArchiveTree` subclass declares
1263 well-formed schema-version constants.
1265 Checks that ``SCHEMA_NAME``, ``SCHEMA_VERSION`` and ``MIN_READ_VERSION``
1266 are present and well-typed, that the version is ``major.minor.patch``, and
1267 that ``MIN_READ_VERSION`` does not exceed the schema major.
1269 Parameters
1270 ----------
1271 tc
1272 Test case object with assert methods to use.
1273 cls
1274 The concrete `.serialization.ArchiveTree` subclass to check.
1275 """
1276 tc.assertTrue(hasattr(cls, "SCHEMA_NAME"), f"{cls.__name__} lacks SCHEMA_NAME")
1277 tc.assertTrue(hasattr(cls, "SCHEMA_VERSION"), f"{cls.__name__} lacks SCHEMA_VERSION")
1278 tc.assertTrue(hasattr(cls, "MIN_READ_VERSION"), f"{cls.__name__} lacks MIN_READ_VERSION")
1279 tc.assertIsInstance(cls.SCHEMA_NAME, str)
1280 tc.assertGreater(len(cls.SCHEMA_NAME), 0)
1281 tc.assertRegex(cls.SCHEMA_VERSION, r"^\d+\.\d+\.\d+$")
1282 tc.assertIsInstance(cls.MIN_READ_VERSION, int)
1283 tc.assertGreaterEqual(cls.MIN_READ_VERSION, 1)
1284 major = int(cls.SCHEMA_VERSION.split(".")[0])
1285 tc.assertLessEqual(cls.MIN_READ_VERSION, major)