Coverage for python/lsst/images/ndf/_hds.py: 14%
131 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 08:25 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 08:25 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""HDS-on-HDF5 read/write helpers.
14These follow the conventions used by the canonical Starlink ``hds-v5``
15library (see ``reference/hds-v5/dat1.h`` and ``dat1New.c``):
17* HDS structures are HDF5 groups with a ``CLASS`` attribute holding the
18 HDS type string (``"NDF"``, ``"WCS"``, ``"EXT"``, ``"ARRAY"``, ...).
19* Arrays of structures additionally carry an ``HDS_STRUCTURE_DIMS``
20 attribute (deferred from v1; we only handle scalar structures).
21* The file root group, when it represents a top-level HDS structure,
22 carries an ``HDS_ROOT_NAME`` attribute giving the HDS object name.
23* HDS primitives are bare HDF5 datasets with no HDS-specific attributes;
24 the HDS type is inferred from the HDF5 dtype.
25"""
27from __future__ import annotations
29from collections.abc import Iterator, Sequence
30from typing import Any
32import h5py
33import numpy as np
35__all__ = (
36 "ATTR_CLASS",
37 "ATTR_ROOT_NAME",
38 "ATTR_STRUCTURE_DIMS",
39 "HDS_TO_NUMPY",
40 "NUMPY_TO_HDS",
41 "create_structure",
42 "decode_ndf_ast_data",
43 "encode_ndf_ast_data",
44 "hds_type_for_dtype",
45 "iter_children",
46 "open_structure",
47 "read_array",
48 "read_char_array",
49 "set_ascii_attr",
50 "set_root_name",
51 "write_array",
52 "write_char_array",
53)
56# Canonical attribute names used by hds-v5 (see reference/hds-v5/dat1.h).
57ATTR_CLASS = "CLASS"
58ATTR_STRUCTURE_DIMS = "HDS_STRUCTURE_DIMS"
59ATTR_ROOT_NAME = "HDS_ROOT_NAME"
62HDS_TO_NUMPY: dict[str, np.dtype] = {
63 "_LOGICAL": np.dtype(np.bool_),
64 "_REAL": np.dtype(np.float32),
65 "_DOUBLE": np.dtype(np.float64),
66 "_UBYTE": np.dtype(np.uint8),
67 "_WORD": np.dtype(np.int16),
68 "_INTEGER": np.dtype(np.int32),
69 "_INT64": np.dtype(np.int64),
70}
72NUMPY_TO_HDS: dict[np.dtype, str] = {
73 np.dtype(np.bool_): "_LOGICAL",
74 np.dtype(np.float32): "_REAL",
75 np.dtype(np.float64): "_DOUBLE",
76 np.dtype(np.uint8): "_UBYTE",
77 np.dtype(np.int16): "_WORD",
78 np.dtype(np.int32): "_INTEGER",
79 np.dtype(np.int64): "_INT64",
80}
83NDF_AST_DATA_WIDTH = 32
84NDF_AST_DATA_MIN_WIDTH = 16
87def hds_type_for_dtype(dtype: np.dtype) -> str:
88 """Return the HDS type string for a numpy dtype.
90 Parameters
91 ----------
92 dtype
93 Numpy dtype to map to an HDS primitive type.
95 Returns
96 -------
97 str
98 HDS primitive type string.
100 Raises
101 ------
102 NotImplementedError
103 Raised if ``dtype`` does not map to a supported HDS primitive type.
105 Notes
106 -----
107 Fixed-width byte strings ``|S<N>`` map to ``"_CHAR*<N>"``. Numeric
108 dtypes are looked up in `NUMPY_TO_HDS`. Anything else raises
109 ``NotImplementedError``.
110 """
111 if dtype.kind == "S":
112 return f"_CHAR*{dtype.itemsize}"
113 try:
114 return NUMPY_TO_HDS[dtype]
115 except KeyError:
116 raise NotImplementedError(f"No HDS type mapping for dtype {dtype!r}.") from None
119def write_array(
120 parent: h5py.Group,
121 name: str,
122 data: np.ndarray,
123 *,
124 compression: str | None = None,
125 compression_opts: int | None = None,
126) -> h5py.Dataset:
127 """Write a numpy C-order array as an HDS primitive.
129 Parameters
130 ----------
131 parent
132 HDF5 group to receive the dataset.
133 name
134 Name of the primitive dataset to create.
135 data
136 Array to write.
137 compression
138 Optional compression algorithm accepted by `h5py.Group.create_dataset`.
139 compression_opts
140 Optional compression options accepted by `h5py.Group.create_dataset`.
142 Returns
143 -------
144 h5py.Dataset
145 Newly-created dataset.
147 Notes
148 -----
149 The HDF5 dataset carries no HDS-specific attributes; the HDS type
150 is inferred on read from the HDF5 dtype. Refuses dtypes that don't
151 map to a supported HDS primitive type.
153 The HDF5 dataset has the array's natural shape (C-order). Combined
154 with HDF5's native byte ordering, this matches the Fortran-on-disk
155 layout required by HDS for an NDF whose Fortran-order shape is the
156 reverse of ``data.shape``.
157 """
158 # Validate the dtype is supported up front so callers get a clear error.
159 hds_type_for_dtype(data.dtype)
160 if data.dtype == np.dtype(np.bool_):
161 return _write_logical_array(parent, name, data, compression=compression)
162 # h5py rejects compression options if no compression algorithm is set.
163 kwargs: dict[str, Any] = {}
164 if compression is not None:
165 kwargs["compression"] = compression
166 if compression_opts is not None:
167 kwargs["compression_opts"] = compression_opts
168 return parent.create_dataset(name, data=data, **kwargs)
171def _write_logical_array(
172 parent: h5py.Group,
173 name: str,
174 data: np.ndarray,
175 *,
176 compression: str | None = None,
177) -> h5py.Dataset:
178 """Write an HDS ``_LOGICAL`` primitive using the HDF5 bitfield type.
180 Parameters
181 ----------
182 parent
183 HDF5 group to receive the dataset.
184 name
185 Name of the primitive dataset to create.
186 data
187 Boolean array to write.
188 compression
189 Compression is not supported for this low-level bitfield writer.
191 Returns
192 -------
193 h5py.Dataset
194 Newly-created dataset.
196 Notes
197 -----
198 High-level h5py writes numpy bool data as an HDF5 enum, but hds-v5
199 identifies ``_LOGICAL`` primitives by the HDF5 bitfield class.
200 """
201 if compression is not None:
202 raise NotImplementedError("Compression is not implemented for HDS _LOGICAL arrays.")
203 logical_data = np.asarray(data, dtype=np.uint8)
204 if logical_data.shape:
205 space = h5py.h5s.create_simple(logical_data.shape)
206 else:
207 space = h5py.h5s.create(h5py.h5s.SCALAR)
208 dataset_id = h5py.h5d.create(
209 parent.id,
210 name.encode("ascii"),
211 h5py.h5t.STD_B8LE,
212 space,
213 )
214 dataset_id.write(
215 h5py.h5s.ALL,
216 h5py.h5s.ALL,
217 logical_data,
218 mtype=h5py.h5t.NATIVE_B8,
219 )
220 dataset_id.close()
221 return parent[name]
224def read_array(dataset: h5py.Dataset) -> np.ndarray:
225 """Read an HDS primitive into a C-order numpy array.
227 Parameters
228 ----------
229 dataset
230 HDF5 dataset to read.
232 Returns
233 -------
234 numpy.ndarray
235 Data read from ``dataset``.
237 Notes
238 -----
239 The HDS type is inferred from the HDF5 dtype. Raises
240 ``NotImplementedError`` if the dtype is not a supported numeric HDS
241 primitive type. Use `read_char_array` for ``_CHAR*N`` datasets.
242 """
243 if dataset.dtype.kind == "S":
244 raise ValueError(f"Dataset {dataset.name!r} is _CHAR*N; use read_char_array instead.")
245 dataset_type = dataset.id.get_type()
246 if dataset_type.get_class() == h5py.h5t.BITFIELD:
247 if dataset_type.get_size() not in {1, 4}:
248 raise NotImplementedError(
249 f"Dataset {dataset.name!r} has bitfield size {dataset_type.get_size()} "
250 "which does not map to HDS _LOGICAL."
251 )
252 data = dataset[()] != 0
253 if isinstance(data, np.ndarray):
254 return data.astype(np.bool_)
255 return np.atleast_1d(np.bool_(data))
256 if dataset.dtype not in NUMPY_TO_HDS:
257 raise NotImplementedError(
258 f"Dataset {dataset.name!r} has dtype {dataset.dtype} which does not "
259 f"map to a supported HDS primitive type."
260 )
261 return dataset[()]
264def write_char_array(
265 parent: h5py.Group,
266 name: str,
267 lines: Sequence[str],
268 *,
269 width: int = 80,
270) -> h5py.Dataset:
271 """Write a sequence of strings as a 1D HDS ``_CHAR*N`` primitive.
273 Parameters
274 ----------
275 parent
276 HDF5 group to receive the dataset.
277 name
278 Name of the primitive dataset to create.
279 lines
280 Strings to write. Each string must be ASCII and no longer than
281 ``width`` characters.
282 width
283 Fixed width of each HDS character-array element.
285 Returns
286 -------
287 h5py.Dataset
288 Newly-created dataset.
290 Raises
291 ------
292 ValueError
293 Raised if any string is not ASCII or is longer than ``width``.
295 Notes
296 -----
297 Each string is padded to ``width`` with trailing spaces (HDS
298 convention). The HDF5 dataset has dtype ``|S<width>``; no HDS-specific
299 attributes are written.
300 """
301 encoded_lines: list[bytes] = []
302 for n, line in enumerate(lines):
303 try:
304 encoded_line = line.encode("ascii")
305 except UnicodeEncodeError as err:
306 raise ValueError(f"Line {n} for {name!r} is not ASCII.") from err
307 if len(encoded_line) > width:
308 raise ValueError(
309 f"Line {n} for {name!r} is {len(encoded_line)} bytes, longer than width={width}."
310 )
311 encoded_lines.append(encoded_line.ljust(width))
312 encoded = np.array(
313 encoded_lines,
314 dtype=f"|S{width}",
315 )
316 return parent.create_dataset(name, data=encoded)
319def encode_ndf_ast_data(text: str, *, width: int = NDF_AST_DATA_WIDTH) -> list[str]:
320 """Encode AST Channel text for an NDF ``WCS.DATA`` component.
322 Parameters
323 ----------
324 text
325 Text emitted by an AST Channel.
326 width
327 Fixed width of the target NDF ``WCS.DATA`` records.
329 Returns
330 -------
331 list [str]
332 HDS character-array records.
334 Notes
335 -----
336 Starlink NDF stores each AST text line in one or more fixed-width
337 ``_CHAR*32`` records. The first character of each record is a flag:
338 a space starts a new AST line and ``+`` continues the previous one.
339 The payload is the AST text line with leading indentation removed.
340 """
341 if width < NDF_AST_DATA_MIN_WIDTH:
342 raise ValueError(
343 f"NDF AST DATA record width {width} is too short; minimum is {NDF_AST_DATA_MIN_WIDTH}."
344 )
346 records: list[str] = []
347 payload_width = width - 1
348 for raw_line in text.splitlines():
349 line = raw_line.lstrip(" ").rstrip(" ")
350 if not line:
351 continue
352 for start in range(0, len(line), payload_width):
353 flag = " " if start == 0 else "+"
354 records.append(f"{flag}{line[start : start + payload_width]}")
355 return records
358def decode_ndf_ast_data(records: Sequence[str]) -> str:
359 """Decode an NDF ``WCS.DATA`` component into AST Channel text.
361 Parameters
362 ----------
363 records
364 HDS character-array records from ``WCS.DATA``.
366 Returns
367 -------
368 str
369 AST Channel text.
371 Notes
372 -----
373 This reverses `encode_ndf_ast_data`. If the input does not look like
374 NDF AST records, it is treated as plain AST Channel text for backward
375 compatibility with earlier non-canonical files.
376 """
377 if not records:
378 return ""
379 if any(record and record[0] not in {" ", "+"} for record in records):
380 return "\n".join(records) + "\n"
382 lines: list[str] = []
383 current: list[str] = []
384 for record in records:
385 if not record:
386 continue
387 flag = record[0]
388 payload = record[1:]
389 if flag == "+":
390 if current:
391 current.append(payload)
392 else:
393 current = [payload]
394 else:
395 if current:
396 lines.append("".join(current).rstrip(" "))
397 current = [payload]
398 if current:
399 lines.append("".join(current).rstrip(" "))
400 return "\n".join(lines) + ("\n" if lines else "")
403def read_char_array(dataset: h5py.Dataset) -> list[str]:
404 """Read an HDS ``_CHAR*N`` 1D primitive as a list of stripped strings.
406 Parameters
407 ----------
408 dataset
409 HDF5 dataset to read.
411 Returns
412 -------
413 list [str]
414 Dataset elements decoded from ASCII with trailing spaces stripped.
416 Notes
417 -----
418 Validates the dataset has a fixed-width byte-string dtype (``|S<N>``).
419 """
420 if dataset.dtype.kind != "S":
421 raise ValueError(f"Dataset {dataset.name!r} is not _CHAR*N (dtype {dataset.dtype}).")
422 if dataset.ndim == 0:
423 raise ValueError(f"Dataset {dataset.name!r} is a scalar _CHAR*N; only 1-D arrays are supported.")
424 raw = dataset[()]
425 return [item.decode("ascii").rstrip(" ") for item in raw]
428def set_ascii_attr(target: h5py.Group | h5py.Dataset, name: str, value: str) -> None:
429 """Write a fixed-length ASCII byte attribute.
431 Parameters
432 ----------
433 target
434 HDF5 object whose attribute should be set.
435 name
436 Attribute name.
437 value
438 ASCII value to write.
440 Notes
441 -----
442 Canonical ``hds-v5`` stores ``CLASS`` and ``HDS_ROOT_NAME`` as
443 fixed-length ASCII byte strings (e.g. ``|S5`` for ``"ARRAY"``).
444 h5py's default for Python ``str`` is variable-length UTF-8, which
445 Starlink tools (KAPPA, ``hdstrace``) can't decode — they show
446 garbage in the type-tag column. Writing as fixed-length bytes
447 matches the canonical layout.
448 """
449 encoded = value.encode("ascii")
450 if name in target.attrs:
451 del target.attrs[name]
452 target.attrs.create(name, encoded, dtype=f"|S{len(encoded)}")
455def create_structure(parent: h5py.Group, name: str, hds_type: str) -> h5py.Group:
456 """Create a named HDS structure (h5py group with ``CLASS`` attribute).
458 Parameters
459 ----------
460 parent
461 Group to create the new structure under.
462 name
463 Component name (HDS rules apply: uppercase letters/digits/underscores,
464 max 15 characters; not enforced here).
465 hds_type
466 HDS type string for the new structure (e.g. ``"NDF"``, ``"WCS"``,
467 ``"ARRAY"``, ``"EXT"``).
468 """
469 group = parent.create_group(name)
470 set_ascii_attr(group, ATTR_CLASS, hds_type)
471 return group
474def set_root_name(file: h5py.File, hds_name: str, hds_type: str) -> None:
475 """Mark a file's root group as a top-level HDS structure.
477 Parameters
478 ----------
479 file
480 HDF5 file whose root group represents the HDS root object.
481 hds_name
482 HDS root object name.
483 hds_type
484 HDS type string for the root object.
486 Notes
487 -----
488 Sets ``HDS_ROOT_NAME`` (the HDS object name) and ``CLASS`` (the HDS
489 type) on the root group, matching what ``hds-v5`` writes for a root
490 structure created via :c:func:`dat1New`.
491 """
492 set_ascii_attr(file["/"], ATTR_ROOT_NAME, hds_name)
493 set_ascii_attr(file["/"], ATTR_CLASS, hds_type)
496def open_structure(parent: h5py.Group, name: str) -> tuple[h5py.Group, str]:
497 """Open a child structure by name. Returns ``(group, hds_type)``.
499 Parameters
500 ----------
501 parent
502 HDF5 group containing the structure.
503 name
504 Name of the child structure.
506 Returns
507 -------
508 group : h5py.Group
509 Opened HDF5 group.
510 hds_type : str
511 HDS type string from the structure's ``CLASS`` attribute.
513 Notes
514 -----
515 Raises ``ValueError`` if the child is not a group, or has no
516 ``CLASS`` attribute. Accepts the legacy ``HDSTYPE`` attribute name
517 as a fallback so files written by older HDS variants can still be
518 inspected.
519 """
520 obj = parent[name]
521 if not isinstance(obj, h5py.Group):
522 raise ValueError(f"{parent.name}/{name} is a dataset, not a structure.")
523 hds_type = obj.attrs.get(ATTR_CLASS)
524 if hds_type is None:
525 # Legacy fallback for older HDS-on-HDF5 variants.
526 hds_type = obj.attrs.get("HDSTYPE")
527 if isinstance(hds_type, bytes):
528 hds_type = hds_type.decode("ascii")
529 if not isinstance(hds_type, str):
530 raise ValueError(f"Group {obj.name!r} has no {ATTR_CLASS!r} (or legacy HDSTYPE) attribute.")
531 return obj, hds_type
534def iter_children(group: h5py.Group) -> Iterator[tuple[str, h5py.Group | h5py.Dataset]]:
535 """Iterate over a structure's direct children.
537 Parameters
538 ----------
539 group
540 HDF5 group to inspect.
542 Yields ``(name, child)`` pairs where ``child`` is a group or dataset.
543 """
544 yield from group.items()