Coverage for tests / test_ndf_input_archive.py: 16%

286 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:52 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.units as u 

20import numpy as np 

21import pydantic 

22 

23from lsst.images import Box, Image, ImageSerializationModel, Mask, MaskedImage 

24from lsst.images._transforms import FrameSet 

25from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

26from lsst.images.serialization import ( 

27 ArchiveReadError, 

28 ArrayReferenceModel, 

29 InlineArrayModel, 

30 NumberType, 

31) 

32 

33try: 

34 import h5py 

35 

36 from lsst.images.ndf import ( 

37 NdfInputArchive, 

38 NdfOutputArchive, 

39 NdfPointerModel, 

40 _hds, 

41 read, 

42 write, 

43 ) 

44 

45 HAVE_H5PY = True 

46except ImportError: 

47 HAVE_H5PY = False 

48 

49 

50@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

51class NdfInputArchiveOpenTestCase(unittest.TestCase): 

52 """Tests for `NdfInputArchive.open` and `get_tree`.""" 

53 

54 def test_open_round_trips_image_tree(self): 

55 image = Image( 

56 np.arange(20, dtype=np.float32).reshape(4, 5), 

57 bbox=Box.factory[10:14, 20:25], 

58 ) 

59 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

60 tmp.close() 

61 written_tree = write(image, tmp.name) 

62 with NdfInputArchive.open(tmp.name) as archive: 

63 tree = archive.get_tree(type(written_tree)) 

64 self.assertEqual(tree.model_dump_json(), written_tree.model_dump_json()) 

65 

66 def test_get_tree_raises_when_main_json_missing(self): 

67 # A file with no /MORE/LSST/JSON should raise ArchiveReadError. 

68 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

69 tmp.close() 

70 with h5py.File(tmp.name, "w") as f: 

71 f["/"].attrs["CLASS"] = "NDF" 

72 with NdfInputArchive.open(tmp.name) as archive: 

73 model_type = ImageSerializationModel[NdfPointerModel] 

74 with self.assertRaises(ArchiveReadError): 

75 archive.get_tree(model_type) 

76 

77 

78@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

79class NdfInputArchiveDataTestCase(unittest.TestCase): 

80 """Tests for `get_array`, `deserialize_pointer`, and `get_frame_set`.""" 

81 

82 def test_get_array_reads_image_array(self): 

83 image = Image(np.arange(20, dtype=np.float32).reshape(4, 5)) 

84 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

85 tmp.close() 

86 tree = write(image, tmp.name) 

87 with NdfInputArchive.open(tmp.name) as archive: 

88 # The Image tree's `data` attribute is an 

89 # ArrayReferenceModel pointing at /DATA_ARRAY/DATA. 

90 arr = archive.get_array(tree.data) 

91 np.testing.assert_array_equal(arr, image.array) 

92 

93 def test_get_array_supports_slicing(self): 

94 image = Image(np.arange(20, dtype=np.float32).reshape(4, 5)) 

95 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

96 tmp.close() 

97 tree = write(image, tmp.name) 

98 with NdfInputArchive.open(tmp.name) as archive: 

99 arr = archive.get_array(tree.data, slices=(slice(0, 2), slice(1, 4))) 

100 np.testing.assert_array_equal(arr, image.array[:2, 1:4]) 

101 

102 def test_get_array_handles_inline_array(self): 

103 inline = InlineArrayModel(data=[1.0, 2.0, 3.0], datatype=NumberType.float64) 

104 image = Image(np.zeros((2, 2), dtype=np.float32)) 

105 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

106 tmp.close() 

107 write(image, tmp.name) 

108 with NdfInputArchive.open(tmp.name) as archive: 

109 arr = archive.get_array(inline) 

110 np.testing.assert_array_equal(arr, np.array([1.0, 2.0, 3.0])) 

111 

112 def test_get_array_unrecognised_source_raises(self): 

113 image = Image(np.zeros((2, 2), dtype=np.float32)) 

114 bogus = ArrayReferenceModel(source="fits:NOTUS", shape=[2, 2], datatype=NumberType.float32) 

115 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

116 tmp.close() 

117 write(image, tmp.name) 

118 with NdfInputArchive.open(tmp.name) as archive: 

119 with self.assertRaises(ArchiveReadError): 

120 archive.get_array(bogus) 

121 

122 def test_deserialize_pointer_round_trips_subtree(self): 

123 # Build a file with a hoisted sub-tree we can read back. Use the 

124 # output archive directly to avoid pulling in the full Image stack. 

125 class TinyTree(pydantic.BaseModel): 

126 name: str 

127 

128 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

129 tmp.close() 

130 with h5py.File(tmp.name, "w") as f: 

131 arch = NdfOutputArchive(f) 

132 ptr = arch.serialize_pointer("psf", lambda nested: TinyTree(name="hello"), key=("psf", 1)) 

