Coverage for tests / test_ndf_output_archive.py: 16%

377 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:47 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import json 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.table 

20import astropy.units as u 

21import numpy as np 

22import pydantic 

23 

24from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema 

25from lsst.images._transforms import FrameLookupError, FrameSet, Transform 

26from lsst.images._transforms._frames import DetectorFrame, Frame 

27from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

28from lsst.images.serialization import ArrayReferenceModel, InlineArrayModel 

29from lsst.images.tests import make_random_projection 

30 

31try: 

32 import h5py 

33 

34 from lsst.images.ndf import ( 

35 NdfInputArchive, 

36 NdfOutputArchive, 

37 _hds, 

38 read, 

39 write, 

40 ) 

41 

42 HAVE_H5PY = True 

43except ImportError: 

44 HAVE_H5PY = False 

45 

46 

47class TinyFrameSet(FrameSet): 

48 """Minimal concrete frame-set for archive bookkeeping tests.""" 

49 

50 def __contains__(self, frame: Frame) -> bool: 

51 return False 

52 

53 def __getitem__[I: Frame, O: Frame](self, key: tuple[I, O]) -> Transform[I, O]: 

54 raise FrameLookupError(key) 

55 

56 

57class TinyTree(pydantic.BaseModel): 

58 """A trivial Pydantic model used as a serialization stand-in.""" 

59 

60 name: str 

61 

62 

63@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

64class NdfOutputArchiveBasicsTestCase(unittest.TestCase): 

65 """Tests for `NdfOutputArchive` constructor and `serialize_direct`.""" 

66 

67 def test_serialize_direct_calls_serializer_with_nested_archive(self): 

68 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

69 with h5py.File(tmp.name, "w") as f: 

70 arch = NdfOutputArchive(f) 

71 tree = arch.serialize_direct("top", lambda nested: TinyTree(name="hello")) 

72 self.assertEqual(tree.name, "hello") 

73 

74 def test_constructor_marks_root_as_ndf(self): 

75 """The constructor should set CLASS=NDF on the root group so that 

76 Starlink tools recognise the file as an NDF. 

77 """ 

78 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

79 with h5py.File(tmp.name, "w") as f: 

80 NdfOutputArchive(f) 

81 with h5py.File(tmp.name, "r") as f: 

82 self.assertEqual(f["/"].attrs["CLASS"], b"NDF") 

83 

84 

85@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

86class NdfOutputArchiveAddArrayTestCase(unittest.TestCase): 

87 """Tests for `NdfOutputArchive.add_array` routing.""" 

88 

89 def test_top_level_image_routes_to_data_array(self): 

90 data = np.arange(20, dtype=np.float32).reshape(4, 5) 

91 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

92 with h5py.File(tmp.name, "w") as f: 

93 arch = NdfOutputArchive(f) 

94 ref = arch.add_array(data, name="image") 

95 self.assertEqual(ref.source, "ndf:/DATA_ARRAY/DATA") 

96 with h5py.File(tmp.name, "r") as f: 

97 ds = f["/DATA_ARRAY/DATA"] 

98 self.assertEqual(ds.dtype, np.float32) 

99 np.testing.assert_array_equal(ds[()], data) 

100 self.assertEqual(f["/DATA_ARRAY"].attrs["CLASS"], b"ARRAY") 

101 origin = f["/DATA_ARRAY/ORIGIN"] 

102 self.assertEqual(origin.dtype, np.int64) 

103 self.assertEqual(origin.shape, (2,)) 

104 

105 def test_top_level_variance_routes_to_variance(self): 

106 data = np.full((3, 3), 0.5, dtype=np.float64) 

107 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

108 with h5py.File(tmp.name, "w") as f: 

109 arch = NdfOutputArchive(f) 

110 ref = arch.add_array(data, name="variance") 

111 self.assertEqual(ref.source, "ndf:/VARIANCE/DATA") 

112 with h5py.File(tmp.name, "r") as f: 

113 self.assertEqual(f["/VARIANCE"].attrs["CLASS"], b"ARRAY") 

114 self.assertEqual(f["/VARIANCE/DATA"].dtype, np.float64) 

115 

116 def test_top_level_compatible_mask_routes_to_quality(self): 

