Coverage for tests/test_masked_image.py: 21%

143 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-27 08:25 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.units as u 

20import numpy as np 

21 

22from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes 

23from lsst.images.fits import FitsCompressionOptions 

24from lsst.images.tests import ( 

25 RoundtripFits, 

26 RoundtripNdf, 

27 assert_masked_images_equal, 

28 compare_masked_image_to_legacy, 

29) 

30 

31try: 

32 import h5py # noqa: F401 

33 

34 HAVE_H5PY = True 

35except ImportError: 

36 HAVE_H5PY = False 

37 

38DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

39 

40 

41class MaskedImageTestCase(unittest.TestCase): 

42 """Tests for the MaskedImage class and the basics of the archive system.""" 

43 

44 def setUp(self) -> None: 

45 self.maxDiff = None 

46 self.rng = np.random.default_rng(500) 

47 self.masked_image = MaskedImage( 

48 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)), 

49 mask_schema=MaskSchema( 

50 [ 

51 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."), 

52 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."), 

53 ] 

54 ), 

55 metadata={"fifty": "5 * 10"}, 

56 ) 

57 self.masked_image.mask.array |= np.multiply.outer( 

58 self.masked_image.image.array < 102.0, 

59 self.masked_image.mask.schema.bitmask("BAD"), 

60 ) 

61 self.masked_image.mask.array |= np.multiply.outer( 

62 self.masked_image.image.array > 98.0, 

63 self.masked_image.mask.schema.bitmask("HUNGRY"), 

64 ) 

65 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape) 

66 

67 def test_construction(self) -> None: 

68 """Test that the MaskedImage construction (in setUp) worked.""" 

69 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259]) 

70 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox) 

71 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox) 

72 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape) 

73 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,)) 

74 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape) 

75 self.assertEqual(self.masked_image.unit, u.nJy) 

76 self.assertEqual(self.masked_image.variance.unit, u.nJy**2) 

77 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"}) 

78 # The checks below are subject to the vagaries of the RNG, but we want 

79 # the seed to be such that they all pass, or other tests will be 

80 # weaker. 

81 self.assertGreater( 

82 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0 

83 ) 

84 self.assertGreater( 

85 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0 

86 ) 

87 self.assertGreater( 

88 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0 

89 ) 

90 

91 self.assertIs(self.masked_image[...], self.masked_image) 

92 self.assertEqual( 

93 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])" 

94 ) 

95 self.assertEqual( 

96 repr(self.masked_image), 

97 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), " 

98 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is " 

99 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had " 

100 "enough to eat today.\")], dtype=dtype('uint8')))", 

101 ) 

102 copy = self.masked_image.copy() 

103 original = self.masked_image.image.array[0, 0] 

104 copy.image.array[0, 0] = 38.0 

105 self.assertEqual(self.masked_image.image.array[0, 0], original) 

106 self.assertEqual(copy.image.array[0, 0], 38.0) 

107 

108 # Test error conditions. 

109 with self.assertRaises(ValueError): 

110 # Disagreement over mask bbox. 

111 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask) 

112 with self.assertRaises(TypeError): 

113 # No mask definition. 

114 MaskedImage(self.masked_image.image, variance=self.masked_image.variance) 

115 with self.assertRaises(TypeError): 

116 # Can not provide mask and mask schema. 

117 MaskedImage( 

118 Image(42.0, shape=(5, 5)), 

119 mask=self.masked_image.mask, 

120 mask_schema=self.masked_image.mask.schema, 

121 ) 

122 with self.assertRaises(ValueError): 

123 # image and variance bbox disagreement. 

124 MaskedImage( 

125 Image(42.0, shape=(5, 5)), 

126 mask_schema=self.masked_image.mask.schema, 

127 variance=self.masked_image.variance, 

128 ) 

129 with self.assertRaises(ValueError): 

130 # no image unit but there is variance unit. 

131 MaskedImage( 

132 Image(42.0, shape=(5, 5)), 

133 mask_schema=self.masked_image.mask.schema, 

134 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

135 ) 

136 with self.assertRaises(ValueError): 

137 # image and variance units disagree. 

138 MaskedImage( 

139 Image(42.0, shape=(5, 5), unit=u.nJy), 

140 mask_schema=self.masked_image.mask.schema, 

141 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

142 ) 

143 

144 def test_subset(self) -> None: 

145 """Test assignment of subset.""" 

146 copy = self.masked_image.copy() 

147 subset = copy.local[0:10, 20:30].copy() 

148 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy) 

149 copy[subset.bbox] = subset 

150 self.assertEqual(copy.image.array[0, 20], 42.0) 

151 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0]) 

152 

153 def test_fits_roundtrip(self) -> None: 

154 """Test that we can round-trip the MaskedImage through FITS, including 

155 subimage reads. 

156 """ 

157 subbox = Box.factory[11:20, 25:30] 

