Coverage for tests/test_masked_image.py: 21%

148 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 01:09 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.units as u 

20import numpy as np 

21 

22from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes 

23from lsst.images.fits import FitsCompressionOptions 

24from lsst.images.tests import ( 

25 RoundtripFits, 

26 RoundtripJson, 

27 RoundtripNdf, 

28 assert_masked_images_equal, 

29 compare_masked_image_to_legacy, 

30) 

31 

32try: 

33 import h5py # noqa: F401 

34 

35 HAVE_H5PY = True 

36except ImportError: 

37 HAVE_H5PY = False 

38 

39DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

40 

41 

42class MaskedImageTestCase(unittest.TestCase): 

43 """Tests for the MaskedImage class and the basics of the archive system.""" 

44 

45 def setUp(self) -> None: 

46 self.maxDiff = None 

47 self.rng = np.random.default_rng(500) 

48 self.masked_image = MaskedImage( 

49 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)), 

50 mask_schema=MaskSchema( 

51 [ 

52 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."), 

53 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."), 

54 ] 

55 ), 

56 metadata={"fifty": "5 * 10"}, 

57 ) 

58 self.masked_image.mask.array |= np.multiply.outer( 

59 self.masked_image.image.array < 102.0, 

60 self.masked_image.mask.schema.bitmask("BAD"), 

61 ) 

62 self.masked_image.mask.array |= np.multiply.outer( 

63 self.masked_image.image.array > 98.0, 

64 self.masked_image.mask.schema.bitmask("HUNGRY"), 

65 ) 

66 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape) 

67 

68 def test_construction(self) -> None: 

69 """Test that the MaskedImage construction (in setUp) worked.""" 

70 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259]) 

71 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox) 

72 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox) 

73 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape) 

74 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,)) 

75 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape) 

76 self.assertEqual(self.masked_image.unit, u.nJy) 

77 self.assertEqual(self.masked_image.variance.unit, u.nJy**2) 

78 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"}) 

79 # The checks below are subject to the vagaries of the RNG, but we want 

80 # the seed to be such that they all pass, or other tests will be 

81 # weaker. 

82 self.assertGreater( 

83 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0 

84 ) 

85 self.assertGreater( 

86 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0 

87 ) 

88 self.assertGreater( 

89 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0 

90 ) 

91 

92 self.assertIs(self.masked_image[...], self.masked_image) 

93 self.assertEqual( 

94 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])" 

95 ) 

96 self.assertEqual( 

97 repr(self.masked_image), 

98 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), " 

99 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is " 

100 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had " 

101 "enough to eat today.\")], dtype=dtype('uint8')))", 

102 ) 

103 copy = self.masked_image.copy() 

104 original = self.masked_image.image.array[0, 0] 

105 copy.image.array[0, 0] = 38.0 

106 self.assertEqual(self.masked_image.image.array[0, 0], original) 

107 self.assertEqual(copy.image.array[0, 0], 38.0) 

108 

109 # Test error conditions. 

110 with self.assertRaises(ValueError): 

111 # Disagreement over mask bbox. 

112 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask) 

113 with self.assertRaises(TypeError): 

114 # No mask definition. 

115 MaskedImage(self.masked_image.image, variance=self.masked_image.variance) 

116 with self.assertRaises(TypeError): 

117 # Can not provide mask and mask schema. 

118 MaskedImage( 

119 Image(42.0, shape=(5, 5)), 

120 mask=self.masked_image.mask, 

121 mask_schema=self.masked_image.mask.schema, 

122 ) 

123 with self.assertRaises(ValueError): 

124 # image and variance bbox disagreement. 

125 MaskedImage( 

126 Image(42.0, shape=(5, 5)), 

127 mask_schema=self.masked_image.mask.schema, 

128 variance=self.masked_image.variance, 

129 ) 

130 with self.assertRaises(ValueError): 

131 # no image unit but there is variance unit. 

132 MaskedImage( 

133 Image(42.0, shape=(5, 5)), 

134 mask_schema=self.masked_image.mask.schema, 

135 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

136 ) 

137 with self.assertRaises(ValueError): 

138 # image and variance units disagree. 

139 MaskedImage( 

140 Image(42.0, shape=(5, 5), unit=u.nJy), 

141 mask_schema=self.masked_image.mask.schema, 

142 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

143 ) 

144 

145 def test_subset(self) -> None: 

146 """Test assignment of subset.""" 

147 copy = self.masked_image.copy() 

148 subset = copy.local[0:10, 20:30].copy() 

149 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy) 

150 copy[subset.bbox] = subset 

151 self.assertEqual(copy.image.array[0, 20], 42.0) 

152 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0]) 

153 

154 def test_fits_roundtrip(self) -> None: 

155 """Test that we can round-trip the MaskedImage through FITS, including 

156 subimage reads. 

157 """ 

158 subbox = Box.factory[11:20, 25:30] 

159 subslices = (slice(6, 15), slice(17, 22)) 

160 np.testing.assert_array_equal( 

161 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

162 ) 

163 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip: 

164 subimage = roundtrip.get(bbox=subbox) 

165 # Check that we used lossless compression (the default). 