117 data = np.array([[0, 1, 2], [3, 4, 5]], dtype=np.uint8) 

118 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

119 with h5py.File(tmp.name, "w") as f: 

120 arch = NdfOutputArchive(f) 

121 ref = arch.add_array(data, name="mask") 

122 self.assertEqual(ref.source, "ndf:/QUALITY/QUALITY/DATA") 

123 with h5py.File(tmp.name, "r") as f: 

124 self.assertEqual(f["/QUALITY"].attrs["CLASS"], b"QUALITY") 

125 self.assertEqual(f["/QUALITY/QUALITY"].attrs["CLASS"], b"ARRAY") 

126 self.assertEqual(f["/QUALITY/QUALITY/DATA"].dtype, np.uint8) 

127 np.testing.assert_array_equal(f["/QUALITY/QUALITY/DATA"][()], data) 

128 self.assertEqual(f["/QUALITY/QUALITY/ORIGIN"].dtype, np.int32) 

129 self.assertEqual(f["/QUALITY/QUALITY/ORIGIN"].shape, (2,)) 

130 self.assertEqual(f["/QUALITY/QUALITY/BAD_PIXEL"].id.get_type().get_class(), h5py.h5t.BITFIELD) 

131 self.assertFalse(_hds.read_array(f["/QUALITY/QUALITY/BAD_PIXEL"])) 

132 self.assertEqual(f["/QUALITY/BADBITS"][()], 255) 

133 

134 def test_top_level_incompatible_mask_routes_to_more_lsst(self): 

135 # 3D mask array in NDF storage order (mask-byte, y, x) is hoisted 

136 # as a sub-NDF inside /MORE/LSST/MASK, with a compressed 2D view 

137 # exposed as /QUALITY/QUALITY for standard NDF applications. 

138 data = np.zeros((2, 3, 4), dtype=np.uint8) 

139 data[0, 1, 2] = 4 

140 data[1, 2, 3] = 8 

141 expected_quality = np.any(data != 0, axis=0).astype(np.uint8) 

142 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

143 with h5py.File(tmp.name, "w") as f: 

144 arch = NdfOutputArchive(f) 

145 ref = arch.add_array(data, name="mask") 

146 self.assertEqual(ref.source, "ndf:/MORE/LSST/MASK/DATA_ARRAY/DATA") 

147 with h5py.File(tmp.name, "r") as f: 

148 # /MORE/LSST/MASK is a real NDF: top-level CLASS="NDF" 

149 # containing a DATA_ARRAY structure with DATA + ORIGIN. 

150 self.assertEqual(f["/MORE/LSST/MASK"].attrs["CLASS"], b"NDF") 

151 self.assertEqual(f["/MORE/LSST/MASK/DATA_ARRAY"].attrs["CLASS"], b"ARRAY") 

152 self.assertEqual(f["/MORE/LSST/MASK/DATA_ARRAY/DATA"].shape, data.shape) 

153 self.assertEqual(f["/QUALITY/QUALITY"].attrs["CLASS"], b"ARRAY") 

154 np.testing.assert_array_equal(f["/QUALITY/QUALITY/DATA"][()], expected_quality) 

155 self.assertEqual(f["/QUALITY/BADBITS"][()], 255) 

156 origin = f["/MORE/LSST/MASK/DATA_ARRAY/ORIGIN"] 

157 self.assertEqual(origin.dtype, np.int64) 

158 self.assertEqual(origin.shape, (3,)) 

159 

160 def test_nested_array_hoists_as_sub_ndf(self): 

161 # Hoisted numeric arrays land under /MORE/LSST as hierarchical 

162 # sub-NDFs (CLASS="NDF" with DATA_ARRAY/DATA + ORIGIN inside) so 

163 # Starlink tools can inspect them as ordinary NDFs while each HDS 

164 # component stays short. 

165 data = np.array([[1.0, 2.0]], dtype=np.float32) 

166 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

167 with h5py.File(tmp.name, "w") as f: 

168 arch = NdfOutputArchive(f) 

169 ref = arch.add_array(data, name="psf/coefficients") 

170 self.assertEqual(ref.source, "ndf:/MORE/LSST/PSF/COEFFICIENTS/DATA_ARRAY/DATA") 

