Coverage for tests / test_ndf_hds.py: 18%
206 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import numpy as np
20try:
21 import h5py
23 from lsst.images.ndf import _hds
25 HAVE_H5PY = True
26except ImportError:
27 HAVE_H5PY = False
30def _attr_str(value: object) -> str | None:
31 """Decode an h5py attribute value (bytes or str) to a Python str."""
32 if isinstance(value, bytes):
33 return value.decode("ascii")
34 if isinstance(value, str):
35 return value
36 return None
39@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
40class HdsPrimitiveTestCase(unittest.TestCase):
41 """Primitives are bare HDF5 datasets with no HDS-specific attributes."""
43 def test_real_array_round_trip(self):
44 data = np.arange(12, dtype=np.float32).reshape(3, 4)
45 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
46 with h5py.File(tmp.name, "w") as f:
47 _hds.write_array(f, "DATA", data)
48 with h5py.File(tmp.name, "r") as f:
49 ds = f["DATA"]
50 self.assertEqual(ds.dtype, np.float32)
51 self.assertEqual(ds.shape, (3, 4))
52 self.assertEqual(dict(ds.attrs), {})
53 np.testing.assert_array_equal(_hds.read_array(ds), data)
55 def test_double_array_round_trip(self):
56 data = np.linspace(0, 1, 6, dtype=np.float64).reshape(2, 3)
57 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
58 with h5py.File(tmp.name, "w") as f:
59 _hds.write_array(f, "DATA", data)
60 with h5py.File(tmp.name, "r") as f:
61 self.assertEqual(f["DATA"].dtype, np.float64)
62 np.testing.assert_array_equal(_hds.read_array(f["DATA"]), data)
64 def test_ubyte_and_integer(self):
65 data_u = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.uint8)
66 data_i = np.array([10, 20, 30], dtype=np.int32)
67 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
68 with h5py.File(tmp.name, "w") as f:
69 _hds.write_array(f, "Q", data_u)
70 _hds.write_array(f, "I", data_i)
71 with h5py.File(tmp.name, "r") as f:
72 self.assertEqual(f["Q"].dtype, np.uint8)
73 self.assertEqual(f["I"].dtype, np.int32)
74 np.testing.assert_array_equal(_hds.read_array(f["Q"]), data_u)
75 np.testing.assert_array_equal(_hds.read_array(f["I"]), data_i)
77 def test_logical_uses_hdf5_bitfield(self):
78 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
79 with h5py.File(tmp.name, "w") as f:
80 _hds.write_array(f, "SCALAR", np.array(False, dtype=np.bool_))
81 _hds.write_array(f, "ARRAY", np.array([True, False], dtype=np.bool_))
82 with h5py.File(tmp.name, "r") as f:
83 self.assertEqual(f["SCALAR"].id.get_type().get_class(), h5py.h5t.BITFIELD)
84 self.assertEqual(f["SCALAR"].id.get_type().get_size(), 1)
85 self.assertFalse(_hds.read_array(f["SCALAR"]))
86 self.assertEqual(f["ARRAY"].id.get_type().get_class(), h5py.h5t.BITFIELD)
87 self.assertEqual(f["ARRAY"].id.get_type().get_size(), 1)
88 np.testing.assert_array_equal(_hds.read_array(f["ARRAY"]), np.array([True, False]))
90 def test_unsupported_dtype_raises_on_write(self):
91 data = np.array([1.0], dtype=np.complex128)
92 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp, h5py.File(tmp.name, "w") as f:
93 with self.assertRaises(NotImplementedError):
94 _hds.write_array(f, "X", data)
96 def test_unsupported_dtype_raises_on_read(self):
97 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
98 with h5py.File(tmp.name, "w") as f:
99 # Write directly with h5py, bypassing write_array's check.
100 f.create_dataset("X", data=np.array([1.0], dtype=np.complex128))
101 with h5py.File(tmp.name, "r") as f:
102 with self.assertRaises(NotImplementedError):
103 _hds.read_array(f["X"])
105 def test_read_array_rejects_char_dataset(self):
106 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
107 with h5py.File(tmp.name, "w") as f:
108 _hds.write_char_array(f, "WCS", ["hello", "world"], width=16)
109 with h5py.File(tmp.name, "r") as f:
110 with self.assertRaises(ValueError):
111 _hds.read_array(f["WCS"])
113 def test_char_array_round_trip(self):
114 lines = ["Begin FrameSet", "Nframe = 5", "End FrameSet"]
115 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
116 with h5py.File(tmp.name, "w") as f:
117 _hds.write_char_array(f, "DATA", lines, width=80)
118 with h5py.File(tmp.name, "r") as f:
119 ds = f["DATA"]
120 self.assertEqual(ds.dtype, np.dtype("|S80"))
121 self.assertEqual(ds.shape, (3,))
122 self.assertEqual(dict(ds.attrs), {})
123 self.assertEqual(_hds.read_char_array(ds), lines)
125 def test_char_array_pads_and_strips(self):
126 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
127 with h5py.File(tmp.name, "w") as f:
128 _hds.write_char_array(f, "X", ["short"], width=80)
129 with h5py.File(tmp.name, "r") as f:
130 # Raw data should be space-padded to 80 characters.
131 self.assertEqual(f["X"][0], b"short" + b" " * 75)
132 # read_char_array strips trailing spaces.
133 self.assertEqual(_hds.read_char_array(f["X"]), ["short"])
135 def test_char_array_rejects_long_lines(self):
136 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp, h5py.File(tmp.name, "w") as f:
137 with self.assertRaises(ValueError):
138 _hds.write_char_array(f, "X", ["too long"], width=3)
140 def test_char_array_rejects_non_ascii(self):
141 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp, h5py.File(tmp.name, "w") as f:
142 with self.assertRaises(ValueError):
143 _hds.write_char_array(f, "X", ["not ascii: \N{LATIN SMALL LETTER E WITH ACUTE}"], width=80)
145 def test_ndf_ast_data_encoding_uses_flagged_fixed_width_records(self):
146 text = (
147 " Begin FrameSet\n"
148 '# Title = "demo"\n'
149 " VeryLongAttribute = 12345678901234567890\n"
150 " End FrameSet\n"
151 )
152 expected = (
153 'Begin FrameSet\n# Title = "demo"\nVeryLongAttribute = 12345678901234567890\nEnd FrameSet\n'
154 )
156 records = _hds.encode_ndf_ast_data(text)
158 self.assertTrue(all(len(record) <= _hds.NDF_AST_DATA_WIDTH for record in records))
159 self.assertTrue(all(record[0] in {" ", "+"} for record in records))
160 self.assertIn(' # Title = "demo"', records)
161 self.assertTrue(any(record.startswith("+") for record in records))
162 self.assertEqual(_hds.decode_ndf_ast_data(records), expected)
164 def test_read_char_array_rejects_numeric_dataset(self):
165 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
166 with h5py.File(tmp.name, "w") as f:
167 _hds.write_array(f, "DATA", np.zeros((2,), dtype=np.float32))
168 with h5py.File(tmp.name, "r") as f:
169 with self.assertRaises(ValueError):
170 _hds.read_char_array(f["DATA"])
172 def test_hds_type_for_dtype(self):
173 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.bool_)), "_LOGICAL")
174 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.float32)), "_REAL")
175 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.float64)), "_DOUBLE")
176 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.uint8)), "_UBYTE")
177 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.int32)), "_INTEGER")
178 self.assertEqual(_hds.hds_type_for_dtype(np.dtype("|S80")), "_CHAR*80")
179 with self.assertRaises(NotImplementedError):
180 _hds.hds_type_for_dtype(np.dtype(np.complex128))
183@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
184class HdsStructureTestCase(unittest.TestCase):
185 """Structures are HDF5 groups with a CLASS attribute."""
187 def test_create_open_structure(self):
188 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
189 with h5py.File(tmp.name, "w") as f:
190 ndf = _hds.create_structure(f, "ROOT", "NDF")
191 _hds.create_structure(ndf, "DATA_ARRAY", "ARRAY")
192 with h5py.File(tmp.name, "r") as f:
193 root_obj = f["ROOT"]
194 self.assertEqual(_attr_str(root_obj.attrs["CLASS"]), "NDF")
195 root, root_type = _hds.open_structure(f, "ROOT")
196 self.assertEqual(root_type, "NDF")
197 child_names = sorted(name for name, _ in _hds.iter_children(root))
198 self.assertEqual(child_names, ["DATA_ARRAY"])
199 _, child_type = _hds.open_structure(root, "DATA_ARRAY")
200 self.assertEqual(child_type, "ARRAY")
202 def test_open_structure_missing_class_raises(self):
203 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
204 with h5py.File(tmp.name, "w") as f:
205 f.create_group("BAD")
206 with h5py.File(tmp.name, "r") as f:
207 with self.assertRaises(ValueError):
208 _hds.open_structure(f, "BAD")
210 def test_open_structure_accepts_legacy_hdstype(self):
211 """Files from older HDS variants used HDSTYPE rather than CLASS."""
212 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
213 with h5py.File(tmp.name, "w") as f:
214 g = f.create_group("LEGACY")
215 g.attrs["HDSTYPE"] = b"NDF"
216 with h5py.File(tmp.name, "r") as f:
217 _, t = _hds.open_structure(f, "LEGACY")
218 self.assertEqual(t, "NDF")
220 def test_set_root_name(self):
221 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp:
222 with h5py.File(tmp.name, "w") as f:
223 _hds.set_root_name(f, "MYNDF", "NDF")
224 with h5py.File(tmp.name, "r") as f:
225 self.assertEqual(_attr_str(f["/"].attrs["HDS_ROOT_NAME"]), "MYNDF")
226 self.assertEqual(_attr_str(f["/"].attrs["CLASS"]), "NDF")
229@unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
230class HdsCanonicalExampleTestCase(unittest.TestCase):
231 """Validate _hds against a canonical-format Starlink-generated NDF.
233 The example file is an M57 image with the modern hds-v5 layout:
234 root group with CLASS="NDF" and HDS_ROOT_NAME, DATA_ARRAY as an
235 ARRAY structure containing DATA (int16) and ORIGIN (int64), WCS as
236 a structure with an AST text-dump DATA primitive, and MORE.FITS as
237 an 80-character card array.
238 """
240 EXAMPLE = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf")
242 def test_root_is_ndf_with_root_name(self):
243 with h5py.File(self.EXAMPLE, "r") as f:
244 self.assertEqual(_attr_str(f["/"].attrs["CLASS"]), "NDF")
245 self.assertEqual(_attr_str(f["/"].attrs["HDS_ROOT_NAME"]), "M57")
247 def test_data_array_is_array_structure(self):
248 with h5py.File(self.EXAMPLE, "r") as f:
249 data_array, hds_type = _hds.open_structure(f, "DATA_ARRAY")
250 self.assertEqual(hds_type, "ARRAY")
251 data = data_array["DATA"]
252 self.assertEqual(data.dtype, np.int16)
253 self.assertEqual(data.shape, (611, 609))
254 self.assertEqual(_hds.hds_type_for_dtype(data.dtype), "_WORD")
255 arr = _hds.read_array(data)
256 self.assertEqual(arr.shape, (611, 609))
257 origin = _hds.read_array(data_array["ORIGIN"])
258 self.assertEqual(origin.dtype, np.int64)
259 self.assertEqual(origin.shape, (2,))
261 def test_wcs_is_structure_with_ast_text(self):
262 with h5py.File(self.EXAMPLE, "r") as f:
263 wcs, hds_type = _hds.open_structure(f, "WCS")
264 self.assertEqual(hds_type, "WCS")
265 lines = _hds.read_char_array(wcs["DATA"])
266 text = _hds.decode_ndf_ast_data(lines)
267 stripped = [line.lstrip() for line in text.splitlines()]
268 self.assertTrue(any(s.startswith("Begin FrameSet") for s in stripped))
269 self.assertTrue(any(s.startswith("End FrameSet") for s in stripped))
271 def test_more_fits_present(self):
272 with h5py.File(self.EXAMPLE, "r") as f:
273 more, hds_type = _hds.open_structure(f, "MORE")
274 self.assertEqual(hds_type, "EXT")
275 cards = _hds.read_char_array(more["FITS"])
276 self.assertGreater(len(cards), 0)
277 self.assertTrue(any(c.startswith("NAXIS") for c in cards))