Coverage for python / lsst / images / ndf / _model.py: 20%
297 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:30 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:30 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Python intermediate representation for NDF/HDS content."""
14from __future__ import annotations
16__all__ = (
17 "HdsExtension",
18 "HdsPrimitive",
19 "HdsStructure",
20 "Ndf",
21 "NdfArray",
22 "NdfContainer",
23 "NdfDocument",
24 "NdfQuality",
25 "NdfWcs",
26)
28from collections.abc import Iterable, Mapping, Sequence
29from dataclasses import dataclass, field
30from typing import Any, Self, cast
32import h5py
33import numpy as np
35from . import _hds
38def _decode_ascii_attr(value: Any) -> str | None:
39 """Decode an HDS ASCII attribute value from h5py."""
40 if value is None:
41 return None
42 if isinstance(value, bytes):
43 return value.decode("ascii")
44 if isinstance(value, np.bytes_):
45 return bytes(value).decode("ascii")
46 if isinstance(value, str):
47 return value
48 return str(value)
51@dataclass
52class HdsPrimitive:
53 """An HDS primitive component.
55 Parameters
56 ----------
57 data
58 Numeric/logical array data, or an open HDF5 dataset when this
59 primitive was read lazily from an existing file.
60 char_lines
61 Character data for ``_CHAR*N`` primitives. A scalar string is
62 represented as a one-element list with ``char_scalar=True``.
63 char_width
64 Fixed HDS character width for ``char_lines``.
65 is_char_scalar
66 If `True`, write ``char_lines`` as a scalar ``_CHAR*N`` primitive.
67 Otherwise write them as a one-dimensional character array.
68 compression_options
69 Optional h5py compression options for numeric array data.
70 """
72 data: np.ndarray | h5py.Dataset | None = None
73 char_lines: list[str] | None = None
74 char_width: int | None = None
75 is_char_scalar: bool = False
76 compression_options: dict[str, Any] = field(default_factory=dict)
78 @classmethod
79 def array(
80 cls,
81 data: np.ndarray | h5py.Dataset,
82 *,
83 compression_options: Mapping[str, Any] | None = None,
84 ) -> Self:
85 """Create a numeric/logical HDS primitive."""
86 return cls(
87 data=data if isinstance(data, h5py.Dataset) else np.asarray(data),
88 compression_options=dict(compression_options) if compression_options else {},
89 )
91 @classmethod
92 def char_array(cls, lines: Sequence[str], *, width: int = 80) -> Self:
93 """Create a one-dimensional HDS ``_CHAR*N`` primitive."""
94 return cls(char_lines=list(lines), char_width=width)
96 @classmethod
97 def char_scalar(cls, text: str, *, width: int | None = None) -> Self:
98 """Create a scalar HDS ``_CHAR*N`` primitive."""
99 encoded = text.encode("ascii")
100 return cls(char_lines=[text], char_width=max(width or 0, len(encoded), 1), is_char_scalar=True)
102 @classmethod
103 def from_hdf5(cls, dataset: h5py.Dataset) -> Self:
104 """Build an HDS primitive model from an open HDF5 dataset."""
105 if dataset.dtype.kind == "S":
106 if dataset.ndim == 0:
107 raw = dataset[()]
108 if isinstance(raw, np.bytes_):
109 raw = bytes(raw)
110 assert isinstance(raw, bytes)
111 return cls(
112 char_lines=[raw.decode("ascii").rstrip(" ")],
113 char_width=dataset.dtype.itemsize,
114 is_char_scalar=True,
115 )
116 return cls(
117 char_lines=_hds.read_char_array(dataset),
118 char_width=dataset.dtype.itemsize,
119 is_char_scalar=False,
120 )
121 return cls(data=dataset)
123 def read_array(self) -> np.ndarray:
124 """Return this primitive as a numpy array."""
125 if self.data is None:
126 raise TypeError("Character HDS primitives cannot be read as numeric arrays.")
127 if isinstance(self.data, h5py.Dataset):
128 return _hds.read_array(self.data)
129 return self.data
131 def read_char_array(self) -> list[str]:
132 """Return this primitive as stripped ASCII strings."""
133 if self.char_lines is not None:
134 return list(self.char_lines)
135 if isinstance(self.data, h5py.Dataset) and self.data.dtype.kind == "S":
136 if self.data.ndim == 0:
137 raw = self.data[()]
138 if isinstance(raw, np.bytes_):
139 raw = bytes(raw)
140 assert isinstance(raw, bytes)
141 return [raw.decode("ascii").rstrip(" ")]
142 return _hds.read_char_array(self.data)
143 raise TypeError("Numeric HDS primitives cannot be read as character arrays.")
145 def write_to_hdf5(self, parent: h5py.Group, name: str) -> h5py.Dataset:
146 """Write this primitive to an HDF5 group."""
147 if name in parent:
148 del parent[name]
149 if self.char_lines is not None:
150 width = self.char_width
151 if width is None:
152 width = max((len(line.encode("ascii")) for line in self.char_lines), default=1)
153 if self.is_char_scalar:
154 if len(self.char_lines) != 1:
155 raise ValueError("Scalar _CHAR*N primitives require exactly one string.")
156 line = self.char_lines[0]
157 encoded = line.encode("ascii")
158 if len(encoded) > width:
159 raise ValueError(
160 f"Scalar _CHAR*N primitive {name!r} is {len(encoded)} bytes, "
161 f"longer than width={width}."
162 )
163 return parent.create_dataset(name, data=np.bytes_(encoded.ljust(width)))
164 return _hds.write_char_array(parent, name, self.char_lines, width=width)
165 data = self.read_array()
166 return _hds.write_array(
167 parent,
168 name,
169 data,
170 compression=self.compression_options.get("compression"),
171 compression_opts=self.compression_options.get("compression_opts"),
172 )
175class HdsStructure:
176 """An HDS structure component with named child components."""
178 def __init__(
179 self,
180 hds_type: str,
181 children: Mapping[str, HdsStructure | HdsPrimitive] | None = None,
182 ) -> None:
183 self.hds_type = hds_type
184 self.children: dict[str, HdsStructure | HdsPrimitive] = dict(children) if children else {}
186 @classmethod
187 def from_hdf5(cls, group: h5py.Group) -> HdsStructure:
188 """Build a structure model from an open HDF5 group."""
189 hds_type = _decode_ascii_attr(group.attrs.get(_hds.ATTR_CLASS))
190 if hds_type is None:
191 hds_type = _decode_ascii_attr(group.attrs.get("HDSTYPE")) or "EXT"
192 structure = _new_structure(hds_type)
193 for name, child in group.items():
194 if isinstance(child, h5py.Group):
195 structure.children[name] = cls.from_hdf5(child)
196 elif isinstance(child, h5py.Dataset):
197 structure.children[name] = HdsPrimitive.from_hdf5(child)
198 return structure
200 def __contains__(self, path: str) -> bool:
201 try:
202 self.get(path)
203 except KeyError:
204 return False
205 return True
207 def __getitem__(self, path: str) -> HdsStructure | HdsPrimitive:
208 return self.get(path)
210 def items(self) -> Iterable[tuple[str, HdsStructure | HdsPrimitive]]:
211 """Iterate over direct child components."""
212 return self.children.items()
214 def get(self, path: str) -> HdsStructure | HdsPrimitive:
215 """Return a child component by relative or absolute path."""
216 if path in ("", "/"):
217 return self
218 cursor: HdsStructure | HdsPrimitive = self
219 for part in _split_path(path):
220 if not isinstance(cursor, HdsStructure):
221 raise KeyError(path)
222 cursor = cursor.children[part]
223 return cursor
225 def get_structure(self, path: str) -> HdsStructure:
226 """Return a child structure by relative or absolute path."""
227 node = self.get(path)
228 if not isinstance(node, HdsStructure):
229 raise KeyError(f"{path!r} is an HDS primitive, not a structure.")
230 return node
232 def set(self, path: str, node: HdsStructure | HdsPrimitive) -> None:
233 """Set a child component by relative or absolute path."""
234 parts = _split_path(path)
235 if not parts:
236 raise ValueError("Cannot replace an HDS structure with itself.")
237 parent = self.ensure_structure("/".join(parts[:-1]), "EXT")
238 parent.children[parts[-1]] = node
240 def delete(self, path: str) -> None:
241 """Delete a child component if it exists."""
242 parts = _split_path(path)
243 if not parts:
244 raise ValueError("Cannot delete an HDS structure root.")
245 parent = self.get_structure("/".join(parts[:-1]))
246 parent.children.pop(parts[-1], None)
248 def ensure_structure(self, path: str, hds_type: str | None = None) -> HdsStructure:
249 """Return an existing structure or create it and its parents.
251 In HDS each structure carries a type tag distinct from its name.
252 Newly-created structures here default to having the part name as
253 their type (e.g. ``/MORE/LSST/PSF`` → types ``EXT``, ``LSST``,
254 ``PSF``). The well-known NDF extension container ``MORE`` is the
255 sole exception and defaults to ``EXT``. Pass ``hds_type`` to
256 override the type on the leaf component only; existing structures
257 retain their type unless overridden.
258 """
259 if path in ("", "/"):
260 return self
261 cursor: HdsStructure = self
262 parts = _split_path(path)
263 for index, part in enumerate(parts):
264 existing = cursor.children.get(part)
265 is_leaf = index == len(parts) - 1
266 if existing is None:
267 if is_leaf and hds_type is not None:
268 child_type = hds_type
269 else:
270 child_type = _default_hds_type_for_name(part)
271 existing = _new_structure(child_type)
272 cursor.children[part] = existing
273 if not isinstance(existing, HdsStructure):
274 raise KeyError(f"{part!r} already exists as an HDS primitive.")
275 cursor = existing
276 if hds_type is not None and cursor.hds_type != hds_type:
277 cursor.hds_type = hds_type
278 return cursor
280 def write_to_hdf5(self, parent: h5py.Group, name: str | None = None) -> h5py.Group:
281 """Write this structure to an HDF5 group."""
282 if name is None:
283 group = parent
284 _clear_hdf5_group(group)
285 _hds.set_ascii_attr(group, _hds.ATTR_CLASS, self.hds_type)
286 else:
287 if name in parent:
288 del parent[name]
289 group = _hds.create_structure(parent, name, self.hds_type)
290 for child_name, child in self.children.items():
291 if isinstance(child, HdsStructure):
292 child.write_to_hdf5(group, child_name)
293 else:
294 child.write_to_hdf5(group, child_name)
295 return group
298class HdsExtension(HdsStructure):
299 """A general-purpose HDS extension structure."""
301 def __init__(self, children: Mapping[str, HdsStructure | HdsPrimitive] | None = None) -> None:
302 super().__init__("EXT", children)
305class NdfContainer(HdsStructure):
306 """A top-level HDS container for multiple NDFs and shared metadata."""
308 def __init__(self, children: Mapping[str, HdsStructure | HdsPrimitive] | None = None) -> None:
309 super().__init__("EXT", children)
311 def ensure_ndf(self, path: str) -> Ndf:
312 """Return or create a child NDF at ``path``."""
313 structure = self.ensure_structure(path, "NDF")
314 if not isinstance(structure, Ndf):
315 structure.__class__ = Ndf
316 return cast(Ndf, structure)
319@dataclass
320class NdfArray:
321 """An NDF ARRAY component."""
323 data: np.ndarray | h5py.Dataset
324 origin: np.ndarray | Sequence[int] | None = None
325 bad_pixel: bool | None = None
326 compression_options: dict[str, Any] = field(default_factory=dict)
328 def to_hds_structure(self) -> HdsStructure:
329 """Convert this array component to an HDS ``ARRAY`` structure."""
330 structure = HdsStructure("ARRAY")
331 structure.children["DATA"] = HdsPrimitive.array(
332 self.data,
333 compression_options=self.compression_options,
334 )
335 if self.origin is not None:
336 structure.children["ORIGIN"] = HdsPrimitive.array(np.asarray(self.origin))
337 if self.bad_pixel is not None:
338 structure.children["BAD_PIXEL"] = HdsPrimitive.array(np.array(self.bad_pixel, dtype=np.bool_))
339 return structure
341 @classmethod
342 def from_hds_structure(cls, structure: HdsStructure) -> Self:
343 """Build an `NdfArray` facade from an HDS ``ARRAY`` structure."""
344 data = structure.children["DATA"]
345 if not isinstance(data, HdsPrimitive):
346 raise TypeError("ARRAY.DATA must be an HDS primitive.")
347 origin: np.ndarray | None = None
348 if (origin_node := structure.children.get("ORIGIN")) is not None:
349 if not isinstance(origin_node, HdsPrimitive):
350 raise TypeError("ARRAY.ORIGIN must be an HDS primitive.")
351 origin = origin_node.read_array()
352 bad_pixel: bool | None = None
353 if (bad_pixel_node := structure.children.get("BAD_PIXEL")) is not None:
354 if not isinstance(bad_pixel_node, HdsPrimitive):
355 raise TypeError("ARRAY.BAD_PIXEL must be an HDS primitive.")
356 bad_pixel = bool(np.asarray(bad_pixel_node.read_array()).reshape(-1)[0])
357 array_data = data.data if data.data is not None else data.read_array()
358 return cls(data=array_data, origin=origin, bad_pixel=bad_pixel)
361@dataclass
362class NdfQuality:
363 """An NDF QUALITY component."""
365 quality: NdfArray
366 badbits: int = 255
368 def to_hds_structure(self) -> HdsStructure:
369 """Convert this quality component to an HDS ``QUALITY`` structure."""
370 structure = HdsStructure("QUALITY")
371 structure.children["BADBITS"] = HdsPrimitive.array(np.array(self.badbits, dtype=np.uint8))
372 structure.children["QUALITY"] = self.quality.to_hds_structure()
373 return structure
376@dataclass
377class NdfWcs:
378 """An NDF WCS component represented by encoded AST channel records."""
380 lines: list[str]
381 width: int = _hds.NDF_AST_DATA_WIDTH
383 def to_hds_structure(self) -> HdsStructure:
384 """Convert this WCS component to an HDS ``WCS`` structure."""
385 structure = HdsStructure("WCS")
386 structure.children["DATA"] = HdsPrimitive.char_array(self.lines, width=self.width)
387 return structure
390class Ndf(HdsStructure):
391 """An NDF structure with convenience accessors for standard components."""
393 def __init__(self, children: Mapping[str, HdsStructure | HdsPrimitive] | None = None) -> None:
394 super().__init__("NDF", children)
396 def set_array_component(
397 self,
398 name: str,
399 data: np.ndarray | h5py.Dataset,
400 *,
401 origin: Sequence[int] | np.ndarray | None = None,
402 bad_pixel: bool | None = None,
403 compression_options: Mapping[str, Any] | None = None,
404 ) -> None:
405 """Set an NDF array-like component such as ``DATA_ARRAY``."""
406 self.children[name] = NdfArray(
407 data,
408 origin=origin,
409 bad_pixel=bad_pixel,
410 compression_options=dict(compression_options) if compression_options else {},
411 ).to_hds_structure()
413 def set_quality(self, quality: NdfQuality) -> None:
414 """Set the NDF ``QUALITY`` component."""
415 self.children["QUALITY"] = quality.to_hds_structure()
417 def set_wcs(self, wcs: NdfWcs) -> None:
418 """Set the NDF ``WCS`` component."""
419 self.children["WCS"] = wcs.to_hds_structure()
421 def set_units(self, units: str | None) -> None:
422 """Set or remove the NDF ``UNITS`` component."""
423 if units is None:
424 self.children.pop("UNITS", None)
425 else:
426 self.children["UNITS"] = HdsPrimitive.char_scalar(units)
428 def get_units(self) -> str | None:
429 """Return the NDF ``UNITS`` component, if present."""
430 node = self.children.get("UNITS")
431 if node is None:
432 return None
433 if not isinstance(node, HdsPrimitive):
434 raise TypeError("NDF.UNITS must be an HDS primitive.")
435 lines = node.read_char_array()
436 return lines[0] if lines else ""
438 def ensure_lsst_extension(self, *, base_path: str = "MORE/LSST") -> HdsStructure:
439 """Return or create the LSST extension structure for this NDF."""
440 return self.ensure_structure(base_path, "EXT")
443@dataclass
444class NdfDocument:
445 """A complete HDS root object containing one NDF or an NDF container."""
447 root: Ndf | NdfContainer = field(default_factory=Ndf)
448 root_name: str | None = None
450 @classmethod
451 def from_hdf5(cls, file: h5py.File) -> Self:
452 """Read an NDF document model from an open HDF5 file."""
453 root = HdsStructure.from_hdf5(file["/"])
454 typed_root: Ndf | NdfContainer
455 if isinstance(root, Ndf | NdfContainer):
456 typed_root = root
457 elif root.hds_type == "NDF":
458 root.__class__ = Ndf
459 typed_root = cast(Ndf, root)
460 else:
461 root.__class__ = NdfContainer
462 typed_root = cast(NdfContainer, root)
463 return cls(root=typed_root, root_name=_decode_ascii_attr(file["/"].attrs.get(_hds.ATTR_ROOT_NAME)))
465 def write_to_hdf5(self, file: h5py.File) -> None:
466 """Write this document to an open HDF5 file."""
467 self.root.write_to_hdf5(file["/"])
468 if self.root_name is not None:
469 _hds.set_root_name(file, self.root_name, self.root.hds_type)
471 def ensure_ndf(self, path: str = "/") -> Ndf:
472 """Return or create an NDF at the requested absolute path."""
473 if path in ("", "/"):
474 if not isinstance(self.root, Ndf):
475 raise TypeError("The document root is an NDF container, not an NDF.")
476 return self.root
477 if isinstance(self.root, NdfContainer):
478 return self.root.ensure_ndf(path)
479 structure = self.root.ensure_structure(path, "NDF")
480 if not isinstance(structure, Ndf):
481 structure.__class__ = Ndf
482 return cast(Ndf, structure)
484 def get(self, path: str) -> HdsStructure | HdsPrimitive:
485 """Return a component by absolute path."""
486 return self.root.get(path)
489def _new_structure(hds_type: str) -> HdsStructure:
490 """Create a typed HDS structure model for an HDS type."""
491 if hds_type == "NDF":
492 return Ndf()
493 if hds_type == "EXT":
494 return HdsExtension()
495 return HdsStructure(hds_type)
498def _default_hds_type_for_name(name: str) -> str:
499 """Return the HDS type to use for a structure with the given name.
501 Mirrors the convention in real NDF files where each named structure
502 carries a type tag describing what it is. ``MORE`` is the standard
503 NDF extension container and is always ``EXT``; every other name is
504 used verbatim as its own type.
505 """
506 return "EXT" if name == "MORE" else name
509def _split_path(path: str) -> list[str]:
510 """Split an HDS/HDF5 path into component names."""
511 return [part for part in path.strip("/").split("/") if part]
514def _clear_hdf5_group(group: h5py.Group) -> None:
515 """Remove children and HDS root attributes before rewriting a group."""
516 for name in list(group.keys()):
517 del group[name]
518 for name in (_hds.ATTR_CLASS, _hds.ATTR_ROOT_NAME, _hds.ATTR_STRUCTURE_DIMS, "HDSTYPE"):
519 if name in group.attrs:
520 del group.attrs[name]