Coverage for python / lsst / images / ndf / _hds.py: 14%

131 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:47 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""HDS-on-HDF5 read/write helpers. 

13 

14These follow the conventions used by the canonical Starlink ``hds-v5`` 

15library (see ``reference/hds-v5/dat1.h`` and ``dat1New.c``): 

16 

17* HDS structures are HDF5 groups with a ``CLASS`` attribute holding the 

18 HDS type string (``"NDF"``, ``"WCS"``, ``"EXT"``, ``"ARRAY"``, ...). 

19* Arrays of structures additionally carry an ``HDS_STRUCTURE_DIMS`` 

20 attribute (deferred from v1; we only handle scalar structures). 

21* The file root group, when it represents a top-level HDS structure, 

22 carries an ``HDS_ROOT_NAME`` attribute giving the HDS object name. 

23* HDS primitives are bare HDF5 datasets with no HDS-specific attributes; 

24 the HDS type is inferred from the HDF5 dtype. 

25""" 

26 

27from __future__ import annotations 

28 

29from collections.abc import Iterator, Sequence 

30from typing import Any 

31 

32import h5py 

33import numpy as np 

34 

35__all__ = ( 

36 "ATTR_CLASS", 

37 "ATTR_ROOT_NAME", 

38 "ATTR_STRUCTURE_DIMS", 

39 "HDS_TO_NUMPY", 

40 "NUMPY_TO_HDS", 

41 "create_structure", 

42 "decode_ndf_ast_data", 

43 "encode_ndf_ast_data", 

44 "hds_type_for_dtype", 

45 "iter_children", 

46 "open_structure", 

47 "read_array", 

48 "read_char_array", 

49 "set_ascii_attr", 

50 "set_root_name", 

51 "write_array", 

52 "write_char_array", 

53) 

54 

55 

56# Canonical attribute names used by hds-v5 (see reference/hds-v5/dat1.h). 

57ATTR_CLASS = "CLASS" 

58ATTR_STRUCTURE_DIMS = "HDS_STRUCTURE_DIMS" 

59ATTR_ROOT_NAME = "HDS_ROOT_NAME" 

60 

61 

62HDS_TO_NUMPY: dict[str, np.dtype] = { 

63 "_LOGICAL": np.dtype(np.bool_), 

64 "_REAL": np.dtype(np.float32), 

65 "_DOUBLE": np.dtype(np.float64), 

66 "_UBYTE": np.dtype(np.uint8), 

67 "_WORD": np.dtype(np.int16), 

68 "_INTEGER": np.dtype(np.int32), 

69 "_INT64": np.dtype(np.int64), 

70} 

71 

72NUMPY_TO_HDS: dict[np.dtype, str] = { 

73 np.dtype(np.bool_): "_LOGICAL", 

74 np.dtype(np.float32): "_REAL", 

75 np.dtype(np.float64): "_DOUBLE", 

76 np.dtype(np.uint8): "_UBYTE", 

77 np.dtype(np.int16): "_WORD", 

78 np.dtype(np.int32): "_INTEGER", 

79 np.dtype(np.int64): "_INT64", 

80} 

81 

82 

83NDF_AST_DATA_WIDTH = 32 

84NDF_AST_DATA_MIN_WIDTH = 16 

85 

86 

87def hds_type_for_dtype(dtype: np.dtype) -> str: 

88 """Return the HDS type string for a numpy dtype. 

89 

90 Parameters 

91 ---------- 

92 dtype 

93 Numpy dtype to map to an HDS primitive type. 

94 

95 Returns 

96 ------- 

97 str 

98 HDS primitive type string. 

99 

100 Raises 

101 ------ 

102 NotImplementedError 

103 Raised if ``dtype`` does not map to a supported HDS primitive type. 

104 

105 Notes 

106 ----- 

107 Fixed-width byte strings ``|S<N>`` map to ``"_CHAR*<N>"``. Numeric 

108 dtypes are looked up in `NUMPY_TO_HDS`. Anything else raises 

