Coverage for tests / test_ndf_hds.py: 18%

206 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:32 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import numpy as np 

19 

20try: 

21 import h5py 

22 

23 from lsst.images.ndf import _hds 

24 

25 HAVE_H5PY = True 

26except ImportError: 

27 HAVE_H5PY = False 

28 

29 

30def _attr_str(value: object) -> str | None: 

31 """Decode an h5py attribute value (bytes or str) to a Python str.""" 

32 if isinstance(value, bytes): 

33 return value.decode("ascii") 

34 if isinstance(value, str): 

35 return value 

36 return None 

37 

38 

39@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

40class HdsPrimitiveTestCase(unittest.TestCase): 

41 """Primitives are bare HDF5 datasets with no HDS-specific attributes.""" 

42 

43 def test_real_array_round_trip(self): 

44 data = np.arange(12, dtype=np.float32).reshape(3, 4) 

45 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

46 with h5py.File(tmp.name, "w") as f: 

47 _hds.write_array(f, "DATA", data) 

48 with h5py.File(tmp.name, "r") as f: 

49 ds = f["DATA"] 

50 self.assertEqual(ds.dtype, np.float32) 

51 self.assertEqual(ds.shape, (3, 4)) 

52 self.assertEqual(dict(ds.attrs), {}) 

53 np.testing.assert_array_equal(_hds.read_array(ds), data) 

54 

55 def test_double_array_round_trip(self): 

56 data = np.linspace(0, 1, 6, dtype=np.float64).reshape(2, 3) 

57 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

58 with h5py.File(tmp.name, "w") as f: 

59 _hds.write_array(f, "DATA", data) 

60 with h5py.File(tmp.name, "r") as f: 

61 self.assertEqual(f["DATA"].dtype, np.float64) 

62 np.testing.assert_array_equal(_hds.read_array(f["DATA"]), data) 

63 

64 def test_ubyte_and_integer(self): 

65 data_u = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.uint8) 

66 data_i = np.array([10, 20, 30], dtype=np.int32) 

67 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

68 with h5py.File(tmp.name, "w") as f: 

69 _hds.write_array(f, "Q", data_u) 

70 _hds.write_array(f, "I", data_i) 

71 with h5py.File(tmp.name, "r") as f: 

72 self.assertEqual(f["Q"].dtype, np.uint8) 

73 self.assertEqual(f["I"].dtype, np.int32) 

74 np.testing.assert_array_equal(_hds.read_array(f["Q"]), data_u) 

75 np.testing.assert_array_equal(_hds.read_array(f["I"]), data_i) 

76 

77 def test_logical_uses_hdf5_bitfield(self): 

78 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

79 with h5py.File(tmp.name, "w") as f: 

80 _hds.write_array(f, "SCALAR", np.array(False, dtype=np.bool_)) 

81 _hds.write_array(f, "ARRAY", np.array([True, False], dtype=np.bool_)) 

82 with h5py.File(tmp.name, "r") as f: 

83 self.assertEqual(f["SCALAR"].id.get_type().get_class(), h5py.h5t.BITFIELD) 

84 self.assertEqual(f["SCALAR"].id.get_type().get_size(), 1) 

85 self.assertFalse(_hds.read_array(f["SCALAR"])) 

86 self.assertEqual(f["ARRAY"].id.get_type().get_class(), h5py.h5t.BITFIELD) 

87 self.assertEqual(f["ARRAY"].id.get_type().get_size(), 1) 

88 np.testing.assert_array_equal(_hds.read_array(f["ARRAY"]), np.array([True, False])) 

89 

90 def test_unsupported_dtype_raises_on_write(self): 

91 data = np.array([1.0], dtype=np.complex128) 

92 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp, h5py.File(tmp.name, "w") as f: 

93 with self.assertRaises(NotImplementedError): 

94 _hds.write_array(f, "X", data) 

95 

96 def test_unsupported_dtype_raises_on_read(self): 

97 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

98 with h5py.File(tmp.name, "w") as f: 

99 # Write directly with h5py, bypassing write_array's check. 

100 f.create_dataset("X", data=np.array([1.0], dtype=np.complex128)) 

101 with h5py.File(tmp.name, "r") as f: 

102 with self.assertRaises(NotImplementedError): 

103 _hds.read_array(f["X"]) 

104 

105 def test_read_array_rejects_char_dataset(self): 

106 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

107 with h5py.File(tmp.name, "w") as f: 

108 _hds.write_char_array(f, "WCS", ["hello", "world"], width=16) 

109 with h5py.File(tmp.name, "r") as f: 