158 subslices = (slice(6, 15), slice(17, 22)) 

159 np.testing.assert_array_equal( 

160 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

161 ) 

162 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip: 

163 subimage = roundtrip.get(bbox=subbox) 

164 # Check that we used lossless compression (the default). 

165 fits = roundtrip.inspect() 

166 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

167 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

168 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

169 # Test reading back in as a legacy MaskedImage. 

170 with self.subTest(): 

171 try: 

172 import lsst.afw.image 

173 except ImportError: 

174 raise unittest.SkipTest("afw could not be imported") from None 

175 legacy_masked_image = roundtrip.get(storageClass="MaskedImage") 

176 self.assertIsInstance(legacy_masked_image, lsst.afw.image.MaskedImage) 

177 compare_masked_image_to_legacy( 

178 self, self.masked_image, legacy_masked_image, expect_view=False 

179 ) 

180 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

181 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False) 

182 

183 def test_fits_roundtrip_lossy(self) -> None: 

184 """Test that we can round-trip the MaskedImage through FITS, including 

185 subimage reads, with lossy compression. 

186 """ 

187 subbox = Box.factory[11:20, 25:30] 

188 subslices = (slice(6, 15), slice(17, 22)) 

189 np.testing.assert_array_equal( 

190 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

191 ) 

192 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp: 

193 tmp.close() 

194 self.masked_image.write_fits( 

195 tmp.name, 

196 image_compression=FitsCompressionOptions.LOSSY, 

197 variance_compression=FitsCompressionOptions.LOSSY, 

198 compression_seed=50, 

199 ) 

200 roundtripped = MaskedImage.read_fits(tmp.name) 

201 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox) 

202 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits: 

203 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

204 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

205 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

206 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01) 

207 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False) 

208 

209 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

210 def test_round_trip_ndf_compatible_mask(self): 

211 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8).""" 

212 with RoundtripNdf(self, self.masked_image) as roundtrip: 

213 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

214 

215 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

216 def test_round_trip_ndf_incompatible_mask(self): 

217 """NDF round-trip for a >8-plane mask (uses native 3D mask array, 

218 to MORE/LSST/MASK). 

219 """ 

220 rng = np.random.default_rng(7) 

221 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)] 

222 wide = MaskedImage( 

223 Image( 

224 rng.normal(100.0, 8.0, size=(50, 60)), 

225 dtype=np.float64, 

226 unit=u.nJy, 

227 start=(0, 0), 

228 ), 

229 mask_schema=MaskSchema(planes), 

230 ) 

231 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

232 with RoundtripNdf(self, wide) as roundtrip: 

233 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

234 

235 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

236 def test_round_trip_ndf_many_plane_mask(self): 

237 """NDF round-trip for a mask that needs more than one int32 chunk.""" 

238 rng = np.random.default_rng(11) 

239 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)] 

240 wide = MaskedImage( 

241 Image( 

242 rng.normal(100.0, 8.0, size=(10, 12)), 

243 dtype=np.float64, 

244 unit=u.nJy, 

245 start=(0, 0), 

246 ), 

247 mask_schema=MaskSchema(planes), 

248 ) 

249 wide.mask.set("P0", wide.image.array > 100.0) 

250 wide.mask.set("P17", wide.image.array < 95.0) 

251 wide.mask.set("P39", wide.image.array > 110.0) 

252 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

253 with RoundtripNdf(self, wide) as roundtrip: 

254 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

255 

256 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

257 def test_fits_ndf_consistency(self): 

258 """FITS and NDF backends produce equal MaskedImages on round-trip.""" 

259 with ( 

260 RoundtripFits(self, self.masked_image) as fits_rt, 

261 RoundtripNdf(self, self.masked_image) as ndf_rt, 

262 ): 

263 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False) 

264 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False) 

265 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

266 

267 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

268 def test_legacy(self) -> None: 

269 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and 

270 MaskedImage.from_legacy. 

271 """ 

272 assert DATA_DIR is not None, "Guaranteed by decorator." 

273 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

274 plane_map = get_legacy_visit_image_mask_planes() 

275 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map) 

276 try: 

277 from lsst.afw.image import MaskedImageFitsReader 

278 except ImportError: 

279 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

280 reader = MaskedImageFitsReader(filename) 

281 legacy_masked_image = reader.read() 

282 compare_masked_image_to_legacy( 

283 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False 

284 ) 

285 compare_masked_image_to_legacy( 

286 self, 

287 masked_image, 

288 masked_image.to_legacy(plane_map=plane_map), 

289 plane_map=plane_map, 

290 expect_view=True, 

291 ) 

292 compare_masked_image_to_legacy( 

293 self, 

294 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map), 

295 legacy_masked_image, 

296 expect_view=True, 

297 plane_map=plane_map, 

298 ) 

299 

300 

301if __name__ == "__main__": 

302 unittest.main()