171 with h5py.File(tmp.name, "r") as f: 

172 self.assertIn("MORE", f) 

173 self.assertIn("LSST", f["/MORE"]) 

174 self.assertIn("PSF", f["/MORE/LSST"]) 

175 self.assertIn("COEFFICIENTS", f["/MORE/LSST/PSF"]) 

176 sub = f["/MORE/LSST/PSF/COEFFICIENTS"] 

177 self.assertEqual(sub.attrs["CLASS"], b"NDF") 

178 self.assertEqual(sub["DATA_ARRAY"].attrs["CLASS"], b"ARRAY") 

179 np.testing.assert_array_equal(sub["DATA_ARRAY/DATA"][()], data) 

180 origin = sub["DATA_ARRAY/ORIGIN"] 

181 self.assertEqual(origin.dtype, np.int64) 

182 self.assertEqual(origin.shape, (data.ndim,)) 

183 

184 

185@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

186class NdfOutputArchivePointerTestCase(unittest.TestCase): 

187 """Tests for `NdfOutputArchive.serialize_pointer` and 

188 `serialize_frame_set`. 

189 """ 

190 

191 def test_serialize_pointer_writes_subtree_and_returns_pointer(self): 

192 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

193 with h5py.File(tmp.name, "w") as f: 

194 arch = NdfOutputArchive(f) 

195 ptr = arch.serialize_pointer( 

196 "psf", 

197 lambda nested: TinyTree(name="gaussian"), 

198 key=("psf", 1), 

199 ) 

200 self.assertEqual(ptr.path, "/MORE/LSST/PSF/JSON") 

201 with h5py.File(tmp.name, "r") as f: 

202 # The hoisted sub-tree is stored as a "JSON" _CHAR*N 

203 # child of the target structure. 

204 raw = f["/MORE/LSST/PSF/JSON"][()] 

205 joined = b"".join(raw).decode("ascii").rstrip(" ") 

206 self.assertIn('"name":"gaussian"', joined.replace(" ", "")) 

207 

208 def test_serialize_pointer_caches_by_key(self): 

209 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

210 with h5py.File(tmp.name, "w") as f: 

211 arch = NdfOutputArchive(f) 

212 ptr1 = arch.serialize_pointer( 

213 "psf", 

214 lambda nested: TinyTree(name="first"), 

215 key=("psf", 1), 

216 ) 

217 # Same key -> returns cached pointer; serializer not re-run 

218 # (we'd otherwise overwrite the file content with "second"). 

219 ptr2 = arch.serialize_pointer( 

220 "psf", 

221 lambda nested: TinyTree(name="second"), 

222 key=("psf", 1), 

223 ) 

224 self.assertEqual(ptr1, ptr2) 

225 with h5py.File(tmp.name, "r") as f: 

226 raw = f["/MORE/LSST/PSF/JSON"][()] 

227 joined = b"".join(raw).decode("ascii").rstrip(" ") 

228 self.assertIn("first", joined) 

229 self.assertNotIn("second", joined) 

230 

231 def test_serialize_pointer_preserves_nested_arrays(self): 

232 # Regression test: a pointer target that writes a nested array via 

233 # the nested archive must round-trip with that array still in the 

234 # file. Previously the pointer JSON was written at the target path 

235 # itself, clobbering any nested data the serializer produced. 

236 class TreeWithArray(pydantic.BaseModel): 

237 name: str 

238 data: ArrayReferenceModel 

239 

240 payload = np.arange(6, dtype=np.float32).reshape(2, 3) 

241 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

242 with h5py.File(tmp.name, "w") as f: 

243 arch = NdfOutputArchive(f) 

244 ptr = arch.serialize_pointer( 

245 "psf", 

246 lambda nested: TreeWithArray( 

247 name="gaussian", 

248 data=nested.add_array(payload, name="parameters"), 

249 ), 

250 key=("psf", 1), 

251 ) 

252 self.assertEqual(ptr.path, "/MORE/LSST/PSF/JSON") 

253 with h5py.File(tmp.name, "r") as f: 

254 # JSON stayed at <path>/JSON; the nested array is still 

255 # accessible at the sub-NDF path the serializer wrote. 

256 self.assertIn("/MORE/LSST/PSF/JSON", f) 

