Coverage for python/lsst/images/ndf/_output_archive.py: 15%
330 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:08 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "NdfOutputArchive",
16 "write",
17)
19import os
20from collections.abc import Callable, Hashable, Iterator, Mapping, Sequence
21from contextlib import contextmanager
22from typing import Any, Self
24import astropy.io.fits
25import astropy.table
26import astropy.units
27import h5py
28import numpy as np
29import pydantic
31from .._color_image import ColorImage
32from .._transforms import FrameSet
33from .._transforms._ast import Channel, CmpFrame, CmpMap, ShiftMap, StringStream, UnitMap
34from .._transforms._ast import Frame as AstFrame
35from .._transforms._ast import FrameSet as AstFrameSet
36from .._transforms._transform import _prepend_ast_shift
37from ..fits._common import ExtensionKey, FitsOpaqueMetadata
38from ..serialization import (
39 ArchiveTree,
40 ArrayReferenceModel,
41 ButlerInfo,
42 MetadataValue,
43 NestedOutputArchive,
44 NumberType,
45 OutputArchive,
46 TableColumnModel,
47 TableModel,
48 no_header_updates,
49)
50from . import _hds
51from ._common import NdfPointerModel, archive_path_to_hdf5_path, archive_path_to_hdf5_path_components
52from ._model import HdsPrimitive, HdsStructure, Ndf, NdfArray, NdfContainer, NdfDocument, NdfQuality, NdfWcs
54_NDF_FORMAT_VERSION = 1
55"""Container layout version for files written by `NdfOutputArchive`.
57Bumps when the NDF layout (NdfDocument shape, .MORE.LSST contents)
58changes. Independent of any data-model ``SCHEMA_VERSION``.
59"""
62def write(
63 obj: Any,
64 filename: str | None = None,
65 *,
66 metadata: dict[str, MetadataValue] | None = None,
67 butler_info: ButlerInfo | None = None,
68 compression_options: Mapping[str, Any] | None = None,
69) -> ArchiveTree:
70 """Write a serializable object to an NDF (HDS-on-HDF5) file.
72 Parameters
73 ----------
74 obj
75 Object with a ``serialize`` method. May carry an
76 ``_opaque_metadata`` attribute (a
77 `~lsst.images.fits.FitsOpaqueMetadata`)
78 whose primary-HDU header gets written to ``/MORE/FITS``. This
79 preserves FITS cards on objects that originated from a FITS read;
80 butler provenance is conveyed through ``butler_info`` instead.
81 filename
82 Path to write to. Must not already exist. If `None`, an
83 in-memory HDF5 file is used and the on-disk artefact is
84 discarded; the returned tree still reflects all the writes the
85 archive made (useful for tests).
86 metadata, butler_info
87 Optional caller-supplied entries that are written into the
88 returned `~lsst.images.serialization.ArchiveTree`.
89 compression_options
90 Optional dict forwarded to the archive constructor for h5py
91 dataset compression.
93 Returns
94 -------
95 `~lsst.images.serialization.ArchiveTree`
96 The Pydantic tree the object's ``serialize`` produced (with
97 ``metadata``/``butler_info`` applied).
98 """
99 opaque_metadata = getattr(obj, "_opaque_metadata", None)
100 if not isinstance(opaque_metadata, FitsOpaqueMetadata):
101 opaque_metadata = FitsOpaqueMetadata()
102 archive_default_name = getattr(obj, "_archive_default_name", None)
103 with NdfOutputArchive.open(
104 filename,
105 compression_options=compression_options,
106 opaque_metadata=opaque_metadata,
107 **_get_archive_layout(obj),
108 ) as archive:
109 if archive_default_name is not None:
110 tree = archive.serialize_direct(archive_default_name, obj.serialize)
111 else:
112 tree = obj.serialize(archive)
113 if metadata is not None:
114 tree.metadata.update(metadata)
115 if butler_info is not None:
116 tree.butler_info = butler_info
117 archive.add_tree(
118 tree,
119 projection=getattr(obj, "projection", None),
120 bbox=getattr(obj, "bbox", None),
121 unit=getattr(obj, "unit", None),
122 root_name=archive_default_name or type(obj).__name__,
123 )
124 return tree
127def _origin_from_bbox(bbox: Any) -> tuple[int, ...]:
128 """Extract NDF/Fortran-order origin tuple from an lsst.images Box.
130 The exact attribute names on Box depend on the version. Inspect the
131 object and pick whatever exposes the integer lower bounds. For a 2D
132 image with bbox lower bound (x_min, y_min) the returned tuple is
133 ``(x_min, y_min)``.
134 """
135 # Box exposes .x and .y properties returning Interval objects with a
136 # .start attribute (the lower bound).
137 if hasattr(bbox, "x") and hasattr(bbox, "y"):
138 x = bbox.x
139 y = bbox.y
140 if hasattr(x, "start") and hasattr(y, "start"):
141 return (int(x.start), int(y.start))
142 raise AttributeError(
143 f"Don't know how to extract origin from bbox of type {type(bbox).__name__!r}; "
144 "_origin_from_bbox needs updating."
145 )
148def _unit_to_ndf_string(unit: astropy.units.UnitBase) -> str:
149 """Return an ASCII unit string for the NDF UNITS component."""
150 try:
151 return unit.to_string(format="fits")
152 except ValueError:
153 return unit.to_string()
156def _fits_header_records(header: astropy.io.fits.Header) -> list[str]:
157 """Return fixed-width FITS records for an opaque NDF FITS extension.
159 NDF ``.MORE.FITS`` is a ``_CHAR*80`` vector. Use Astropy's FITS
160 header serializer to preserve multi-record logical cards such as
161 CONTINUE strings, but omit the FITS END card and 2880-byte padding
162 because this is an NDF extension component, not a complete FITS
163 header block.
164 """
165 block = header.tostring(sep="", endcard=False, padding=False)
166 encoded = block.encode("ascii")
167 if len(encoded) % 80:
168 raise ValueError(
169 f"FITS header block is {len(encoded)} bytes, not a multiple of the 80-byte FITS record size."
170 )
171 return [block[n : n + 80] for n in range(0, len(block), 80)]
174def _get_archive_layout(obj: Any) -> dict[str, Any]:
175 """Return NDF document layout options for a top-level object."""
176 if isinstance(obj, ColorImage):
177 return {
178 "root": NdfContainer(),
179 "lsst_path": "/LSST",
180 "direct_ndf_array_paths": {
181 "red": "/RED",
182 "green": "/GREEN",
183 "blue": "/BLUE",
184 },
185 "wcs_ndf_paths": ("/RED", "/GREEN", "/BLUE"),
186 }
187 return {}
190def _show_ast_for_ndf(ast_frame_set: Any, bbox: Any | None) -> str:
191 """Return AST Channel text matching Starlink NDF WCS serialization.
193 Tags the original base frame with ``Domain="PIXEL"`` and prepends a
194 new ``Domain="GRID"`` base frame related to it by a `ShiftMap` whose
195 shift converts ``bbox``-origin pixel coordinates into 1-based grid
196 coordinates. The result is written via an abstraction-layer
197 ``Channel`` configured with the same options the Starlink C writer
198 uses (``Full=-1,Comment=0``; see ``ndf1Wwrt.c``) plus ``Indent=0`` so
199 each line is just the bare AST item with the single-space prefix
200 that ``ndf1Rdast`` strips back off on read.
201 """
202 if bbox is None:
203 x_shift = 1.0
204 y_shift = 1.0
205 else:
206 x_shift = 1.0 - float(bbox.x.start)
207 y_shift = 1.0 - float(bbox.y.start)
209 saved_current = ast_frame_set.current
210 ast_frame_set.current = ast_frame_set.base
211 ast_frame_set.domain = "PIXEL"
212 ast_frame_set.current = saved_current
213 _prepend_ast_shift(ast_frame_set, x=x_shift, y=y_shift, ast_domain="GRID")
215 stream = StringStream()
216 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0")
217 channel.write(ast_frame_set)
218 return stream.getSinkData()
221def _show_mask_ast_for_ndf(
222 parent_ast_frame_set: Any,
223 origin: Sequence[int],
224 *,
225 labels: Sequence[str] = (),
226) -> str:
227 """Return an NDF WCS for the 3D native mask sub-NDF.
229 The first two axes reuse the parent image's pixel-to-sky mapping. The
230 third axis is a generic mask-byte coordinate that passes through unchanged.
231 """
232 n_axes = len(origin)
233 ast_frame_set = AstFrameSet(AstFrame(n_axes, "Domain=GRID"))
234 pixel_frame = AstFrame(n_axes, "Domain=PIXEL")
235 for axis, label in enumerate(labels[:n_axes], start=1):
236 pixel_frame.setLabel(axis, label)
237 shifts = [1.0 - float(axis_origin) for axis_origin in origin]
238 ast_frame_set.addFrame(AstFrameSet.BASE, ShiftMap(shifts), pixel_frame)
240 parent_pixel_to_sky = parent_ast_frame_set.getMapping(
241 parent_ast_frame_set.base,
242 parent_ast_frame_set.current,
243 )
244 parent_sky_frame = parent_ast_frame_set.getFrame(parent_ast_frame_set.current)
245 mask_axis_frame = AstFrame(1, "Domain=MASK")
246 mask_axis_frame.setLabel(1, labels[2] if len(labels) > 2 else "mask-byte")
247 ast_frame_set.addFrame(
248 ast_frame_set.current,
249 CmpMap(parent_pixel_to_sky, UnitMap(1), False),
250 CmpFrame(parent_sky_frame, mask_axis_frame),
251 )
253 stream = StringStream()
254 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0")
255 channel.write(ast_frame_set)
256 return stream.getSinkData()
259class NdfOutputArchive(OutputArchive[NdfPointerModel]):
260 """An `~lsst.images.serialization.OutputArchive` implementation
261 that writes HDS-on-HDF5 files compatible with the Starlink NDF data
262 model.
264 Parameters
265 ----------
266 file
267 An open `h5py.File` opened in a writable mode. The archive does
268 not close the file; the caller is responsible for that.
269 compression_options
270 Optional dict passed through to `h5py.Group.create_dataset` for image
271 arrays (e.g. ``{"compression": "gzip", "compression_opts": 4}``).
272 opaque_metadata
273 Optional `~lsst.images.fits.FitsOpaqueMetadata`; if its primary-HDU
274 header is non-empty its cards will be written to ``/MORE/FITS`` by the
275 top-level `write` function.
276 """
278 def __init__(
279 self,
280 file: h5py.File,
281 compression_options: Mapping[str, Any] | None = None,
282 opaque_metadata: FitsOpaqueMetadata | None = None,
283 root: Ndf | NdfContainer | None = None,
284 lsst_path: str = "/MORE/LSST",
285 direct_ndf_array_paths: Mapping[str, str] | None = None,
286 wcs_ndf_paths: Sequence[str] = ("/",),
287 ) -> None:
288 self._file = file
289 self._document = NdfDocument(root=root if root is not None else Ndf())
290 self._lsst_path = lsst_path.rstrip("/") or "/LSST"
291 self._direct_ndf_array_paths = dict(direct_ndf_array_paths) if direct_ndf_array_paths else {}
292 self._wcs_ndf_paths = tuple(wcs_ndf_paths)
293 self._bbox_array_struct_paths: set[str] = set()
294 self._compression_options = dict(compression_options) if compression_options else {}
295 self._opaque_metadata = opaque_metadata if opaque_metadata is not None else FitsOpaqueMetadata()
296 self._frame_sets: list[tuple[FrameSet, NdfPointerModel]] = []
297 self._pointers: dict[Hashable, NdfPointerModel] = {}
298 # Keep the open file in sync so existing direct-archive tests can
299 # inspect it immediately, while all mutations go through the IR.
300 self._flush()
302 @classmethod
303 @contextmanager
304 def open(
305 cls,
306 filename: str | None,
307 *,
308 compression_options: Mapping[str, Any] | None = None,
309 opaque_metadata: FitsOpaqueMetadata | None = None,
310 root: Ndf | NdfContainer | None = None,
311 lsst_path: str = "/MORE/LSST",
312 direct_ndf_array_paths: Mapping[str, str] | None = None,
313 wcs_ndf_paths: Sequence[str] = ("/",),
314 ) -> Iterator[Self]:
315 """Open an NDF file for writing and yield an `NdfOutputArchive`.
317 ``filename=None`` uses an in-memory HDF5 file; the on-disk
318 artefact is discarded but the archive's writes still produce a
319 usable returned tree (handy for tests).
320 """
321 if filename is None:
322 h5_file = h5py.File("inmem.sdf", "w", driver="core", backing_store=False)
323 else:
324 if os.path.exists(filename) and os.path.getsize(filename) > 0:
325 raise OSError(f"File {filename!r} already exists.")
326 h5_file = h5py.File(filename, "w")
327 try:
328 yield cls(
329 h5_file,
330 compression_options=compression_options,
331 opaque_metadata=opaque_metadata,
332 root=root,
333 lsst_path=lsst_path,
334 direct_ndf_array_paths=direct_ndf_array_paths,
335 wcs_ndf_paths=wcs_ndf_paths,
336 )
337 finally:
338 h5_file.close()
340 def add_tree(
341 self,
342 tree: ArchiveTree,
343 *,
344 projection: Any = None,
345 bbox: Any = None,
346 unit: astropy.units.UnitBase | None = None,
347 root_name: str | None = None,
348 ) -> None:
349 """Finalize the file: write WCS, units, JSON tree, and ORIGIN.
351 Writes the canonical NDF ``/WCS`` HDS structure (an AST channel
352 text dump that KAPPA / hdstrace expect) when ``projection`` is
353 provided. A native mask sub-NDF at ``/MORE/LSST/MASK`` gets a 3D
354 WCS whose first two axes reuse the parent's sky projection and
355 whose third axis is the mask-byte coordinate. The JSON tree at
356 ``<lsst_path>/JSON`` remains the source of truth for symmetric
357 round-trips; ``/WCS`` is for Starlink tools. Auto-detect read of
358 ``/WCS/DATA`` into a typed ``Projection`` is a follow-up.
360 Parameters
361 ----------
362 tree
363 Pydantic tree returned by the object's ``serialize`` method,
364 with ``metadata``/``butler_info`` already applied.
365 projection, bbox, unit
366 Top-level object attributes that drive NDF-canonical writes.
367 root_name
368 Value to assign to the root group's ``HDS_ROOT_NAME``
369 attribute (fixed-length ASCII so KAPPA / hdstrace decode it).
370 """
371 if projection is not None:
372 self._write_wcs(projection, bbox)
373 if unit is not None and isinstance(self._document.root, Ndf):
374 self._document.ensure_ndf("/").set_units(_unit_to_ndf_string(unit))
375 json_text = tree.model_dump_json()
376 # Let ensure_structure derive types from path-component names so
377 # /MORE/LSST/... gets <MORE> stay as EXT and <LSST> typed LSST,
378 # mirroring HDS convention.
379 lsst = self._ensure_model_structure(self._lsst_path)
380 lsst.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text)))
381 lsst.children["DATA_MODEL"] = HdsPrimitive.char_scalar(
382 tree.schema_url, width=max(80, len(tree.schema_url))
383 )
384 lsst.children["FORMAT_VERSION"] = HdsPrimitive.array(np.array(_NDF_FORMAT_VERSION, dtype=np.int32))
385 primary = self._opaque_metadata.headers.get(ExtensionKey())
386 if primary is not None and len(primary):
387 cards = _fits_header_records(primary)
388 more = self._ensure_model_structure("/MORE")
389 more.children["FITS"] = HdsPrimitive.char_array(cards, width=80)
390 if bbox is not None:
391 origin = _origin_from_bbox(bbox)
392 for struct_path in self._bbox_array_struct_paths:
393 if self._has_model_path(struct_path):
394 self.set_array_origin(struct_path, origin)
395 if root_name is not None:
396 self._document.root_name = root_name
397 self._flush()
399 def _write_wcs(self, projection: Any, bbox: Any) -> None:
400 ast_frame_set = projection._pixel_to_sky._get_ast_frame_set()
401 text = _show_ast_for_ndf(ast_frame_set, bbox)
402 lines = _hds.encode_ndf_ast_data(text)
403 for ndf_path in self._wcs_ndf_paths:
404 if self._has_model_path(ndf_path):
405 self._document.ensure_ndf(ndf_path).set_wcs(NdfWcs(lines))
406 if self._has_model_path("/MORE/LSST/MASK"):
407 mask_origin = _origin_from_bbox(bbox) if bbox is not None else (0, 0)
408 mask_ndim = self._model_array_ndim("/MORE/LSST/MASK/DATA_ARRAY")
409 mask_origin = (*mask_origin, *((0,) * max(0, mask_ndim - len(mask_origin))))
410 mask_ast_frame_set = projection._pixel_to_sky._get_ast_frame_set()
411 mask_text = _show_mask_ast_for_ndf(
412 mask_ast_frame_set,
413 mask_origin,
414 labels=("x", "y", "mask-byte"),
415 )
416 self._document.ensure_ndf("/MORE/LSST/MASK").set_wcs(NdfWcs(_hds.encode_ndf_ast_data(mask_text)))
418 def serialize_direct[T: pydantic.BaseModel](
419 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T]
420 ) -> T:
421 nested = NestedOutputArchive[NdfPointerModel](name, self)
422 return serializer(nested)
424 def serialize_pointer[T: ArchiveTree](
425 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T], key: Hashable
426 ) -> NdfPointerModel:
427 if (pointer := self._pointers.get(key)) is not None:
428 return pointer
429 archive_path = name if name.startswith("/") else f"/{name}"
430 target_path = self._archive_path_to_hdf5_path(archive_path)
431 # Run the serializer first so any nested add_array / serialize_pointer
432 # calls write into the file before we dump this sub-tree to JSON.
433 model = self.serialize_direct(name, serializer)
434 json_text = model.model_dump_json()
435 # Store the JSON document as a "JSON" child of the target structure,
436 # mirroring the main /MORE/LSST/JSON tree. Writing it at the target
437 # path itself would clobber any nested arrays the serializer added
438 # under that path (IR.write_to_hdf5 deletes the existing node before
439 # writing a primitive).
440 # ensure_structure derives the HDS type from the leaf name, so
441 # /MORE/LSST/PSF is typed <PSF> rather than the generic <EXT>.
442 target = self._ensure_model_structure(target_path)
443 target.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text)))
444 self._flush()
445 pointer = NdfPointerModel(path=f"{target_path}/JSON")
446 self._pointers[key] = pointer
447 return pointer
449 def serialize_frame_set[T: ArchiveTree](
450 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
451 ) -> NdfPointerModel:
452 pointer = self.serialize_pointer(name, serializer, key)
453 self._frame_sets.append((frame_set, pointer))
454 return pointer
456 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, NdfPointerModel]]:
457 return iter(self._frame_sets)
459 _COMPATIBLE_MASK_DTYPES = (np.dtype(np.uint8),)
460 _prefer_native_mask_arrays = True
462 def add_array(
463 self,
464 array: np.ndarray,
465 *,
466 name: str | None = None,
467 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
468 ) -> ArrayReferenceModel:
469 # Recognised top-level names go to standard NDF locations.
470 # Anything else hoists under /MORE/LSST.
471 if name == "image":
472 root = self._document.ensure_ndf("/")
473 root.set_array_component(
474 "DATA_ARRAY",
475 array,
476 origin=np.zeros(array.ndim, dtype=np.int64),
477 compression_options=self._compression_options,
478 )
479 path = "/DATA_ARRAY/DATA"
480 self._bbox_array_struct_paths.add("/DATA_ARRAY")
481 elif name == "variance":
482 root = self._document.ensure_ndf("/")
483 root.set_array_component(
484 "VARIANCE",
485 array,
486 origin=np.zeros(array.ndim, dtype=np.int64),
487 compression_options=self._compression_options,
488 )
489 path = "/VARIANCE/DATA"
490 self._bbox_array_struct_paths.add("/VARIANCE")
491 elif name == "mask":
492 if array.ndim == 2 and array.dtype in self._COMPATIBLE_MASK_DTYPES:
493 self._set_quality_array(array)
494 path = "/QUALITY/QUALITY/DATA"
495 self._bbox_array_struct_paths.add("/QUALITY/QUALITY")
496 else:
497 # Native Mask serialization passes HDF5 shape
498 # (mask-byte, y, x). HDS reports the reverse dimension order,
499 # so Starlink tools see (x, y, mask-byte).
500 if array.ndim == 3 and array.dtype in self._COMPATIBLE_MASK_DTYPES:
501 self._set_quality_array(self._collapse_mask_to_quality(array))
502 self._bbox_array_struct_paths.add("/QUALITY/QUALITY")
503 mask_ndf = self._document.ensure_ndf("/MORE/LSST/MASK")
504 mask_ndf.set_array_component(
505 "DATA_ARRAY",
506 array,
507 origin=np.zeros(array.ndim, dtype=np.int64),
508 bad_pixel=False,
509 compression_options=self._compression_options,
510 )
511 path = "/MORE/LSST/MASK/DATA_ARRAY/DATA"
512 self._bbox_array_struct_paths.add("/MORE/LSST/MASK/DATA_ARRAY")
513 elif name in self._direct_ndf_array_paths:
514 sub_ndf_path = self._direct_ndf_array_paths[name]
515 sub_ndf = self._document.ensure_ndf(sub_ndf_path)
516 sub_ndf.set_array_component(
517 "DATA_ARRAY",
518 array,
519 origin=np.zeros(array.ndim, dtype=np.int64),
520 compression_options=self._compression_options,
521 )
522 path = f"{sub_ndf_path}/DATA_ARRAY/DATA"
523 self._bbox_array_struct_paths.add(f"{sub_ndf_path}/DATA_ARRAY")
524 else:
525 if name is None:
526 raise ValueError("Anonymous arrays are not supported in the NDF archive.")
527 archive_path = name if name.startswith("/") else f"/{name}"
528 # Hoisted numeric arrays are wrapped as sub-NDFs under
529 # /MORE/LSST/<UPPER_PATH> so Starlink tools (KAPPA `display`,
530 # `hdstrace`, etc.) can inspect them just like the main
531 # image. The sub-NDF has the canonical layout: top-level
532 # group with CLASS="NDF" containing a DATA_ARRAY structure
533 # (CLASS="ARRAY") with DATA + ORIGIN primitives. Hoisted
534 # JSON sub-trees from serialize_pointer stay as bare
535 # _CHAR*N datasets at /MORE/LSST/<NAME> (no NDF wrapper) —
536 # they're JSON documents, not numeric arrays.
537 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path)
538 sub_ndf = self._document.ensure_ndf(sub_ndf_path)
539 sub_ndf.set_array_component(
540 "DATA_ARRAY",
541 array,
542 origin=np.zeros(array.ndim, dtype=np.int64),
543 compression_options=self._compression_options,
544 )
545 path = f"{sub_ndf_path}/DATA_ARRAY/DATA"
546 self._flush()
547 # Shape is stored in the JSON tree (matching the FITS archive) because
548 # MaskSerializationModel.bbox needs it before any arrays are read.
549 # Future work: resolve shape from the HDF5 dataset on read instead.
550 return ArrayReferenceModel(
551 source=f"ndf:{path}",
552 shape=list(array.shape),
553 datatype=NumberType.from_numpy(array.dtype),
554 )
556 def _ensure_path(self, path: str) -> h5py.Group:
557 """Walk/create groups for an HDF5 absolute path.
559 Intermediate groups created on demand are tagged with
560 ``CLASS="EXT"``, the HDS type for general-purpose extension
561 containers (matches what hds-v5 writes for ``MORE``).
562 """
563 self._ensure_model_structure(path, "EXT")
564 self._flush()
565 return self._file["/" if path in ("", "/") else path]
567 def _ensure_struct(self, path: str, hds_type: str) -> h5py.Group:
568 """Ensure a structure exists at ``path`` with the given HDS type."""
569 self._ensure_model_structure(path, hds_type)
570 self._flush()
571 return self._file["/" if path in ("", "/") else path]
573 def _ensure_array_structure(self, path: str) -> h5py.Group:
574 """Ensure an HDS ``ARRAY`` structure exists at ``path``."""
575 return self._ensure_struct(path, "ARRAY")
577 def _ensure_quality_structure(self) -> h5py.Group:
578 """Ensure ``/QUALITY`` exists with ``CLASS="QUALITY"`` and BADBITS.
580 BADBITS is set to 255 so every bit of the collapsed single-byte
581 QUALITY plane is treated as bad by NDF applications.
582 """
583 root = self._document.ensure_ndf("/")
584 if "QUALITY" not in root.children:
585 root.children["QUALITY"] = HdsStructure("QUALITY")
586 quality = root.get_structure("QUALITY")
587 quality.hds_type = "QUALITY"
588 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8))
589 self._flush()
590 return self._file["/QUALITY"]
592 def _ensure_quality_array_structure(self) -> h5py.Group:
593 """Ensure the nested ``/QUALITY/QUALITY`` ARRAY structure exists."""
594 root = self._document.ensure_ndf("/")
595 if "QUALITY" not in root.children:
596 root.children["QUALITY"] = HdsStructure("QUALITY")
597 quality = root.get_structure("QUALITY")
598 quality.hds_type = "QUALITY"
599 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8))
600 if "QUALITY" not in quality.children or not isinstance(quality.children["QUALITY"], HdsStructure):
601 quality.children["QUALITY"] = HdsStructure("ARRAY")
602 quality_array = quality.get_structure("QUALITY")
603 quality_array.hds_type = "ARRAY"
604 quality_array.children.setdefault("ORIGIN", HdsPrimitive.array(np.zeros(2, dtype=np.int32)))
605 quality_array.children.setdefault("BAD_PIXEL", HdsPrimitive.array(np.array(False, dtype=np.bool_)))
606 self._flush()
607 return self._file["/QUALITY/QUALITY"]
609 def _write_quality_array(self, quality: np.ndarray) -> None:
610 """Write or replace the NDF QUALITY array."""
611 self._set_quality_array(quality)
612 self._flush()
614 def _set_quality_array(self, quality: np.ndarray) -> None:
615 """Set or replace the NDF QUALITY array in the model."""
616 root = self._document.ensure_ndf("/")
617 root.set_quality(
618 NdfQuality(
619 NdfArray(
620 quality,
621 origin=np.zeros(2, dtype=np.int32),
622 bad_pixel=False,
623 compression_options=self._compression_options,
624 )
625 )
626 )
628 def _collapse_mask_to_quality(self, array: np.ndarray) -> np.ndarray:
629 """Compress an NDF-native 3-D mask array into 2-D QUALITY.
631 The input array is in HDF5 storage order ``(mask-byte, y, x)``.
632 Single-byte masks copy directly to preserve bit values. Wider masks
633 collapse to 1 where any byte is non-zero and 0 otherwise.
634 """
635 if array.shape[0] == 1:
636 return array[0, :, :]
637 return np.any(array != 0, axis=0).astype(np.uint8)
639 def _write_origin_for_array(self, struct_path: str, array: np.ndarray) -> None:
640 """Write a placeholder ORIGIN of zeros (int64).
642 The top-level `write` function overwrites this
643 with bbox-derived values via :meth:`set_array_origin` once the
644 bbox is known.
645 """
646 struct = self._document.root.get_structure(struct_path)
647 if "ORIGIN" not in struct.children:
648 struct.children["ORIGIN"] = HdsPrimitive.array(np.zeros(array.ndim, dtype=np.int64))
650 def set_array_origin(self, struct_path: str, origin: tuple[int, ...]) -> None:
651 """Overwrite the ORIGIN of an ARRAY structure.
653 Parameters
654 ----------
655 struct_path
656 HDF5 path to the ARRAY structure (e.g. ``"/DATA_ARRAY"``).
657 origin
658 Origin in NDF/Fortran axis order (e.g. ``(x_min, y_min)``
659 for a 2D image with bbox lower bound ``(x_min, y_min)``).
660 """
661 struct = self._document.root.get_structure(struct_path)
662 origin_dtype = np.int32 if struct_path == "/QUALITY/QUALITY" else np.int64
663 origin_array = np.asarray(origin, dtype=origin_dtype)
664 data_node = struct.children.get("DATA")
665 if isinstance(data_node, HdsPrimitive):
666 data_ndim = data_node.read_array().ndim
667 if origin_array.size < data_ndim:
668 origin_array = np.pad(origin_array, (0, data_ndim - origin_array.size))
669 struct.children["ORIGIN"] = HdsPrimitive.array(origin_array)
670 self._flush()
672 def _ensure_model_structure(self, path: str, hds_type: str | None = None) -> HdsStructure:
673 """Return or create a structure in the NDF document model.
675 With ``hds_type=None`` the leaf type is derived from its name;
676 see `HdsStructure.ensure_structure`.
677 """
678 if path in ("", "/"):
679 return self._document.root
680 return self._document.root.ensure_structure(path, hds_type)
682 def _archive_path_to_hdf5_path(self, archive_path: str) -> str:
683 """Translate an archive path to this layout's HDF5 path."""
684 if self._lsst_path == "/MORE/LSST":
685 return archive_path_to_hdf5_path(archive_path)
686 if not archive_path:
687 return f"{self._lsst_path}/JSON"
688 components = archive_path_to_hdf5_path_components(archive_path)
689 return f"{self._lsst_path}/{'/'.join(components)}"
691 def _has_model_path(self, path: str) -> bool:
692 """Return `True` if a path exists in the NDF document model."""
693 try:
694 self._document.get(path)
695 except KeyError:
696 return False
697 return True
699 def _model_array_ndim(self, struct_path: str) -> int:
700 """Return the dimensionality of an ARRAY structure's DATA primitive."""
701 struct = self._document.root.get_structure(struct_path)
702 data_node = struct.children["DATA"]
703 if not isinstance(data_node, HdsPrimitive):
704 raise TypeError(f"{struct_path}/DATA is not an HDS primitive.")
705 return data_node.read_array().ndim
707 def _flush(self) -> None:
708 """Synchronize the Python NDF document model to the open HDF5 file."""
709 self._document.write_to_hdf5(self._file)
711 def add_table(
712 self,
713 table: astropy.table.Table,
714 *,
715 name: str | None = None,
716 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
717 ) -> TableModel:
718 columns = TableColumnModel.from_table(table, inline=True)
719 return TableModel(columns=columns, meta=table.meta)
721 def add_structured_array(
722 self,
723 array: np.ndarray,
724 *,
725 name: str | None = None,
726 units: Mapping[str, astropy.units.Unit] | None = None,
727 descriptions: Mapping[str, str] | None = None,
728 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
729 ) -> TableModel:
730 if name is None:
731 columns = TableColumnModel.from_record_array(array, inline=True)
732 for c in columns:
733 if units and (unit := units.get(c.name)):
734 c.unit = unit
735 if descriptions and (description := descriptions.get(c.name)):
736 c.description = description
737 return TableModel(columns=columns)
738 columns = TableColumnModel.from_record_dtype(array.dtype)
739 for c in columns:
740 column_path = name if len(columns) == 1 else f"{name}/{c.name}"
741 archive_path = column_path if column_path.startswith("/") else f"/{column_path}"
742 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path)
743 column_array = np.asarray(array[c.name])
744 sub_ndf = self._document.ensure_ndf(sub_ndf_path)
745 sub_ndf.set_array_component(
746 "DATA_ARRAY",
747 column_array,
748 origin=np.zeros(column_array.ndim, dtype=np.int64),
749 compression_options=self._compression_options,
750 )
751 assert isinstance(c.data, ArrayReferenceModel)
752 c.data.source = f"ndf:{sub_ndf_path}/DATA_ARRAY/DATA"
753 for c in columns:
754 if units and (unit := units.get(c.name)):
755 c.unit = unit
756 if descriptions and (description := descriptions.get(c.name)):
757 c.description = description
758 self._flush()
759 return TableModel(columns=columns)