Coverage for tests / test_masked_image.py: 21%
143 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-27 01:31 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-27 01:31 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.units as u
20import numpy as np
22from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes
23from lsst.images.fits import FitsCompressionOptions
24from lsst.images.tests import (
25 RoundtripFits,
26 RoundtripNdf,
27 assert_masked_images_equal,
28 compare_masked_image_to_legacy,
29)
31try:
32 import h5py # noqa: F401
34 HAVE_H5PY = True
35except ImportError:
36 HAVE_H5PY = False
38DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
41class MaskedImageTestCase(unittest.TestCase):
42 """Tests for the MaskedImage class and the basics of the archive system."""
44 def setUp(self) -> None:
45 self.maxDiff = None
46 self.rng = np.random.default_rng(500)
47 self.masked_image = MaskedImage(
48 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)),
49 mask_schema=MaskSchema(
50 [
51 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."),
52 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."),
53 ]
54 ),
55 metadata={"fifty": "5 * 10"},
56 )
57 self.masked_image.mask.array |= np.multiply.outer(
58 self.masked_image.image.array < 102.0,
59 self.masked_image.mask.schema.bitmask("BAD"),
60 )
61 self.masked_image.mask.array |= np.multiply.outer(
62 self.masked_image.image.array > 98.0,
63 self.masked_image.mask.schema.bitmask("HUNGRY"),
64 )
65 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape)
67 def test_construction(self) -> None:
68 """Test that the MaskedImage construction (in setUp) worked."""
69 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259])
70 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox)
71 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox)
72 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape)
73 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,))
74 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape)
75 self.assertEqual(self.masked_image.unit, u.nJy)
76 self.assertEqual(self.masked_image.variance.unit, u.nJy**2)
77 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"})
78 # The checks below are subject to the vagaries of the RNG, but we want
79 # the seed to be such that they all pass, or other tests will be
80 # weaker.
81 self.assertGreater(
82 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0
83 )
84 self.assertGreater(
85 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0
86 )
87 self.assertGreater(
88 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0
89 )
91 self.assertIs(self.masked_image[...], self.masked_image)
92 self.assertEqual(
93 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])"
94 )
95 self.assertEqual(
96 repr(self.masked_image),
97 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), "
98 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is "
99 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had "
100 "enough to eat today.\")], dtype=dtype('uint8')))",
101 )
102 copy = self.masked_image.copy()
103 original = self.masked_image.image.array[0, 0]
104 copy.image.array[0, 0] = 38.0
105 self.assertEqual(self.masked_image.image.array[0, 0], original)
106 self.assertEqual(copy.image.array[0, 0], 38.0)
108 # Test error conditions.
109 with self.assertRaises(ValueError):
110 # Disagreement over mask bbox.
111 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask)
112 with self.assertRaises(TypeError):
113 # No mask definition.
114 MaskedImage(self.masked_image.image, variance=self.masked_image.variance)
115 with self.assertRaises(TypeError):
116 # Can not provide mask and mask schema.
117 MaskedImage(
118 Image(42.0, shape=(5, 5)),
119 mask=self.masked_image.mask,
120 mask_schema=self.masked_image.mask.schema,
121 )
122 with self.assertRaises(ValueError):
123 # image and variance bbox disagreement.
124 MaskedImage(
125 Image(42.0, shape=(5, 5)),
126 mask_schema=self.masked_image.mask.schema,
127 variance=self.masked_image.variance,
128 )
129 with self.assertRaises(ValueError):
130 # no image unit but there is variance unit.
131 MaskedImage(
132 Image(42.0, shape=(5, 5)),
133 mask_schema=self.masked_image.mask.schema,
134 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
135 )
136 with self.assertRaises(ValueError):
137 # image and variance units disagree.
138 MaskedImage(
139 Image(42.0, shape=(5, 5), unit=u.nJy),
140 mask_schema=self.masked_image.mask.schema,
141 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
142 )
144 def test_subset(self) -> None:
145 """Test assignment of subset."""
146 copy = self.masked_image.copy()
147 subset = copy.local[0:10, 20:30].copy()
148 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy)
149 copy[subset.bbox] = subset
150 self.assertEqual(copy.image.array[0, 20], 42.0)
151 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0])
153 def test_fits_roundtrip(self) -> None:
154 """Test that we can round-trip the MaskedImage through FITS, including
155 subimage reads.
156 """
157 subbox = Box.factory[11:20, 25:30]
158 subslices = (slice(6, 15), slice(17, 22))
159 np.testing.assert_array_equal(
160 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
161 )
162 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip:
163 subimage = roundtrip.get(bbox=subbox)
164 # Check that we used lossless compression (the default).
165 fits = roundtrip.inspect()
166 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
167 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
168 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
169 # Test reading back in as a legacy MaskedImage.
170 with self.subTest():
171 try:
172 import lsst.afw.image
173 except ImportError:
174 raise unittest.SkipTest("afw could not be imported") from None
175 legacy_masked_image = roundtrip.get(storageClass="MaskedImage")
176 self.assertIsInstance(legacy_masked_image, lsst.afw.image.MaskedImage)
177 compare_masked_image_to_legacy(
178 self, self.masked_image, legacy_masked_image, expect_view=False
179 )
180 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
181 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False)
183 def test_fits_roundtrip_lossy(self) -> None:
184 """Test that we can round-trip the MaskedImage through FITS, including
185 subimage reads, with lossy compression.
186 """
187 subbox = Box.factory[11:20, 25:30]
188 subslices = (slice(6, 15), slice(17, 22))
189 np.testing.assert_array_equal(
190 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
191 )
192 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp:
193 tmp.close()
194 self.masked_image.write_fits(
195 tmp.name,
196 image_compression=FitsCompressionOptions.LOSSY,
197 variance_compression=FitsCompressionOptions.LOSSY,
198 compression_seed=50,
199 )
200 roundtripped = MaskedImage.read_fits(tmp.name)
201 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox)
202 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits:
203 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
204 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
205 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
206 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01)
207 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False)
209 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
210 def test_round_trip_ndf_compatible_mask(self):
211 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8)."""
212 with RoundtripNdf(self, self.masked_image) as roundtrip:
213 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
215 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
216 def test_round_trip_ndf_incompatible_mask(self):
217 """NDF round-trip for a >8-plane mask (uses native 3D mask array,
218 to MORE/LSST/MASK).
219 """
220 rng = np.random.default_rng(7)
221 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)]
222 wide = MaskedImage(
223 Image(
224 rng.normal(100.0, 8.0, size=(50, 60)),
225 dtype=np.float64,
226 unit=u.nJy,
227 start=(0, 0),
228 ),
229 mask_schema=MaskSchema(planes),
230 )
231 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
232 with RoundtripNdf(self, wide) as roundtrip:
233 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
235 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
236 def test_round_trip_ndf_many_plane_mask(self):
237 """NDF round-trip for a mask that needs more than one int32 chunk."""
238 rng = np.random.default_rng(11)
239 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)]
240 wide = MaskedImage(
241 Image(
242 rng.normal(100.0, 8.0, size=(10, 12)),
243 dtype=np.float64,
244 unit=u.nJy,
245 start=(0, 0),
246 ),
247 mask_schema=MaskSchema(planes),
248 )
249 wide.mask.set("P0", wide.image.array > 100.0)
250 wide.mask.set("P17", wide.image.array < 95.0)
251 wide.mask.set("P39", wide.image.array > 110.0)
252 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
253 with RoundtripNdf(self, wide) as roundtrip:
254 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
256 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
257 def test_fits_ndf_consistency(self):
258 """FITS and NDF backends produce equal MaskedImages on round-trip."""
259 with (
260 RoundtripFits(self, self.masked_image) as fits_rt,
261 RoundtripNdf(self, self.masked_image) as ndf_rt,
262 ):
263 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False)
264 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False)
265 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
267 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
268 def test_legacy(self) -> None:
269 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and
270 MaskedImage.from_legacy.
271 """
272 assert DATA_DIR is not None, "Guaranteed by decorator."
273 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
274 plane_map = get_legacy_visit_image_mask_planes()
275 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map)
276 try:
277 from lsst.afw.image import MaskedImageFitsReader
278 except ImportError:
279 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
280 reader = MaskedImageFitsReader(filename)
281 legacy_masked_image = reader.read()
282 compare_masked_image_to_legacy(
283 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False
284 )
285 compare_masked_image_to_legacy(
286 self,
287 masked_image,
288 masked_image.to_legacy(plane_map=plane_map),
289 plane_map=plane_map,
290 expect_view=True,
291 )
292 compare_masked_image_to_legacy(
293 self,
294 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map),
295 legacy_masked_image,
296 expect_view=True,
297 plane_map=plane_map,
298 )
301if __name__ == "__main__":
302 unittest.main()