257 self.assertIn("/MORE/LSST/PSF/PARAMETERS/DATA_ARRAY/DATA", f) 

258 np.testing.assert_array_equal(f["/MORE/LSST/PSF/PARAMETERS/DATA_ARRAY/DATA"][()], payload) 

259 # The pointer-target structure is typed after its leaf 

260 # name, not the generic EXT. 

261 self.assertEqual(f["/MORE/LSST/PSF"].attrs["CLASS"], b"PSF") 

262 self.assertEqual(f["/MORE/LSST"].attrs["CLASS"], b"LSST") 

263 

264 def test_serialize_frame_set_records_for_iter(self): 

265 # serialize_frame_set is delegated to serialize_pointer plus 

266 # recording the (FrameSet, pointer) pair for iter_frame_sets, 

267 # mirroring how FITS and JSON archives behave. 

268 frame_set = TinyFrameSet() 

269 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

270 with h5py.File(tmp.name, "w") as f: 

271 arch = NdfOutputArchive(f) 

272 ptr = arch.serialize_frame_set( 

273 "wcs/pixel_to_sky", 

274 frame_set, 

275 lambda nested: TinyTree(name="proj"), 

276 key=("frame_set", 1), 

277 ) 

278 self.assertEqual(ptr.path, "/MORE/LSST/WCS/PIXEL_TO_SKY/JSON") 

279 recorded = list(arch.iter_frame_sets()) 

280 self.assertEqual(len(recorded), 1) 

281 self.assertIs(recorded[0][0], frame_set) 

282 self.assertEqual(recorded[0][1].path, "/MORE/LSST/WCS/PIXEL_TO_SKY/JSON") 

283 

284 

285@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

286class NdfOutputArchiveAddTableTestCase(unittest.TestCase): 

287 """Tests for `NdfOutputArchive.add_table` and `add_structured_array`.""" 

288 

289 def test_add_table_returns_inline_table_model(self): 

290 t = astropy.table.Table({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]}) 

291 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

292 with h5py.File(tmp.name, "w") as f: 

293 arch = NdfOutputArchive(f) 

294 model = arch.add_table(t, name="some_table") 

295 self.assertEqual(len(model.columns), 2) 

296 # v1 stores tables inline in the JSON tree. 

297 self.assertIsInstance(model.columns[0].data, InlineArrayModel) 

298 

299 def test_add_structured_array_writes_column_ndfs_with_units(self): 

300 rec = np.zeros(3, dtype=[("x", np.float64), ("y", np.int32)]) 

301 rec["x"] = [1.0, 2.0, 3.0] 

302 rec["y"] = [10, 20, 30] 

303 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

304 with h5py.File(tmp.name, "w") as f: 

305 arch = NdfOutputArchive(f) 

306 model = arch.add_structured_array( 

307 rec, 

308 name="rec", 

309 units={"x": u.m}, 

310 descriptions={"y": "the y values"}, 

311 ) 

312 self.assertEqual(len(model.columns), 2) 

313 self.assertIsInstance(model.columns[0].data, ArrayReferenceModel) 

314 # Confirm units/descriptions were applied. 

315 col_x = next(c for c in model.columns if c.name == "x") 

316 col_y = next(c for c in model.columns if c.name == "y") 

317 self.assertEqual(col_x.unit, u.m) 

318 self.assertEqual(col_y.description, "the y values") 

319 self.assertEqual(col_x.data.source, "ndf:/MORE/LSST/REC/X/DATA_ARRAY/DATA") 

320 self.assertEqual(col_y.data.source, "ndf:/MORE/LSST/REC/Y/DATA_ARRAY/DATA") 

321 with h5py.File(tmp.name, "r") as f: 

322 self.assertEqual(f["/MORE/LSST/REC/X"].attrs["CLASS"], b"NDF") 

323 np.testing.assert_array_equal(f["/MORE/LSST/REC/X/DATA_ARRAY/DATA"][()], rec["x"]) 

324 self.assertEqual(f["/MORE/LSST/REC/Y"].attrs["CLASS"], b"NDF") 

325 np.testing.assert_array_equal(f["/MORE/LSST/REC/Y/DATA_ARRAY/DATA"][()], rec["y"]) 

