Coverage for tests / test_masked_image.py: 22%
135 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.units as u
20import numpy as np
22from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes
23from lsst.images.fits import FitsCompressionOptions
24from lsst.images.tests import (
25 RoundtripFits,
26 RoundtripNdf,
27 assert_masked_images_equal,
28 compare_masked_image_to_legacy,
29)
31try:
32 import h5py # noqa: F401
34 HAVE_H5PY = True
35except ImportError:
36 HAVE_H5PY = False
38DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
41class MaskedImageTestCase(unittest.TestCase):
42 """Tests for the MaskedImage class and the basics of the archive system."""
44 def setUp(self) -> None:
45 self.maxDiff = None
46 self.rng = np.random.default_rng(500)
47 self.masked_image = MaskedImage(
48 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)),
49 mask_schema=MaskSchema(
50 [
51 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."),
52 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."),
53 ]
54 ),
55 metadata={"fifty": "5 * 10"},
56 )
57 self.masked_image.mask.array |= np.multiply.outer(
58 self.masked_image.image.array < 102.0,
59 self.masked_image.mask.schema.bitmask("BAD"),
60 )
61 self.masked_image.mask.array |= np.multiply.outer(
62 self.masked_image.image.array > 98.0,
63 self.masked_image.mask.schema.bitmask("HUNGRY"),
64 )
65 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape)
67 def test_construction(self) -> None:
68 """Test that the MaskedImage construction (in setUp) worked."""
69 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259])
70 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox)
71 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox)
72 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape)
73 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,))
74 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape)
75 self.assertEqual(self.masked_image.unit, u.nJy)
76 self.assertEqual(self.masked_image.variance.unit, u.nJy**2)
77 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"})
78 # The checks below are subject to the vagaries of the RNG, but we want
79 # the seed to be such that they all pass, or other tests will be
80 # weaker.
81 self.assertGreater(
82 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0
83 )
84 self.assertGreater(
85 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0
86 )
87 self.assertGreater(
88 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0
89 )
91 self.assertIs(self.masked_image[...], self.masked_image)
92 self.assertEqual(
93 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])"
94 )
95 self.assertEqual(
96 repr(self.masked_image),
97 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), "
98 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is "
99 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had "
100 "enough to eat today.\")], dtype=dtype('uint8')))",
101 )
102 copy = self.masked_image.copy()
103 original = self.masked_image.image.array[0, 0]
104 copy.image.array[0, 0] = 38.0
105 self.assertEqual(self.masked_image.image.array[0, 0], original)
106 self.assertEqual(copy.image.array[0, 0], 38.0)
108 # Test error conditions.
109 with self.assertRaises(ValueError):
110 # Disagreement over mask bbox.
111 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask)
112 with self.assertRaises(TypeError):
113 # No mask definition.
114 MaskedImage(self.masked_image.image, variance=self.masked_image.variance)
115 with self.assertRaises(TypeError):
116 # Can not provide mask and mask schema.
117 MaskedImage(
118 Image(42.0, shape=(5, 5)),
119 mask=self.masked_image.mask,
120 mask_schema=self.masked_image.mask.schema,
121 )
122 with self.assertRaises(ValueError):
123 # image and variance bbox disagreement.
124 MaskedImage(
125 Image(42.0, shape=(5, 5)),
126 mask_schema=self.masked_image.mask.schema,
127 variance=self.masked_image.variance,
128 )
129 with self.assertRaises(ValueError):
130 # no image unit but there is variance unit.
131 MaskedImage(
132 Image(42.0, shape=(5, 5)),
133 mask_schema=self.masked_image.mask.schema,
134 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
135 )
136 with self.assertRaises(ValueError):
137 # image and variance units disagree.
138 MaskedImage(
139 Image(42.0, shape=(5, 5), unit=u.nJy),
140 mask_schema=self.masked_image.mask.schema,
141 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
142 )
144 def test_subset(self) -> None:
145 """Test assignment of subset."""
146 copy = self.masked_image.copy()
147 subset = copy.local[0:10, 20:30].copy()
148 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy)
149 copy[subset.bbox] = subset
150 self.assertEqual(copy.image.array[0, 20], 42.0)
151 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0])
153 def test_fits_roundtrip(self) -> None:
154 """Test that we can round-trip the MaskedImage through FITS, including
155 subimage reads.
156 """
157 subbox = Box.factory[11:20, 25:30]
158 subslices = (slice(6, 15), slice(17, 22))
159 np.testing.assert_array_equal(
160 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
161 )
162 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip:
163 subimage = roundtrip.get(bbox=subbox)
164 # Check that we used lossless compression (the default).
165 fits = roundtrip.inspect()
166 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
167 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
168 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
169 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
170 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False)
172 def test_fits_roundtrip_lossy(self) -> None:
173 """Test that we can round-trip the MaskedImage through FITS, including
174 subimage reads, with lossy compression.
175 """
176 subbox = Box.factory[11:20, 25:30]
177 subslices = (slice(6, 15), slice(17, 22))
178 np.testing.assert_array_equal(
179 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
180 )
181 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp:
182 tmp.close()
183 self.masked_image.write_fits(
184 tmp.name,
185 image_compression=FitsCompressionOptions.LOSSY,
186 variance_compression=FitsCompressionOptions.LOSSY,
187 compression_seed=50,
188 )
189 roundtripped = MaskedImage.read_fits(tmp.name)
190 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox)
191 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits:
192 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
193 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
194 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
195 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01)
196 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False)
198 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
199 def test_round_trip_ndf_compatible_mask(self):
200 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8)."""
201 with RoundtripNdf(self, self.masked_image) as roundtrip:
202 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
204 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
205 def test_round_trip_ndf_incompatible_mask(self):
206 """NDF round-trip for a >8-plane mask (uses native 3D mask array,
207 to MORE/LSST/MASK).
208 """
209 rng = np.random.default_rng(7)
210 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)]
211 wide = MaskedImage(
212 Image(
213 rng.normal(100.0, 8.0, size=(50, 60)),
214 dtype=np.float64,
215 unit=u.nJy,
216 start=(0, 0),
217 ),
218 mask_schema=MaskSchema(planes),
219 )
220 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
221 with RoundtripNdf(self, wide) as roundtrip:
222 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
224 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
225 def test_round_trip_ndf_many_plane_mask(self):
226 """NDF round-trip for a mask that needs more than one int32 chunk."""
227 rng = np.random.default_rng(11)
228 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)]
229 wide = MaskedImage(
230 Image(
231 rng.normal(100.0, 8.0, size=(10, 12)),
232 dtype=np.float64,
233 unit=u.nJy,
234 start=(0, 0),
235 ),
236 mask_schema=MaskSchema(planes),
237 )
238 wide.mask.set("P0", wide.image.array > 100.0)
239 wide.mask.set("P17", wide.image.array < 95.0)
240 wide.mask.set("P39", wide.image.array > 110.0)
241 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
242 with RoundtripNdf(self, wide) as roundtrip:
243 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
245 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
246 def test_fits_ndf_consistency(self):
247 """FITS and NDF backends produce equal MaskedImages on round-trip."""
248 with (
249 RoundtripFits(self, self.masked_image) as fits_rt,
250 RoundtripNdf(self, self.masked_image) as ndf_rt,
251 ):
252 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False)
253 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False)
254 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
256 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
257 def test_legacy(self) -> None:
258 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and
259 MaskedImage.from_legacy.
260 """
261 assert DATA_DIR is not None, "Guaranteed by decorator."
262 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
263 plane_map = get_legacy_visit_image_mask_planes()
264 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map)
265 try:
266 from lsst.afw.image import MaskedImageFitsReader
267 except ImportError:
268 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
269 reader = MaskedImageFitsReader(filename)
270 legacy_masked_image = reader.read()
271 compare_masked_image_to_legacy(
272 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False
273 )
274 compare_masked_image_to_legacy(
275 self,
276 masked_image,
277 masked_image.to_legacy(plane_map=plane_map),
278 plane_map=plane_map,
279 expect_view=True,
280 )
281 compare_masked_image_to_legacy(
282 self,
283 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map),
284 legacy_masked_image,
285 expect_view=True,
286 plane_map=plane_map,
287 )
290if __name__ == "__main__":
291 unittest.main()