133 with NdfInputArchive.open(tmp.name) as archive: 

134 # Deserializer just returns the model unchanged. 

135 result = archive.deserialize_pointer(ptr, TinyTree, lambda m, _a: m) 

136 self.assertEqual(result.name, "hello") 

137 

138 def test_deserialize_pointer_caches_by_ref(self): 

139 class TinyTree(pydantic.BaseModel): 

140 name: str 

141 

142 calls = [] 

143 

144 def deserializer(model, _archive): 

145 calls.append(model) 

146 return model 

147 

148 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

149 tmp.close() 

150 with h5py.File(tmp.name, "w") as f: 

151 arch = NdfOutputArchive(f) 

152 ptr = arch.serialize_pointer("psf", lambda nested: TinyTree(name="x"), key=("psf", 1)) 

153 with NdfInputArchive.open(tmp.name) as archive: 

154 first = archive.deserialize_pointer(ptr, TinyTree, deserializer) 

155 second = archive.deserialize_pointer(ptr, TinyTree, deserializer) 

156 self.assertIs(first, second) 

157 self.assertEqual(len(calls), 1) 

158 

159 def test_deserialize_pointer_caches_frame_set_for_get_frame_set(self): 

160 class TinyTree(pydantic.BaseModel): 

161 name: str 

162 

163 class DummyFrameSet(FrameSet): 

164 def __contains__(self, frame): 

165 return False 

166 

167 def __getitem__(self, key): 

168 raise AssertionError("DummyFrameSet should not be indexed in this test.") 

169 

170 sentinel = DummyFrameSet() 

171 

172 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

173 tmp.close() 

174 with h5py.File(tmp.name, "w") as f: 

175 arch = NdfOutputArchive(f) 

176 ptr = arch.serialize_frame_set( 

177 "frames", 

178 sentinel, 

179 lambda nested: TinyTree(name="frames"), 

180 key=("frames", 1), 

181 ) 

182 with NdfInputArchive.open(tmp.name) as archive: 

183 result = archive.deserialize_pointer(ptr, TinyTree, lambda _m, _a: sentinel) 

184 self.assertIs(result, sentinel) 

185 self.assertIs(archive.get_frame_set(ptr), sentinel) 

186 

187 def test_get_frame_set_returns_cached_value(self): 

188 # Exercise the cache mechanism with a sentinel object pretending 

189 # to be a FrameSet. Real FrameSet plumbing comes when the AST 

190 # text dump for /WCS/DATA lands in a follow-up task. 

191 sentinel = object() 

192 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

193 tmp.close() 

194 write(Image(np.zeros((2, 2), dtype=np.float32)), tmp.name) 

195 with NdfInputArchive.open(tmp.name) as archive: 

196 # Manually populate the cache as deserialize_pointer would 

197 # if a FrameSet deserializer ran. 

198 archive._frame_set_cache["/MORE/LSST/PIXEL_TO_SKY"] = sentinel 

199 pointer = NdfPointerModel(path="/MORE/LSST/PIXEL_TO_SKY") 

200 self.assertIs(archive.get_frame_set(pointer), sentinel) 

201 

202 def test_get_frame_set_raises_if_not_cached(self): 

203 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

204 tmp.close() 

205 write(Image(np.zeros((2, 2), dtype=np.float32)), tmp.name) 

206 with NdfInputArchive.open(tmp.name) as archive: 

207 pointer = NdfPointerModel(path="/MORE/LSST/UNKNOWN") 

208 with self.assertRaises(AssertionError): 

209 archive.get_frame_set(pointer) 

210 

211 

212@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

213class NdfInputArchiveOpaqueMetadataTestCase(unittest.TestCase): 

214 """Tests for `NdfInputArchive.get_opaque_metadata`.""" 

215 

216 def test_more_fits_round_trips_via_opaque_metadata(self): 

217 image = Image(np.zeros((2, 2), dtype=np.float32)) 

218 primary = astropy.io.fits.Header() 

219 primary["FOO"] = ("bar", "test card") 

220 opaque = FitsOpaqueMetadata() 

221 opaque.add_header(primary, name="", ver=1) 

222 image._opaque_metadata = opaque 

223 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

224 tmp.close() 

225 write(image, tmp.name) 

226 with NdfInputArchive.open(tmp.name) as archive: 

227 recovered = archive.get_opaque_metadata() 

228 self.assertIn(ExtensionKey(), recovered.headers) 

229 self.assertEqual(recovered.headers[ExtensionKey()]["FOO"], "bar") 

230 

231 def test_get_opaque_metadata_empty_when_no_more_fits(self): 

232 # Image with no opaque metadata -> /MORE/FITS is absent in the file. 

233 image = Image(np.zeros((2, 2), dtype=np.float32)) 

234 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

235 tmp.close() 

