Coverage for tests / test_masked_image.py: 22%
138 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 07:54 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 07:54 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.units as u
20import numpy as np
21from astro_metadata_translator import ObservationInfo
23from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes
24from lsst.images.fits import FitsCompressionOptions
25from lsst.images.tests import (
26 RoundtripFits,
27 RoundtripNdf,
28 assert_masked_images_equal,
29 compare_masked_image_to_legacy,
30)
32try:
33 import h5py # noqa: F401
35 HAVE_H5PY = True
36except ImportError:
37 HAVE_H5PY = False
39DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
42class MaskedImageTestCase(unittest.TestCase):
43 """Tests for the MaskedImage class and the basics of the archive system."""
45 def setUp(self) -> None:
46 self.maxDiff = None
47 self.rng = np.random.default_rng(500)
48 self.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4)
49 self.masked_image = MaskedImage(
50 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)),
51 mask_schema=MaskSchema(
52 [
53 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."),
54 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."),
55 ]
56 ),
57 metadata={"fifty": "5 * 10"},
58 obs_info=self.obs_info,
59 )
60 self.masked_image.mask.array |= np.multiply.outer(
61 self.masked_image.image.array < 102.0,
62 self.masked_image.mask.schema.bitmask("BAD"),
63 )
64 self.masked_image.mask.array |= np.multiply.outer(
65 self.masked_image.image.array > 98.0,
66 self.masked_image.mask.schema.bitmask("HUNGRY"),
67 )
68 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape)
70 def test_construction(self) -> None:
71 """Test that the MaskedImage construction (in setUp) worked."""
72 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259])
73 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox)
74 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox)
75 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape)
76 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,))
77 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape)
78 self.assertEqual(self.masked_image.unit, u.nJy)
79 self.assertEqual(self.masked_image.variance.unit, u.nJy**2)
80 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"})
81 self.assertEqual(self.masked_image.obs_info.instrument, "LSSTCam")
82 # The checks below are subject to the vagaries of the RNG, but we want
83 # the seed to be such that they all pass, or other tests will be
84 # weaker.
85 self.assertGreater(
86 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0
87 )
88 self.assertGreater(
89 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0
90 )
91 self.assertGreater(
92 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0
93 )
95 self.assertIs(self.masked_image[...], self.masked_image)
96 self.assertEqual(
97 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])"
98 )
99 self.assertEqual(
100 repr(self.masked_image),
101 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), "
102 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is "
103 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had "
104 "enough to eat today.\")], dtype=dtype('uint8')))",
105 )
106 copy = self.masked_image.copy()
107 original = self.masked_image.image.array[0, 0]
108 copy.image.array[0, 0] = 38.0
109 self.assertEqual(self.masked_image.image.array[0, 0], original)
110 self.assertEqual(copy.image.array[0, 0], 38.0)
112 # Test error conditions.
113 with self.assertRaises(ValueError):
114 # Disagreement over mask bbox.
115 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask)
116 with self.assertRaises(TypeError):
117 # No mask definition.
118 MaskedImage(self.masked_image.image, variance=self.masked_image.variance)
119 with self.assertRaises(TypeError):
120 # Can not provide mask and mask schema.
121 MaskedImage(
122 Image(42.0, shape=(5, 5)),
123 mask=self.masked_image.mask,
124 mask_schema=self.masked_image.mask.schema,
125 )
126 with self.assertRaises(ValueError):
127 # image and variance bbox disagreement.
128 MaskedImage(
129 Image(42.0, shape=(5, 5)),
130 mask_schema=self.masked_image.mask.schema,
131 variance=self.masked_image.variance,
132 )
133 with self.assertRaises(ValueError):
134 # no image unit but there is variance unit.
135 MaskedImage(
136 Image(42.0, shape=(5, 5)),
137 mask_schema=self.masked_image.mask.schema,
138 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
139 )
140 with self.assertRaises(ValueError):
141 # image and variance units disagree.
142 MaskedImage(
143 Image(42.0, shape=(5, 5), unit=u.nJy),
144 mask_schema=self.masked_image.mask.schema,
145 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
146 )
148 def test_subset(self) -> None:
149 """Test assignment of subset."""
150 copy = self.masked_image.copy()
151 subset = copy.local[0:10, 20:30].copy()
152 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy)
153 copy[subset.bbox] = subset
154 self.assertEqual(copy.image.array[0, 20], 42.0)
155 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0])
157 def test_fits_roundtrip(self) -> None:
158 """Test that we can round-trip the MaskedImage through FITS, including
159 subimage reads.
160 """
161 subbox = Box.factory[11:20, 25:30]
162 subslices = (slice(6, 15), slice(17, 22))
163 np.testing.assert_array_equal(
164 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
165 )
166 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip:
167 subimage = roundtrip.get(bbox=subbox)
168 # Check that we used lossless compression (the default).
169 fits = roundtrip.inspect()
170 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
171 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
172 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
173 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
174 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False)
176 def test_fits_roundtrip_lossy(self) -> None:
177 """Test that we can round-trip the MaskedImage through FITS, including
178 subimage reads, with lossy compression.
179 """
180 subbox = Box.factory[11:20, 25:30]
181 subslices = (slice(6, 15), slice(17, 22))
182 np.testing.assert_array_equal(
183 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
184 )
185 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp:
186 tmp.close()
187 self.masked_image.write_fits(
188 tmp.name,
189 image_compression=FitsCompressionOptions.LOSSY,
190 variance_compression=FitsCompressionOptions.LOSSY,
191 compression_seed=50,
192 )
193 roundtripped = MaskedImage.read_fits(tmp.name)
194 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox)
195 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits:
196 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
197 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
198 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
199 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01)
200 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False)
202 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
203 def test_round_trip_ndf_compatible_mask(self):
204 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8)."""
205 with RoundtripNdf(self, self.masked_image) as roundtrip:
206 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
208 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
209 def test_round_trip_ndf_incompatible_mask(self):
210 """NDF round-trip for a >8-plane mask (uses native 3D mask array,
211 to MORE/LSST/MASK).
212 """
213 rng = np.random.default_rng(7)
214 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)]
215 wide = MaskedImage(
216 Image(
217 rng.normal(100.0, 8.0, size=(50, 60)),
218 dtype=np.float64,
219 unit=u.nJy,
220 start=(0, 0),
221 ),
222 mask_schema=MaskSchema(planes),
223 obs_info=self.obs_info,
224 )
225 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
226 with RoundtripNdf(self, wide) as roundtrip:
227 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
229 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
230 def test_round_trip_ndf_many_plane_mask(self):
231 """NDF round-trip for a mask that needs more than one int32 chunk."""
232 rng = np.random.default_rng(11)
233 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)]
234 wide = MaskedImage(
235 Image(
236 rng.normal(100.0, 8.0, size=(10, 12)),
237 dtype=np.float64,
238 unit=u.nJy,
239 start=(0, 0),
240 ),
241 mask_schema=MaskSchema(planes),
242 obs_info=self.obs_info,
243 )
244 wide.mask.set("P0", wide.image.array > 100.0)
245 wide.mask.set("P17", wide.image.array < 95.0)
246 wide.mask.set("P39", wide.image.array > 110.0)
247 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape)
248 with RoundtripNdf(self, wide) as roundtrip:
249 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False)
251 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
252 def test_fits_ndf_consistency(self):
253 """FITS and NDF backends produce equal MaskedImages on round-trip."""
254 with (
255 RoundtripFits(self, self.masked_image) as fits_rt,
256 RoundtripNdf(self, self.masked_image) as ndf_rt,
257 ):
258 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False)
259 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False)
260 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
262 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
263 def test_legacy(self) -> None:
264 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and
265 MaskedImage.from_legacy.
266 """
267 assert DATA_DIR is not None, "Guaranteed by decorator."
268 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
269 plane_map = get_legacy_visit_image_mask_planes()
270 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map)
271 try:
272 from lsst.afw.image import MaskedImageFitsReader
273 except ImportError:
274 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
275 reader = MaskedImageFitsReader(filename)
276 legacy_masked_image = reader.read()
277 compare_masked_image_to_legacy(
278 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False
279 )
280 compare_masked_image_to_legacy(
281 self,
282 masked_image,
283 masked_image.to_legacy(plane_map=plane_map),
284 plane_map=plane_map,
285 expect_view=True,
286 )
287 compare_masked_image_to_legacy(
288 self,
289 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map),
290 legacy_masked_image,
291 expect_view=True,
292 plane_map=plane_map,
293 )
296if __name__ == "__main__":
297 unittest.main()