110 with self.assertRaises(ValueError): 

111 _hds.read_array(f["WCS"]) 

112 

113 def test_char_array_round_trip(self): 

114 lines = ["Begin FrameSet", "Nframe = 5", "End FrameSet"] 

115 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

116 with h5py.File(tmp.name, "w") as f: 

117 _hds.write_char_array(f, "DATA", lines, width=80) 

118 with h5py.File(tmp.name, "r") as f: 

119 ds = f["DATA"] 

120 self.assertEqual(ds.dtype, np.dtype("|S80")) 

121 self.assertEqual(ds.shape, (3,)) 

122 self.assertEqual(dict(ds.attrs), {}) 

123 self.assertEqual(_hds.read_char_array(ds), lines) 

124 

125 def test_char_array_pads_and_strips(self): 

126 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

127 with h5py.File(tmp.name, "w") as f: 

128 _hds.write_char_array(f, "X", ["short"], width=80) 

129 with h5py.File(tmp.name, "r") as f: 

130 # Raw data should be space-padded to 80 characters. 

131 self.assertEqual(f["X"][0], b"short" + b" " * 75) 

132 # read_char_array strips trailing spaces. 

133 self.assertEqual(_hds.read_char_array(f["X"]), ["short"]) 

134 

135 def test_char_array_rejects_long_lines(self): 

136 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp, h5py.File(tmp.name, "w") as f: 

137 with self.assertRaises(ValueError): 

138 _hds.write_char_array(f, "X", ["too long"], width=3) 

139 

140 def test_char_array_rejects_non_ascii(self): 

141 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp, h5py.File(tmp.name, "w") as f: 

142 with self.assertRaises(ValueError): 

143 _hds.write_char_array(f, "X", ["not ascii: \N{LATIN SMALL LETTER E WITH ACUTE}"], width=80) 

144 

145 def test_ndf_ast_data_encoding_uses_flagged_fixed_width_records(self): 

146 text = ( 

147 " Begin FrameSet\n" 

148 '# Title = "demo"\n' 

149 " VeryLongAttribute = 12345678901234567890\n" 

150 " End FrameSet\n" 

151 ) 

152 expected = ( 

153 'Begin FrameSet\n# Title = "demo"\nVeryLongAttribute = 12345678901234567890\nEnd FrameSet\n' 

154 ) 

155 

156 records = _hds.encode_ndf_ast_data(text) 

157 

158 self.assertTrue(all(len(record) <= _hds.NDF_AST_DATA_WIDTH for record in records)) 

159 self.assertTrue(all(record[0] in {" ", "+"} for record in records)) 

160 self.assertIn(' # Title = "demo"', records) 

161 self.assertTrue(any(record.startswith("+") for record in records)) 

162 self.assertEqual(_hds.decode_ndf_ast_data(records), expected) 

163 

164 def test_read_char_array_rejects_numeric_dataset(self): 

165 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

166 with h5py.File(tmp.name, "w") as f: 

167 _hds.write_array(f, "DATA", np.zeros((2,), dtype=np.float32)) 

168 with h5py.File(tmp.name, "r") as f: 

169 with self.assertRaises(ValueError): 

170 _hds.read_char_array(f["DATA"]) 

171 

172 def test_hds_type_for_dtype(self): 

173 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.bool_)), "_LOGICAL") 

174 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.float32)), "_REAL") 

175 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.float64)), "_DOUBLE") 

176 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.uint8)), "_UBYTE") 

177 self.assertEqual(_hds.hds_type_for_dtype(np.dtype(np.int32)), "_INTEGER") 

178 self.assertEqual(_hds.hds_type_for_dtype(np.dtype("|S80")), "_CHAR*80") 

179 with self.assertRaises(NotImplementedError): 

180 _hds.hds_type_for_dtype(np.dtype(np.complex128)) 

181 

182 

183@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

184class HdsStructureTestCase(unittest.TestCase): 

185 """Structures are HDF5 groups with a CLASS attribute.""" 

186 

187 def test_create_open_structure(self): 

188 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

189 with h5py.File(tmp.name, "w") as f: 

190 ndf = _hds.create_structure(f, "ROOT", "NDF") 

191 _hds.create_structure(ndf, "DATA_ARRAY", "ARRAY") 

192 with h5py.File(tmp.name, "r") as f: 

193 root_obj = f["ROOT"] 

194 self.assertEqual(_attr_str(root_obj.attrs["CLASS"]), "NDF") 

195 root, root_type = _hds.open_structure(f, "ROOT") 

196 self.assertEqual(root_type, "NDF") 