326 with NdfInputArchive.open(tmp.name) as archive: 

327 recovered = archive.get_structured_array(model) 

328 np.testing.assert_array_equal(recovered, rec) 

329 

330 def test_add_single_column_structured_array_uses_table_name(self): 

331 rec = np.zeros(1, dtype=[("solution", np.float64, (4,))]) 

332 rec["solution"] = [[1.0, 2.0, 3.0, 4.0]] 

333 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

334 with h5py.File(tmp.name, "w") as f: 

335 arch = NdfOutputArchive(f) 

336 model = arch.add_structured_array(rec, name="psf/piff/interp/solution") 

337 self.assertEqual(len(model.columns), 1) 

338 column = model.columns[0] 

339 self.assertIsInstance(column.data, ArrayReferenceModel) 

340 self.assertEqual( 

341 column.data.source, 

342 "ndf:/MORE/LSST/PSF/PIFF/INTERP/SOLUTION/DATA_ARRAY/DATA", 

343 ) 

344 self.assertEqual(column.data.shape, [4]) 

345 with h5py.File(tmp.name, "r") as f: 

346 self.assertIn("PSF", f["/MORE/LSST"]) 

347 self.assertIn("PIFF", f["/MORE/LSST/PSF"]) 

348 self.assertIn("INTERP", f["/MORE/LSST/PSF/PIFF"]) 

349 self.assertIn("SOLUTION", f["/MORE/LSST/PSF/PIFF/INTERP"]) 

350 np.testing.assert_array_equal( 

351 f["/MORE/LSST/PSF/PIFF/INTERP/SOLUTION/DATA_ARRAY/DATA"][()], 

352 rec["solution"], 

353 ) 

354 

355 

356@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

357class NdfWriteWcsTestCase(unittest.TestCase): 

358 """Tests for /WCS/DATA serialization in ndf.write().""" 

359 

360 def test_write_with_projection_creates_wcs_component(self): 

361 rng = np.random.default_rng(42) 

362 det_frame = DetectorFrame(instrument="TestInst", detector=4, bbox=Box.factory[1:4096, 1:4096]) 

363 bbox = Box.factory[10:14, 20:25] 

364 projection = make_random_projection(rng, det_frame, Box.factory[1:4096, 1:4096]) 

365 image = Image( 

366 np.arange(20, dtype=np.float32).reshape(4, 5), 

367 bbox=bbox, 

368 projection=projection, 

369 ) 

370 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

371 tmp.close() 

372 write(image, tmp.name) 

373 with h5py.File(tmp.name, "r") as f: 

374 self.assertIn("WCS", f) 

375 self.assertEqual(f["/WCS"].attrs["CLASS"], b"WCS") 

376 wcs_data = f["/WCS/DATA"] 

377 self.assertEqual(wcs_data.dtype, np.dtype("|S32")) 

378 records = [s.decode("ascii").rstrip(" ") for s in wcs_data[()]] 

379 self.assertTrue(all(record[0] in {" ", "+"} for record in records)) 

380 self.assertFalse(any(record.startswith("#") for record in records)) 

381 text = _hds.decode_ndf_ast_data(records) 

382 stripped = [line.lstrip() for line in text.splitlines()] 

383 self.assertTrue(any(s.startswith("Begin FrameSet") for s in stripped)) 

384 self.assertTrue(any(s.startswith("End FrameSet") for s in stripped)) 

385 self.assertIn('Domain = "GRID"', stripped) 

386 self.assertIn('Domain = "PIXEL"', stripped) 

387 self.assertIn("Sft1 = -19", stripped) 

388 self.assertIn("Sft2 = -9", stripped) 

389 

390 def test_write_without_projection_omits_wcs_component(self): 

391 # Image with no projection -> no /WCS in the file. 

392 image = Image(np.zeros((2, 2), dtype=np.float32)) 

393 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

394 tmp.close() 

395 write(image, tmp.name) 

396 with h5py.File(tmp.name, "r") as f: 

397 self.assertNotIn("WCS", f) 

398 

399 def test_mask_sub_ndf_gets_3d_wcs(self): 

400 # When an incompatible mask is hoisted to /MORE/LSST/MASK as a 

401 # sub-NDF, it should carry its own 3D /WCS. The first two axes 