109 ``NotImplementedError``. 

110 """ 

111 if dtype.kind == "S": 

112 return f"_CHAR*{dtype.itemsize}" 

113 try: 

114 return NUMPY_TO_HDS[dtype] 

115 except KeyError: 

116 raise NotImplementedError(f"No HDS type mapping for dtype {dtype!r}.") from None 

117 

118 

119def write_array( 

120 parent: h5py.Group, 

121 name: str, 

122 data: np.ndarray, 

123 *, 

124 compression: str | None = None, 

125 compression_opts: int | None = None, 

126) -> h5py.Dataset: 

127 """Write a numpy C-order array as an HDS primitive. 

128 

129 Parameters 

130 ---------- 

131 parent 

132 HDF5 group to receive the dataset. 

133 name 

134 Name of the primitive dataset to create. 

135 data 

136 Array to write. 

137 compression 

138 Optional compression algorithm accepted by `h5py.Group.create_dataset`. 

139 compression_opts 

140 Optional compression options accepted by `h5py.Group.create_dataset`. 

141 

142 Returns 

143 ------- 

144 h5py.Dataset 

145 Newly-created dataset. 

146 

147 Notes 

148 ----- 

149 The HDF5 dataset carries no HDS-specific attributes; the HDS type 

150 is inferred on read from the HDF5 dtype. Refuses dtypes that don't 

151 map to a supported HDS primitive type. 

152 

153 The HDF5 dataset has the array's natural shape (C-order). Combined 

154 with HDF5's native byte ordering, this matches the Fortran-on-disk 

155 layout required by HDS for an NDF whose Fortran-order shape is the 

156 reverse of ``data.shape``. 

157 """ 

158 # Validate the dtype is supported up front so callers get a clear error. 

159 hds_type_for_dtype(data.dtype) 

160 if data.dtype == np.dtype(np.bool_): 

161 return _write_logical_array(parent, name, data, compression=compression) 

162 # h5py rejects compression options if no compression algorithm is set. 

163 kwargs: dict[str, Any] = {} 

164 if compression is not None: 

165 kwargs["compression"] = compression 

166 if compression_opts is not None: 

167 kwargs["compression_opts"] = compression_opts 

168 return parent.create_dataset(name, data=data, **kwargs) 

169 

170 

171def _write_logical_array( 

172 parent: h5py.Group, 

173 name: str, 

174 data: np.ndarray, 

175 *, 

176 compression: str | None = None, 

177) -> h5py.Dataset: 

178 """Write an HDS ``_LOGICAL`` primitive using the HDF5 bitfield type. 

179 

180 Parameters 

181 ---------- 

182 parent 

183 HDF5 group to receive the dataset. 

184 name 

185 Name of the primitive dataset to create. 

186 data 

187 Boolean array to write. 

188 compression 

189 Compression is not supported for this low-level bitfield writer. 

190 

191 Returns 

192 ------- 

193 h5py.Dataset 

194 Newly-created dataset. 

195 

196 Notes 

197 ----- 

198 High-level h5py writes numpy bool data as an HDF5 enum, but hds-v5 

199 identifies ``_LOGICAL`` primitives by the HDF5 bitfield class. 

200 """ 

201 if compression is not None: 

202 raise NotImplementedError("Compression is not implemented for HDS _LOGICAL arrays.") 

203 logical_data = np.asarray(data, dtype=np.uint8) 

204 if logical_data.shape: 

205 space = h5py.h5s.create_simple(logical_data.shape) 

206 else: 

207 space = h5py.h5s.create(h5py.h5s.SCALAR) 

208 dataset_id = h5py.h5d.create( 

209 parent.id, 

210 name.encode("ascii"), 

211 h5py.h5t.STD_B8LE, 

212 space, 

213 ) 

214 dataset_id.write( 

215 h5py.h5s.ALL, 

216 h5py.h5s.ALL, 

217 logical_data, 

218 mtype=h5py.h5t.NATIVE_B8, 

219 ) 

220 dataset_id.close() 

221 return parent[name] 

222 

223 

224def read_array(dataset: h5py.Dataset) -> np.ndarray: 

225 """Read an HDS primitive into a C-order numpy array. 

