Coverage for tests / test_masked_image.py: 22%

135 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 08:29 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.units as u 

20import numpy as np 

21 

22from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes 

23from lsst.images.fits import FitsCompressionOptions 

24from lsst.images.tests import ( 

25 RoundtripFits, 

26 RoundtripNdf, 

27 assert_masked_images_equal, 

28 compare_masked_image_to_legacy, 

29) 

30 

31try: 

32 import h5py # noqa: F401 

33 

34 HAVE_H5PY = True 

35except ImportError: 

36 HAVE_H5PY = False 

37 

38DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

39 

40 

41class MaskedImageTestCase(unittest.TestCase): 

42 """Tests for the MaskedImage class and the basics of the archive system.""" 

43 

44 def setUp(self) -> None: 

45 self.maxDiff = None 

46 self.rng = np.random.default_rng(500) 

47 self.masked_image = MaskedImage( 

48 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)), 

49 mask_schema=MaskSchema( 

50 [ 

51 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."), 

52 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."), 

53 ] 

54 ), 

55 metadata={"fifty": "5 * 10"}, 

56 ) 

57 self.masked_image.mask.array |= np.multiply.outer( 

58 self.masked_image.image.array < 102.0, 

59 self.masked_image.mask.schema.bitmask("BAD"), 

60 ) 

61 self.masked_image.mask.array |= np.multiply.outer( 

62 self.masked_image.image.array > 98.0, 

63 self.masked_image.mask.schema.bitmask("HUNGRY"), 

64 ) 

65 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape) 

66 

67 def test_construction(self) -> None: 

68 """Test that the MaskedImage construction (in setUp) worked.""" 

69 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259]) 

70 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox) 

71 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox) 

72 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape) 

73 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,)) 

74 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape) 

75 self.assertEqual(self.masked_image.unit, u.nJy) 

76 self.assertEqual(self.masked_image.variance.unit, u.nJy**2) 

77 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"}) 

78 # The checks below are subject to the vagaries of the RNG, but we want 

79 # the seed to be such that they all pass, or other tests will be 

80 # weaker. 

81 self.assertGreater( 

82 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0 

83 ) 

84 self.assertGreater( 

85 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0 

86 ) 

87 self.assertGreater( 

88 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0 

89 ) 

90 

91 self.assertIs(self.masked_image[...], self.masked_image) 

92 self.assertEqual( 

93 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])" 

94 ) 

95 self.assertEqual( 

96 repr(self.masked_image), 

97 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), " 

98 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is " 

99 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had " 

100 "enough to eat today.\")], dtype=dtype('uint8')))", 

101 ) 

102 copy = self.masked_image.copy() 

103 original = self.masked_image.image.array[0, 0] 

104 copy.image.array[0, 0] = 38.0 

105 self.assertEqual(self.masked_image.image.array[0, 0], original) 

106 self.assertEqual(copy.image.array[0, 0], 38.0) 

107 

108 # Test error conditions. 

109 with self.assertRaises(ValueError): 

110 # Disagreement over mask bbox. 

111 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask) 

112 with self.assertRaises(TypeError): 

113 # No mask definition. 

114 MaskedImage(self.masked_image.image, variance=self.masked_image.variance) 

115 with self.assertRaises(TypeError): 

116 # Can not provide mask and mask schema. 

117 MaskedImage( 

118 Image(42.0, shape=(5, 5)), 

119 mask=self.masked_image.mask, 

120 mask_schema=self.masked_image.mask.schema, 

121 ) 

122 with self.assertRaises(ValueError): 

123 # image and variance bbox disagreement. 

124 MaskedImage( 

125 Image(42.0, shape=(5, 5)), 

126 mask_schema=self.masked_image.mask.schema, 

127 variance=self.masked_image.variance, 

128 ) 

129 with self.assertRaises(ValueError): 

130 # no image unit but there is variance unit. 

131 MaskedImage( 

132 Image(42.0, shape=(5, 5)), 

133 mask_schema=self.masked_image.mask.schema, 

134 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

135 ) 

136 with self.assertRaises(ValueError): 

137 # image and variance units disagree. 

138 MaskedImage( 

139 Image(42.0, shape=(5, 5), unit=u.nJy), 

140 mask_schema=self.masked_image.mask.schema, 

141 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

142 ) 

143 

144 def test_subset(self) -> None: 

145 """Test assignment of subset.""" 

146 copy = self.masked_image.copy() 

147 subset = copy.local[0:10, 20:30].copy() 

148 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy) 

149 copy[subset.bbox] = subset 

150 self.assertEqual(copy.image.array[0, 20], 42.0) 

