Coverage for tests / test_ndf_input_archive.py: 16%
286 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 07:54 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 07:54 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.units as u
20import numpy as np
21import pydantic
23from lsst.images import Box, Image, ImageSerializationModel, Mask, MaskedImage
24from lsst.images._transforms import FrameSet
25from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
26from lsst.images.serialization import (
27 ArchiveReadError,
28 ArrayReferenceModel,
29 InlineArrayModel,
30 NumberType,
31)
33try:
34 import h5py
36 from lsst.images.ndf import (
37 NdfInputArchive,
38 NdfOutputArchive,
39 NdfPointerModel,
40 _hds,
41 read,
42 write,
43 )
45 HAVE_H5PY = True
46except ImportError:
47 HAVE_H5PY = False
50@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
51class NdfInputArchiveOpenTestCase(unittest.TestCase):
52 """Tests for `NdfInputArchive.open` and `get_tree`."""
54 def test_open_round_trips_image_tree(self):
55 image = Image(
56 np.arange(20, dtype=np.float32).reshape(4, 5),
57 bbox=Box.factory[10:14, 20:25],
58 )
59 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
60 tmp.close()
61 written_tree = write(image, tmp.name)
62 with NdfInputArchive.open(tmp.name) as archive:
63 tree = archive.get_tree(type(written_tree))
64 self.assertEqual(tree.model_dump_json(), written_tree.model_dump_json())
66 def test_get_tree_raises_when_main_json_missing(self):
67 # A file with no /MORE/LSST/JSON should raise ArchiveReadError.
68 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
69 tmp.close()
70 with h5py.File(tmp.name, "w") as f:
71 f["/"].attrs["CLASS"] = "NDF"
72 with NdfInputArchive.open(tmp.name) as archive:
73 model_type = ImageSerializationModel[NdfPointerModel]
74 with self.assertRaises(ArchiveReadError):
75 archive.get_tree(model_type)
78@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
79class NdfInputArchiveDataTestCase(unittest.TestCase):
80 """Tests for `get_array`, `deserialize_pointer`, and `get_frame_set`."""
82 def test_get_array_reads_image_array(self):
83 image = Image(np.arange(20, dtype=np.float32).reshape(4, 5))
84 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
85 tmp.close()
86 tree = write(image, tmp.name)
87 with NdfInputArchive.open(tmp.name) as archive:
88 # The Image tree's `data` attribute is an
89 # ArrayReferenceModel pointing at /DATA_ARRAY/DATA.
90 arr = archive.get_array(tree.data)
91 np.testing.assert_array_equal(arr, image.array)
93 def test_get_array_supports_slicing(self):
94 image = Image(np.arange(20, dtype=np.float32).reshape(4, 5))
95 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
96 tmp.close()
97 tree = write(image, tmp.name)
98 with NdfInputArchive.open(tmp.name) as archive:
99 arr = archive.get_array(tree.data, slices=(slice(0, 2), slice(1, 4)))
100 np.testing.assert_array_equal(arr, image.array[:2, 1:4])
102 def test_get_array_handles_inline_array(self):
103 inline = InlineArrayModel(data=[1.0, 2.0, 3.0], datatype=NumberType.float64)
104 image = Image(np.zeros((2, 2), dtype=np.float32))
105 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
106 tmp.close()
107 write(image, tmp.name)
108 with NdfInputArchive.open(tmp.name) as archive:
109 arr = archive.get_array(inline)
110 np.testing.assert_array_equal(arr, np.array([1.0, 2.0, 3.0]))
112 def test_get_array_unrecognised_source_raises(self):
113 image = Image(np.zeros((2, 2), dtype=np.float32))
114 bogus = ArrayReferenceModel(source="fits:NOTUS", shape=[2, 2], datatype=NumberType.float32)
115 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
116 tmp.close()
117 write(image, tmp.name)
118 with NdfInputArchive.open(tmp.name) as archive:
119 with self.assertRaises(ArchiveReadError):
120 archive.get_array(bogus)
122 def test_deserialize_pointer_round_trips_subtree(self):
123 # Build a file with a hoisted sub-tree we can read back. Use the
124 # output archive directly to avoid pulling in the full Image stack.
125 class TinyTree(pydantic.BaseModel):
126 name: str
128 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
129 tmp.close()
130 with h5py.File(tmp.name, "w") as f:
131 arch = NdfOutputArchive(f)
132 ptr = arch.serialize_pointer("psf", lambda nested: TinyTree(name="hello"), key=("psf", 1))
133 with NdfInputArchive.open(tmp.name) as archive:
134 # Deserializer just returns the model unchanged.
135 result = archive.deserialize_pointer(ptr, TinyTree, lambda m, _a: m)
136 self.assertEqual(result.name, "hello")
138 def test_deserialize_pointer_caches_by_ref(self):
139 class TinyTree(pydantic.BaseModel):
140 name: str
142 calls = []
144 def deserializer(model, _archive):
145 calls.append(model)
146 return model
148 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
149 tmp.close()
150 with h5py.File(tmp.name, "w") as f:
151 arch = NdfOutputArchive(f)
152 ptr = arch.serialize_pointer("psf", lambda nested: TinyTree(name="x"), key=("psf", 1))
153 with NdfInputArchive.open(tmp.name) as archive:
154 first = archive.deserialize_pointer(ptr, TinyTree, deserializer)
155 second = archive.deserialize_pointer(ptr, TinyTree, deserializer)
156 self.assertIs(first, second)
157 self.assertEqual(len(calls), 1)
159 def test_deserialize_pointer_caches_frame_set_for_get_frame_set(self):
160 class TinyTree(pydantic.BaseModel):
161 name: str
163 class DummyFrameSet(FrameSet):
164 def __contains__(self, frame):
165 return False
167 def __getitem__(self, key):
168 raise AssertionError("DummyFrameSet should not be indexed in this test.")
170 sentinel = DummyFrameSet()
172 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
173 tmp.close()
174 with h5py.File(tmp.name, "w") as f:
175 arch = NdfOutputArchive(f)
176 ptr = arch.serialize_frame_set(
177 "frames",
178 sentinel,
179 lambda nested: TinyTree(name="frames"),
180 key=("frames", 1),
181 )
182 with NdfInputArchive.open(tmp.name) as archive:
183 result = archive.deserialize_pointer(ptr, TinyTree, lambda _m, _a: sentinel)
184 self.assertIs(result, sentinel)
185 self.assertIs(archive.get_frame_set(ptr), sentinel)
187 def test_get_frame_set_returns_cached_value(self):
188 # Exercise the cache mechanism with a sentinel object pretending
189 # to be a FrameSet. Real FrameSet plumbing comes when the AST
190 # text dump for /WCS/DATA lands in a follow-up task.
191 sentinel = object()
192 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
193 tmp.close()
194 write(Image(np.zeros((2, 2), dtype=np.float32)), tmp.name)
195 with NdfInputArchive.open(tmp.name) as archive:
196 # Manually populate the cache as deserialize_pointer would
197 # if a FrameSet deserializer ran.
198 archive._frame_set_cache["/MORE/LSST/PIXEL_TO_SKY"] = sentinel
199 pointer = NdfPointerModel(path="/MORE/LSST/PIXEL_TO_SKY")
200 self.assertIs(archive.get_frame_set(pointer), sentinel)
202 def test_get_frame_set_raises_if_not_cached(self):
203 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
204 tmp.close()
205 write(Image(np.zeros((2, 2), dtype=np.float32)), tmp.name)
206 with NdfInputArchive.open(tmp.name) as archive:
207 pointer = NdfPointerModel(path="/MORE/LSST/UNKNOWN")
208 with self.assertRaises(AssertionError):
209 archive.get_frame_set(pointer)
212@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
213class NdfInputArchiveOpaqueMetadataTestCase(unittest.TestCase):
214 """Tests for `NdfInputArchive.get_opaque_metadata`."""
216 def test_more_fits_round_trips_via_opaque_metadata(self):
217 image = Image(np.zeros((2, 2), dtype=np.float32))
218 primary = astropy.io.fits.Header()
219 primary["FOO"] = ("bar", "test card")
220 opaque = FitsOpaqueMetadata()
221 opaque.add_header(primary, name="", ver=1)
222 image._opaque_metadata = opaque
223 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
224 tmp.close()
225 write(image, tmp.name)
226 with NdfInputArchive.open(tmp.name) as archive:
227 recovered = archive.get_opaque_metadata()
228 self.assertIn(ExtensionKey(), recovered.headers)
229 self.assertEqual(recovered.headers[ExtensionKey()]["FOO"], "bar")
231 def test_get_opaque_metadata_empty_when_no_more_fits(self):
232 # Image with no opaque metadata -> /MORE/FITS is absent in the file.
233 image = Image(np.zeros((2, 2), dtype=np.float32))
234 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
235 tmp.close()
236 write(image, tmp.name)
237 with NdfInputArchive.open(tmp.name) as archive:
238 recovered = archive.get_opaque_metadata()
239 self.assertIsInstance(recovered, FitsOpaqueMetadata)
240 # No primary header should be populated since /MORE/FITS
241 # was never written.
242 self.assertFalse(recovered.headers)
245@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
246class NdfReadFunctionTestCase(unittest.TestCase):
247 """Tests for the module-level `ndf.read()` function."""
249 def test_read_round_trips_image(self):
250 image = Image(
251 np.arange(20, dtype=np.float32).reshape(4, 5),
252 bbox=Box.factory[10:14, 20:25],
253 )
254 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
255 tmp.close()
256 write(image, tmp.name)
257 result = read(Image, tmp.name)
258 self.assertIsInstance(result.deserialized, Image)
259 np.testing.assert_array_equal(result.deserialized.array, image.array)
260 self.assertEqual(result.deserialized.bbox, image.bbox)
262 def test_read_starlink_file_auto_detects_image(self):
263 # The canonical fixture has no /MORE/LSST/JSON, no QUALITY,
264 # no VARIANCE -- auto-detect should return an Image whose array
265 # shape matches the file (611x609 int16).
266 example_path = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf")
267 result = read(Image, example_path)
268 self.assertIsInstance(result.deserialized, Image)
269 self.assertEqual(result.deserialized.array.shape, (611, 609))
270 self.assertEqual(result.deserialized.array.dtype, np.int16)
271 self.assertIsNotNone(result.deserialized.projection)
273 def test_read_starlink_file_recovers_opaque_fits_metadata(self):
274 example_path = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf")
275 result = read(Image, example_path)
276 opaque = result.deserialized._opaque_metadata
277 self.assertIn(ExtensionKey(), opaque.headers)
278 # The fixture is a real Starlink M57 image; sample one card we know
279 # is present (NAXIS).
280 primary = opaque.headers[ExtensionKey()]
281 self.assertIn("NAXIS", primary)
283 def test_read_auto_detects_nested_quality_array(self):
284 image_array = np.arange(6, dtype=np.float32).reshape(2, 3)
285 quality_array = np.array([[0, 1, 0], [1, 0, 1]], dtype=np.uint8)
287 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
288 tmp.close()
289 with h5py.File(tmp.name, "w") as f:
290 _hds.set_root_name(f, "TEST", "NDF")
291 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY")
292 _hds.write_array(data_array, "DATA", image_array)
293 quality = _hds.create_structure(f, "QUALITY", "QUALITY")
294 quality_array_struct = _hds.create_structure(quality, "QUALITY", "ARRAY")
295 _hds.write_array(quality_array_struct, "DATA", quality_array)
296 _hds.write_array(quality_array_struct, "ORIGIN", np.array([0, 0], dtype=np.int32))
297 _hds.write_array(quality_array_struct, "BAD_PIXEL", np.array(False, dtype=np.bool_))
298 _hds.write_array(quality, "BADBITS", np.array(1, dtype=np.uint8))
299 result = read(MaskedImage, tmp.name)
300 self.assertIsInstance(result.deserialized, MaskedImage)
301 np.testing.assert_array_equal(result.deserialized.mask.array[:, :, 0], quality_array)
302 self.assertEqual(set(result.deserialized.mask.schema.names), {f"MASK{i}" for i in range(8)})
303 image_result = read(Image, tmp.name)
304 self.assertIsInstance(image_result.deserialized, Image)
305 np.testing.assert_array_equal(image_result.deserialized.array, image_array)
307 def test_read_auto_detect_preserves_quality_bits(self):
308 image_array = np.arange(6, dtype=np.float32).reshape(2, 3)
309 quality_array = np.array([[0, 2, 4], [2, 0, 6]], dtype=np.uint8)
310 expected_mask1 = np.array([[0, 1, 0], [1, 0, 1]], dtype=bool)
311 expected_mask2 = np.array([[0, 0, 1], [0, 0, 1]], dtype=bool)
313 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
314 tmp.close()
315 with h5py.File(tmp.name, "w") as f:
316 _hds.set_root_name(f, "TEST", "NDF")
317 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY")
318 _hds.write_array(data_array, "DATA", image_array)
319 quality = _hds.create_structure(f, "QUALITY", "QUALITY")
320 quality_array_struct = _hds.create_structure(quality, "QUALITY", "ARRAY")
321 _hds.write_array(quality_array_struct, "DATA", quality_array)
322 _hds.write_array(quality_array_struct, "ORIGIN", np.array([0, 0], dtype=np.int32))
323 _hds.write_array(quality_array_struct, "BAD_PIXEL", np.array(False, dtype=np.bool_))
324 _hds.write_array(quality, "BADBITS", np.array(2, dtype=np.uint8))
325 result = read(MaskedImage, tmp.name)
326 self.assertIsInstance(result.deserialized, MaskedImage)
327 mask = result.deserialized.mask
328 np.testing.assert_array_equal(mask.array[:, :, 0], quality_array)
329 np.testing.assert_array_equal(mask.get("MASK1"), expected_mask1)
330 np.testing.assert_array_equal(mask.get("MASK2"), expected_mask2)
331 self.assertIn("Selected by BADBITS", mask.schema.descriptions["MASK1"])
332 self.assertNotIn("Selected by BADBITS", mask.schema.descriptions["MASK2"])
334 def test_read_auto_detected_data_only_as_masked_image_uses_defaults(self):
335 image_array = np.arange(6, dtype=np.float32).reshape(2, 3)
337 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
338 tmp.close()
339 with h5py.File(tmp.name, "w") as f:
340 _hds.set_root_name(f, "TEST", "NDF")
341 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY")
342 _hds.write_array(data_array, "DATA", image_array)
343 _hds.write_array(data_array, "ORIGIN", np.array([5, 4], dtype=np.int32))
344 result = read(MaskedImage, tmp.name)
345 self.assertIsInstance(result.deserialized, MaskedImage)
346 self.assertEqual(result.deserialized.bbox, Box.factory[4:6, 5:8])
347 np.testing.assert_array_equal(result.deserialized.image.array, image_array)
348 np.testing.assert_array_equal(
349 result.deserialized.mask.array,
350 np.zeros((2, 3, 1), dtype=np.uint8),
351 )
352 np.testing.assert_array_equal(
353 result.deserialized.variance.array,
354 np.ones((2, 3), dtype=np.float32),
355 )
357 def test_read_auto_detected_variance_as_masked_image_keeps_variance(self):
358 image_array = np.arange(6, dtype=np.float32).reshape(2, 3)
359 variance_array = np.full((2, 3), 2.5, dtype=np.float32)
361 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
362 tmp.close()
363 with h5py.File(tmp.name, "w") as f:
364 _hds.set_root_name(f, "TEST", "NDF")
365 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY")
366 _hds.write_array(data_array, "DATA", image_array)
367 _hds.write_array(data_array, "ORIGIN", np.array([5, 4], dtype=np.int32))
368 variance = _hds.create_structure(f, "VARIANCE", "ARRAY")
369 _hds.write_array(variance, "DATA", variance_array)
370 _hds.write_array(variance, "ORIGIN", np.array([5, 4], dtype=np.int32))
371 result = read(MaskedImage, tmp.name)
372 self.assertIsInstance(result.deserialized, MaskedImage)
373 np.testing.assert_array_equal(result.deserialized.variance.array, variance_array)
374 np.testing.assert_array_equal(
375 result.deserialized.mask.array,
376 np.zeros((2, 3, 1), dtype=np.uint8),
377 )
379 def test_read_auto_detected_units_component(self):
380 image_array = np.arange(6, dtype=np.float32).reshape(2, 3)
382 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
383 tmp.close()
384 with h5py.File(tmp.name, "w") as f:
385 _hds.set_root_name(f, "TEST", "NDF")
386 data_array = _hds.create_structure(f, "DATA_ARRAY", "ARRAY")
387 _hds.write_array(data_array, "DATA", image_array)
388 f.create_dataset("UNITS", data=np.bytes_("count"))
389 result = read(Image, tmp.name)
390 self.assertEqual(result.deserialized.unit, u.ct)
392 def test_read_missing_data_array_raises(self):
393 # A file with only /MORE/LSST/JSON is fine for the symmetric
394 # path. A file with NEITHER /MORE/LSST/JSON NOR DATA_ARRAY is a
395 # malformed NDF -- auto-detect must fail clearly.
396 with tempfile.NamedTemporaryFile(suffix=".sdf", delete_on_close=False) as tmp:
397 tmp.close()
398 with h5py.File(tmp.name, "w") as f:
399 f["/"].attrs["CLASS"] = "NDF"
400 # Note: no DATA_ARRAY, no /MORE/LSST/JSON.
401 with self.assertRaises(ArchiveReadError):
402 read(Image, tmp.name)
404 def test_read_auto_detect_wrong_target_type_raises(self):
405 # Auto-detect only knows how to produce Image-like objects from NDF
406 # components; unrelated target classes should fail clearly.
407 example_path = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf")
408 with self.assertRaises(ArchiveReadError):
409 read(Mask, example_path)