226 

227 Parameters 

228 ---------- 

229 dataset 

230 HDF5 dataset to read. 

231 

232 Returns 

233 ------- 

234 numpy.ndarray 

235 Data read from ``dataset``. 

236 

237 Notes 

238 ----- 

239 The HDS type is inferred from the HDF5 dtype. Raises 

240 ``NotImplementedError`` if the dtype is not a supported numeric HDS 

241 primitive type. Use `read_char_array` for ``_CHAR*N`` datasets. 

242 """ 

243 if dataset.dtype.kind == "S": 

244 raise ValueError(f"Dataset {dataset.name!r} is _CHAR*N; use read_char_array instead.") 

245 dataset_type = dataset.id.get_type() 

246 if dataset_type.get_class() == h5py.h5t.BITFIELD: 

247 if dataset_type.get_size() not in {1, 4}: 

248 raise NotImplementedError( 

249 f"Dataset {dataset.name!r} has bitfield size {dataset_type.get_size()} " 

250 "which does not map to HDS _LOGICAL." 

251 ) 

252 data = dataset[()] != 0 

253 if isinstance(data, np.ndarray): 

254 return data.astype(np.bool_) 

255 return np.atleast_1d(np.bool_(data)) 

256 if dataset.dtype not in NUMPY_TO_HDS: 

257 raise NotImplementedError( 

258 f"Dataset {dataset.name!r} has dtype {dataset.dtype} which does not " 

259 f"map to a supported HDS primitive type." 

260 ) 

261 return dataset[()] 

262 

263 

264def write_char_array( 

265 parent: h5py.Group, 

266 name: str, 

267 lines: Sequence[str], 

268 *, 

269 width: int = 80, 

270) -> h5py.Dataset: 

271 """Write a sequence of strings as a 1D HDS ``_CHAR*N`` primitive. 

272 

273 Parameters 

274 ---------- 

275 parent 

276 HDF5 group to receive the dataset. 

277 name 

278 Name of the primitive dataset to create. 

279 lines 

280 Strings to write. Each string must be ASCII and no longer than 

281 ``width`` characters. 

282 width 

283 Fixed width of each HDS character-array element. 

284 

285 Returns 

286 ------- 

287 h5py.Dataset 

288 Newly-created dataset. 

289 

290 Raises 

291 ------ 

292 ValueError 

293 Raised if any string is not ASCII or is longer than ``width``. 

294 

295 Notes 

296 ----- 

297 Each string is padded to ``width`` with trailing spaces (HDS 

298 convention). The HDF5 dataset has dtype ``|S<width>``; no HDS-specific 

299 attributes are written. 

300 """ 

301 encoded_lines: list[bytes] = [] 

302 for n, line in enumerate(lines): 

303 try: 

304 encoded_line = line.encode("ascii") 

305 except UnicodeEncodeError as err: 

306 raise ValueError(f"Line {n} for {name!r} is not ASCII.") from err 

307 if len(encoded_line) > width: 

308 raise ValueError( 

309 f"Line {n} for {name!r} is {len(encoded_line)} bytes, longer than width={width}." 

310 ) 

311 encoded_lines.append(encoded_line.ljust(width)) 

312 encoded = np.array( 

313 encoded_lines, 

314 dtype=f"|S{width}", 

315 ) 

316 return parent.create_dataset(name, data=encoded) 

317 

318 

319def encode_ndf_ast_data(text: str, *, width: int = NDF_AST_DATA_WIDTH) -> list[str]: 

320 """Encode AST Channel text for an NDF ``WCS.DATA`` component. 

321 

322 Parameters 

323 ---------- 

324 text 

325 Text emitted by an AST Channel. 

326 width 

327 Fixed width of the target NDF ``WCS.DATA`` records. 

328 

329 Returns 

330 ------- 

331 list [str] 

332 HDS character-array records. 