166 fits = roundtrip.inspect() 

167 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

168 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

169 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

170 # Test reading back in as a legacy MaskedImage. 

171 with self.subTest(): 

172 try: 

173 import lsst.afw.image 

174 except ImportError: 

175 raise unittest.SkipTest("afw could not be imported") from None 

176 legacy_masked_image = roundtrip.get(storageClass="MaskedImage") 

177 self.assertIsInstance(legacy_masked_image, lsst.afw.image.MaskedImage) 

178 compare_masked_image_to_legacy( 

179 self, self.masked_image, legacy_masked_image, expect_view=False 

180 ) 

181 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

182 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False) 

183 

184 def test_fits_roundtrip_lossy(self) -> None: 

185 """Test that we can round-trip the MaskedImage through FITS, including 

186 subimage reads, with lossy compression. 

187 """ 

188 subbox = Box.factory[11:20, 25:30] 

189 subslices = (slice(6, 15), slice(17, 22)) 

190 np.testing.assert_array_equal( 

191 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

192 ) 

193 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp: 

194 tmp.close() 

195 self.masked_image.write_fits( 

196 tmp.name, 

197 image_compression=FitsCompressionOptions.LOSSY, 

198 variance_compression=FitsCompressionOptions.LOSSY, 

199 compression_seed=50, 

200 ) 

201 roundtripped = MaskedImage.read_fits(tmp.name) 

202 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox) 

203 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits: 

204 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

205 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

206 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

207 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01) 

208 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False) 

209 

210 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

211 def test_round_trip_ndf_compatible_mask(self): 

212 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8).""" 

213 with RoundtripNdf(self, self.masked_image, "MaskedImageV2") as roundtrip: 

214 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

215 

216 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

217 def test_round_trip_ndf_incompatible_mask(self): 

218 """NDF round-trip for a >8-plane mask (uses native 3D mask array, 

219 to MORE/LSST/MASK). 

220 """ 

221 rng = np.random.default_rng(7) 

222 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)] 

223 wide = MaskedImage( 

224 Image( 

225 rng.normal(100.0, 8.0, size=(50, 60)), 

226 dtype=np.float64, 

227 unit=u.nJy, 

228 start=(0, 0), 

229 ), 

230 mask_schema=MaskSchema(planes), 

231 ) 

232 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

233 with RoundtripNdf(self, wide, "MaskedImageV2") as roundtrip: 

234 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

235 

236 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

237 def test_round_trip_ndf_many_plane_mask(self): 

238 """NDF round-trip for a mask that needs more than one int32 chunk.""" 

239 rng = np.random.default_rng(11) 

240 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)] 

241 wide = MaskedImage( 

242 Image( 

243 rng.normal(100.0, 8.0, size=(10, 12)), 

244 dtype=np.float64, 

245 unit=u.nJy, 

246 start=(0, 0), 

247 ), 

248 mask_schema=MaskSchema(planes), 

249 ) 

250 wide.mask.set("P0", wide.image.array > 100.0) 

251 wide.mask.set("P17", wide.image.array < 95.0) 

252 wide.mask.set("P39", wide.image.array > 110.0) 

253 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

254 with RoundtripNdf(self, wide, "MaskedImageV2") as roundtrip: 

255 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

256 

257 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

258 def test_fits_ndf_consistency(self): 

259 """FITS and NDF backends produce equal MaskedImages on round-trip.""" 

260 with ( 

261 RoundtripFits(self, self.masked_image) as fits_rt, 

262 RoundtripNdf(self, self.masked_image) as ndf_rt, 

263 ): 

264 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False) 

265 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False) 

266 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

267 

268 def test_fits_json_consistency(self): 

269 """FITS and JSON backends produce equal MaskedImages on round-trip.""" 

270 with ( 

271 RoundtripFits(self, self.masked_image) as fits_rt, 

272 RoundtripJson(self, self.masked_image) as json_rt, 

273 ): 

274 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False) 

275 assert_masked_images_equal(self, self.masked_image, json_rt.result, expect_view=False) 

276 assert_masked_images_equal(self, fits_rt.result, json_rt.result, expect_view=False) 

277 

278 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

279 def test_legacy(self) -> None: 

280 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and 

281 MaskedImage.from_legacy. 

282 """ 

283 assert DATA_DIR is not None, "Guaranteed by decorator." 

284 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

285 plane_map = get_legacy_visit_image_mask_planes() 

286 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map) 

287 try: 

288 from lsst.afw.image import MaskedImageFitsReader 

289 except ImportError: 

290 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

291 reader = MaskedImageFitsReader(filename) 

292 legacy_masked_image = reader.read() 

293 compare_masked_image_to_legacy( 

294 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False 

295 ) 

296 compare_masked_image_to_legacy( 

297 self, 

298 masked_image, 

299 masked_image.to_legacy(plane_map=plane_map), 

300 plane_map=plane_map, 

301 expect_view=True, 

302 ) 

303 compare_masked_image_to_legacy( 

304 self, 

305 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map), 

306 legacy_masked_image, 

307 expect_view=True, 

308 plane_map=plane_map, 

309 ) 

310 

311 

312if __name__ == "__main__": 

313 unittest.main()