236 write(image, tmp.name) 

237 with NdfInputArchive.open(tmp.name) as archive: 

238 recovered = archive.get_opaque_metadata() 

239 self.assertIsInstance(recovered, FitsOpaqueMetadata) 

240 # No primary header should be populated since /MORE/FITS 

241 # was never written. 

242 self.assertFalse(recovered.headers) 

243 

244 

245@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

246class NdfReadFunctionTestCase(unittest.TestCase): 

247 """Tests for the module-level `ndf.read()` function.""" 

248 

249 def test_read_round_trips_image(self): 

250 image = Image( 

251 np.arange(20, dtype=np.float32).reshape(4, 5), 

252 bbox=Box.factory[10:14, 20:25], 

253 ) 

254 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

255 tmp.close() 

256 write(image, tmp.name) 

257 result = read(Image, tmp.name) 

258 self.assertIsInstance(result.deserialized, Image) 

259 np.testing.assert_array_equal(result.deserialized.array, image.array) 

260 self.assertEqual(result.deserialized.bbox, image.bbox) 

261 

262 def test_read_starlink_file_auto_detects_image(self): 

263 # The canonical fixture has no /MORE/LSST/JSON, no QUALITY, 

264 # no VARIANCE -- auto-detect should return an Image whose array 

265 # shape matches the file (611x609 int16). 

266 example_path = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf") 

267 result = read(Image, example_path) 

268 self.assertIsInstance(result.deserialized, Image) 

269 self.assertEqual(result.deserialized.array.shape, (611, 609)) 

270 self.assertEqual(result.deserialized.array.dtype, np.int16) 

271 self.assertIsNotNone(result.deserialized.projection) 

272 

273 def test_read_starlink_file_recovers_opaque_fits_metadata(self): 

274 example_path = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf") 

275 result = read(Image, example_path) 

276 opaque = result.deserialized._opaque_metadata 

277 self.assertIn(ExtensionKey(), opaque.headers) 

278 # The fixture is a real Starlink M57 image; sample one card we know 

279 # is present (NAXIS). 

280 primary = opaque.headers[ExtensionKey()] 

281 self.assertIn("NAXIS", primary) 

282 

283 def test_read_auto_detects_nested_quality_array(self): 

284 image_array = np.arange(6, dtype=np.float32).reshape(2, 3) 

285 quality_array = np.array([[0, 1, 0], [1, 0, 1]], dtype=np.uint8) 

286 

287 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

288 tmp.close() 

289 with h5py.File(tmp.name, "w") as f: 

290 _hds.set_root_name(f, "TEST", "NDF") 

291 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY") 

292 _hds.write_array(data_array, "DATA", image_array) 

293 quality = _hds.create_structure(f, "QUALITY", "QUALITY") 

294 quality_array_struct = _hds.create_structure(quality, "QUALITY", "ARRAY") 

295 _hds.write_array(quality_array_struct, "DATA", quality_array) 

296 _hds.write_array(quality_array_struct, "ORIGIN", np.array([0, 0], dtype=np.int32)) 

297 _hds.write_array(quality_array_struct, "BAD_PIXEL", np.array(False, dtype=np.bool_)) 

298 _hds.write_array(quality, "BADBITS", np.array(1, dtype=np.uint8)) 

299 result = read(MaskedImage, tmp.name) 

300 self.assertIsInstance(result.deserialized, MaskedImage) 

301 np.testing.assert_array_equal(result.deserialized.mask.array[:, :, 0], quality_array) 

302 self.assertEqual(set(result.deserialized.mask.schema.names), {f"MASK{i}" for i in range(8)}) 

303 image_result = read(Image, tmp.name) 

304 self.assertIsInstance(image_result.deserialized, Image) 

305 np.testing.assert_array_equal(image_result.deserialized.array, image_array) 

306 

307 def test_read_auto_detect_preserves_quality_bits(self): 

308 image_array = np.arange(6, dtype=np.float32).reshape(2, 3) 

309 quality_array = np.array([[0, 2, 4], [2, 0, 6]], dtype=np.uint8) 

310 expected_mask1 = np.array([[0, 1, 0], [1, 0, 1]], dtype=bool) 

311 expected_mask2 = np.array([[0, 0, 1], [0, 0, 1]], dtype=bool) 

312 

313 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

314 tmp.close() 

315 with h5py.File(tmp.name, "w") as f: 

316 _hds.set_root_name(f, "TEST", "NDF") 

317 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY") 

318 _hds.write_array(data_array, "DATA", image_array) 

319 quality = _hds.create_structure(f, "QUALITY", "QUALITY") 

320 quality_array_struct = _hds.create_structure(quality, "QUALITY", "ARRAY") 

321 _hds.write_array(quality_array_struct, "DATA", quality_array) 

322 _hds.write_array(quality_array_struct, "ORIGIN", np.array([0, 0], dtype=np.int32)) 

