Coverage for tests/test_ndf_output_archive.py: 16%
377 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 08:25 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 08:25 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import json
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.table
20import astropy.units as u
21import numpy as np
22import pydantic
24from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema
25from lsst.images._transforms import FrameLookupError, FrameSet, Transform
26from lsst.images._transforms._frames import DetectorFrame, Frame
27from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
28from lsst.images.serialization import ArrayReferenceModel, InlineArrayModel
29from lsst.images.tests import make_random_projection
31try:
32 import h5py
34 from lsst.images.ndf import (
35 NdfInputArchive,
36 NdfOutputArchive,
37 _hds,
38 read,
39 write,
40 )
42 HAVE_H5PY = True
43except ImportError:
44 HAVE_H5PY = False
47class TinyFrameSet(FrameSet):
48 """Minimal concrete frame-set for archive bookkeeping tests."""
50 def __contains__(self, frame: Frame) -> bool:
51 return False
53 def __getitem__[I: Frame, O: Frame](self, key: tuple[I, O]) -> Transform[I, O]:
54 raise FrameLookupError(key)
57class TinyTree(pydantic.BaseModel):
58 """A trivial Pydantic model used as a serialization stand-in."""
60 name: str
63@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
64class NdfOutputArchiveBasicsTestCase(unittest.TestCase):
65 """Tests for `NdfOutputArchive` constructor and `serialize_direct`."""
67 def test_serialize_direct_calls_serializer_with_nested_archive(self):
68 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
69 with h5py.File(tmp.name, "w") as f:
70 arch = NdfOutputArchive(f)
71 tree = arch.serialize_direct("top", lambda nested: TinyTree(name="hello"))
72 self.assertEqual(tree.name, "hello")
74 def test_constructor_marks_root_as_ndf(self):
75 """The constructor should set CLASS=NDF on the root group so that
76 Starlink tools recognise the file as an NDF.
77 """
78 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
79 with h5py.File(tmp.name, "w") as f:
80 NdfOutputArchive(f)
81 with h5py.File(tmp.name, "r") as f:
82 self.assertEqual(f["/"].attrs["CLASS"], b"NDF")
85@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
86class NdfOutputArchiveAddArrayTestCase(unittest.TestCase):
87 """Tests for `NdfOutputArchive.add_array` routing."""
89 def test_top_level_image_routes_to_data_array(self):
90 data = np.arange(20, dtype=np.float32).reshape(4, 5)
91 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
92 with h5py.File(tmp.name, "w") as f:
93 arch = NdfOutputArchive(f)
94 ref = arch.add_array(data, name="image")
95 self.assertEqual(ref.source, "ndf:/DATA_ARRAY/DATA")
96 with h5py.File(tmp.name, "r") as f:
97 ds = f["/DATA_ARRAY/DATA"]
98 self.assertEqual(ds.dtype, np.float32)
99 np.testing.assert_array_equal(ds[()], data)
100 self.assertEqual(f["/DATA_ARRAY"].attrs["CLASS"], b"ARRAY")
101 origin = f["/DATA_ARRAY/ORIGIN"]
102 self.assertEqual(origin.dtype, np.int64)
103 self.assertEqual(origin.shape, (2,))
105 def test_top_level_variance_routes_to_variance(self):
106 data = np.full((3, 3), 0.5, dtype=np.float64)
107 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
108 with h5py.File(tmp.name, "w") as f:
109 arch = NdfOutputArchive(f)
110 ref = arch.add_array(data, name="variance")
111 self.assertEqual(ref.source, "ndf:/VARIANCE/DATA")
112 with h5py.File(tmp.name, "r") as f:
113 self.assertEqual(f["/VARIANCE"].attrs["CLASS"], b"ARRAY")
114 self.assertEqual(f["/VARIANCE/DATA"].dtype, np.float64)
116 def test_top_level_compatible_mask_routes_to_quality(self):
117 data = np.array([[0, 1, 2], [3, 4, 5]], dtype=np.uint8)
118 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
119 with h5py.File(tmp.name, "w") as f:
120 arch = NdfOutputArchive(f)
121 ref = arch.add_array(data, name="mask")
122 self.assertEqual(ref.source, "ndf:/QUALITY/QUALITY/DATA")
123 with h5py.File(tmp.name, "r") as f:
124 self.assertEqual(f["/QUALITY"].attrs["CLASS"], b"QUALITY")
125 self.assertEqual(f["/QUALITY/QUALITY"].attrs["CLASS"], b"ARRAY")
126 self.assertEqual(f["/QUALITY/QUALITY/DATA"].dtype, np.uint8)
127 np.testing.assert_array_equal(f["/QUALITY/QUALITY/DATA"][()], data)
128 self.assertEqual(f["/QUALITY/QUALITY/ORIGIN"].dtype, np.int32)
129 self.assertEqual(f["/QUALITY/QUALITY/ORIGIN"].shape, (2,))
130 self.assertEqual(f["/QUALITY/QUALITY/BAD_PIXEL"].id.get_type().get_class(), h5py.h5t.BITFIELD)
131 self.assertFalse(_hds.read_array(f["/QUALITY/QUALITY/BAD_PIXEL"]))
132 self.assertEqual(f["/QUALITY/BADBITS"][()], 255)
134 def test_top_level_incompatible_mask_routes_to_more_lsst(self):
135 # 3D mask array in NDF storage order (mask-byte, y, x) is hoisted
136 # as a sub-NDF inside /MORE/LSST/MASK, with a compressed 2D view
137 # exposed as /QUALITY/QUALITY for standard NDF applications.
138 data = np.zeros((2, 3, 4), dtype=np.uint8)
139 data[0, 1, 2] = 4
140 data[1, 2, 3] = 8
141 expected_quality = np.any(data != 0, axis=0).astype(np.uint8)
142 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
143 with h5py.File(tmp.name, "w") as f:
144 arch = NdfOutputArchive(f)
145 ref = arch.add_array(data, name="mask")
146 self.assertEqual(ref.source, "ndf:/MORE/LSST/MASK/DATA_ARRAY/DATA")
147 with h5py.File(tmp.name, "r") as f:
148 # /MORE/LSST/MASK is a real NDF: top-level CLASS="NDF"
149 # containing a DATA_ARRAY structure with DATA + ORIGIN.
150 self.assertEqual(f["/MORE/LSST/MASK"].attrs["CLASS"], b"NDF")
151 self.assertEqual(f["/MORE/LSST/MASK/DATA_ARRAY"].attrs["CLASS"], b"ARRAY")
152 self.assertEqual(f["/MORE/LSST/MASK/DATA_ARRAY/DATA"].shape, data.shape)
153 self.assertEqual(f["/QUALITY/QUALITY"].attrs["CLASS"], b"ARRAY")
154 np.testing.assert_array_equal(f["/QUALITY/QUALITY/DATA"][()], expected_quality)
155 self.assertEqual(f["/QUALITY/BADBITS"][()], 255)
156 origin = f["/MORE/LSST/MASK/DATA_ARRAY/ORIGIN"]
157 self.assertEqual(origin.dtype, np.int64)
158 self.assertEqual(origin.shape, (3,))
160 def test_nested_array_hoists_as_sub_ndf(self):
161 # Hoisted numeric arrays land under /MORE/LSST as hierarchical
162 # sub-NDFs (CLASS="NDF" with DATA_ARRAY/DATA + ORIGIN inside) so
163 # Starlink tools can inspect them as ordinary NDFs while each HDS
164 # component stays short.
165 data = np.array([[1.0, 2.0]], dtype=np.float32)
166 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
167 with h5py.File(tmp.name, "w") as f:
168 arch = NdfOutputArchive(f)
169 ref = arch.add_array(data, name="psf/coefficients")
170 self.assertEqual(ref.source, "ndf:/MORE/LSST/PSF/COEFFICIENTS/DATA_ARRAY/DATA")
171 with h5py.File(tmp.name, "r") as f:
172 self.assertIn("MORE", f)
173 self.assertIn("LSST", f["/MORE"])
174 self.assertIn("PSF", f["/MORE/LSST"])
175 self.assertIn("COEFFICIENTS", f["/MORE/LSST/PSF"])
176 sub = f["/MORE/LSST/PSF/COEFFICIENTS"]
177 self.assertEqual(sub.attrs["CLASS"], b"NDF")
178 self.assertEqual(sub["DATA_ARRAY"].attrs["CLASS"], b"ARRAY")
179 np.testing.assert_array_equal(sub["DATA_ARRAY/DATA"][()], data)
180 origin = sub["DATA_ARRAY/ORIGIN"]
181 self.assertEqual(origin.dtype, np.int64)
182 self.assertEqual(origin.shape, (data.ndim,))
185@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
186class NdfOutputArchivePointerTestCase(unittest.TestCase):
187 """Tests for `NdfOutputArchive.serialize_pointer` and
188 `serialize_frame_set`.
189 """
191 def test_serialize_pointer_writes_subtree_and_returns_pointer(self):
192 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
193 with h5py.File(tmp.name, "w") as f:
194 arch = NdfOutputArchive(f)
195 ptr = arch.serialize_pointer(
196 "psf",
197 lambda nested: TinyTree(name="gaussian"),
198 key=("psf", 1),
199 )
200 self.assertEqual(ptr.path, "/MORE/LSST/PSF/JSON")
201 with h5py.File(tmp.name, "r") as f:
202 # The hoisted sub-tree is stored as a "JSON" _CHAR*N
203 # child of the target structure.
204 raw = f["/MORE/LSST/PSF/JSON"][()]
205 joined = b"".join(raw).decode("ascii").rstrip(" ")
206 self.assertIn('"name":"gaussian"', joined.replace(" ", ""))
208 def test_serialize_pointer_caches_by_key(self):
209 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
210 with h5py.File(tmp.name, "w") as f:
211 arch = NdfOutputArchive(f)
212 ptr1 = arch.serialize_pointer(
213 "psf",
214 lambda nested: TinyTree(name="first"),
215 key=("psf", 1),
216 )
217 # Same key -> returns cached pointer; serializer not re-run
218 # (we'd otherwise overwrite the file content with "second").
219 ptr2 = arch.serialize_pointer(
220 "psf",
221 lambda nested: TinyTree(name="second"),
222 key=("psf", 1),
223 )
224 self.assertEqual(ptr1, ptr2)
225 with h5py.File(tmp.name, "r") as f:
226 raw = f["/MORE/LSST/PSF/JSON"][()]
227 joined = b"".join(raw).decode("ascii").rstrip(" ")
228 self.assertIn("first", joined)
229 self.assertNotIn("second", joined)
231 def test_serialize_pointer_preserves_nested_arrays(self):
232 # Regression test: a pointer target that writes a nested array via
233 # the nested archive must round-trip with that array still in the
234 # file. Previously the pointer JSON was written at the target path
235 # itself, clobbering any nested data the serializer produced.
236 class TreeWithArray(pydantic.BaseModel):
237 name: str
238 data: ArrayReferenceModel
240 payload = np.arange(6, dtype=np.float32).reshape(2, 3)
241 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
242 with h5py.File(tmp.name, "w") as f:
243 arch = NdfOutputArchive(f)
244 ptr = arch.serialize_pointer(
245 "psf",
246 lambda nested: TreeWithArray(
247 name="gaussian",
248 data=nested.add_array(payload, name="parameters"),
249 ),
250 key=("psf", 1),
251 )
252 self.assertEqual(ptr.path, "/MORE/LSST/PSF/JSON")
253 with h5py.File(tmp.name, "r") as f:
254 # JSON stayed at <path>/JSON; the nested array is still
255 # accessible at the sub-NDF path the serializer wrote.
256 self.assertIn("/MORE/LSST/PSF/JSON", f)
257 self.assertIn("/MORE/LSST/PSF/PARAMETERS/DATA_ARRAY/DATA", f)
258 np.testing.assert_array_equal(f["/MORE/LSST/PSF/PARAMETERS/DATA_ARRAY/DATA"][()], payload)
259 # The pointer-target structure is typed after its leaf
260 # name, not the generic EXT.
261 self.assertEqual(f["/MORE/LSST/PSF"].attrs["CLASS"], b"PSF")
262 self.assertEqual(f["/MORE/LSST"].attrs["CLASS"], b"LSST")
264 def test_serialize_frame_set_records_for_iter(self):
265 # serialize_frame_set is delegated to serialize_pointer plus
266 # recording the (FrameSet, pointer) pair for iter_frame_sets,
267 # mirroring how FITS and JSON archives behave.
268 frame_set = TinyFrameSet()
269 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
270 with h5py.File(tmp.name, "w") as f:
271 arch = NdfOutputArchive(f)
272 ptr = arch.serialize_frame_set(
273 "wcs/pixel_to_sky",
274 frame_set,
275 lambda nested: TinyTree(name="proj"),
276 key=("frame_set", 1),
277 )
278 self.assertEqual(ptr.path, "/MORE/LSST/WCS/PIXEL_TO_SKY/JSON")
279 recorded = list(arch.iter_frame_sets())
280 self.assertEqual(len(recorded), 1)
281 self.assertIs(recorded[0][0], frame_set)
282 self.assertEqual(recorded[0][1].path, "/MORE/LSST/WCS/PIXEL_TO_SKY/JSON")
285@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
286class NdfOutputArchiveAddTableTestCase(unittest.TestCase):
287 """Tests for `NdfOutputArchive.add_table` and `add_structured_array`."""
289 def test_add_table_returns_inline_table_model(self):
290 t = astropy.table.Table({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})
291 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
292 with h5py.File(tmp.name, "w") as f:
293 arch = NdfOutputArchive(f)
294 model = arch.add_table(t, name="some_table")
295 self.assertEqual(len(model.columns), 2)
296 # v1 stores tables inline in the JSON tree.
297 self.assertIsInstance(model.columns[0].data, InlineArrayModel)
299 def test_add_structured_array_writes_column_ndfs_with_units(self):
300 rec = np.zeros(3, dtype=[("x", np.float64), ("y", np.int32)])
301 rec["x"] = [1.0, 2.0, 3.0]
302 rec["y"] = [10, 20, 30]
303 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
304 with h5py.File(tmp.name, "w") as f:
305 arch = NdfOutputArchive(f)
306 model = arch.add_structured_array(
307 rec,
308 name="rec",
309 units={"x": u.m},
310 descriptions={"y": "the y values"},
311 )
312 self.assertEqual(len(model.columns), 2)
313 self.assertIsInstance(model.columns[0].data, ArrayReferenceModel)
314 # Confirm units/descriptions were applied.
315 col_x = next(c for c in model.columns if c.name == "x")
316 col_y = next(c for c in model.columns if c.name == "y")
317 self.assertEqual(col_x.unit, u.m)
318 self.assertEqual(col_y.description, "the y values")
319 self.assertEqual(col_x.data.source, "ndf:/MORE/LSST/REC/X/DATA_ARRAY/DATA")
320 self.assertEqual(col_y.data.source, "ndf:/MORE/LSST/REC/Y/DATA_ARRAY/DATA")
321 with h5py.File(tmp.name, "r") as f:
322 self.assertEqual(f["/MORE/LSST/REC/X"].attrs["CLASS"], b"NDF")
323 np.testing.assert_array_equal(f["/MORE/LSST/REC/X/DATA_ARRAY/DATA"][()], rec["x"])
324 self.assertEqual(f["/MORE/LSST/REC/Y"].attrs["CLASS"], b"NDF")
325 np.testing.assert_array_equal(f["/MORE/LSST/REC/Y/DATA_ARRAY/DATA"][()], rec["y"])
326 with NdfInputArchive.open(tmp.name) as archive:
327 recovered = archive.get_structured_array(model)
328 np.testing.assert_array_equal(recovered, rec)
330 def test_add_single_column_structured_array_uses_table_name(self):
331 rec = np.zeros(1, dtype=[("solution", np.float64, (4,))])
332 rec["solution"] = [[1.0, 2.0, 3.0, 4.0]]
333 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
334 with h5py.File(tmp.name, "w") as f:
335 arch = NdfOutputArchive(f)
336 model = arch.add_structured_array(rec, name="psf/piff/interp/solution")
337 self.assertEqual(len(model.columns), 1)
338 column = model.columns[0]
339 self.assertIsInstance(column.data, ArrayReferenceModel)
340 self.assertEqual(
341 column.data.source,
342 "ndf:/MORE/LSST/PSF/PIFF/INTERP/SOLUTION/DATA_ARRAY/DATA",
343 )
344 self.assertEqual(column.data.shape, [4])
345 with h5py.File(tmp.name, "r") as f:
346 self.assertIn("PSF", f["/MORE/LSST"])
347 self.assertIn("PIFF", f["/MORE/LSST/PSF"])
348 self.assertIn("INTERP", f["/MORE/LSST/PSF/PIFF"])
349 self.assertIn("SOLUTION", f["/MORE/LSST/PSF/PIFF/INTERP"])
350 np.testing.assert_array_equal(
351 f["/MORE/LSST/PSF/PIFF/INTERP/SOLUTION/DATA_ARRAY/DATA"][()],
352 rec["solution"],
353 )
356@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
357class NdfWriteWcsTestCase(unittest.TestCase):
358 """Tests for /WCS/DATA serialization in ndf.write()."""
360 def test_write_with_projection_creates_wcs_component(self):
361 rng = np.random.default_rng(42)
362 det_frame = DetectorFrame(instrument="TestInst", detector=4, bbox=Box.factory[1:4096, 1:4096])
363 bbox = Box.factory[10:14, 20:25]
364 projection = make_random_projection(rng, det_frame, Box.factory[1:4096, 1:4096])
365 image = Image(
366 np.arange(20, dtype=np.float32).reshape(4, 5),
367 bbox=bbox,
368 projection=projection,
369 )
370 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
371 tmp.close()
372 write(image, tmp.name)
373 with h5py.File(tmp.name, "r") as f:
374 self.assertIn("WCS", f)
375 self.assertEqual(f["/WCS"].attrs["CLASS"], b"WCS")
376 wcs_data = f["/WCS/DATA"]
377 self.assertEqual(wcs_data.dtype, np.dtype("|S32"))
378 records = [s.decode("ascii").rstrip(" ") for s in wcs_data[()]]
379 self.assertTrue(all(record[0] in {" ", "+"} for record in records))
380 self.assertFalse(any(record.startswith("#") for record in records))
381 text = _hds.decode_ndf_ast_data(records)
382 stripped = [line.lstrip() for line in text.splitlines()]
383 self.assertTrue(any(s.startswith("Begin FrameSet") for s in stripped))
384 self.assertTrue(any(s.startswith("End FrameSet") for s in stripped))
385 self.assertIn('Domain = "GRID"', stripped)
386 self.assertIn('Domain = "PIXEL"', stripped)
387 self.assertIn("Sft1 = -19", stripped)
388 self.assertIn("Sft2 = -9", stripped)
390 def test_write_without_projection_omits_wcs_component(self):
391 # Image with no projection -> no /WCS in the file.
392 image = Image(np.zeros((2, 2), dtype=np.float32))
393 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
394 tmp.close()
395 write(image, tmp.name)
396 with h5py.File(tmp.name, "r") as f:
397 self.assertNotIn("WCS", f)
399 def test_mask_sub_ndf_gets_3d_wcs(self):
400 # When an incompatible mask is hoisted to /MORE/LSST/MASK as a
401 # sub-NDF, it should carry its own 3D /WCS. The first two axes
402 # retain the parent image sky projection while the third axis is
403 # a generic mask-byte coordinate.
404 rng = np.random.default_rng(42)
405 det_frame = DetectorFrame(instrument="TestInst", detector=4, bbox=Box.factory[1:4096, 1:4096])
406 bbox = Box.factory[10:14, 20:25]
407 projection = make_random_projection(rng, det_frame, Box.factory[1:4096, 1:4096])
408 # 12-plane schema -> native 3D uint8 mask, hoisted to /MORE/LSST/MASK.
409 planes = [MaskPlane(f"P{i}", f"Plane {i}") for i in range(12)]
410 image = Image(
411 np.arange(20, dtype=np.float32).reshape(4, 5),
412 bbox=bbox,
413 projection=projection,
414 )
415 masked = MaskedImage(image, mask_schema=MaskSchema(planes))
416 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
417 tmp.close()
418 write(masked, tmp.name)
419 with h5py.File(tmp.name, "r") as f:
420 # Top-level WCS is present (existing behaviour).
421 self.assertIn("WCS", f)
422 top_lines = [s.decode("ascii") for s in f["/WCS/DATA"][()]]
423 self.assertIn("MASK", f["/MORE/LSST"])
424 self.assertIn("WCS", f["/MORE/LSST/MASK"])
425 self.assertEqual(f["/MORE/LSST/MASK/WCS"].attrs["CLASS"], b"WCS")
426 mask_lines = [s.decode("ascii") for s in f["/MORE/LSST/MASK/WCS/DATA"][()]]
427 self.assertNotEqual(top_lines, mask_lines)
428 mask_text = _hds.decode_ndf_ast_data(mask_lines)
429 stripped = [line.lstrip() for line in mask_text.splitlines()]
430 self.assertIn("Naxes = 3", stripped)
431 self.assertIn('Domain = "GRID"', stripped)
432 self.assertIn('Domain = "PIXEL"', stripped)
433 self.assertIn("Sft1 = -19", stripped)
434 self.assertIn("Sft2 = -9", stripped)
435 self.assertIn("Sft3 = 1", stripped)
436 self.assertIn("Begin CmpFrame", stripped)
437 self.assertIn("Begin SkyFrame", stripped)
438 self.assertIn('Domain = "MASK"', stripped)
439 self.assertIn("Begin CmpMap", stripped)
440 self.assertIn("Series = 0", stripped)
442 def test_mask_sub_ndf_no_wcs_when_image_has_no_projection(self):
443 planes = [MaskPlane(f"P{i}", f"Plane {i}") for i in range(12)]
444 masked = MaskedImage(
445 Image(np.zeros((4, 5), dtype=np.float32)),
446 mask_schema=MaskSchema(planes),
447 )
448 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
449 tmp.close()
450 write(masked, tmp.name)
451 with h5py.File(tmp.name, "r") as f:
452 self.assertNotIn("WCS", f)
453 self.assertIn("MASK", f["/MORE/LSST"])
454 self.assertNotIn("WCS", f["/MORE/LSST/MASK"])
457@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
458class NdfWriteFunctionTestCase(unittest.TestCase):
459 """End-to-end tests for the module-level `write()` function."""
461 def test_write_image_produces_valid_layout(self):
462 image = Image(
463 np.arange(20, dtype=np.float32).reshape(4, 5),
464 bbox=Box.factory[10:14, 20:25],
465 )
466 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
467 tmp.close()
468 tree = write(image, tmp.name)
469 self.assertIsNotNone(tree)
470 with h5py.File(tmp.name, "r") as f:
471 # Root is an NDF with a name.
472 self.assertEqual(f["/"].attrs["CLASS"], b"NDF")
473 self.assertIn("HDS_ROOT_NAME", f["/"].attrs)
474 # DATA_ARRAY uses the complex form (DATA + ORIGIN).
475 self.assertEqual(f["/DATA_ARRAY"].attrs["CLASS"], b"ARRAY")
476 np.testing.assert_array_equal(f["/DATA_ARRAY/DATA"][()], image.array)
477 origin = f["/DATA_ARRAY/ORIGIN"][()]
478 self.assertEqual(origin.dtype, np.int64)
479 self.assertEqual(len(origin), 2)
480 # ORIGIN encodes bbox lower bounds in Fortran order. The exact
481 # values depend on Box's API; just verify it isn't the
482 # all-zeros placeholder when the bbox is non-trivial.
483 self.assertFalse((origin == 0).all())
484 # Main JSON tree at /MORE/LSST/JSON.
485 self.assertIn("MORE", f)
486 self.assertIn("LSST", f["/MORE"])
487 self.assertIn("JSON", f["/MORE/LSST"])
489 def test_write_image_preserves_opaque_fits_metadata(self):
490 image = Image(np.zeros((2, 2), dtype=np.float32))
491 # Attach an opaque-metadata primary header to the image.
492 primary = astropy.io.fits.Header()
493 primary["FOO"] = ("bar", "test card")
494 long_value = "x" * 100
495 primary["LONGSTR"] = (long_value, "long string value")
496 opaque = FitsOpaqueMetadata()
497 opaque.add_header(primary, name="", ver=1)
498 image._opaque_metadata = opaque
499 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
500 tmp.close()
501 write(image, tmp.name)
502 with h5py.File(tmp.name, "r") as f:
503 self.assertIn("FITS", f["/MORE"])
504 cards = [c.decode("ascii").rstrip(" ") for c in f["/MORE/FITS"][()]]
505 self.assertTrue(any(c.startswith("FOO") for c in cards))
506 self.assertTrue(any(c.startswith("CONTINUE") for c in cards))
507 self.assertTrue(all(len(c.encode("ascii")) <= 80 for c in cards))
508 result = read(Image, tmp.name)
509 recovered = result.deserialized._opaque_metadata.headers[ExtensionKey()]
510 self.assertEqual(recovered["LONGSTR"], long_value)
512 def test_write_image_main_json_round_trips_back(self):
513 # Sanity: the main JSON tree at /MORE/LSST/JSON should parse as the
514 # in-memory ArchiveTree and contain the array reference for DATA_ARRAY.
515 image = Image(np.arange(6, dtype=np.float32).reshape(2, 3))
516 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
517 tmp.close()
518 tree = write(image, tmp.name)
519 with h5py.File(tmp.name, "r") as f:
520 raw = f["/MORE/LSST/JSON"][()]
521 joined = b"".join(raw).decode("ascii").rstrip(" ")
522 recovered = json.loads(joined)
523 # The exact structure depends on Image's serialization model; we
524 # just check the JSON is parseable and the ArchiveTree object the
525 # write() function returned dumps to the same JSON.
526 self.assertEqual(json.loads(tree.model_dump_json()), recovered)
528 def test_write_image_with_unit_creates_units_component(self):
529 image = Image(np.arange(6, dtype=np.float32).reshape(2, 3), unit=u.ct)
530 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
531 tmp.close()
532 write(image, tmp.name)
533 with h5py.File(tmp.name, "r") as f:
534 self.assertIn("UNITS", f)
535 self.assertEqual(f["/UNITS"].shape, ())
536 self.assertEqual(f["/UNITS"][()].decode("ascii").rstrip(" "), "count")
537 result = read(Image, tmp.name)
538 self.assertEqual(result.deserialized.unit, u.ct)
540 def test_write_propagates_metadata(self):
541 image = Image(np.arange(6, dtype=np.float32).reshape(2, 3))
542 extra = {"test_key": 42, "another": "hello"}
543 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
544 tmp.close()
545 tree = write(image, tmp.name, metadata=extra)
546 self.assertEqual(tree.metadata["test_key"], 42)
547 self.assertEqual(tree.metadata["another"], "hello")
548 result = read(Image, tmp.name)
549 self.assertEqual(result.metadata["test_key"], 42)
550 self.assertEqual(result.metadata["another"], "hello")