Coverage for python / lsst / images / ndf / _output_archive.py: 14%
326 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 00:52 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 00:52 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "NdfOutputArchive",
16 "write",
17)
19import os
20from collections.abc import Callable, Hashable, Iterator, Mapping, Sequence
21from contextlib import contextmanager
22from typing import Any, Self
24import astropy.io.fits
25import astropy.table
26import astropy.units
27import h5py
28import numpy as np
29import pydantic
31from .._color_image import ColorImage
32from .._transforms import FrameSet
33from .._transforms._ast import Channel, CmpFrame, CmpMap, ShiftMap, StringStream, UnitMap
34from .._transforms._ast import Frame as AstFrame
35from .._transforms._ast import FrameSet as AstFrameSet
36from .._transforms._transform import _prepend_ast_shift
37from ..fits._common import ExtensionKey, FitsOpaqueMetadata
38from ..serialization import (
39 ArchiveTree,
40 ArrayReferenceModel,
41 ButlerInfo,
42 MetadataValue,
43 NestedOutputArchive,
44 NumberType,
45 OutputArchive,
46 TableColumnModel,
47 TableModel,
48 no_header_updates,
49)
50from . import _hds
51from ._common import NdfPointerModel, archive_path_to_hdf5_path, archive_path_to_hdf5_path_components
52from ._model import HdsPrimitive, HdsStructure, Ndf, NdfArray, NdfContainer, NdfDocument, NdfQuality, NdfWcs
55def write(
56 obj: Any,
57 filename: str | None = None,
58 *,
59 metadata: dict[str, MetadataValue] | None = None,
60 butler_info: ButlerInfo | None = None,
61 compression_options: Mapping[str, Any] | None = None,
62) -> ArchiveTree:
63 """Write a serializable object to an NDF (HDS-on-HDF5) file.
65 Parameters
66 ----------
67 obj
68 Object with a ``serialize`` method. May carry an
69 ``_opaque_metadata`` attribute (a
70 `~lsst.images.fits.FitsOpaqueMetadata`)
71 whose primary-HDU header gets written to ``/MORE/FITS``. This
72 preserves FITS cards on objects that originated from a FITS read;
73 butler provenance is conveyed through ``butler_info`` instead.
74 filename
75 Path to write to. Must not already exist. If `None`, an
76 in-memory HDF5 file is used and the on-disk artefact is
77 discarded; the returned tree still reflects all the writes the
78 archive made (useful for tests).
79 metadata, butler_info
80 Optional caller-supplied entries that are written into the
81 returned `~lsst.images.serialization.ArchiveTree`.
82 compression_options
83 Optional dict forwarded to the archive constructor for h5py
84 dataset compression.
86 Returns
87 -------
88 `~lsst.images.serialization.ArchiveTree`
89 The Pydantic tree the object's ``serialize`` produced (with
90 ``metadata``/``butler_info`` applied).
91 """
92 opaque_metadata = getattr(obj, "_opaque_metadata", None)
93 if not isinstance(opaque_metadata, FitsOpaqueMetadata):
94 opaque_metadata = FitsOpaqueMetadata()
95 archive_default_name = getattr(obj, "_archive_default_name", None)
96 with NdfOutputArchive.open(
97 filename,
98 compression_options=compression_options,
99 opaque_metadata=opaque_metadata,
100 **_get_archive_layout(obj),
101 ) as archive:
102 if archive_default_name is not None:
103 tree = archive.serialize_direct(archive_default_name, obj.serialize)
104 else:
105 tree = obj.serialize(archive)
106 if metadata is not None:
107 tree.metadata.update(metadata)
108 if butler_info is not None:
109 tree.butler_info = butler_info
110 archive.add_tree(
111 tree,
112 projection=getattr(obj, "projection", None),
113 bbox=getattr(obj, "bbox", None),
114 unit=getattr(obj, "unit", None),
115 root_name=archive_default_name or type(obj).__name__,
116 )
117 return tree
120def _origin_from_bbox(bbox: Any) -> tuple[int, ...]:
121 """Extract NDF/Fortran-order origin tuple from an lsst.images Box.
123 The exact attribute names on Box depend on the version. Inspect the
124 object and pick whatever exposes the integer lower bounds. For a 2D
125 image with bbox lower bound (x_min, y_min) the returned tuple is
126 ``(x_min, y_min)``.
127 """
128 # Box exposes .x and .y properties returning Interval objects with a
129 # .start attribute (the lower bound).
130 if hasattr(bbox, "x") and hasattr(bbox, "y"):
131 x = bbox.x
132 y = bbox.y
133 if hasattr(x, "start") and hasattr(y, "start"):
134 return (int(x.start), int(y.start))
135 raise AttributeError(
136 f"Don't know how to extract origin from bbox of type {type(bbox).__name__!r}; "
137 "_origin_from_bbox needs updating."
138 )
141def _unit_to_ndf_string(unit: astropy.units.UnitBase) -> str:
142 """Return an ASCII unit string for the NDF UNITS component."""
143 try:
144 return unit.to_string(format="fits")
145 except ValueError:
146 return unit.to_string()
149def _fits_header_records(header: astropy.io.fits.Header) -> list[str]:
150 """Return fixed-width FITS records for an opaque NDF FITS extension.
152 NDF ``.MORE.FITS`` is a ``_CHAR*80`` vector. Use Astropy's FITS
153 header serializer to preserve multi-record logical cards such as
154 CONTINUE strings, but omit the FITS END card and 2880-byte padding
155 because this is an NDF extension component, not a complete FITS
156 header block.
157 """
158 block = header.tostring(sep="", endcard=False, padding=False)
159 encoded = block.encode("ascii")
160 if len(encoded) % 80:
161 raise ValueError(
162 f"FITS header block is {len(encoded)} bytes, not a multiple of the 80-byte FITS record size."
163 )
164 return [block[n : n + 80] for n in range(0, len(block), 80)]
167def _get_archive_layout(obj: Any) -> dict[str, Any]:
168 """Return NDF document layout options for a top-level object."""
169 if isinstance(obj, ColorImage):
170 return {
171 "root": NdfContainer(),
172 "lsst_path": "/LSST",
173 "direct_ndf_array_paths": {
174 "red": "/RED",
175 "green": "/GREEN",
176 "blue": "/BLUE",
177 },
178 "wcs_ndf_paths": ("/RED", "/GREEN", "/BLUE"),
179 }
180 return {}
183def _show_ast_for_ndf(ast_frame_set: Any, bbox: Any | None) -> str:
184 """Return AST Channel text matching Starlink NDF WCS serialization.
186 Tags the original base frame with ``Domain="PIXEL"`` and prepends a
187 new ``Domain="GRID"`` base frame related to it by a `ShiftMap` whose
188 shift converts ``bbox``-origin pixel coordinates into 1-based grid
189 coordinates. The result is written via an abstraction-layer
190 ``Channel`` configured with the same options the Starlink C writer
191 uses (``Full=-1,Comment=0``; see ``ndf1Wwrt.c``) plus ``Indent=0`` so
192 each line is just the bare AST item with the single-space prefix
193 that ``ndf1Rdast`` strips back off on read.
194 """
195 if bbox is None:
196 x_shift = 1.0
197 y_shift = 1.0
198 else:
199 x_shift = 1.0 - float(bbox.x.start)
200 y_shift = 1.0 - float(bbox.y.start)
202 saved_current = ast_frame_set.current
203 ast_frame_set.current = ast_frame_set.base
204 ast_frame_set.domain = "PIXEL"
205 ast_frame_set.current = saved_current
206 _prepend_ast_shift(ast_frame_set, x=x_shift, y=y_shift, ast_domain="GRID")
208 stream = StringStream()
209 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0")
210 channel.write(ast_frame_set)
211 return stream.getSinkData()
214def _show_mask_ast_for_ndf(
215 parent_ast_frame_set: Any,
216 origin: Sequence[int],
217 *,
218 labels: Sequence[str] = (),
219) -> str:
220 """Return an NDF WCS for the 3D native mask sub-NDF.
222 The first two axes reuse the parent image's pixel-to-sky mapping. The
223 third axis is a generic mask-byte coordinate that passes through unchanged.
224 """
225 n_axes = len(origin)
226 ast_frame_set = AstFrameSet(AstFrame(n_axes, "Domain=GRID"))
227 pixel_frame = AstFrame(n_axes, "Domain=PIXEL")
228 for axis, label in enumerate(labels[:n_axes], start=1):
229 pixel_frame.setLabel(axis, label)
230 shifts = [1.0 - float(axis_origin) for axis_origin in origin]
231 ast_frame_set.addFrame(AstFrameSet.BASE, ShiftMap(shifts), pixel_frame)
233 parent_pixel_to_sky = parent_ast_frame_set.getMapping(
234 parent_ast_frame_set.base,
235 parent_ast_frame_set.current,
236 )
237 parent_sky_frame = parent_ast_frame_set.getFrame(parent_ast_frame_set.current)
238 mask_axis_frame = AstFrame(1, "Domain=MASK")
239 mask_axis_frame.setLabel(1, labels[2] if len(labels) > 2 else "mask-byte")
240 ast_frame_set.addFrame(
241 ast_frame_set.current,
242 CmpMap(parent_pixel_to_sky, UnitMap(1), False),
243 CmpFrame(parent_sky_frame, mask_axis_frame),
244 )
246 stream = StringStream()
247 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0")
248 channel.write(ast_frame_set)
249 return stream.getSinkData()
252class NdfOutputArchive(OutputArchive[NdfPointerModel]):
253 """An `~lsst.images.serialization.OutputArchive` implementation
254 that writes HDS-on-HDF5 files compatible with the Starlink NDF data
255 model.
257 Parameters
258 ----------
259 file
260 An open `h5py.File` opened in a writable mode. The archive does
261 not close the file; the caller is responsible for that.
262 compression_options
263 Optional dict passed through to `h5py.Group.create_dataset` for image
264 arrays (e.g. ``{"compression": "gzip", "compression_opts": 4}``).
265 opaque_metadata
266 Optional `~lsst.images.fits.FitsOpaqueMetadata`; if its primary-HDU
267 header is non-empty its cards will be written to ``/MORE/FITS`` by the
268 top-level `write` function.
269 """
271 def __init__(
272 self,
273 file: h5py.File,
274 compression_options: Mapping[str, Any] | None = None,
275 opaque_metadata: FitsOpaqueMetadata | None = None,
276 root: Ndf | NdfContainer | None = None,
277 lsst_path: str = "/MORE/LSST",
278 direct_ndf_array_paths: Mapping[str, str] | None = None,
279 wcs_ndf_paths: Sequence[str] = ("/",),
280 ) -> None:
281 self._file = file
282 self._document = NdfDocument(root=root if root is not None else Ndf())
283 self._lsst_path = lsst_path.rstrip("/") or "/LSST"
284 self._direct_ndf_array_paths = dict(direct_ndf_array_paths) if direct_ndf_array_paths else {}
285 self._wcs_ndf_paths = tuple(wcs_ndf_paths)
286 self._bbox_array_struct_paths: set[str] = set()
287 self._compression_options = dict(compression_options) if compression_options else {}
288 self._opaque_metadata = opaque_metadata if opaque_metadata is not None else FitsOpaqueMetadata()
289 self._frame_sets: list[tuple[FrameSet, NdfPointerModel]] = []
290 self._pointers: dict[Hashable, NdfPointerModel] = {}
291 # Keep the open file in sync so existing direct-archive tests can
292 # inspect it immediately, while all mutations go through the IR.
293 self._flush()
295 @classmethod
296 @contextmanager
297 def open(
298 cls,
299 filename: str | None,
300 *,
301 compression_options: Mapping[str, Any] | None = None,
302 opaque_metadata: FitsOpaqueMetadata | None = None,
303 root: Ndf | NdfContainer | None = None,
304 lsst_path: str = "/MORE/LSST",
305 direct_ndf_array_paths: Mapping[str, str] | None = None,
306 wcs_ndf_paths: Sequence[str] = ("/",),
307 ) -> Iterator[Self]:
308 """Open an NDF file for writing and yield an `NdfOutputArchive`.
310 ``filename=None`` uses an in-memory HDF5 file; the on-disk
311 artefact is discarded but the archive's writes still produce a
312 usable returned tree (handy for tests).
313 """
314 if filename is None:
315 h5_file = h5py.File("inmem.sdf", "w", driver="core", backing_store=False)
316 else:
317 if os.path.exists(filename) and os.path.getsize(filename) > 0:
318 raise OSError(f"File {filename!r} already exists.")
319 h5_file = h5py.File(filename, "w")
320 try:
321 yield cls(
322 h5_file,
323 compression_options=compression_options,
324 opaque_metadata=opaque_metadata,
325 root=root,
326 lsst_path=lsst_path,
327 direct_ndf_array_paths=direct_ndf_array_paths,
328 wcs_ndf_paths=wcs_ndf_paths,
329 )
330 finally:
331 h5_file.close()
333 def add_tree(
334 self,
335 tree: ArchiveTree,
336 *,
337 projection: Any = None,
338 bbox: Any = None,
339 unit: astropy.units.UnitBase | None = None,
340 root_name: str | None = None,
341 ) -> None:
342 """Finalize the file: write WCS, units, JSON tree, and ORIGIN.
344 Writes the canonical NDF ``/WCS`` HDS structure (an AST channel
345 text dump that KAPPA / hdstrace expect) when ``projection`` is
346 provided. A native mask sub-NDF at ``/MORE/LSST/MASK`` gets a 3D
347 WCS whose first two axes reuse the parent's sky projection and
348 whose third axis is the mask-byte coordinate. The JSON tree at
349 ``<lsst_path>/JSON`` remains the source of truth for symmetric
350 round-trips; ``/WCS`` is for Starlink tools. Auto-detect read of
351 ``/WCS/DATA`` into a typed ``Projection`` is a follow-up.
353 Parameters
354 ----------
355 tree
356 Pydantic tree returned by the object's ``serialize`` method,
357 with ``metadata``/``butler_info`` already applied.
358 projection, bbox, unit
359 Top-level object attributes that drive NDF-canonical writes.
360 root_name
361 Value to assign to the root group's ``HDS_ROOT_NAME``
362 attribute (fixed-length ASCII so KAPPA / hdstrace decode it).
363 """
364 if projection is not None:
365 self._write_wcs(projection, bbox)
366 if unit is not None and isinstance(self._document.root, Ndf):
367 self._document.ensure_ndf("/").set_units(_unit_to_ndf_string(unit))
368 json_text = tree.model_dump_json()
369 # Let ensure_structure derive types from path-component names so
370 # /MORE/LSST/... gets <MORE> stay as EXT and <LSST> typed LSST,
371 # mirroring HDS convention.
372 lsst = self._ensure_model_structure(self._lsst_path)
373 lsst.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text)))
374 primary = self._opaque_metadata.headers.get(ExtensionKey())
375 if primary is not None and len(primary):
376 cards = _fits_header_records(primary)
377 more = self._ensure_model_structure("/MORE")
378 more.children["FITS"] = HdsPrimitive.char_array(cards, width=80)
379 if bbox is not None:
380 origin = _origin_from_bbox(bbox)
381 for struct_path in self._bbox_array_struct_paths:
382 if self._has_model_path(struct_path):
383 self.set_array_origin(struct_path, origin)
384 if root_name is not None:
385 self._document.root_name = root_name
386 self._flush()
388 def _write_wcs(self, projection: Any, bbox: Any) -> None:
389 ast_frame_set = projection._pixel_to_sky._get_ast_frame_set()
390 text = _show_ast_for_ndf(ast_frame_set, bbox)
391 lines = _hds.encode_ndf_ast_data(text)
392 for ndf_path in self._wcs_ndf_paths:
393 if self._has_model_path(ndf_path):
394 self._document.ensure_ndf(ndf_path).set_wcs(NdfWcs(lines))
395 if self._has_model_path("/MORE/LSST/MASK"):
396 mask_origin = _origin_from_bbox(bbox) if bbox is not None else (0, 0)
397 mask_ndim = self._model_array_ndim("/MORE/LSST/MASK/DATA_ARRAY")
398 mask_origin = (*mask_origin, *((0,) * max(0, mask_ndim - len(mask_origin))))
399 mask_ast_frame_set = projection._pixel_to_sky._get_ast_frame_set()
400 mask_text = _show_mask_ast_for_ndf(
401 mask_ast_frame_set,
402 mask_origin,
403 labels=("x", "y", "mask-byte"),
404 )
405 self._document.ensure_ndf("/MORE/LSST/MASK").set_wcs(NdfWcs(_hds.encode_ndf_ast_data(mask_text)))
407 def serialize_direct[T: pydantic.BaseModel](
408 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T]
409 ) -> T:
410 nested = NestedOutputArchive[NdfPointerModel](name, self)
411 return serializer(nested)
413 def serialize_pointer[T: ArchiveTree](
414 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T], key: Hashable
415 ) -> NdfPointerModel:
416 if (pointer := self._pointers.get(key)) is not None:
417 return pointer
418 archive_path = name if name.startswith("/") else f"/{name}"
419 target_path = self._archive_path_to_hdf5_path(archive_path)
420 # Run the serializer first so any nested add_array / serialize_pointer
421 # calls write into the file before we dump this sub-tree to JSON.
422 model = self.serialize_direct(name, serializer)
423 json_text = model.model_dump_json()
424 # Store the JSON document as a "JSON" child of the target structure,
425 # mirroring the main /MORE/LSST/JSON tree. Writing it at the target
426 # path itself would clobber any nested arrays the serializer added
427 # under that path (IR.write_to_hdf5 deletes the existing node before
428 # writing a primitive).
429 # ensure_structure derives the HDS type from the leaf name, so
430 # /MORE/LSST/PSF is typed <PSF> rather than the generic <EXT>.
431 target = self._ensure_model_structure(target_path)
432 target.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text)))
433 self._flush()
434 pointer = NdfPointerModel(path=f"{target_path}/JSON")
435 self._pointers[key] = pointer
436 return pointer
438 def serialize_frame_set[T: ArchiveTree](
439 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
440 ) -> NdfPointerModel:
441 pointer = self.serialize_pointer(name, serializer, key)
442 self._frame_sets.append((frame_set, pointer))
443 return pointer
445 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, NdfPointerModel]]:
446 return iter(self._frame_sets)
448 _COMPATIBLE_MASK_DTYPES = (np.dtype(np.uint8),)
449 _prefer_native_mask_arrays = True
451 def add_array(
452 self,
453 array: np.ndarray,
454 *,
455 name: str | None = None,
456 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
457 ) -> ArrayReferenceModel:
458 # Recognised top-level names go to standard NDF locations.
459 # Anything else hoists under /MORE/LSST.
460 if name == "image":
461 root = self._document.ensure_ndf("/")
462 root.set_array_component(
463 "DATA_ARRAY",
464 array,
465 origin=np.zeros(array.ndim, dtype=np.int64),
466 compression_options=self._compression_options,
467 )
468 path = "/DATA_ARRAY/DATA"
469 self._bbox_array_struct_paths.add("/DATA_ARRAY")
470 elif name == "variance":
471 root = self._document.ensure_ndf("/")
472 root.set_array_component(
473 "VARIANCE",
474 array,
475 origin=np.zeros(array.ndim, dtype=np.int64),
476 compression_options=self._compression_options,
477 )
478 path = "/VARIANCE/DATA"
479 self._bbox_array_struct_paths.add("/VARIANCE")
480 elif name == "mask":
481 if array.ndim == 2 and array.dtype in self._COMPATIBLE_MASK_DTYPES:
482 self._set_quality_array(array)
483 path = "/QUALITY/QUALITY/DATA"
484 self._bbox_array_struct_paths.add("/QUALITY/QUALITY")
485 else:
486 # Native Mask serialization passes HDF5 shape
487 # (mask-byte, y, x). HDS reports the reverse dimension order,
488 # so Starlink tools see (x, y, mask-byte).
489 if array.ndim == 3 and array.dtype in self._COMPATIBLE_MASK_DTYPES:
490 self._set_quality_array(self._collapse_mask_to_quality(array))
491 self._bbox_array_struct_paths.add("/QUALITY/QUALITY")
492 mask_ndf = self._document.ensure_ndf("/MORE/LSST/MASK")
493 mask_ndf.set_array_component(
494 "DATA_ARRAY",
495 array,
496 origin=np.zeros(array.ndim, dtype=np.int64),
497 bad_pixel=False,
498 compression_options=self._compression_options,
499 )
500 path = "/MORE/LSST/MASK/DATA_ARRAY/DATA"
501 self._bbox_array_struct_paths.add("/MORE/LSST/MASK/DATA_ARRAY")
502 elif name in self._direct_ndf_array_paths:
503 sub_ndf_path = self._direct_ndf_array_paths[name]
504 sub_ndf = self._document.ensure_ndf(sub_ndf_path)
505 sub_ndf.set_array_component(
506 "DATA_ARRAY",
507 array,
508 origin=np.zeros(array.ndim, dtype=np.int64),
509 compression_options=self._compression_options,
510 )
511 path = f"{sub_ndf_path}/DATA_ARRAY/DATA"
512 self._bbox_array_struct_paths.add(f"{sub_ndf_path}/DATA_ARRAY")
513 else:
514 if name is None:
515 raise ValueError("Anonymous arrays are not supported in the NDF archive.")
516 archive_path = name if name.startswith("/") else f"/{name}"
517 # Hoisted numeric arrays are wrapped as sub-NDFs under
518 # /MORE/LSST/<UPPER_PATH> so Starlink tools (KAPPA `display`,
519 # `hdstrace`, etc.) can inspect them just like the main
520 # image. The sub-NDF has the canonical layout: top-level
521 # group with CLASS="NDF" containing a DATA_ARRAY structure
522 # (CLASS="ARRAY") with DATA + ORIGIN primitives. Hoisted
523 # JSON sub-trees from serialize_pointer stay as bare
524 # _CHAR*N datasets at /MORE/LSST/<NAME> (no NDF wrapper) —
525 # they're JSON documents, not numeric arrays.
526 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path)
527 sub_ndf = self._document.ensure_ndf(sub_ndf_path)
528 sub_ndf.set_array_component(
529 "DATA_ARRAY",
530 array,
531 origin=np.zeros(array.ndim, dtype=np.int64),
532 compression_options=self._compression_options,
533 )
534 path = f"{sub_ndf_path}/DATA_ARRAY/DATA"
535 self._flush()
536 # Shape is stored in the JSON tree (matching the FITS archive) because
537 # MaskSerializationModel.bbox needs it before any arrays are read.
538 # Future work: resolve shape from the HDF5 dataset on read instead.
539 return ArrayReferenceModel(
540 source=f"ndf:{path}",
541 shape=list(array.shape),
542 datatype=NumberType.from_numpy(array.dtype),
543 )
545 def _ensure_path(self, path: str) -> h5py.Group:
546 """Walk/create groups for an HDF5 absolute path.
548 Intermediate groups created on demand are tagged with
549 ``CLASS="EXT"``, the HDS type for general-purpose extension
550 containers (matches what hds-v5 writes for ``MORE``).
551 """
552 self._ensure_model_structure(path, "EXT")
553 self._flush()
554 return self._file["/" if path in ("", "/") else path]
556 def _ensure_struct(self, path: str, hds_type: str) -> h5py.Group:
557 """Ensure a structure exists at ``path`` with the given HDS type."""
558 self._ensure_model_structure(path, hds_type)
559 self._flush()
560 return self._file["/" if path in ("", "/") else path]
562 def _ensure_array_structure(self, path: str) -> h5py.Group:
563 """Ensure an HDS ``ARRAY`` structure exists at ``path``."""
564 return self._ensure_struct(path, "ARRAY")
566 def _ensure_quality_structure(self) -> h5py.Group:
567 """Ensure ``/QUALITY`` exists with ``CLASS="QUALITY"`` and BADBITS.
569 BADBITS is set to 255 so every bit of the collapsed single-byte
570 QUALITY plane is treated as bad by NDF applications.
571 """
572 root = self._document.ensure_ndf("/")
573 if "QUALITY" not in root.children:
574 root.children["QUALITY"] = HdsStructure("QUALITY")
575 quality = root.get_structure("QUALITY")
576 quality.hds_type = "QUALITY"
577 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8))
578 self._flush()
579 return self._file["/QUALITY"]
581 def _ensure_quality_array_structure(self) -> h5py.Group:
582 """Ensure the nested ``/QUALITY/QUALITY`` ARRAY structure exists."""
583 root = self._document.ensure_ndf("/")
584 if "QUALITY" not in root.children:
585 root.children["QUALITY"] = HdsStructure("QUALITY")
586 quality = root.get_structure("QUALITY")
587 quality.hds_type = "QUALITY"
588 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8))
589 if "QUALITY" not in quality.children or not isinstance(quality.children["QUALITY"], HdsStructure):
590 quality.children["QUALITY"] = HdsStructure("ARRAY")
591 quality_array = quality.get_structure("QUALITY")
592 quality_array.hds_type = "ARRAY"
593 quality_array.children.setdefault("ORIGIN", HdsPrimitive.array(np.zeros(2, dtype=np.int32)))
594 quality_array.children.setdefault("BAD_PIXEL", HdsPrimitive.array(np.array(False, dtype=np.bool_)))
595 self._flush()
596 return self._file["/QUALITY/QUALITY"]
598 def _write_quality_array(self, quality: np.ndarray) -> None:
599 """Write or replace the NDF QUALITY array."""
600 self._set_quality_array(quality)
601 self._flush()
603 def _set_quality_array(self, quality: np.ndarray) -> None:
604 """Set or replace the NDF QUALITY array in the model."""
605 root = self._document.ensure_ndf("/")
606 root.set_quality(
607 NdfQuality(
608 NdfArray(
609 quality,
610 origin=np.zeros(2, dtype=np.int32),
611 bad_pixel=False,
612 compression_options=self._compression_options,
613 )
614 )
615 )
617 def _collapse_mask_to_quality(self, array: np.ndarray) -> np.ndarray:
618 """Compress an NDF-native 3-D mask array into 2-D QUALITY.
620 The input array is in HDF5 storage order ``(mask-byte, y, x)``.
621 Single-byte masks copy directly to preserve bit values. Wider masks
622 collapse to 1 where any byte is non-zero and 0 otherwise.
623 """
624 if array.shape[0] == 1:
625 return array[0, :, :]
626 return np.any(array != 0, axis=0).astype(np.uint8)
628 def _write_origin_for_array(self, struct_path: str, array: np.ndarray) -> None:
629 """Write a placeholder ORIGIN of zeros (int64).
631 The top-level `write` function overwrites this
632 with bbox-derived values via :meth:`set_array_origin` once the
633 bbox is known.
634 """
635 struct = self._document.root.get_structure(struct_path)
636 if "ORIGIN" not in struct.children:
637 struct.children["ORIGIN"] = HdsPrimitive.array(np.zeros(array.ndim, dtype=np.int64))
639 def set_array_origin(self, struct_path: str, origin: tuple[int, ...]) -> None:
640 """Overwrite the ORIGIN of an ARRAY structure.
642 Parameters
643 ----------
644 struct_path
645 HDF5 path to the ARRAY structure (e.g. ``"/DATA_ARRAY"``).
646 origin
647 Origin in NDF/Fortran axis order (e.g. ``(x_min, y_min)``
648 for a 2D image with bbox lower bound ``(x_min, y_min)``).
649 """
650 struct = self._document.root.get_structure(struct_path)
651 origin_dtype = np.int32 if struct_path == "/QUALITY/QUALITY" else np.int64
652 origin_array = np.asarray(origin, dtype=origin_dtype)
653 data_node = struct.children.get("DATA")
654 if isinstance(data_node, HdsPrimitive):
655 data_ndim = data_node.read_array().ndim
656 if origin_array.size < data_ndim:
657 origin_array = np.pad(origin_array, (0, data_ndim - origin_array.size))
658 struct.children["ORIGIN"] = HdsPrimitive.array(origin_array)
659 self._flush()
661 def _ensure_model_structure(self, path: str, hds_type: str | None = None) -> HdsStructure:
662 """Return or create a structure in the NDF document model.
664 With ``hds_type=None`` the leaf type is derived from its name;
665 see `HdsStructure.ensure_structure`.
666 """
667 if path in ("", "/"):
668 return self._document.root
669 return self._document.root.ensure_structure(path, hds_type)
671 def _archive_path_to_hdf5_path(self, archive_path: str) -> str:
672 """Translate an archive path to this layout's HDF5 path."""
673 if self._lsst_path == "/MORE/LSST":
674 return archive_path_to_hdf5_path(archive_path)
675 if not archive_path:
676 return f"{self._lsst_path}/JSON"
677 components = archive_path_to_hdf5_path_components(archive_path)
678 return f"{self._lsst_path}/{'/'.join(components)}"
680 def _has_model_path(self, path: str) -> bool:
681 """Return `True` if a path exists in the NDF document model."""
682 try:
683 self._document.get(path)
684 except KeyError:
685 return False
686 return True
688 def _model_array_ndim(self, struct_path: str) -> int:
689 """Return the dimensionality of an ARRAY structure's DATA primitive."""
690 struct = self._document.root.get_structure(struct_path)
691 data_node = struct.children["DATA"]
692 if not isinstance(data_node, HdsPrimitive):
693 raise TypeError(f"{struct_path}/DATA is not an HDS primitive.")
694 return data_node.read_array().ndim
696 def _flush(self) -> None:
697 """Synchronize the Python NDF document model to the open HDF5 file."""
698 self._document.write_to_hdf5(self._file)
700 def add_table(
701 self,
702 table: astropy.table.Table,
703 *,
704 name: str | None = None,
705 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
706 ) -> TableModel:
707 columns = TableColumnModel.from_table(table, inline=True)
708 return TableModel(columns=columns, meta=table.meta)
710 def add_structured_array(
711 self,
712 array: np.ndarray,
713 *,
714 name: str | None = None,
715 units: Mapping[str, astropy.units.Unit] | None = None,
716 descriptions: Mapping[str, str] | None = None,
717 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
718 ) -> TableModel:
719 if name is None:
720 columns = TableColumnModel.from_record_array(array, inline=True)
721 for c in columns:
722 if units and (unit := units.get(c.name)):
723 c.unit = unit
724 if descriptions and (description := descriptions.get(c.name)):
725 c.description = description
726 return TableModel(columns=columns)
727 columns = TableColumnModel.from_record_dtype(array.dtype)
728 for c in columns:
729 column_path = name if len(columns) == 1 else f"{name}/{c.name}"
730 archive_path = column_path if column_path.startswith("/") else f"/{column_path}"
731 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path)
732 column_array = np.asarray(array[c.name])
733 sub_ndf = self._document.ensure_ndf(sub_ndf_path)
734 sub_ndf.set_array_component(
735 "DATA_ARRAY",
736 column_array,
737 origin=np.zeros(column_array.ndim, dtype=np.int64),
738 compression_options=self._compression_options,
739 )
740 assert isinstance(c.data, ArrayReferenceModel)
741 c.data.source = f"ndf:{sub_ndf_path}/DATA_ARRAY/DATA"
742 for c in columns:
743 if units and (unit := units.get(c.name)):
744 c.unit = unit
745 if descriptions and (description := descriptions.get(c.name)):
746 c.description = description
747 self._flush()
748 return TableModel(columns=columns)