Coverage for tests/test_cell_coadd.py: 30%
61 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:08 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import pickle
16import unittest
17from typing import Any
19import numpy as np
21from lsst.images import YX, Box, Interval, get_legacy_deep_coadd_mask_planes
22from lsst.images.cells import CellCoadd, CellIJ
23from lsst.images.tests import (
24 DP2_COADD_DATA_ID,
25 DP2_COADD_MISSING_CELL,
26 RoundtripFits,
27 RoundtripJson,
28 assert_cell_coadds_equal,
29 assert_masked_images_equal,
30 assert_psfs_equal,
31 compare_cell_coadd_to_legacy,
32)
34DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
37@unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
38class CellCoaddTestCase(unittest.TestCase):
39 """Tests for the CellCoadd class and its many component classes."""
41 @classmethod
42 def setUpClass(cls) -> None:
43 assert DATA_DIR is not None, "Guaranteed by decorator."
44 cls.filename = os.path.join(DATA_DIR, "dp2", "legacy", "deep_coadd_cell_predetection.fits")
45 cls.plane_map = get_legacy_deep_coadd_mask_planes()
46 cls.missing_cell = CellIJ(**DP2_COADD_MISSING_CELL)
47 try:
48 from lsst.cell_coadds import MultipleCellCoadd
50 cls.legacy_cell_coadd = MultipleCellCoadd.read_fits(cls.filename)
51 except ImportError:
52 raise unittest.SkipTest("lsst.cell_coadds could not be imported.") from None
53 with open(os.path.join(DATA_DIR, "dp2", "legacy", "skyMap.pickle"), "rb") as stream:
54 cls.skymap = pickle.load(stream)
55 cls.cell_coadd = CellCoadd.from_legacy(
56 cls.legacy_cell_coadd,
57 plane_map=cls.plane_map,
58 tract_info=cls.skymap[DP2_COADD_DATA_ID["tract"]],
59 )
61 def make_psf_points(self, bbox: Box) -> YX[np.ndarray]:
62 """Make arrays of points to test PSFs at, given a bbox that is assumed
63 to be snapped to the cell_coadd grid.
64 """
65 xc, yc = np.meshgrid(
66 np.arange(
67 bbox.x.start + self.cell_coadd.grid.cell_shape.x * 0.5,
68 bbox.x.stop,
69 self.cell_coadd.grid.cell_shape.x,
70 ),
71 np.arange(
72 bbox.y.start + self.cell_coadd.grid.cell_shape.y * 0.5,
73 bbox.y.stop,
74 self.cell_coadd.grid.cell_shape.y,
75 ),
76 )
77 return YX(
78 y=yc.ravel() + self.rng.uniform(-0.4, 0.4, size=yc.size),
79 x=xc.ravel() + self.rng.uniform(-0.4, 0.4, size=xc.size),
80 )
82 def setUp(self) -> None:
83 self.rng = np.random.default_rng(44)
84 self.psf_points = self.make_psf_points(self.cell_coadd.bbox)
86 def test_from_legacy(self) -> None:
87 """Test constructing a CellCoadd by converting a legacy
88 lsst.cell_coadds.MultipleCellCoadd.
89 """
90 self.assertEqual(self.cell_coadd.bounds.missing, {self.missing_cell})
91 self.assertEqual(self.cell_coadd.bbox, Box.factory[12900:13500, 9600:10050])
92 compare_cell_coadd_to_legacy(
93 self,
94 self.cell_coadd,
95 self.legacy_cell_coadd,
96 tract_bbox=Box.from_legacy(self.skymap[DP2_COADD_DATA_ID["tract"]].getBBox()),
97 plane_map=self.plane_map,
98 psf_points=self.psf_points,
99 )
101 def test_roundtrip(self) -> None:
102 """Test serializing a CellCoadd and reading it back in, including
103 subimage and component reads.
104 """
105 with RoundtripFits(self, self.cell_coadd, "CellCoadd") as roundtrip:
106 # Check a subimage read. The subbox only overlaps (but does not
107 # fully cover) the middle 2 (of 4) cells in y, while covering
108 # exactly the last column of cells in x. It does not cover the
109 # missing cell.
110 subbox = Box.factory[
111 self.cell_coadd.bbox.y.start + 252 : self.cell_coadd.bbox.y.stop - 175,
112 self.cell_coadd.bbox.x.stop - 150 : self.cell_coadd.bbox.x.stop,
113 ]
114 subimage = roundtrip.get(bbox=subbox)
115 assert_masked_images_equal(self, subimage, self.cell_coadd[subbox], expect_view=False)
116 alternates: dict[str, Any] = {}
117 with self.subTest():
118 subpsf = roundtrip.get("psf", bbox=subbox)
119 self.assertEqual(
120 subpsf.bounds.bbox,
121 Box(
122 y=Interval.factory[
123 self.cell_coadd.bbox.y.start + 150 : self.cell_coadd.bbox.y.stop - 150
124 ],
125 x=subbox.x,
126 ),
127 )
128 assert_psfs_equal(self, subpsf, self.cell_coadd.psf, points=self.make_psf_points(subbox))
129 self.assertEqual(roundtrip.get("bbox"), self.cell_coadd.bbox)
130 alternates = {
131 k: roundtrip.get(k)
132 for k in ["projection", "image", "mask", "variance", "psf", "provenance"]
133 }
134 with self.subTest():
135 backgrounds = roundtrip.get("backgrounds")
136 self.assertEqual(backgrounds.keys(), set())
137 self.assertIsNone(backgrounds.subtracted)
138 # Fixture self-consistency: bbox and missing-cell set are what setUp
139 # claims they are.
140 self.assertEqual(self.cell_coadd.bounds.missing, {self.missing_cell})
141 self.assertEqual(self.cell_coadd.bbox, Box.factory[12900:13500, 9600:10050])
142 # Full round-trip fidelity, including background contents.
143 assert_cell_coadds_equal(self, roundtrip.result, self.cell_coadd, expect_view=False)
144 compare_cell_coadd_to_legacy(
145 self,
146 roundtrip.result,
147 self.legacy_cell_coadd,
148 tract_bbox=Box.from_legacy(self.skymap[DP2_COADD_DATA_ID["tract"]].getBBox()),
149 plane_map=self.plane_map,
150 alternates=alternates,
151 psf_points=self.psf_points,
152 )
154 def test_fits_json_consistency(self) -> None:
155 """FITS and JSON backends produce equal CellCoadds on round-trip."""
156 with (
157 RoundtripFits(self, self.cell_coadd) as fits_rt,
158 RoundtripJson(self, self.cell_coadd) as json_rt,
159 ):
160 assert_cell_coadds_equal(self, self.cell_coadd, fits_rt.result, expect_view=False)
161 assert_cell_coadds_equal(self, self.cell_coadd, json_rt.result, expect_view=False)
162 assert_cell_coadds_equal(self, fits_rt.result, json_rt.result, expect_view=False)
165if __name__ == "__main__":
166 unittest.main()