Coverage for tests / test_masked_image.py: 22%

138 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-16 00:52 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.units as u 

20import numpy as np 

21from astro_metadata_translator import ObservationInfo 

22 

23from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes 

24from lsst.images.fits import FitsCompressionOptions 

25from lsst.images.tests import ( 

26 RoundtripFits, 

27 RoundtripNdf, 

28 assert_masked_images_equal, 

29 compare_masked_image_to_legacy, 

30) 

31 

32try: 

33 import h5py # noqa: F401 

34 

35 HAVE_H5PY = True 

36except ImportError: 

37 HAVE_H5PY = False 

38 

39DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

40 

41 

42class MaskedImageTestCase(unittest.TestCase): 

43 """Tests for the MaskedImage class and the basics of the archive system.""" 

44 

45 def setUp(self) -> None: 

46 self.maxDiff = None 

47 self.rng = np.random.default_rng(500) 

48 self.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4) 

49 self.masked_image = MaskedImage( 

50 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)), 

51 mask_schema=MaskSchema( 

52 [ 

53 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."), 

54 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."), 

55 ] 

56 ), 

57 metadata={"fifty": "5 * 10"}, 

58 obs_info=self.obs_info, 

59 ) 

60 self.masked_image.mask.array |= np.multiply.outer( 

61 self.masked_image.image.array < 102.0, 

62 self.masked_image.mask.schema.bitmask("BAD"), 

63 ) 

64 self.masked_image.mask.array |= np.multiply.outer( 

65 self.masked_image.image.array > 98.0, 

66 self.masked_image.mask.schema.bitmask("HUNGRY"), 

67 ) 

68 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape) 

69 

70 def test_construction(self) -> None: 

71 """Test that the MaskedImage construction (in setUp) worked.""" 

72 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259]) 

73 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox) 

74 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox) 

75 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape) 

76 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,)) 

77 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape) 

78 self.assertEqual(self.masked_image.unit, u.nJy) 

79 self.assertEqual(self.masked_image.variance.unit, u.nJy**2) 

80 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"}) 

81 self.assertEqual(self.masked_image.obs_info.instrument, "LSSTCam") 

82 # The checks below are subject to the vagaries of the RNG, but we want 

83 # the seed to be such that they all pass, or other tests will be 

84 # weaker. 

85 self.assertGreater( 

86 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0 

87 ) 

88 self.assertGreater( 

89 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0 

90 ) 

91 self.assertGreater( 

92 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0 

93 ) 

94 

95 self.assertIs(self.masked_image[...], self.masked_image) 

96 self.assertEqual( 

97 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])" 

98 ) 

99 self.assertEqual( 

100 repr(self.masked_image), 

101 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), " 

102 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is " 

103 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had " 

104 "enough to eat today.\")], dtype=dtype('uint8')))", 

105 ) 

106 copy = self.masked_image.copy() 

107 original = self.masked_image.image.array[0, 0] 

108 copy.image.array[0, 0] = 38.0 

109 self.assertEqual(self.masked_image.image.array[0, 0], original) 

110 self.assertEqual(copy.image.array[0, 0], 38.0) 

111 

112 # Test error conditions. 

113 with self.assertRaises(ValueError): 

114 # Disagreement over mask bbox. 

115 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask) 

116 with self.assertRaises(TypeError): 

117 # No mask definition. 

118 MaskedImage(self.masked_image.image, variance=self.masked_image.variance) 

119 with self.assertRaises(TypeError): 

120 # Can not provide mask and mask schema. 

121 MaskedImage( 

122 Image(42.0, shape=(5, 5)), 

123 mask=self.masked_image.mask, 

124 mask_schema=self.masked_image.mask.schema, 

125 ) 

126 with self.assertRaises(ValueError): 

127 # image and variance bbox disagreement. 

128 MaskedImage( 

129 Image(42.0, shape=(5, 5)), 

130 mask_schema=self.masked_image.mask.schema, 

131 variance=self.masked_image.variance, 

132 ) 

133 with self.assertRaises(ValueError): 

134 # no image unit but there is variance unit. 

135 MaskedImage( 

136 Image(42.0, shape=(5, 5)), 

137 mask_schema=self.masked_image.mask.schema, 

138 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

139 ) 

140 with self.assertRaises(ValueError): 

141 # image and variance units disagree. 

142 MaskedImage( 

143 Image(42.0, shape=(5, 5), unit=u.nJy), 

144 mask_schema=self.masked_image.mask.schema, 

145 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

146 ) 

147 

148 def test_subset(self) -> None: 

149 """Test assignment of subset.""" 

150 copy = self.masked_image.copy() 

151 subset = copy.local[0:10, 20:30].copy() 

152 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy) 

153 copy[subset.bbox] = subset 

154 self.assertEqual(copy.image.array[0, 20], 42.0) 

155 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0]) 

156 

157 def test_fits_roundtrip(self) -> None: 

158 """Test that we can round-trip the MaskedImage through FITS, including 

159 subimage reads. 

160 """ 

