Coverage for python / lsst / images / ndf / _input_archive.py: 14%
258 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:57 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:57 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("NdfInputArchive", "read")
16import logging
17from collections.abc import Callable, Iterator
18from contextlib import contextmanager
19from types import EllipsisType
20from typing import Any, Self
22import astropy.io.fits
23import astropy.table
24import astropy.units as u
25import h5py
26import numpy as np
28from lsst.resources import ResourcePath, ResourcePathExpression
30from .._geom import Box
31from .._image import Image
32from .._mask import Mask, MaskPlane, MaskSchema
33from .._masked_image import MaskedImage
34from .._transforms import FrameSet, Projection
35from .._transforms import _ast as astshim
36from .._transforms._frames import GeneralFrame
37from ..fits._common import FitsOpaqueMetadata
38from ..serialization import (
39 ArchiveReadError,
40 ArchiveTree,
41 ArrayReferenceModel,
42 InlineArrayModel,
43 InputArchive,
44 ReadResult,
45 TableModel,
46 no_header_updates,
47)
48from . import _hds
49from ._common import NdfPointerModel
50from ._model import HdsPrimitive, NdfDocument
52_LOG = logging.getLogger(__name__)
55class NdfInputArchive(InputArchive[NdfPointerModel]):
56 """Reads HDS-on-HDF5 NDF files written by `NdfOutputArchive`.
58 Instances should only be constructed via the :meth:`open` context
59 manager.
61 Parameters
62 ----------
63 file
64 Open `h5py.File` handle. Owned by the caller of :meth:`open`;
65 the archive does not close it.
66 """
68 def __init__(self, file: h5py.File) -> None:
69 self._file = file
70 self._document = NdfDocument.from_hdf5(file)
71 self._opaque_metadata = FitsOpaqueMetadata()
72 self._deserialized_pointer_cache: dict[str, Any] = {}
73 self._frame_set_cache: dict[str, FrameSet] = {}
74 self._read_opaque_fits_metadata()
76 @classmethod
77 @contextmanager
78 def open(cls, path: ResourcePathExpression) -> Iterator[Self]:
79 """Open an NDF file for reading and yield an `NdfInputArchive`.
81 Remote ResourcePaths are materialised locally first; fsspec-direct
82 h5py reads are a deferred follow-up.
83 """
84 rp = ResourcePath(path)
85 with rp.as_local() as local:
86 with h5py.File(local.ospath, "r") as f:
87 yield cls(f)
89 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T:
90 """Read and validate the main Pydantic tree at ``/MORE/LSST/JSON``."""
91 json_path = self._get_main_json_path()
92 if json_path is None:
93 raise ArchiveReadError(
94 "File has no /MORE/LSST/JSON tree; this is either a "
95 "Starlink-only NDF (use ndf.read() with auto-detect) or "
96 "the file was written by an unrelated tool."
97 )
98 json_text = _read_json_record(self._get_primitive(json_path), json_path)
99 return model_type.model_validate_json(json_text)
101 def deserialize_pointer[U: ArchiveTree, V](
102 self,
103 pointer: NdfPointerModel,
104 model_type: type[U],
105 deserializer: Callable[[U, InputArchive[NdfPointerModel]], V],
106 ) -> V:
107 # Cache by pointer.path so repeated dereferences reuse the same
108 # deserialised result and don't re-run the deserializer.
109 if (cached := self._deserialized_pointer_cache.get(pointer.path)) is not None:
110 return cached
111 if not self._has_model_path(pointer.path):
112 raise ArchiveReadError(f"Pointer reference {pointer.path!r} not found in NDF file.")
113 primitive = self._get_primitive(pointer.path)
114 json_text = _read_json_record(primitive, pointer.path)
115 model = model_type.model_validate_json(json_text)
116 result = deserializer(model, self)
117 self._deserialized_pointer_cache[pointer.path] = result
118 if isinstance(result, FrameSet):
119 self._frame_set_cache[pointer.path] = result
120 return result
122 def get_frame_set(self, pointer: NdfPointerModel) -> FrameSet:
123 try:
124 return self._frame_set_cache[pointer.path]
125 except KeyError:
126 raise AssertionError(
127 f"Frame set at {pointer.path!r} must be deserialised via "
128 f"deserialize_pointer before any dependent transform can be."
129 ) from None
131 def get_array(
132 self,
133 model: ArrayReferenceModel | InlineArrayModel,
134 *,
135 slices: tuple[slice, ...] | EllipsisType = ...,
136 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
137 ) -> np.ndarray:
138 if isinstance(model, InlineArrayModel):
139 data: np.ndarray = np.array(model.data, dtype=model.datatype.to_numpy())
140 return data if slices is ... else data[slices]
141 if not isinstance(model.source, str) or not model.source.startswith("ndf:"):
142 raise ArchiveReadError(
143 f"NdfInputArchive cannot resolve array source {model.source!r}; "
144 f"expected an 'ndf:<HDF5-path>' reference."
145 )
146 path = model.source[len("ndf:") :]
147 if not self._has_model_path(path):
148 raise ArchiveReadError(f"Array reference {path!r} not in file.")
149 primitive = self._get_primitive(path)
150 # h5py supports lazy slicing via dataset[slices].
151 if isinstance(primitive.data, h5py.Dataset):
152 return primitive.data[()] if slices is ... else primitive.data[slices]
153 data = primitive.read_array()
154 return data if slices is ... else data[slices]
156 def get_table(
157 self,
158 model: TableModel,
159 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
160 ) -> astropy.table.Table:
161 result = astropy.table.Table(meta=model.meta)
162 for column_model in model.columns:
163 if isinstance(column_model.data, InlineArrayModel):
164 data: Any = column_model.data.data
165 else:
166 data = self.get_array(column_model.data, strip_header=strip_header)
167 result[column_model.name] = astropy.table.Column(
168 data,
169 name=column_model.name,
170 dtype=column_model.data.datatype.to_numpy(),
171 unit=column_model.unit,
172 description=column_model.description,
173 meta=column_model.meta,
174 )
175 return result
177 def get_structured_array(
178 self,
179 model: TableModel,
180 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
181 ) -> np.ndarray:
182 return self.get_table(model, strip_header).as_array()
184 def _read_opaque_fits_metadata(self) -> None:
185 if not self._has_model_path("/MORE/FITS"):
186 return
187 cards = self._get_primitive("/MORE/FITS").read_char_array()
188 # FITS Header.fromstring expects fixed-width 80-char cards
189 # concatenated; pad each card defensively so readers tolerate
190 # files written with shorter widths.
191 header = astropy.io.fits.Header.fromstring("".join(c.ljust(80) for c in cards))
192 self._opaque_metadata.add_header(header, name="", ver=1)
194 def get_opaque_metadata(self) -> FitsOpaqueMetadata:
195 return self._opaque_metadata
197 def _get_main_json_path(self) -> str | None:
198 """Return the path of the main LSST JSON tree, if present."""
199 for path in ("/MORE/LSST/JSON", "/LSST/JSON"):
200 if self._has_model_path(path):
201 return path
202 return None
204 def _has_model_path(self, path: str) -> bool:
205 """Return `True` if a path exists in the NDF document model."""
206 try:
207 self._document.get(path)
208 except KeyError:
209 return False
210 return True
212 def _get_primitive(self, path: str) -> HdsPrimitive:
213 """Return a primitive component from the NDF document model."""
214 node = self._document.get(path)
215 if not isinstance(node, HdsPrimitive):
216 raise ArchiveReadError(f"NDF reference {path!r} is not a primitive dataset.")
217 return node
220def read[T: Any](cls: type[T], path: ResourcePathExpression, **kwargs: Any) -> ReadResult[T]:
221 """Read an object from an NDF (HDS-on-HDF5) file.
223 If the file has a ``/MORE/LSST/JSON`` tree it is used as the source
224 of truth and ``cls.deserialize`` is invoked with the parsed model.
225 Otherwise the reader falls back to auto-detection of a minimal
226 recognised-component set (``DATA_ARRAY``, ``VARIANCE``, ``QUALITY``,
227 ``MORE.FITS``); ``WCS`` is logged-and-dropped in v1.
229 Parameters
230 ----------
231 cls
232 Expected return type. `~lsst.images.Image` and
233 `~lsst.images.MaskedImage` are the only types the auto-detect
234 path can produce. The symmetric path accepts whatever the file's
235 discriminator says.
236 path
237 File path or ``lsst.resources.ResourcePathExpression``.
238 **kwargs
239 Forwarded to ``cls.deserialize`` on the symmetric read path.
241 Returns
242 -------
243 `~lsst.images.serialization.ReadResult` [T]
244 Named tuple of (deserialized object, metadata, butler_info).
245 """
246 with NdfInputArchive.open(path) as archive:
247 if archive._get_main_json_path() is not None:
248 tree_type = cls._get_archive_tree_type(NdfPointerModel)
249 tree = archive.get_tree(tree_type)
250 obj = tree.deserialize(archive, **kwargs)
251 obj._opaque_metadata = archive.get_opaque_metadata()
252 return ReadResult(obj, tree.metadata, tree.butler_info)
253 return _read_auto_detect(cls, archive)
256def _read_auto_detect[T: Any](cls: type[T], archive: NdfInputArchive) -> ReadResult[T]:
257 """Reconstruct an `Image` (or `MaskedImage`) from a Starlink NDF.
259 Recognised components: ``DATA_ARRAY`` (in either simple or complex
260 form), ``VARIANCE``, ``QUALITY``, ``MORE.FITS``. Other components
261 (``WCS``, ``HISTORY``, ``AXIS``, ``LABEL``, custom ``MORE.*``,
262 ``_LOGICAL`` primitives) are warned-and-dropped.
263 """
264 f = archive._file
265 ndf_group = _locate_ndf_root(f)
267 # DATA_ARRAY is required.
268 if "DATA_ARRAY" not in ndf_group:
269 raise ArchiveReadError(f"Auto-detect read of {f.filename!r}: no DATA_ARRAY component.")
270 data_arr, bbox = _read_data_array_with_bbox(ndf_group["DATA_ARRAY"])
272 # VARIANCE / QUALITY are optional.
273 variance_arr: np.ndarray | None = None
274 variance_bbox: Any | None = None
275 if "VARIANCE" in ndf_group:
276 variance_arr, variance_bbox = _read_data_array_with_bbox(ndf_group["VARIANCE"])
277 quality_arr: np.ndarray | None = None
278 quality_bbox: Any | None = None
279 quality_badbits = 255
280 if "QUALITY" in ndf_group and isinstance(ndf_group["QUALITY"], h5py.Group):
281 q = ndf_group["QUALITY"]
282 quality_badbits = _read_quality_badbits(q)
283 if "QUALITY" in q and isinstance(q["QUALITY"], h5py.Dataset):
284 quality_arr = _validate_quality_array(_hds.read_array(q["QUALITY"]))
285 quality_bbox = _make_bbox(x_min=0, y_min=0, array=quality_arr)
286 elif "QUALITY" in q and isinstance(q["QUALITY"], h5py.Group):
287 quality_arr, quality_bbox = _read_data_array_with_bbox(q["QUALITY"])
288 quality_arr = _validate_quality_array(quality_arr)
290 projection: Projection | None = None
291 if "WCS" in ndf_group:
292 try:
293 wcs_group = ndf_group["WCS"]
294 if isinstance(wcs_group, h5py.Group) and "DATA" in wcs_group:
295 wcs_lines = _hds.read_char_array(wcs_group["DATA"])
296 wcs_text = _hds.decode_ndf_ast_data(wcs_lines)
297 ast_obj = astshim.Object.fromString(wcs_text)
298 if isinstance(ast_obj, astshim.FrameSet):
299 pixel_frame = GeneralFrame(unit=u.pix)
300 projection = Projection.from_ast_frame_set(
301 ast_obj,
302 pixel_frame,
303 pixel_bounds=bbox,
304 )
305 except Exception:
306 _LOG.warning(
307 "Could not reconstruct Projection from WCS in %s; dropping.",
308 f.filename,
309 exc_info=True,
310 )
312 unit = _read_ndf_units(ndf_group)
314 # Anything unrecognised: warn-and-drop.
315 recognised = {
316 "DATA_ARRAY",
317 "VARIANCE",
318 "QUALITY",
319 "WCS",
320 "MORE",
321 "TITLE",
322 "LABEL",
323 "UNITS",
324 "HISTORY",
325 "AXIS",
326 }
327 for name in ndf_group:
328 if name not in recognised:
329 _LOG.warning(
330 "Ignoring unrecognised NDF component %s/%s during auto-detect read.",
331 ndf_group.name,
332 name,
333 )
335 # Build the requested in-memory object. Any NDF can be read as an Image;
336 # MaskedImage construction uses whatever VARIANCE/QUALITY are present and
337 # lets the MaskedImage constructor provide defaults for missing planes.
338 image = Image(data_arr, bbox=bbox, unit=unit, projection=projection)
339 obj: Any
340 if cls is Image:
341 obj = image
342 elif issubclass(cls, MaskedImage):
343 if quality_arr is not None:
344 schema = _make_quality_mask_schema(quality_badbits)
345 mask = Mask(quality_arr[:, :, np.newaxis], schema=schema, bbox=quality_bbox)
346 else:
347 schema = MaskSchema([MaskPlane(name="BAD", description="Bad pixel.")])
348 mask = None
349 variance = Image(variance_arr, bbox=variance_bbox) if variance_arr is not None else None
350 obj = cls(
351 image=image,
352 mask=mask,
353 mask_schema=schema if mask is None else None,
354 variance=variance,
355 )
356 else:
357 raise ArchiveReadError(
358 f"Auto-detect can produce Image or MaskedImage, but caller asked for {cls.__name__}."
359 )
360 obj._opaque_metadata = archive.get_opaque_metadata()
361 # Auto-detect path produces no archive-tree metadata or butler_info.
362 return ReadResult(obj, {}, None)
365def _read_ndf_units(ndf_group: h5py.Group) -> u.UnitBase | None:
366 """Read the NDF UNITS component, if present."""
367 if "UNITS" not in ndf_group or not isinstance(ndf_group["UNITS"], h5py.Dataset):
368 return None
369 dataset = ndf_group["UNITS"]
370 if dataset.dtype.kind != "S":
371 _LOG.warning("Ignoring non-character NDF UNITS component in %s.", ndf_group.name)
372 return None
373 if dataset.ndim == 0:
374 raw = dataset[()]
375 if isinstance(raw, np.bytes_):
376 raw = bytes(raw)
377 if not isinstance(raw, bytes):
378 return None
379 units_text = raw.decode("ascii").rstrip(" ")
380 else:
381 records = _hds.read_char_array(dataset)
382 units_text = records[0] if records else ""
383 if not units_text:
384 return None
385 for kwargs in ({"format": "fits"}, {}):
386 try:
387 return u.Unit(units_text, **kwargs)
388 except ValueError:
389 continue
390 _LOG.warning("Could not parse NDF UNITS value %r in %s.", units_text, ndf_group.name)
391 return None
394def _read_quality_badbits(quality_group: h5py.Group) -> int:
395 """Read the scalar NDF QUALITY.BADBITS value."""
396 badbits = quality_group.get("BADBITS")
397 if not isinstance(badbits, h5py.Dataset):
398 return 255
399 value = np.asarray(_hds.read_array(badbits)).reshape(-1)
400 if value.size == 0:
401 return 255
402 return int(value[0])
405def _validate_quality_array(quality: np.ndarray) -> np.ndarray:
406 """Return an NDF QUALITY array as a `numpy.uint8` mask plane."""
407 if quality.dtype != np.dtype(np.uint8):
408 raise ArchiveReadError(f"NDF QUALITY array has dtype {quality.dtype}; expected uint8.")
409 return quality
412def _make_quality_mask_schema(badbits: int) -> MaskSchema:
413 """Create a fallback `MaskSchema` for an unnamed 8-bit QUALITY array."""
414 planes = []
415 for bit in range(8):
416 mask = 1 << bit
417 description = f"NDF QUALITY bit {bit}."
418 if badbits & mask:
419 description += " Selected by BADBITS."
420 planes.append(MaskPlane(name=f"MASK{bit}", description=description))
421 return MaskSchema(planes, dtype=np.uint8)
424def _locate_ndf_root(f: h5py.File) -> h5py.Group:
425 """Return the group representing the top-level NDF.
427 Most files have the NDF at the root group itself. A few wrap it
428 in a single-child container at the root; we accept that shape
429 too. Anything more elaborate raises.
430 """
431 root_class = f["/"].attrs.get(_hds.ATTR_CLASS)
432 if isinstance(root_class, bytes):
433 root_class = root_class.decode("ascii")
434 if root_class == "NDF":
435 return f["/"]
436 # Maybe a one-level container.
437 candidates = []
438 for name, child in f["/"].items():
439 if isinstance(child, h5py.Group):
440 cls_attr = child.attrs.get(_hds.ATTR_CLASS)
441 if isinstance(cls_attr, bytes):
442 cls_attr = cls_attr.decode("ascii")
443 if cls_attr == "NDF":
444 candidates.append(name)
445 if len(candidates) == 1:
446 return f[candidates[0]]
447 raise ArchiveReadError(
448 f"Could not locate top-level NDF in {f.filename!r}; "
449 f"expected the root group or a single NDF-typed child."
450 )
453def _read_data_array_with_bbox(
454 obj: h5py.Group | h5py.Dataset,
455) -> tuple[np.ndarray, Any]:
456 """Read a DATA_ARRAY component in either simple or complex form.
458 The complex form (what our writer always produces) is an HDS
459 ARRAY structure (h5py group with CLASS="ARRAY") containing
460 ``DATA`` and ``ORIGIN`` primitives. The simple form is a bare
461 primitive dataset.
463 Returns
464 -------
465 array, bbox : tuple
466 ``array`` is the C-order numpy data (shape ``(height, width)``
467 for 2D images). ``bbox`` is constructed from the ORIGIN if
468 present, else from a default origin of (0, 0).
469 """
470 if isinstance(obj, h5py.Dataset):
471 # Simple form.
472 array = _hds.read_array(obj)
473 bbox = _make_bbox(x_min=0, y_min=0, array=array)
474 return array, bbox
475 # Complex form: an HDS structure with DATA + ORIGIN.
476 data = _hds.read_array(obj["DATA"])
477 if "ORIGIN" in obj:
478 origin = _hds.read_array(obj["ORIGIN"])
479 bbox = _make_bbox(x_min=int(origin[0]), y_min=int(origin[1]), array=data)
480 else:
481 bbox = _make_bbox(x_min=0, y_min=0, array=data)
482 return data, bbox
485def _read_json_record(primitive: HdsPrimitive, path: str) -> str:
486 """Read a JSON document stored as a single _CHAR*N record.
488 Our writer always emits JSON trees as a single-element character
489 array sized to the document. Joining multiple records would lose
490 trailing whitespace inside JSON string values, since
491 `read_char_array` strips trailing spaces per record.
492 """
493 records = primitive.read_char_array()
494 if len(records) != 1:
495 raise ArchiveReadError(f"Expected a single _CHAR*N record at {path!r}, got {len(records)}.")
496 return records[0]
499def _make_bbox(*, x_min: int, y_min: int, array: np.ndarray) -> Any:
500 """Build an lsst.images.Box for a 2D image array.
502 The array is C-order ``(height, width)``. NDF stores ``ORIGIN``
503 in Fortran axis order ``(x_min, y_min)``.
504 """
505 if array.ndim != 2:
506 raise ArchiveReadError(f"Auto-detect read only supports 2D arrays, got ndim={array.ndim}.")
507 # Box.from_shape takes (height, width) and start=(y_start, x_start).
508 return Box.from_shape(array.shape, start=(y_min, x_min))