333 

334 Notes 

335 ----- 

336 Starlink NDF stores each AST text line in one or more fixed-width 

337 ``_CHAR*32`` records. The first character of each record is a flag: 

338 a space starts a new AST line and ``+`` continues the previous one. 

339 The payload is the AST text line with leading indentation removed. 

340 """ 

341 if width < NDF_AST_DATA_MIN_WIDTH: 

342 raise ValueError( 

343 f"NDF AST DATA record width {width} is too short; minimum is {NDF_AST_DATA_MIN_WIDTH}." 

344 ) 

345 

346 records: list[str] = [] 

347 payload_width = width - 1 

348 for raw_line in text.splitlines(): 

349 line = raw_line.lstrip(" ").rstrip(" ") 

350 if not line: 

351 continue 

352 for start in range(0, len(line), payload_width): 

353 flag = " " if start == 0 else "+" 

354 records.append(f"{flag}{line[start : start + payload_width]}") 

355 return records 

356 

357 

358def decode_ndf_ast_data(records: Sequence[str]) -> str: 

359 """Decode an NDF ``WCS.DATA`` component into AST Channel text. 

360 

361 Parameters 

362 ---------- 

363 records 

364 HDS character-array records from ``WCS.DATA``. 

365 

366 Returns 

367 ------- 

368 str 

369 AST Channel text. 

370 

371 Notes 

372 ----- 

373 This reverses `encode_ndf_ast_data`. If the input does not look like 

374 NDF AST records, it is treated as plain AST Channel text for backward 

375 compatibility with earlier non-canonical files. 

376 """ 

377 if not records: 

378 return "" 

379 if any(record and record[0] not in {" ", "+"} for record in records): 

380 return "\n".join(records) + "\n" 

381 

382 lines: list[str] = [] 

383 current: list[str] = [] 

384 for record in records: 

385 if not record: 

386 continue 

387 flag = record[0] 

388 payload = record[1:] 

389 if flag == "+": 

390 if current: 

391 current.append(payload) 

392 else: 

393 current = [payload] 

394 else: 

395 if current: 

396 lines.append("".join(current).rstrip(" ")) 

397 current = [payload] 

398 if current: 

399 lines.append("".join(current).rstrip(" ")) 

400 return "\n".join(lines) + ("\n" if lines else "") 

401 

402 

403def read_char_array(dataset: h5py.Dataset) -> list[str]: 

404 """Read an HDS ``_CHAR*N`` 1D primitive as a list of stripped strings. 

405 

406 Parameters 

407 ---------- 

408 dataset 

409 HDF5 dataset to read. 

410 

411 Returns 

412 ------- 

413 list [str] 

414 Dataset elements decoded from ASCII with trailing spaces stripped. 

415 

416 Notes 

417 ----- 

418 Validates the dataset has a fixed-width byte-string dtype (``|S<N>``). 

419 """ 

420 if dataset.dtype.kind != "S": 

421 raise ValueError(f"Dataset {dataset.name!r} is not _CHAR*N (dtype {dataset.dtype}).") 

422 if dataset.ndim == 0: 

423 raise ValueError(f"Dataset {dataset.name!r} is a scalar _CHAR*N; only 1-D arrays are supported.") 

424 raw = dataset[()] 

425 return [item.decode("ascii").rstrip(" ") for item in raw] 

426 

427 

428def set_ascii_attr(target: h5py.Group | h5py.Dataset, name: str, value: str) -> None: 

429 """Write a fixed-length ASCII byte attribute. 

430 

431 Parameters 

432 ---------- 

433 target 

434 HDF5 object whose attribute should be set. 

435 name 

436 Attribute name. 

437 value 

438 ASCII value to write. 

439 

440 Notes 

441 ----- 

442 Canonical ``hds-v5`` stores ``CLASS`` and ``HDS_ROOT_NAME`` as 

443 fixed-length ASCII byte strings (e.g. ``|S5`` for ``"ARRAY"``). 