402 # retain the parent image sky projection while the third axis is 

403 # a generic mask-byte coordinate. 

404 rng = np.random.default_rng(42) 

405 det_frame = DetectorFrame(instrument="TestInst", detector=4, bbox=Box.factory[1:4096, 1:4096]) 

406 bbox = Box.factory[10:14, 20:25] 

407 projection = make_random_projection(rng, det_frame, Box.factory[1:4096, 1:4096]) 

408 # 12-plane schema -> native 3D uint8 mask, hoisted to /MORE/LSST/MASK. 

409 planes = [MaskPlane(f"P{i}", f"Plane {i}") for i in range(12)] 

410 image = Image( 

411 np.arange(20, dtype=np.float32).reshape(4, 5), 

412 bbox=bbox, 

413 projection=projection, 

414 ) 

415 masked = MaskedImage(image, mask_schema=MaskSchema(planes)) 

416 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

417 tmp.close() 

418 write(masked, tmp.name) 

419 with h5py.File(tmp.name, "r") as f: 

420 # Top-level WCS is present (existing behaviour). 

421 self.assertIn("WCS", f) 

422 top_lines = [s.decode("ascii") for s in f["/WCS/DATA"][()]] 

423 self.assertIn("MASK", f["/MORE/LSST"]) 

424 self.assertIn("WCS", f["/MORE/LSST/MASK"]) 

425 self.assertEqual(f["/MORE/LSST/MASK/WCS"].attrs["CLASS"], b"WCS") 

426 mask_lines = [s.decode("ascii") for s in f["/MORE/LSST/MASK/WCS/DATA"][()]] 

427 self.assertNotEqual(top_lines, mask_lines) 

428 mask_text = _hds.decode_ndf_ast_data(mask_lines) 

429 stripped = [line.lstrip() for line in mask_text.splitlines()] 

430 self.assertIn("Naxes = 3", stripped) 

431 self.assertIn('Domain = "GRID"', stripped) 

432 self.assertIn('Domain = "PIXEL"', stripped) 

433 self.assertIn("Sft1 = -19", stripped) 

434 self.assertIn("Sft2 = -9", stripped) 

435 self.assertIn("Sft3 = 1", stripped) 

436 self.assertIn("Begin CmpFrame", stripped) 

437 self.assertIn("Begin SkyFrame", stripped) 

438 self.assertIn('Domain = "MASK"', stripped) 

439 self.assertIn("Begin CmpMap", stripped) 

440 self.assertIn("Series = 0", stripped) 

441 

442 def test_mask_sub_ndf_no_wcs_when_image_has_no_projection(self): 

443 planes = [MaskPlane(f"P{i}", f"Plane {i}") for i in range(12)] 

444 masked = MaskedImage( 

445 Image(np.zeros((4, 5), dtype=np.float32)), 

446 mask_schema=MaskSchema(planes), 

447 ) 

448 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

449 tmp.close() 

450 write(masked, tmp.name) 

451 with h5py.File(tmp.name, "r") as f: 

452 self.assertNotIn("WCS", f) 

453 self.assertIn("MASK", f["/MORE/LSST"]) 

454 self.assertNotIn("WCS", f["/MORE/LSST/MASK"]) 

455 

456 

457@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

458class NdfWriteFunctionTestCase(unittest.TestCase): 

459 """End-to-end tests for the module-level `write()` function.""" 

460 

461 def test_write_image_produces_valid_layout(self): 

462 image = Image( 

463 np.arange(20, dtype=np.float32).reshape(4, 5), 

464 bbox=Box.factory[10:14, 20:25], 

465 ) 

466 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

467 tmp.close() 

468 tree = write(image, tmp.name) 

469 self.assertIsNotNone(tree) 

470 with h5py.File(tmp.name, "r") as f: 

471 # Root is an NDF with a name. 

472 self.assertEqual(f["/"].attrs["CLASS"], b"NDF") 

473 self.assertIn("HDS_ROOT_NAME", f["/"].attrs) 

474 # DATA_ARRAY uses the complex form (DATA + ORIGIN). 

475 self.assertEqual(f["/DATA_ARRAY"].attrs["CLASS"], b"ARRAY") 

