Coverage for tests/test_masked_image.py: 21%
148 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.units as u
20import numpy as np
22from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes
23from lsst.images.fits import FitsCompressionOptions
24from lsst.images.tests import (
25 RoundtripFits,
26 RoundtripJson,
27 RoundtripNdf,
28 assert_masked_images_equal,
29 compare_masked_image_to_legacy,
30)
32try:
33 import h5py # noqa: F401
35 HAVE_H5PY = True
36except ImportError:
37 HAVE_H5PY = False
39DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
42class MaskedImageTestCase(unittest.TestCase):
43 """Tests for the MaskedImage class and the basics of the archive system."""
45 def setUp(self) -> None:
46 self.maxDiff = None
47 self.rng = np.random.default_rng(500)
48 self.masked_image = MaskedImage(
49 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)),
50 mask_schema=MaskSchema(
51 [
52 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."),
53 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."),
54 ]
55 ),
56 metadata={"fifty": "5 * 10"},
57 )
58 self.masked_image.mask.array |= np.multiply.outer(
59 self.masked_image.image.array < 102.0,
60 self.masked_image.mask.schema.bitmask("BAD"),
61 )
62 self.masked_image.mask.array |= np.multiply.outer(
63 self.masked_image.image.array > 98.0,
64 self.masked_image.mask.schema.bitmask("HUNGRY"),
65 )
66 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape)
68 def test_construction(self) -> None:
69 """Test that the MaskedImage construction (in setUp) worked."""
70 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259])
71 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox)
72 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox)
73 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape)
74 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,))
75 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape)
76 self.assertEqual(self.masked_image.unit, u.nJy)
77 self.assertEqual(self.masked_image.variance.unit, u.nJy**2)
78 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"})
79 # The checks below are subject to the vagaries of the RNG, but we want
80 # the seed to be such that they all pass, or other tests will be
81 # weaker.
82 self.assertGreater(
83 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0
84 )
85 self.assertGreater(
86 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0
87 )
88 self.assertGreater(
89 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0
90 )
92 self.assertIs(self.masked_image[...], self.masked_image)
93 self.assertEqual(
94 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])"
95 )
96 self.assertEqual(
97 repr(self.masked_image),
98 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), "
99 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is "
100 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had "
101 "enough to eat today.\")], dtype=dtype('uint8')))",
102 )
103 copy = self.masked_image.copy()
104 original = self.masked_image.image.array[0, 0]
105 copy.image.array[0, 0] = 38.0
106 self.assertEqual(self.masked_image.image.array[0, 0], original)
107 self.assertEqual(copy.image.array[0, 0], 38.0)
109 # Test error conditions.
110 with self.assertRaises(ValueError):
111 # Disagreement over mask bbox.
112 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask)
113 with self.assertRaises(TypeError):
114 # No mask definition.
115 MaskedImage(self.masked_image.image, variance=self.masked_image.variance)
116 with self.assertRaises(TypeError):
117 # Can not provide mask and mask schema.
118 MaskedImage(
119 Image(42.0, shape=(5, 5)),
120 mask=self.masked_image.mask,
121 mask_schema=self.masked_image.mask.schema,
122 )
123 with self.assertRaises(ValueError):
124 # image and variance bbox disagreement.
125 MaskedImage(
126 Image(42.0, shape=(5, 5)),
127 mask_schema=self.masked_image.mask.schema,
128 variance=self.masked_image.variance,
129 )
130 with self.assertRaises(ValueError):
131 # no image unit but there is variance unit.
132 MaskedImage(
133 Image(42.0, shape=(5, 5)),
134 mask_schema=self.masked_image.mask.schema,
135 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
136 )
137 with self.assertRaises(ValueError):
138 # image and variance units disagree.
139 MaskedImage(
140 Image(42.0, shape=(5, 5), unit=u.nJy),
141 mask_schema=self.masked_image.mask.schema,
142 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
143 )
145 def test_subset(self) -> None:
146 """Test assignment of subset."""
147 copy = self.masked_image.copy()
148 subset = copy.local[0:10, 20:30].copy()
149 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy)
150 copy[subset.bbox] = subset
151 self.assertEqual(copy.image.array[0, 20], 42.0)
152 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0])
154 def test_fits_roundtrip(self) -> None:
155 """Test that we can round-trip the MaskedImage through FITS, including
156 subimage reads.
157 """
158 subbox = Box.factory[11:20, 25:30]
159 subslices = (slice(6, 15), slice(17, 22))
160 np.testing.assert_array_equal(
161 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
162 )
163 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip:
164 subimage = roundtrip.get(bbox=subbox)
165 # Check that we used lossless compression (the default).
166 fits = roundtrip.inspect()
167 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
168 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
169 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
170 # Test reading back in as a legacy MaskedImage.
171 with self.subTest():
172 try:
173 import lsst.afw.image
174 except ImportError:
175 raise unittest.SkipTest("afw could not be imported") from None
176 legacy_masked_image = roundtrip.get(storageClass="MaskedImage")
177 self.assertIsInstance(legacy_masked_image, lsst.afw.image.MaskedImage)
178 compare_masked_image_to_legacy(
179 self, self.masked_image, legacy_masked_image, expect_view=False
180 )
181 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
182 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False)
184 def test_fits_roundtrip_lossy(self) -> None:
185 """Test that we can round-trip the MaskedImage through FITS, including
186 subimage reads, with lossy compression.
187 """
188 subbox = Box.factory[11:20, 25:30]
189 subslices = (slice(6, 15), slice(17, 22))
190 np.testing.assert_array_equal(
191 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
192 )
193 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp:
194 tmp.close()
195 self.masked_image.write_fits(
196 tmp.name,
197 image_compression=FitsCompressionOptions.LOSSY,
198 variance_compression=FitsCompressionOptions.LOSSY,
199 compression_seed=50,
200 )
201 roundtripped = MaskedImage.read_fits(tmp.name)
202 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox)
203 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits:
204 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
205 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
206 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
207 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01)
208 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False)
210 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
211 def test_round_trip_ndf_compatible_mask(self):
212 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8)."""
213 with RoundtripNdf(self, self.masked_image, "MaskedImageV2") as roundtrip:
214 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
216 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
217 def test_round_trip_ndf_incompatible_mask(self):
218 """NDF round-trip for a >8-plane mask (uses native 3D mask array,
219 to MORE/LSST/MASK).
220 """
221 rng = np.random.default_rng(7)
222 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)]
223 wide = MaskedImage(
224 Image(
225 rng.normal(100.0, 8.0, size=(50, 60)),
226 dtype=np.float64,
227 unit=u.nJy,
228 start=(0, 0),
229 ),
230 mask_schema=MaskSchema(planes),
231 )
232 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
233 with RoundtripNdf(self, wide, "MaskedImageV2") as roundtrip:
234 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
236 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
237 def test_round_trip_ndf_many_plane_mask(self):
238 """NDF round-trip for a mask that needs more than one int32 chunk."""
239 rng = np.random.default_rng(11)
240 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)]
241 wide = MaskedImage(
242 Image(
243 rng.normal(100.0, 8.0, size=(10, 12)),
244 dtype=np.float64,
245 unit=u.nJy,
246 start=(0, 0),
247 ),
248 mask_schema=MaskSchema(planes),
249 )
250 wide.mask.set("P0", wide.image.array > 100.0)
251 wide.mask.set("P17", wide.image.array < 95.0)
252 wide.mask.set("P39", wide.image.array > 110.0)
253 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
254 with RoundtripNdf(self, wide, "MaskedImageV2") as roundtrip:
255 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
257 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
258 def test_fits_ndf_consistency(self):
259 """FITS and NDF backends produce equal MaskedImages on round-trip."""
260 with (
261 RoundtripFits(self, self.masked_image) as fits_rt,
262 RoundtripNdf(self, self.masked_image) as ndf_rt,
263 ):
264 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False)
265 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False)
266 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
268 def test_fits_json_consistency(self):
269 """FITS and JSON backends produce equal MaskedImages on round-trip."""
270 with (
271 RoundtripFits(self, self.masked_image) as fits_rt,
272 RoundtripJson(self, self.masked_image) as json_rt,
273 ):
274 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False)
275 assert_masked_images_equal(self, self.masked_image, json_rt.result, expect_view=False)
276 assert_masked_images_equal(self, fits_rt.result, json_rt.result, expect_view=False)
278 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
279 def test_legacy(self) -> None:
280 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and
281 MaskedImage.from_legacy.
282 """
283 assert DATA_DIR is not None, "Guaranteed by decorator."
284 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
285 plane_map = get_legacy_visit_image_mask_planes()
286 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map)
287 try:
288 from lsst.afw.image import MaskedImageFitsReader
289 except ImportError:
290 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
291 reader = MaskedImageFitsReader(filename)
292 legacy_masked_image = reader.read()
293 compare_masked_image_to_legacy(
294 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False
295 )
296 compare_masked_image_to_legacy(
297 self,
298 masked_image,
299 masked_image.to_legacy(plane_map=plane_map),
300 plane_map=plane_map,
301 expect_view=True,
302 )
303 compare_masked_image_to_legacy(
304 self,
305 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map),
306 legacy_masked_image,
307 expect_view=True,
308 plane_map=plane_map,
309 )
312if __name__ == "__main__":
313 unittest.main()