Coverage for python/lsst/images/ndf/_input_archive.py: 15%
271 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:08 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("NdfInputArchive", "read")
16import logging
17from collections.abc import Callable, Iterator
18from contextlib import contextmanager
19from types import EllipsisType
20from typing import Any, Self
22import astropy.io.fits
23import astropy.table
24import astropy.units as u
25import h5py
26import numpy as np
28from lsst.resources import ResourcePath, ResourcePathExpression
30from .._geom import Box
31from .._image import Image
32from .._mask import Mask, MaskPlane, MaskSchema
33from .._masked_image import MaskedImage
34from .._transforms import FrameSet, Projection
35from .._transforms import _ast as astshim
36from .._transforms._frames import GeneralFrame
37from ..fits._common import FitsOpaqueMetadata
38from ..serialization import (
39 ArchiveReadError,
40 ArchiveTree,
41 ArrayReferenceModel,
42 InlineArrayModel,
43 InputArchive,
44 ReadResult,
45 TableModel,
46 no_header_updates,
47)
48from ..serialization._common import _check_format_version
49from . import _hds
50from ._common import NdfPointerModel
51from ._model import HdsPrimitive, NdfDocument
53_LOG = logging.getLogger(__name__)
55_NDF_FORMAT_VERSION = 1
56"""Container layout version this release of `NdfInputArchive` understands."""
59class NdfInputArchive(InputArchive[NdfPointerModel]):
60 """Reads HDS-on-HDF5 NDF files written by `NdfOutputArchive`.
62 Instances should only be constructed via the :meth:`open` context
63 manager.
65 Parameters
66 ----------
67 file
68 Open `h5py.File` handle. Owned by the caller of :meth:`open`;
69 the archive does not close it.
70 """
72 def __init__(self, file: h5py.File) -> None:
73 self._file = file
74 self._document = NdfDocument.from_hdf5(file)
75 self._opaque_metadata = FitsOpaqueMetadata()
76 self._deserialized_pointer_cache: dict[str, Any] = {}
77 self._frame_set_cache: dict[str, FrameSet] = {}
78 self._read_opaque_fits_metadata()
79 self._check_format_version()
81 @classmethod
82 @contextmanager
83 def open(cls, path: ResourcePathExpression) -> Iterator[Self]:
84 """Open an NDF file for reading and yield an `NdfInputArchive`.
86 Remote ResourcePaths are materialised locally first; fsspec-direct
87 h5py reads are a deferred follow-up.
88 """
89 rp = ResourcePath(path)
90 with rp.as_local() as local:
91 with h5py.File(local.ospath, "r") as f:
92 yield cls(f)
94 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T:
95 """Read and validate the main Pydantic tree at ``/MORE/LSST/JSON``."""
96 json_path = self._get_main_json_path()
97 if json_path is None:
98 raise ArchiveReadError(
99 "File has no /MORE/LSST/JSON tree; this is either a "
100 "Starlink-only NDF (use ndf.read() with auto-detect) or "
101 "the file was written by an unrelated tool."
102 )
103 json_text = _read_json_record(self._get_primitive(json_path), json_path)
104 return model_type.model_validate_json(json_text)
106 def deserialize_pointer[U: ArchiveTree, V](
107 self,
108 pointer: NdfPointerModel,
109 model_type: type[U],
110 deserializer: Callable[[U, InputArchive[NdfPointerModel]], V],
111 ) -> V:
112 # Cache by pointer.path so repeated dereferences reuse the same
113 # deserialised result and don't re-run the deserializer.
114 if (cached := self._deserialized_pointer_cache.get(pointer.path)) is not None:
115 return cached
116 if not self._has_model_path(pointer.path):
117 raise ArchiveReadError(f"Pointer reference {pointer.path!r} not found in NDF file.")
118 primitive = self._get_primitive(pointer.path)
119 json_text = _read_json_record(primitive, pointer.path)
120 model = model_type.model_validate_json(json_text)
121 result = deserializer(model, self)
122 self._deserialized_pointer_cache[pointer.path] = result
123 if isinstance(result, FrameSet):
124 self._frame_set_cache[pointer.path] = result
125 return result
127 def get_frame_set(self, pointer: NdfPointerModel) -> FrameSet:
128 try:
129 return self._frame_set_cache[pointer.path]
130 except KeyError:
131 raise AssertionError(
132 f"Frame set at {pointer.path!r} must be deserialised via "
133 f"deserialize_pointer before any dependent transform can be."
134 ) from None
136 def get_array(
137 self,
138 model: ArrayReferenceModel | InlineArrayModel,
139 *,
140 slices: tuple[slice, ...] | EllipsisType = ...,
141 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
142 ) -> np.ndarray:
143 if isinstance(model, InlineArrayModel):
144 data: np.ndarray = np.array(model.data, dtype=model.datatype.to_numpy())
145 return data if slices is ... else data[slices]
146 if not isinstance(model.source, str) or not model.source.startswith("ndf:"):
147 raise ArchiveReadError(
148 f"NdfInputArchive cannot resolve array source {model.source!r}; "
149 f"expected an 'ndf:<HDF5-path>' reference."
150 )
151 path = model.source[len("ndf:") :]
152 if not self._has_model_path(path):
153 raise ArchiveReadError(f"Array reference {path!r} not in file.")
154 primitive = self._get_primitive(path)
155 # h5py supports lazy slicing via dataset[slices].
156 if isinstance(primitive.data, h5py.Dataset):
157 return primitive.data[()] if slices is ... else primitive.data[slices]
158 data = primitive.read_array()
159 return data if slices is ... else data[slices]
161 def get_table(
162 self,
163 model: TableModel,
164 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
165 ) -> astropy.table.Table:
166 result = astropy.table.Table(meta=model.meta)
167 for column_model in model.columns:
168 if isinstance(column_model.data, InlineArrayModel):
169 data: Any = column_model.data.data
170 else:
171 data = self.get_array(column_model.data, strip_header=strip_header)
172 result[column_model.name] = astropy.table.Column(
173 data,
174 name=column_model.name,
175 dtype=column_model.data.datatype.to_numpy(),
176 unit=column_model.unit,
177 description=column_model.description,
178 meta=column_model.meta,
179 )
180 return result
182 def get_structured_array(
183 self,
184 model: TableModel,
185 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
186 ) -> np.ndarray:
187 return self.get_table(model, strip_header).as_array()
189 def _read_opaque_fits_metadata(self) -> None:
190 if not self._has_model_path("/MORE/FITS"):
191 return
192 cards = self._get_primitive("/MORE/FITS").read_char_array()
193 # FITS Header.fromstring expects fixed-width 80-char cards
194 # concatenated; pad each card defensively so readers tolerate
195 # files written with shorter widths.
196 header = astropy.io.fits.Header.fromstring("".join(c.ljust(80) for c in cards))
197 self._opaque_metadata.add_header(header, name="", ver=1)
199 def get_opaque_metadata(self) -> FitsOpaqueMetadata:
200 return self._opaque_metadata
202 def _get_main_json_path(self) -> str | None:
203 """Return the path of the main LSST JSON tree, if present."""
204 for path in ("/MORE/LSST/JSON", "/LSST/JSON"):
205 if self._has_model_path(path):
206 return path
207 return None
209 def _check_format_version(self) -> None:
210 """Read FORMAT_VERSION from the NDF top-level structure and check it.
212 Absence is treated as ``1`` (legacy default). DATA_MODEL is
213 informational only on read; the JSON tree's ``schema_version`` /
214 ``min_read_version`` drive data-model compatibility.
215 """
216 on_disk = 1
217 for prefix in ("/MORE/LSST", "/LSST"):
218 path = f"{prefix}/FORMAT_VERSION"
219 if self._has_model_path(path):
220 primitive = self._get_primitive(path)
221 # We wrote the version as a 0-d int32 numpy array; .item()
222 # unwraps to a Python int.
223 on_disk = int(primitive.read_array().item())
224 break
225 _check_format_version("ndf", on_disk, _NDF_FORMAT_VERSION)
227 def _has_model_path(self, path: str) -> bool:
228 """Return `True` if a path exists in the NDF document model."""
229 try:
230 self._document.get(path)
231 except KeyError:
232 return False
233 return True
235 def _get_primitive(self, path: str) -> HdsPrimitive:
236 """Return a primitive component from the NDF document model."""
237 node = self._document.get(path)
238 if not isinstance(node, HdsPrimitive):
239 raise ArchiveReadError(f"NDF reference {path!r} is not a primitive dataset.")
240 return node
243def read[T: Any](cls: type[T], path: ResourcePathExpression, **kwargs: Any) -> ReadResult[T]:
244 """Read an object from an NDF (HDS-on-HDF5) file.
246 If the file has a ``/MORE/LSST/JSON`` tree it is used as the source
247 of truth and ``cls.deserialize`` is invoked with the parsed model.
248 Otherwise the reader falls back to auto-detection of a minimal
249 recognised-component set (``DATA_ARRAY``, ``VARIANCE``, ``QUALITY``,
250 ``MORE.FITS``); ``WCS`` is logged-and-dropped in v1.
252 Parameters
253 ----------
254 cls
255 Expected return type. `~lsst.images.Image` and
256 `~lsst.images.MaskedImage` are the only types the auto-detect
257 path can produce. The symmetric path accepts whatever the file's
258 discriminator says.
259 path
260 File path or ``lsst.resources.ResourcePathExpression``.
261 **kwargs
262 Forwarded to ``cls.deserialize`` on the symmetric read path.
264 Returns
265 -------
266 `~lsst.images.serialization.ReadResult` [T]
267 Named tuple of (deserialized object, metadata, butler_info).
268 """
269 with NdfInputArchive.open(path) as archive:
270 if archive._get_main_json_path() is not None:
271 tree_type = cls._get_archive_tree_type(NdfPointerModel)
272 tree = archive.get_tree(tree_type)
273 obj = tree.deserialize(archive, **kwargs)
274 obj._opaque_metadata = archive.get_opaque_metadata()
275 return ReadResult(obj, tree.metadata, tree.butler_info)
276 return _read_auto_detect(cls, archive)
279def _read_auto_detect[T: Any](cls: type[T], archive: NdfInputArchive) -> ReadResult[T]:
280 """Reconstruct an `Image` (or `MaskedImage`) from a Starlink NDF.
282 Recognised components: ``DATA_ARRAY`` (in either simple or complex
283 form), ``VARIANCE``, ``QUALITY``, ``MORE.FITS``. Other components
284 (``WCS``, ``HISTORY``, ``AXIS``, ``LABEL``, custom ``MORE.*``,
285 ``_LOGICAL`` primitives) are warned-and-dropped.
286 """
287 f = archive._file
288 ndf_group = _locate_ndf_root(f)
290 # DATA_ARRAY is required.
291 if "DATA_ARRAY" not in ndf_group:
292 raise ArchiveReadError(f"Auto-detect read of {f.filename!r}: no DATA_ARRAY component.")
293 data_arr, bbox = _read_data_array_with_bbox(ndf_group["DATA_ARRAY"])
295 # VARIANCE / QUALITY are optional.
296 variance_arr: np.ndarray | None = None
297 variance_bbox: Any | None = None
298 if "VARIANCE" in ndf_group:
299 variance_arr, variance_bbox = _read_data_array_with_bbox(ndf_group["VARIANCE"])
300 quality_arr: np.ndarray | None = None
301 quality_bbox: Any | None = None
302 quality_badbits = 255
303 if "QUALITY" in ndf_group and isinstance(ndf_group["QUALITY"], h5py.Group):
304 q = ndf_group["QUALITY"]
305 quality_badbits = _read_quality_badbits(q)
306 if "QUALITY" in q and isinstance(q["QUALITY"], h5py.Dataset):
307 quality_arr = _validate_quality_array(_hds.read_array(q["QUALITY"]))
308 quality_bbox = _make_bbox(x_min=0, y_min=0, array=quality_arr)
309 elif "QUALITY" in q and isinstance(q["QUALITY"], h5py.Group):
310 quality_arr, quality_bbox = _read_data_array_with_bbox(q["QUALITY"])
311 quality_arr = _validate_quality_array(quality_arr)
313 projection: Projection | None = None
314 if "WCS" in ndf_group:
315 try:
316 wcs_group = ndf_group["WCS"]
317 if isinstance(wcs_group, h5py.Group) and "DATA" in wcs_group:
318 wcs_lines = _hds.read_char_array(wcs_group["DATA"])
319 wcs_text = _hds.decode_ndf_ast_data(wcs_lines)
320 ast_obj = astshim.Object.fromString(wcs_text)
321 if isinstance(ast_obj, astshim.FrameSet):
322 pixel_frame = GeneralFrame(unit=u.pix)
323 projection = Projection.from_ast_frame_set(
324 ast_obj,
325 pixel_frame,
326 pixel_bounds=bbox,
327 )
328 except Exception:
329 _LOG.warning(
330 "Could not reconstruct Projection from WCS in %s; dropping.",
331 f.filename,
332 exc_info=True,
333 )
335 unit = _read_ndf_units(ndf_group)
337 # Anything unrecognised: warn-and-drop.
338 recognised = {
339 "DATA_ARRAY",
340 "VARIANCE",
341 "QUALITY",
342 "WCS",
343 "MORE",
344 "TITLE",
345 "LABEL",
346 "UNITS",
347 "HISTORY",
348 "AXIS",
349 }
350 for name in ndf_group:
351 if name not in recognised:
352 _LOG.warning(
353 "Ignoring unrecognised NDF component %s/%s during auto-detect read.",
354 ndf_group.name,
355 name,
356 )
358 # Build the requested in-memory object. Any NDF can be read as an Image;
359 # MaskedImage construction uses whatever VARIANCE/QUALITY are present and
360 # lets the MaskedImage constructor provide defaults for missing planes.
361 image = Image(data_arr, bbox=bbox, unit=unit, projection=projection)
362 obj: Any
363 if cls is Image:
364 obj = image
365 elif issubclass(cls, MaskedImage):
366 if quality_arr is not None:
367 schema = _make_quality_mask_schema(quality_badbits)
368 mask = Mask(quality_arr[:, :, np.newaxis], schema=schema, bbox=quality_bbox)
369 else:
370 schema = MaskSchema([MaskPlane(name="BAD", description="Bad pixel.")])
371 mask = None
372 variance = Image(variance_arr, bbox=variance_bbox) if variance_arr is not None else None
373 obj = cls(
374 image=image,
375 mask=mask,
376 mask_schema=schema if mask is None else None,
377 variance=variance,
378 )
379 else:
380 raise ArchiveReadError(
381 f"Auto-detect can produce Image or MaskedImage, but caller asked for {cls.__name__}."
382 )
383 obj._opaque_metadata = archive.get_opaque_metadata()
384 # Auto-detect path produces no archive-tree metadata or butler_info.
385 return ReadResult(obj, {}, None)
388def _read_ndf_units(ndf_group: h5py.Group) -> u.UnitBase | None:
389 """Read the NDF UNITS component, if present."""
390 if "UNITS" not in ndf_group or not isinstance(ndf_group["UNITS"], h5py.Dataset):
391 return None
392 dataset = ndf_group["UNITS"]
393 if dataset.dtype.kind != "S":
394 _LOG.warning("Ignoring non-character NDF UNITS component in %s.", ndf_group.name)
395 return None
396 if dataset.ndim == 0:
397 raw = dataset[()]
398 if isinstance(raw, np.bytes_):
399 raw = bytes(raw)
400 if not isinstance(raw, bytes):
401 return None
402 units_text = raw.decode("ascii").rstrip(" ")
403 else:
404 records = _hds.read_char_array(dataset)
405 units_text = records[0] if records else ""
406 if not units_text:
407 return None
408 for kwargs in ({"format": "fits"}, {}):
409 try:
410 return u.Unit(units_text, **kwargs)
411 except ValueError:
412 continue
413 _LOG.warning("Could not parse NDF UNITS value %r in %s.", units_text, ndf_group.name)
414 return None
417def _read_quality_badbits(quality_group: h5py.Group) -> int:
418 """Read the scalar NDF QUALITY.BADBITS value."""
419 badbits = quality_group.get("BADBITS")
420 if not isinstance(badbits, h5py.Dataset):
421 return 255
422 value = np.asarray(_hds.read_array(badbits)).reshape(-1)
423 if value.size == 0:
424 return 255
425 return int(value[0])
428def _validate_quality_array(quality: np.ndarray) -> np.ndarray:
429 """Return an NDF QUALITY array as a `numpy.uint8` mask plane."""
430 if quality.dtype != np.dtype(np.uint8):
431 raise ArchiveReadError(f"NDF QUALITY array has dtype {quality.dtype}; expected uint8.")
432 return quality
435def _make_quality_mask_schema(badbits: int) -> MaskSchema:
436 """Create a fallback `MaskSchema` for an unnamed 8-bit QUALITY array."""
437 planes = []
438 for bit in range(8):
439 mask = 1 << bit
440 description = f"NDF QUALITY bit {bit}."
441 if badbits & mask:
442 description += " Selected by BADBITS."
443 planes.append(MaskPlane(name=f"MASK{bit}", description=description))
444 return MaskSchema(planes, dtype=np.uint8)
447def _locate_ndf_root(f: h5py.File) -> h5py.Group:
448 """Return the group representing the top-level NDF.
450 Most files have the NDF at the root group itself. A few wrap it
451 in a single-child container at the root; we accept that shape
452 too. Anything more elaborate raises.
453 """
454 root_class = f["/"].attrs.get(_hds.ATTR_CLASS)
455 if isinstance(root_class, bytes):
456 root_class = root_class.decode("ascii")
457 if root_class == "NDF":
458 return f["/"]
459 # Maybe a one-level container.
460 candidates = []
461 for name, child in f["/"].items():
462 if isinstance(child, h5py.Group):
463 cls_attr = child.attrs.get(_hds.ATTR_CLASS)
464 if isinstance(cls_attr, bytes):
465 cls_attr = cls_attr.decode("ascii")
466 if cls_attr == "NDF":
467 candidates.append(name)
468 if len(candidates) == 1:
469 return f[candidates[0]]
470 raise ArchiveReadError(
471 f"Could not locate top-level NDF in {f.filename!r}; "
472 f"expected the root group or a single NDF-typed child."
473 )
476def _read_data_array_with_bbox(
477 obj: h5py.Group | h5py.Dataset,
478) -> tuple[np.ndarray, Any]:
479 """Read a DATA_ARRAY component in either simple or complex form.
481 The complex form (what our writer always produces) is an HDS
482 ARRAY structure (h5py group with CLASS="ARRAY") containing
483 ``DATA`` and ``ORIGIN`` primitives. The simple form is a bare
484 primitive dataset.
486 Returns
487 -------
488 array, bbox : tuple
489 ``array`` is the C-order numpy data (shape ``(height, width)``
490 for 2D images). ``bbox`` is constructed from the ORIGIN if
491 present, else from a default origin of (0, 0).
492 """
493 if isinstance(obj, h5py.Dataset):
494 # Simple form.
495 array = _hds.read_array(obj)
496 bbox = _make_bbox(x_min=0, y_min=0, array=array)
497 return array, bbox
498 # Complex form: an HDS structure with DATA + ORIGIN.
499 data = _hds.read_array(obj["DATA"])
500 if "ORIGIN" in obj:
501 origin = _hds.read_array(obj["ORIGIN"])
502 bbox = _make_bbox(x_min=int(origin[0]), y_min=int(origin[1]), array=data)
503 else:
504 bbox = _make_bbox(x_min=0, y_min=0, array=data)
505 return data, bbox
508def _read_json_record(primitive: HdsPrimitive, path: str) -> str:
509 """Read a JSON document stored as a single _CHAR*N record.
511 Our writer always emits JSON trees as a single-element character
512 array sized to the document. Joining multiple records would lose
513 trailing whitespace inside JSON string values, since
514 `read_char_array` strips trailing spaces per record.
515 """
516 records = primitive.read_char_array()
517 if len(records) != 1:
518 raise ArchiveReadError(f"Expected a single _CHAR*N record at {path!r}, got {len(records)}.")
519 return records[0]
522def _make_bbox(*, x_min: int, y_min: int, array: np.ndarray) -> Any:
523 """Build an lsst.images.Box for a 2D image array.
525 The array is C-order ``(height, width)``. NDF stores ``ORIGIN``
526 in Fortran axis order ``(x_min, y_min)``.
527 """
528 if array.ndim != 2:
529 raise ArchiveReadError(f"Auto-detect read only supports 2D arrays, got ndim={array.ndim}.")
530 # Box.from_shape takes (height, width) and start=(y_start, x_start).
531 return Box.from_shape(array.shape, start=(y_min, x_min))