151 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0]) 

152 

153 def test_fits_roundtrip(self) -> None: 

154 """Test that we can round-trip the MaskedImage through FITS, including 

155 subimage reads. 

156 """ 

157 subbox = Box.factory[11:20, 25:30] 

158 subslices = (slice(6, 15), slice(17, 22)) 

159 np.testing.assert_array_equal( 

160 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

161 ) 

162 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip: 

163 subimage = roundtrip.get(bbox=subbox) 

164 # Check that we used lossless compression (the default). 

165 fits = roundtrip.inspect() 

166 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

167 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

168 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

169 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

170 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False) 

171 

172 def test_fits_roundtrip_lossy(self) -> None: 

173 """Test that we can round-trip the MaskedImage through FITS, including 

174 subimage reads, with lossy compression. 

175 """ 

176 subbox = Box.factory[11:20, 25:30] 

177 subslices = (slice(6, 15), slice(17, 22)) 

178 np.testing.assert_array_equal( 

179 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

180 ) 

181 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp: 

182 tmp.close() 

183 self.masked_image.write_fits( 

184 tmp.name, 

185 image_compression=FitsCompressionOptions.LOSSY, 

186 variance_compression=FitsCompressionOptions.LOSSY, 

187 compression_seed=50, 

188 ) 

189 roundtripped = MaskedImage.read_fits(tmp.name) 

190 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox) 

191 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits: 

192 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

193 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

194 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

195 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01) 

196 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False) 

197 

198 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

199 def test_round_trip_ndf_compatible_mask(self): 

200 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8).""" 

201 with RoundtripNdf(self, self.masked_image) as roundtrip: 

202 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

203 

204 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

205 def test_round_trip_ndf_incompatible_mask(self): 

206 """NDF round-trip for a >8-plane mask (uses native 3D mask array, 

207 to MORE/LSST/MASK). 

208 """ 

209 rng = np.random.default_rng(7) 

210 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)] 

211 wide = MaskedImage( 

212 Image( 

213 rng.normal(100.0, 8.0, size=(50, 60)), 

214 dtype=np.float64, 

215 unit=u.nJy, 

216 start=(0, 0), 

217 ), 

218 mask_schema=MaskSchema(planes), 

219 ) 

220 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

221 with RoundtripNdf(self, wide) as roundtrip: 

222 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

223 

224 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

225 def test_round_trip_ndf_many_plane_mask(self): 

226 """NDF round-trip for a mask that needs more than one int32 chunk.""" 

227 rng = np.random.default_rng(11) 

228 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)] 

229 wide = MaskedImage( 

230 Image( 

231 rng.normal(100.0, 8.0, size=(10, 12)), 

232 dtype=np.float64, 

233 unit=u.nJy, 

234 start=(0, 0), 

235 ), 

236 mask_schema=MaskSchema(planes), 

237 ) 

238 wide.mask.set("P0", wide.image.array > 100.0) 

239 wide.mask.set("P17", wide.image.array < 95.0) 

240 wide.mask.set("P39", wide.image.array > 110.0) 

241 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

242 with RoundtripNdf(self, wide) as roundtrip: 

243 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

244 

245 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

246 def test_fits_ndf_consistency(self): 

247 """FITS and NDF backends produce equal MaskedImages on round-trip.""" 

248 with ( 

249 RoundtripFits(self, self.masked_image) as fits_rt, 

250 RoundtripNdf(self, self.masked_image) as ndf_rt, 

251 ): 

252 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False) 

253 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False) 

254 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

255 

256 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

257 def test_legacy(self) -> None: 

258 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and 

259 MaskedImage.from_legacy. 

260 """ 

261 assert DATA_DIR is not None, "Guaranteed by decorator." 

262 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

263 plane_map = get_legacy_visit_image_mask_planes() 

264 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map) 

265 try: 

266 from lsst.afw.image import MaskedImageFitsReader 

267 except ImportError: 

268 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

269 reader = MaskedImageFitsReader(filename) 

270 legacy_masked_image = reader.read() 

271 compare_masked_image_to_legacy( 

272 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False 

273 ) 

274 compare_masked_image_to_legacy( 

275 self, 

276 masked_image, 

277 masked_image.to_legacy(plane_map=plane_map), 

278 plane_map=plane_map, 

279 expect_view=True, 

280 ) 

281 compare_masked_image_to_legacy( 

282 self, 

283 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map), 

284 legacy_masked_image, 

285 expect_view=True, 

286 plane_map=plane_map, 

287 ) 

288 

289 

290if __name__ == "__main__": 

291 unittest.main()