197 child_names = sorted(name for name, _ in _hds.iter_children(root)) 

198 self.assertEqual(child_names, ["DATA_ARRAY"]) 

199 _, child_type = _hds.open_structure(root, "DATA_ARRAY") 

200 self.assertEqual(child_type, "ARRAY") 

201 

202 def test_open_structure_missing_class_raises(self): 

203 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

204 with h5py.File(tmp.name, "w") as f: 

205 f.create_group("BAD") 

206 with h5py.File(tmp.name, "r") as f: 

207 with self.assertRaises(ValueError): 

208 _hds.open_structure(f, "BAD") 

209 

210 def test_open_structure_accepts_legacy_hdstype(self): 

211 """Files from older HDS variants used HDSTYPE rather than CLASS.""" 

212 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

213 with h5py.File(tmp.name, "w") as f: 

214 g = f.create_group("LEGACY") 

215 g.attrs["HDSTYPE"] = b"NDF" 

216 with h5py.File(tmp.name, "r") as f: 

217 _, t = _hds.open_structure(f, "LEGACY") 

218 self.assertEqual(t, "NDF") 

219 

220 def test_set_root_name(self): 

221 with tempfile.NamedTemporaryFile(suffix=".sdf") as tmp: 

222 with h5py.File(tmp.name, "w") as f: 

223 _hds.set_root_name(f, "MYNDF", "NDF") 

224 with h5py.File(tmp.name, "r") as f: 

225 self.assertEqual(_attr_str(f["/"].attrs["HDS_ROOT_NAME"]), "MYNDF") 

226 self.assertEqual(_attr_str(f["/"].attrs["CLASS"]), "NDF") 

227 

228 

229@unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

230class HdsCanonicalExampleTestCase(unittest.TestCase): 

231 """Validate _hds against a canonical-format Starlink-generated NDF. 

232 

233 The example file is an M57 image with the modern hds-v5 layout: 

234 root group with CLASS="NDF" and HDS_ROOT_NAME, DATA_ARRAY as an 

235 ARRAY structure containing DATA (int16) and ORIGIN (int64), WCS as 

236 a structure with an AST text-dump DATA primitive, and MORE.FITS as 

237 an 80-character card array. 

238 """ 

239 

240 EXAMPLE = os.path.join(os.path.dirname(__file__), "data", "example-ndf.sdf") 

241 

242 def test_root_is_ndf_with_root_name(self): 

243 with h5py.File(self.EXAMPLE, "r") as f: 

244 self.assertEqual(_attr_str(f["/"].attrs["CLASS"]), "NDF") 

245 self.assertEqual(_attr_str(f["/"].attrs["HDS_ROOT_NAME"]), "M57") 

246 

247 def test_data_array_is_array_structure(self): 

248 with h5py.File(self.EXAMPLE, "r") as f: 

249 data_array, hds_type = _hds.open_structure(f, "DATA_ARRAY") 

250 self.assertEqual(hds_type, "ARRAY") 

251 data = data_array["DATA"] 

252 self.assertEqual(data.dtype, np.int16) 

253 self.assertEqual(data.shape, (611, 609)) 

254 self.assertEqual(_hds.hds_type_for_dtype(data.dtype), "_WORD") 

255 arr = _hds.read_array(data) 

256 self.assertEqual(arr.shape, (611, 609)) 

257 origin = _hds.read_array(data_array["ORIGIN"]) 

258 self.assertEqual(origin.dtype, np.int64) 

259 self.assertEqual(origin.shape, (2,)) 

260 

261 def test_wcs_is_structure_with_ast_text(self): 

262 with h5py.File(self.EXAMPLE, "r") as f: 

263 wcs, hds_type = _hds.open_structure(f, "WCS") 

264 self.assertEqual(hds_type, "WCS") 

265 lines = _hds.read_char_array(wcs["DATA"]) 

266 text = _hds.decode_ndf_ast_data(lines) 

267 stripped = [line.lstrip() for line in text.splitlines()] 

268 self.assertTrue(any(s.startswith("Begin FrameSet") for s in stripped)) 

269 self.assertTrue(any(s.startswith("End FrameSet") for s in stripped)) 

270 

271 def test_more_fits_present(self): 

272 with h5py.File(self.EXAMPLE, "r") as f: 

273 more, hds_type = _hds.open_structure(f, "MORE") 

274 self.assertEqual(hds_type, "EXT") 

275 cards = _hds.read_char_array(more["FITS"]) 

276 self.assertGreater(len(cards), 0) 

277 self.assertTrue(any(c.startswith("NAXIS") for c in cards))