161 subbox = Box.factory[11:20, 25:30] 

162 subslices = (slice(6, 15), slice(17, 22)) 

163 np.testing.assert_array_equal( 

164 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

165 ) 

166 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip: 

167 subimage = roundtrip.get(bbox=subbox) 

168 # Check that we used lossless compression (the default). 

169 fits = roundtrip.inspect() 

170 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

171 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

172 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

173 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

174 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False) 

175 

176 def test_fits_roundtrip_lossy(self) -> None: 

177 """Test that we can round-trip the MaskedImage through FITS, including 

178 subimage reads, with lossy compression. 

179 """ 

180 subbox = Box.factory[11:20, 25:30] 

181 subslices = (slice(6, 15), slice(17, 22)) 

182 np.testing.assert_array_equal( 

183 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

184 ) 

185 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp: 

186 tmp.close() 

187 self.masked_image.write_fits( 

188 tmp.name, 

189 image_compression=FitsCompressionOptions.LOSSY, 

190 variance_compression=FitsCompressionOptions.LOSSY, 

191 compression_seed=50, 

192 ) 

193 roundtripped = MaskedImage.read_fits(tmp.name) 

194 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox) 

195 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits: 

196 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

197 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

198 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

199 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01) 

200 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False) 

201 

202 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

203 def test_round_trip_ndf_compatible_mask(self): 

204 """NDF round-trip for the default-setup MaskedImage (2 planes ≤ 8).""" 

205 with RoundtripNdf(self, self.masked_image) as roundtrip: 

206 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

207 

208 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

209 def test_round_trip_ndf_incompatible_mask(self): 

210 """NDF round-trip for a >8-plane mask (uses native 3D mask array, 

211 to MORE/LSST/MASK). 

212 """ 

213 rng = np.random.default_rng(7) 

214 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(12)] 

215 wide = MaskedImage( 

216 Image( 

217 rng.normal(100.0, 8.0, size=(50, 60)), 

218 dtype=np.float64, 

219 unit=u.nJy, 

220 start=(0, 0), 

221 ), 

222 mask_schema=MaskSchema(planes), 

223 obs_info=self.obs_info, 

224 ) 

225 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

226 with RoundtripNdf(self, wide) as roundtrip: 

227 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

228 

229 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

230 def test_round_trip_ndf_many_plane_mask(self): 

231 """NDF round-trip for a mask that needs more than one int32 chunk.""" 

232 rng = np.random.default_rng(11) 

233 planes = [MaskPlane(f"P{i}", f"plane {i}") for i in range(40)] 

234 wide = MaskedImage( 

235 Image( 

236 rng.normal(100.0, 8.0, size=(10, 12)), 

237 dtype=np.float64, 

238 unit=u.nJy, 

239 start=(0, 0), 

240 ), 

241 mask_schema=MaskSchema(planes), 

242 obs_info=self.obs_info, 

243 ) 

244 wide.mask.set("P0", wide.image.array > 100.0) 

245 wide.mask.set("P17", wide.image.array < 95.0) 

246 wide.mask.set("P39", wide.image.array > 110.0) 

247 wide.variance.array = rng.normal(64.0, 0.5, size=wide.bbox.shape) 

248 with RoundtripNdf(self, wide) as roundtrip: 

249 assert_masked_images_equal(self, roundtrip.result, wide, expect_view=False) 

250 

251 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

252 def test_fits_ndf_consistency(self): 

253 """FITS and NDF backends produce equal MaskedImages on round-trip.""" 

254 with ( 

255 RoundtripFits(self, self.masked_image) as fits_rt, 

256 RoundtripNdf(self, self.masked_image) as ndf_rt, 

257 ): 

258 assert_masked_images_equal(self, self.masked_image, fits_rt.result, expect_view=False) 

259 assert_masked_images_equal(self, self.masked_image, ndf_rt.result, expect_view=False) 

260 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

261 

262 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

263 def test_legacy(self) -> None: 

264 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and 

265 MaskedImage.from_legacy. 

266 """ 

267 assert DATA_DIR is not None, "Guaranteed by decorator." 

268 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

269 plane_map = get_legacy_visit_image_mask_planes() 

270 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map) 

271 try: 

272 from lsst.afw.image import MaskedImageFitsReader 

273 except ImportError: 

274 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

275 reader = MaskedImageFitsReader(filename) 

276 legacy_masked_image = reader.read() 

277 compare_masked_image_to_legacy( 

278 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False 

279 ) 

280 compare_masked_image_to_legacy( 

281 self, 

282 masked_image, 

283 masked_image.to_legacy(plane_map=plane_map), 

284 plane_map=plane_map, 

285 expect_view=True, 

286 ) 

287 compare_masked_image_to_legacy( 

288 self, 

289 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map), 

290 legacy_masked_image, 

291 expect_view=True, 

292 plane_map=plane_map, 

293 ) 

294 

295 

296if __name__ == "__main__": 

297 unittest.main()