476 np.testing.assert_array_equal(f["/DATA_ARRAY/DATA"][()], image.array) 

477 origin = f["/DATA_ARRAY/ORIGIN"][()] 

478 self.assertEqual(origin.dtype, np.int64) 

479 self.assertEqual(len(origin), 2) 

480 # ORIGIN encodes bbox lower bounds in Fortran order. The exact 

481 # values depend on Box's API; just verify it isn't the 

482 # all-zeros placeholder when the bbox is non-trivial. 

483 self.assertFalse((origin == 0).all()) 

484 # Main JSON tree at /MORE/LSST/JSON. 

485 self.assertIn("MORE", f) 

486 self.assertIn("LSST", f["/MORE"]) 

487 self.assertIn("JSON", f["/MORE/LSST"]) 

488 

489 def test_write_image_preserves_opaque_fits_metadata(self): 

490 image = Image(np.zeros((2, 2), dtype=np.float32)) 

491 # Attach an opaque-metadata primary header to the image. 

492 primary = astropy.io.fits.Header() 

493 primary["FOO"] = ("bar", "test card") 

494 long_value = "x" * 100 

495 primary["LONGSTR"] = (long_value, "long string value") 

496 opaque = FitsOpaqueMetadata() 

497 opaque.add_header(primary, name="", ver=1) 

498 image._opaque_metadata = opaque 

499 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

500 tmp.close() 

501 write(image, tmp.name) 

502 with h5py.File(tmp.name, "r") as f: 

503 self.assertIn("FITS", f["/MORE"]) 

504 cards = [c.decode("ascii").rstrip(" ") for c in f["/MORE/FITS"][()]] 

505 self.assertTrue(any(c.startswith("FOO") for c in cards)) 

506 self.assertTrue(any(c.startswith("CONTINUE") for c in cards)) 

507 self.assertTrue(all(len(c.encode("ascii")) <= 80 for c in cards)) 

508 result = read(Image, tmp.name) 

509 recovered = result.deserialized._opaque_metadata.headers[ExtensionKey()] 

510 self.assertEqual(recovered["LONGSTR"], long_value) 

511 

512 def test_write_image_main_json_round_trips_back(self): 

513 # Sanity: the main JSON tree at /MORE/LSST/JSON should parse as the 

514 # in-memory ArchiveTree and contain the array reference for DATA_ARRAY. 

515 image = Image(np.arange(6, dtype=np.float32).reshape(2, 3)) 

516 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

517 tmp.close() 

518 tree = write(image, tmp.name) 

519 with h5py.File(tmp.name, "r") as f: 

520 raw = f["/MORE/LSST/JSON"][()] 

521 joined = b"".join(raw).decode("ascii").rstrip(" ") 

522 recovered = json.loads(joined) 

523 # The exact structure depends on Image's serialization model; we 

524 # just check the JSON is parseable and the ArchiveTree object the 

525 # write() function returned dumps to the same JSON. 

526 self.assertEqual(json.loads(tree.model_dump_json()), recovered) 

527 

528 def test_write_image_with_unit_creates_units_component(self): 

529 image = Image(np.arange(6, dtype=np.float32).reshape(2, 3), unit=u.ct) 

530 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

531 tmp.close() 

532 write(image, tmp.name) 

533 with h5py.File(tmp.name, "r") as f: 

534 self.assertIn("UNITS", f) 

535 self.assertEqual(f["/UNITS"].shape, ()) 

536 self.assertEqual(f["/UNITS"][()].decode("ascii").rstrip(" "), "count") 

537 result = read(Image, tmp.name) 

538 self.assertEqual(result.deserialized.unit, u.ct) 

539 

540 def test_write_propagates_metadata(self): 

541 image = Image(np.arange(6, dtype=np.float32).reshape(2, 3)) 

542 extra = {"test_key": 42, "another": "hello"} 

543 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp: 

544 tmp.close() 

545 tree = write(image, tmp.name, metadata=extra) 

546 self.assertEqual(tree.metadata["test_key"], 42) 

547 self.assertEqual(tree.metadata["another"], "hello") 

548 result = read(Image, tmp.name) 

549 self.assertEqual(result.metadata["test_key"], 42) 

550 self.assertEqual(result.metadata["another"], "hello")