444 h5py's default for Python ``str`` is variable-length UTF-8, which 

445 Starlink tools (KAPPA, ``hdstrace``) can't decode — they show 

446 garbage in the type-tag column. Writing as fixed-length bytes 

447 matches the canonical layout. 

448 """ 

449 encoded = value.encode("ascii") 

450 if name in target.attrs: 

451 del target.attrs[name] 

452 target.attrs.create(name, encoded, dtype=f"|S{len(encoded)}") 

453 

454 

455def create_structure(parent: h5py.Group, name: str, hds_type: str) -> h5py.Group: 

456 """Create a named HDS structure (h5py group with ``CLASS`` attribute). 

457 

458 Parameters 

459 ---------- 

460 parent 

461 Group to create the new structure under. 

462 name 

463 Component name (HDS rules apply: uppercase letters/digits/underscores, 

464 max 15 characters; not enforced here). 

465 hds_type 

466 HDS type string for the new structure (e.g. ``"NDF"``, ``"WCS"``, 

467 ``"ARRAY"``, ``"EXT"``). 

468 """ 

469 group = parent.create_group(name) 

470 set_ascii_attr(group, ATTR_CLASS, hds_type) 

471 return group 

472 

473 

474def set_root_name(file: h5py.File, hds_name: str, hds_type: str) -> None: 

475 """Mark a file's root group as a top-level HDS structure. 

476 

477 Parameters 

478 ---------- 

479 file 

480 HDF5 file whose root group represents the HDS root object. 

481 hds_name 

482 HDS root object name. 

483 hds_type 

484 HDS type string for the root object. 

485 

486 Notes 

487 ----- 

488 Sets ``HDS_ROOT_NAME`` (the HDS object name) and ``CLASS`` (the HDS 

489 type) on the root group, matching what ``hds-v5`` writes for a root 

490 structure created via :c:func:`dat1New`. 

491 """ 

492 set_ascii_attr(file["/"], ATTR_ROOT_NAME, hds_name) 

493 set_ascii_attr(file["/"], ATTR_CLASS, hds_type) 

494 

495 

496def open_structure(parent: h5py.Group, name: str) -> tuple[h5py.Group, str]: 

497 """Open a child structure by name. Returns ``(group, hds_type)``. 

498 

499 Parameters 

500 ---------- 

501 parent 

502 HDF5 group containing the structure. 

503 name 

504 Name of the child structure. 

505 

506 Returns 

507 ------- 

508 group : h5py.Group 

509 Opened HDF5 group. 

510 hds_type : str 

511 HDS type string from the structure's ``CLASS`` attribute. 

512 

513 Notes 

514 ----- 

515 Raises ``ValueError`` if the child is not a group, or has no 

516 ``CLASS`` attribute. Accepts the legacy ``HDSTYPE`` attribute name 

517 as a fallback so files written by older HDS variants can still be 

518 inspected. 

519 """ 

520 obj = parent[name] 

521 if not isinstance(obj, h5py.Group): 

522 raise ValueError(f"{parent.name}/{name} is a dataset, not a structure.") 

523 hds_type = obj.attrs.get(ATTR_CLASS) 

524 if hds_type is None: 

525 # Legacy fallback for older HDS-on-HDF5 variants. 

526 hds_type = obj.attrs.get("HDSTYPE") 

527 if isinstance(hds_type, bytes): 

528 hds_type = hds_type.decode("ascii") 

529 if not isinstance(hds_type, str): 

530 raise ValueError(f"Group {obj.name!r} has no {ATTR_CLASS!r} (or legacy HDSTYPE) attribute.") 

531 return obj, hds_type 

532 

533 

534def iter_children(group: h5py.Group) -> Iterator[tuple[str, h5py.Group | h5py.Dataset]]: 

535 """Iterate over a structure's direct children. 

536 

537 Parameters 

538 ---------- 

539 group 

540 HDF5 group to inspect. 

541 

542 Yields ``(name, child)`` pairs where ``child`` is a group or dataset. 

543 """ 

544 yield from group.items()