323 _hds.write_array(quality_array_struct, "BAD_PIXEL", np.array(False, dtype=np.bool_)) 

324 _hds.write_array(quality, "BADBITS", np.array(2, dtype=np.uint8)) 

325 result = read(MaskedImage, tmp.name) 

326 self.assertIsInstance(result.deserialized, MaskedImage) 

327 mask = result.deserialized.mask 

328 np.testing.assert_array_equal(mask.array[:, :, 0], quality_array) 

329 np.testing.assert_array_equal(mask.get("MASK1"), expected_mask1) 

330 np.testing.assert_array_equal(mask.get("MASK2"), expected_mask2) 

331 self.assertIn("Selected by BADBITS", mask.schema.descriptions["MASK1"]) 

332 self.assertNotIn("Selected by BADBITS", mask.schema.descriptions["MASK2"]) 

333 

334 def test_read_auto_detected_data_only_as_masked_image_uses_defaults(self): 

335 image_array = np.arange(6, dtype=np.float32).reshape(2, 3) 

336 

337 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

338 tmp.close() 

339 with h5py.File(tmp.name, "w") as f: 

340 _hds.set_root_name(f, "TEST", "NDF") 

341 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY") 

342 _hds.write_array(data_array, "DATA", image_array) 

343 _hds.write_array(data_array, "ORIGIN", np.array([5, 4], dtype=np.int32)) 

344 result = read(MaskedImage, tmp.name) 

345 self.assertIsInstance(result.deserialized, MaskedImage) 

346 self.assertEqual(result.deserialized.bbox, Box.factory[4:6, 5:8]) 

347 np.testing.assert_array_equal(result.deserialized.image.array, image_array) 

348 np.testing.assert_array_equal( 

349 result.deserialized.mask.array, 

350 np.zeros((2, 3, 1), dtype=np.uint8), 

351 ) 

352 np.testing.assert_array_equal( 

353 result.deserialized.variance.array, 

354 np.ones((2, 3), dtype=np.float32), 

355 ) 

356 

357 def test_read_auto_detected_variance_as_masked_image_keeps_variance(self): 

358 image_array = np.arange(6, dtype=np.float32).reshape(2, 3) 

359 variance_array = np.full((2, 3), 2.5, dtype=np.float32) 

360 

361 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

362 tmp.close() 

363 with h5py.File(tmp.name, "w") as f: 

364 _hds.set_root_name(f, "TEST", "NDF") 

365 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY") 

366 _hds.write_array(data_array, "DATA", image_array) 

367 _hds.write_array(data_array, "ORIGIN", np.array([5, 4], dtype=np.int32)) 

368 variance = _hds.create_structure(f, "VARIANCE", "ARRAY") 

369 _hds.write_array(variance, "DATA", variance_array) 

370 _hds.write_array(variance, "ORIGIN", np.array([5, 4], dtype=np.int32)) 

371 result = read(MaskedImage, tmp.name) 

372 self.assertIsInstance(result.deserialized, MaskedImage) 

373 np.testing.assert_array_equal(result.deserialized.variance.array, variance_array) 

374 np.testing.assert_array_equal( 

375 result.deserialized.mask.array, 

376 np.zeros((2, 3, 1), dtype=np.uint8), 

377 ) 

378 

379 def test_read_auto_detected_units_component(self): 

380 image_array = np.arange(6, dtype=np.float32).reshape(2, 3) 

381 

382 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

383 tmp.close() 

384 with h5py.File(tmp.name, "w") as f: 

385 _hds.set_root_name(f, "TEST", "NDF") 

386 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY") 

387 _hds.write_array(data_array, "DATA", image_array) 

388 f.create_dataset("UNITS", data=np.bytes_("count")) 

389 result = read(Image, tmp.name) 

390 self.assertEqual(result.deserialized.unit, u.ct) 

391 

392 def test_read_missing_data_array_raises(self): 

393 # A file with only /MORE/LSST/JSON is fine for the symmetric 

394 # path. A file with NEITHER /MORE/LSST/JSON NOR DATA_ARRAY is a 

395 # malformed NDF -- auto-detect must fail clearly. 

396 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

397 tmp.close() 

398 with h5py.File(tmp.name, "w") as f: 

399 f["/"].attrs["CLASS"] = "NDF" 

400 # Note: no DATA_ARRAY, no /MORE/LSST/JSON. 

401 with self.assertRaises(ArchiveReadError): 

402 read(Image, tmp.name) 

403 

404 def test_read_auto_detect_wrong_target_type_raises(self): 

405 # Auto-detect only knows how to produce Image-like objects from NDF 

406 # components; unrelated target classes should fail clearly. 

407 example_path = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf") 

408 with self.assertRaises(ArchiveReadError): 

409 read(Mask, example_path)