Coverage for python/lsst/images/ndf/_output_archive.py: 15%

330 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 01:09 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "NdfOutputArchive", 

16 "write", 

17) 

18 

19import os 

20from collections.abc import Callable, Hashable, Iterator, Mapping, Sequence 

21from contextlib import contextmanager 

22from typing import Any, Self 

23 

24import astropy.io.fits 

25import astropy.table 

26import astropy.units 

27import h5py 

28import numpy as np 

29import pydantic 

30 

31from .._color_image import ColorImage 

32from .._transforms import FrameSet 

33from .._transforms._ast import Channel, CmpFrame, CmpMap, ShiftMap, StringStream, UnitMap 

34from .._transforms._ast import Frame as AstFrame 

35from .._transforms._ast import FrameSet as AstFrameSet 

36from .._transforms._transform import _prepend_ast_shift 

37from ..fits._common import ExtensionKey, FitsOpaqueMetadata 

38from ..serialization import ( 

39 ArchiveTree, 

40 ArrayReferenceModel, 

41 ButlerInfo, 

42 MetadataValue, 

43 NestedOutputArchive, 

44 NumberType, 

45 OutputArchive, 

46 TableColumnModel, 

47 TableModel, 

48 no_header_updates, 

49) 

50from . import _hds 

51from ._common import NdfPointerModel, archive_path_to_hdf5_path, archive_path_to_hdf5_path_components 

52from ._model import HdsPrimitive, HdsStructure, Ndf, NdfArray, NdfContainer, NdfDocument, NdfQuality, NdfWcs 

53 

54_NDF_FORMAT_VERSION = 1 

55"""Container layout version for files written by `NdfOutputArchive`. 

56 

57Bumps when the NDF layout (NdfDocument shape, .MORE.LSST contents) 

58changes. Independent of any data-model ``SCHEMA_VERSION``. 

59""" 

60 

61 

62def write( 

63 obj: Any, 

64 filename: str | None = None, 

65 *, 

66 metadata: dict[str, MetadataValue] | None = None, 

67 butler_info: ButlerInfo | None = None, 

68 compression_options: Mapping[str, Any] | None = None, 

69) -> ArchiveTree: 

70 """Write a serializable object to an NDF (HDS-on-HDF5) file. 

71 

72 Parameters 

73 ---------- 

74 obj 

75 Object with a ``serialize`` method. May carry an 

76 ``_opaque_metadata`` attribute (a 

77 `~lsst.images.fits.FitsOpaqueMetadata`) 

78 whose primary-HDU header gets written to ``/MORE/FITS``. This 

79 preserves FITS cards on objects that originated from a FITS read; 

80 butler provenance is conveyed through ``butler_info`` instead. 

81 filename 

82 Path to write to. Must not already exist. If `None`, an 

83 in-memory HDF5 file is used and the on-disk artefact is 

84 discarded; the returned tree still reflects all the writes the 

85 archive made (useful for tests). 

86 metadata, butler_info 

87 Optional caller-supplied entries that are written into the 

88 returned `~lsst.images.serialization.ArchiveTree`. 

89 compression_options 

90 Optional dict forwarded to the archive constructor for h5py 

91 dataset compression. 

92 

93 Returns 

94 ------- 

95 `~lsst.images.serialization.ArchiveTree` 

96 The Pydantic tree the object's ``serialize`` produced (with 

97 ``metadata``/``butler_info`` applied). 

98 """ 

99 opaque_metadata = getattr(obj, "_opaque_metadata", None) 

100 if not isinstance(opaque_metadata, FitsOpaqueMetadata): 

101 opaque_metadata = FitsOpaqueMetadata() 

102 archive_default_name = getattr(obj, "_archive_default_name", None) 

103 with NdfOutputArchive.open( 

104 filename, 

105 compression_options=compression_options, 

106 opaque_metadata=opaque_metadata, 

107 **_get_archive_layout(obj), 

108 ) as archive: 

109 if archive_default_name is not None: 

110 tree = archive.serialize_direct(archive_default_name, obj.serialize) 

111 else: 

112 tree = obj.serialize(archive) 

113 if metadata is not None: 

114 tree.metadata.update(metadata) 

115 if butler_info is not None: 

116 tree.butler_info = butler_info 

117 archive.add_tree( 

118 tree, 

119 projection=getattr(obj, "projection", None), 

120 bbox=getattr(obj, "bbox", None), 

121 unit=getattr(obj, "unit", None), 

122 root_name=archive_default_name or type(obj).__name__, 

123 ) 

124 return tree 

125 

126 

127def _origin_from_bbox(bbox: Any) -> tuple[int, ...]: 

128 """Extract NDF/Fortran-order origin tuple from an lsst.images Box. 

129 

130 The exact attribute names on Box depend on the version. Inspect the 

131 object and pick whatever exposes the integer lower bounds. For a 2D 

132 image with bbox lower bound (x_min, y_min) the returned tuple is 

133 ``(x_min, y_min)``. 

134 """ 

135 # Box exposes .x and .y properties returning Interval objects with a 

136 # .start attribute (the lower bound). 

137 if hasattr(bbox, "x") and hasattr(bbox, "y"): 

138 x = bbox.x 

139 y = bbox.y 

140 if hasattr(x, "start") and hasattr(y, "start"): 

141 return (int(x.start), int(y.start)) 

142 raise AttributeError( 

143 f"Don't know how to extract origin from bbox of type {type(bbox).__name__!r}; " 

144 "_origin_from_bbox needs updating." 

145 ) 

146 

147 

148def _unit_to_ndf_string(unit: astropy.units.UnitBase) -> str: 

149 """Return an ASCII unit string for the NDF UNITS component.""" 

150 try: 

151 return unit.to_string(format="fits") 

152 except ValueError: 

153 return unit.to_string() 

154 

155 

156def _fits_header_records(header: astropy.io.fits.Header) -> list[str]: 

157 """Return fixed-width FITS records for an opaque NDF FITS extension. 

158 

159 NDF ``.MORE.FITS`` is a ``_CHAR*80`` vector. Use Astropy's FITS 

160 header serializer to preserve multi-record logical cards such as 

161 CONTINUE strings, but omit the FITS END card and 2880-byte padding 

162 because this is an NDF extension component, not a complete FITS 

163 header block. 

164 """ 

165 block = header.tostring(sep="", endcard=False, padding=False) 

166 encoded = block.encode("ascii") 

167 if len(encoded) % 80: 

168 raise ValueError( 

169 f"FITS header block is {len(encoded)} bytes, not a multiple of the 80-byte FITS record size." 

170 ) 

171 return [block[n : n + 80] for n in range(0, len(block), 80)] 

172 

173 

174def _get_archive_layout(obj: Any) -> dict[str, Any]: 

175 """Return NDF document layout options for a top-level object.""" 

176 if isinstance(obj, ColorImage): 

177 return { 

178 "root": NdfContainer(), 

179 "lsst_path": "/LSST", 

180 "direct_ndf_array_paths": { 

181 "red": "/RED", 

182 "green": "/GREEN", 

183 "blue": "/BLUE", 

184 }, 

185 "wcs_ndf_paths": ("/RED", "/GREEN", "/BLUE"), 

186 } 

187 return {} 

188 

189 

190def _show_ast_for_ndf(ast_frame_set: Any, bbox: Any | None) -> str: 

191 """Return AST Channel text matching Starlink NDF WCS serialization. 

192 

193 Tags the original base frame with ``Domain="PIXEL"`` and prepends a 

194 new ``Domain="GRID"`` base frame related to it by a `ShiftMap` whose 

195 shift converts ``bbox``-origin pixel coordinates into 1-based grid 

196 coordinates. The result is written via an abstraction-layer 

197 ``Channel`` configured with the same options the Starlink C writer 

198 uses (``Full=-1,Comment=0``; see ``ndf1Wwrt.c``) plus ``Indent=0`` so 

199 each line is just the bare AST item with the single-space prefix 

200 that ``ndf1Rdast`` strips back off on read. 

201 """ 

202 if bbox is None: 

203 x_shift = 1.0 

204 y_shift = 1.0 

205 else: 

206 x_shift = 1.0 - float(bbox.x.start) 

207 y_shift = 1.0 - float(bbox.y.start) 

208 

209 saved_current = ast_frame_set.current 

210 ast_frame_set.current = ast_frame_set.base 

211 ast_frame_set.domain = "PIXEL" 

212 ast_frame_set.current = saved_current 

213 _prepend_ast_shift(ast_frame_set, x=x_shift, y=y_shift, ast_domain="GRID") 

214 

215 stream = StringStream() 

216 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0") 

217 channel.write(ast_frame_set) 

218 return stream.getSinkData() 

219 

220 

221def _show_mask_ast_for_ndf( 

222 parent_ast_frame_set: Any, 

223 origin: Sequence[int], 

224 *, 

225 labels: Sequence[str] = (), 

226) -> str: 

227 """Return an NDF WCS for the 3D native mask sub-NDF. 

228 

229 The first two axes reuse the parent image's pixel-to-sky mapping. The 

230 third axis is a generic mask-byte coordinate that passes through unchanged. 

231 """ 

232 n_axes = len(origin) 

233 ast_frame_set = AstFrameSet(AstFrame(n_axes, "Domain=GRID")) 

234 pixel_frame = AstFrame(n_axes, "Domain=PIXEL") 

235 for axis, label in enumerate(labels[:n_axes], start=1): 

236 pixel_frame.setLabel(axis, label) 

237 shifts = [1.0 - float(axis_origin) for axis_origin in origin] 

238 ast_frame_set.addFrame(AstFrameSet.BASE, ShiftMap(shifts), pixel_frame) 

239 

240 parent_pixel_to_sky = parent_ast_frame_set.getMapping( 

241 parent_ast_frame_set.base, 

242 parent_ast_frame_set.current, 

243 ) 

244 parent_sky_frame = parent_ast_frame_set.getFrame(parent_ast_frame_set.current) 

245 mask_axis_frame = AstFrame(1, "Domain=MASK") 

246 mask_axis_frame.setLabel(1, labels[2] if len(labels) > 2 else "mask-byte") 

247 ast_frame_set.addFrame( 

248 ast_frame_set.current, 

249 CmpMap(parent_pixel_to_sky, UnitMap(1), False), 

250 CmpFrame(parent_sky_frame, mask_axis_frame), 

251 ) 

252 

253 stream = StringStream() 

254 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0") 

255 channel.write(ast_frame_set) 

256 return stream.getSinkData() 

257 

258 

259class NdfOutputArchive(OutputArchive[NdfPointerModel]): 

260 """An `~lsst.images.serialization.OutputArchive` implementation 

261 that writes HDS-on-HDF5 files compatible with the Starlink NDF data 

262 model. 

263 

264 Parameters 

265 ---------- 

266 file 

267 An open `h5py.File` opened in a writable mode. The archive does 

268 not close the file; the caller is responsible for that. 

269 compression_options 

270 Optional dict passed through to `h5py.Group.create_dataset` for image 

271 arrays (e.g. ``{"compression": "gzip", "compression_opts": 4}``). 

272 opaque_metadata 

273 Optional `~lsst.images.fits.FitsOpaqueMetadata`; if its primary-HDU 

274 header is non-empty its cards will be written to ``/MORE/FITS`` by the 

275 top-level `write` function. 

276 """ 

277 

278 def __init__( 

279 self, 

280 file: h5py.File, 

281 compression_options: Mapping[str, Any] | None = None, 

282 opaque_metadata: FitsOpaqueMetadata | None = None, 

283 root: Ndf | NdfContainer | None = None, 

284 lsst_path: str = "/MORE/LSST", 

285 direct_ndf_array_paths: Mapping[str, str] | None = None, 

286 wcs_ndf_paths: Sequence[str] = ("/",), 

287 ) -> None: 

288 self._file = file 

289 self._document = NdfDocument(root=root if root is not None else Ndf()) 

290 self._lsst_path = lsst_path.rstrip("/") or "/LSST" 

291 self._direct_ndf_array_paths = dict(direct_ndf_array_paths) if direct_ndf_array_paths else {} 

292 self._wcs_ndf_paths = tuple(wcs_ndf_paths) 

293 self._bbox_array_struct_paths: set[str] = set() 

294 self._compression_options = dict(compression_options) if compression_options else {} 

295 self._opaque_metadata = opaque_metadata if opaque_metadata is not None else FitsOpaqueMetadata() 

296 self._frame_sets: list[tuple[FrameSet, NdfPointerModel]] = [] 

297 self._pointers: dict[Hashable, NdfPointerModel] = {} 

298 # Keep the open file in sync so existing direct-archive tests can 

299 # inspect it immediately, while all mutations go through the IR. 

300 self._flush() 

301 

302 @classmethod 

303 @contextmanager 

304 def open( 

305 cls, 

306 filename: str | None, 

307 *, 

308 compression_options: Mapping[str, Any] | None = None, 

309 opaque_metadata: FitsOpaqueMetadata | None = None, 

310 root: Ndf | NdfContainer | None = None, 

311 lsst_path: str = "/MORE/LSST", 

312 direct_ndf_array_paths: Mapping[str, str] | None = None, 

313 wcs_ndf_paths: Sequence[str] = ("/",), 

314 ) -> Iterator[Self]: 

315 """Open an NDF file for writing and yield an `NdfOutputArchive`. 

316 

317 ``filename=None`` uses an in-memory HDF5 file; the on-disk 

318 artefact is discarded but the archive's writes still produce a 

319 usable returned tree (handy for tests). 

320 """ 

321 if filename is None: 

322 h5_file = h5py.File("inmem.sdf", "w", driver="core", backing_store=False) 

323 else: 

324 if os.path.exists(filename) and os.path.getsize(filename) > 0: 

325 raise OSError(f"File {filename!r} already exists.") 

326 h5_file = h5py.File(filename, "w") 

327 try: 

328 yield cls( 

329 h5_file, 

330 compression_options=compression_options, 

331 opaque_metadata=opaque_metadata, 

332 root=root, 

333 lsst_path=lsst_path, 

334 direct_ndf_array_paths=direct_ndf_array_paths, 

335 wcs_ndf_paths=wcs_ndf_paths, 

336 ) 

337 finally: 

338 h5_file.close() 

339 

340 def add_tree( 

341 self, 

342 tree: ArchiveTree, 

343 *, 

344 projection: Any = None, 

345 bbox: Any = None, 

346 unit: astropy.units.UnitBase | None = None, 

347 root_name: str | None = None, 

348 ) -> None: 

349 """Finalize the file: write WCS, units, JSON tree, and ORIGIN. 

350 

351 Writes the canonical NDF ``/WCS`` HDS structure (an AST channel 

352 text dump that KAPPA / hdstrace expect) when ``projection`` is 

353 provided. A native mask sub-NDF at ``/MORE/LSST/MASK`` gets a 3D 

354 WCS whose first two axes reuse the parent's sky projection and 

355 whose third axis is the mask-byte coordinate. The JSON tree at 

356 ``<lsst_path>/JSON`` remains the source of truth for symmetric 

357 round-trips; ``/WCS`` is for Starlink tools. Auto-detect read of 

358 ``/WCS/DATA`` into a typed ``Projection`` is a follow-up. 

359 

360 Parameters 

361 ---------- 

362 tree 

363 Pydantic tree returned by the object's ``serialize`` method, 

364 with ``metadata``/``butler_info`` already applied. 

365 projection, bbox, unit 

366 Top-level object attributes that drive NDF-canonical writes. 

367 root_name 

368 Value to assign to the root group's ``HDS_ROOT_NAME`` 

369 attribute (fixed-length ASCII so KAPPA / hdstrace decode it). 

370 """ 

371 if projection is not None: 

372 self._write_wcs(projection, bbox) 

373 if unit is not None and isinstance(self._document.root, Ndf): 

374 self._document.ensure_ndf("/").set_units(_unit_to_ndf_string(unit)) 

375 json_text = tree.model_dump_json() 

376 # Let ensure_structure derive types from path-component names so 

377 # /MORE/LSST/... gets <MORE> stay as EXT and <LSST> typed LSST, 

378 # mirroring HDS convention. 

379 lsst = self._ensure_model_structure(self._lsst_path) 

380 lsst.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text))) 

381 lsst.children["DATA_MODEL"] = HdsPrimitive.char_scalar( 

382 tree.schema_url, width=max(80, len(tree.schema_url)) 

383 ) 

384 lsst.children["FORMAT_VERSION"] = HdsPrimitive.array(np.array(_NDF_FORMAT_VERSION, dtype=np.int32)) 

385 primary = self._opaque_metadata.headers.get(ExtensionKey()) 

386 if primary is not None and len(primary): 

387 cards = _fits_header_records(primary) 

388 more = self._ensure_model_structure("/MORE") 

389 more.children["FITS"] = HdsPrimitive.char_array(cards, width=80) 

390 if bbox is not None: 

391 origin = _origin_from_bbox(bbox) 

392 for struct_path in self._bbox_array_struct_paths: 

393 if self._has_model_path(struct_path): 

394 self.set_array_origin(struct_path, origin) 

395 if root_name is not None: 

396 self._document.root_name = root_name 

397 self._flush() 

398 

399 def _write_wcs(self, projection: Any, bbox: Any) -> None: 

400 ast_frame_set = projection._pixel_to_sky._get_ast_frame_set() 

401 text = _show_ast_for_ndf(ast_frame_set, bbox) 

402 lines = _hds.encode_ndf_ast_data(text) 

403 for ndf_path in self._wcs_ndf_paths: 

404 if self._has_model_path(ndf_path): 

405 self._document.ensure_ndf(ndf_path).set_wcs(NdfWcs(lines)) 

406 if self._has_model_path("/MORE/LSST/MASK"): 

407 mask_origin = _origin_from_bbox(bbox) if bbox is not None else (0, 0) 

408 mask_ndim = self._model_array_ndim("/MORE/LSST/MASK/DATA_ARRAY") 

409 mask_origin = (*mask_origin, *((0,) * max(0, mask_ndim - len(mask_origin)))) 

410 mask_ast_frame_set = projection._pixel_to_sky._get_ast_frame_set() 

411 mask_text = _show_mask_ast_for_ndf( 

412 mask_ast_frame_set, 

413 mask_origin, 

414 labels=("x", "y", "mask-byte"), 

415 ) 

416 self._document.ensure_ndf("/MORE/LSST/MASK").set_wcs(NdfWcs(_hds.encode_ndf_ast_data(mask_text))) 

417 

418 def serialize_direct[T: pydantic.BaseModel]( 

419 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T] 

420 ) -> T: 

421 nested = NestedOutputArchive[NdfPointerModel](name, self) 

422 return serializer(nested) 

423 

424 def serialize_pointer[T: ArchiveTree]( 

425 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T], key: Hashable 

426 ) -> NdfPointerModel: 

427 if (pointer := self._pointers.get(key)) is not None: 

428 return pointer 

429 archive_path = name if name.startswith("/") else f"/{name}" 

430 target_path = self._archive_path_to_hdf5_path(archive_path) 

431 # Run the serializer first so any nested add_array / serialize_pointer 

432 # calls write into the file before we dump this sub-tree to JSON. 

433 model = self.serialize_direct(name, serializer) 

434 json_text = model.model_dump_json() 

435 # Store the JSON document as a "JSON" child of the target structure, 

436 # mirroring the main /MORE/LSST/JSON tree. Writing it at the target 

437 # path itself would clobber any nested arrays the serializer added 

438 # under that path (IR.write_to_hdf5 deletes the existing node before 

439 # writing a primitive). 

440 # ensure_structure derives the HDS type from the leaf name, so 

441 # /MORE/LSST/PSF is typed <PSF> rather than the generic <EXT>. 

442 target = self._ensure_model_structure(target_path) 

443 target.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text))) 

444 self._flush() 

445 pointer = NdfPointerModel(path=f"{target_path}/JSON") 

446 self._pointers[key] = pointer 

447 return pointer 

448 

449 def serialize_frame_set[T: ArchiveTree]( 

450 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

451 ) -> NdfPointerModel: 

452 pointer = self.serialize_pointer(name, serializer, key) 

453 self._frame_sets.append((frame_set, pointer)) 

454 return pointer 

455 

456 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, NdfPointerModel]]: 

457 return iter(self._frame_sets) 

458 

459 _COMPATIBLE_MASK_DTYPES = (np.dtype(np.uint8),) 

460 _prefer_native_mask_arrays = True 

461 

462 def add_array( 

463 self, 

464 array: np.ndarray, 

465 *, 

466 name: str | None = None, 

467 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

468 ) -> ArrayReferenceModel: 

469 # Recognised top-level names go to standard NDF locations. 

470 # Anything else hoists under /MORE/LSST. 

471 if name == "image": 

472 root = self._document.ensure_ndf("/") 

473 root.set_array_component( 

474 "DATA_ARRAY", 

475 array, 

476 origin=np.zeros(array.ndim, dtype=np.int64), 

477 compression_options=self._compression_options, 

478 ) 

479 path = "/DATA_ARRAY/DATA" 

480 self._bbox_array_struct_paths.add("/DATA_ARRAY") 

481 elif name == "variance": 

482 root = self._document.ensure_ndf("/") 

483 root.set_array_component( 

484 "VARIANCE", 

485 array, 

486 origin=np.zeros(array.ndim, dtype=np.int64), 

487 compression_options=self._compression_options, 

488 ) 

489 path = "/VARIANCE/DATA" 

490 self._bbox_array_struct_paths.add("/VARIANCE") 

491 elif name == "mask": 

492 if array.ndim == 2 and array.dtype in self._COMPATIBLE_MASK_DTYPES: 

493 self._set_quality_array(array) 

494 path = "/QUALITY/QUALITY/DATA" 

495 self._bbox_array_struct_paths.add("/QUALITY/QUALITY") 

496 else: 

497 # Native Mask serialization passes HDF5 shape 

498 # (mask-byte, y, x). HDS reports the reverse dimension order, 

499 # so Starlink tools see (x, y, mask-byte). 

500 if array.ndim == 3 and array.dtype in self._COMPATIBLE_MASK_DTYPES: 

501 self._set_quality_array(self._collapse_mask_to_quality(array)) 

502 self._bbox_array_struct_paths.add("/QUALITY/QUALITY") 

503 mask_ndf = self._document.ensure_ndf("/MORE/LSST/MASK") 

504 mask_ndf.set_array_component( 

505 "DATA_ARRAY", 

506 array, 

507 origin=np.zeros(array.ndim, dtype=np.int64), 

508 bad_pixel=False, 

509 compression_options=self._compression_options, 

510 ) 

511 path = "/MORE/LSST/MASK/DATA_ARRAY/DATA" 

512 self._bbox_array_struct_paths.add("/MORE/LSST/MASK/DATA_ARRAY") 

513 elif name in self._direct_ndf_array_paths: 

514 sub_ndf_path = self._direct_ndf_array_paths[name] 

515 sub_ndf = self._document.ensure_ndf(sub_ndf_path) 

516 sub_ndf.set_array_component( 

517 "DATA_ARRAY", 

518 array, 

519 origin=np.zeros(array.ndim, dtype=np.int64), 

520 compression_options=self._compression_options, 

521 ) 

522 path = f"{sub_ndf_path}/DATA_ARRAY/DATA" 

523 self._bbox_array_struct_paths.add(f"{sub_ndf_path}/DATA_ARRAY") 

524 else: 

525 if name is None: 

526 raise ValueError("Anonymous arrays are not supported in the NDF archive.") 

527 archive_path = name if name.startswith("/") else f"/{name}" 

528 # Hoisted numeric arrays are wrapped as sub-NDFs under 

529 # /MORE/LSST/<UPPER_PATH> so Starlink tools (KAPPA `display`, 

530 # `hdstrace`, etc.) can inspect them just like the main 

531 # image. The sub-NDF has the canonical layout: top-level 

532 # group with CLASS="NDF" containing a DATA_ARRAY structure 

533 # (CLASS="ARRAY") with DATA + ORIGIN primitives. Hoisted 

534 # JSON sub-trees from serialize_pointer stay as bare 

535 # _CHAR*N datasets at /MORE/LSST/<NAME> (no NDF wrapper) — 

536 # they're JSON documents, not numeric arrays. 

537 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path) 

538 sub_ndf = self._document.ensure_ndf(sub_ndf_path) 

539 sub_ndf.set_array_component( 

540 "DATA_ARRAY", 

541 array, 

542 origin=np.zeros(array.ndim, dtype=np.int64), 

543 compression_options=self._compression_options, 

544 ) 

545 path = f"{sub_ndf_path}/DATA_ARRAY/DATA" 

546 self._flush() 

547 # Shape is stored in the JSON tree (matching the FITS archive) because 

548 # MaskSerializationModel.bbox needs it before any arrays are read. 

549 # Future work: resolve shape from the HDF5 dataset on read instead. 

550 return ArrayReferenceModel( 

551 source=f"ndf:{path}", 

552 shape=list(array.shape), 

553 datatype=NumberType.from_numpy(array.dtype), 

554 ) 

555 

556 def _ensure_path(self, path: str) -> h5py.Group: 

557 """Walk/create groups for an HDF5 absolute path. 

558 

559 Intermediate groups created on demand are tagged with 

560 ``CLASS="EXT"``, the HDS type for general-purpose extension 

561 containers (matches what hds-v5 writes for ``MORE``). 

562 """ 

563 self._ensure_model_structure(path, "EXT") 

564 self._flush() 

565 return self._file["/" if path in ("", "/") else path] 

566 

567 def _ensure_struct(self, path: str, hds_type: str) -> h5py.Group: 

568 """Ensure a structure exists at ``path`` with the given HDS type.""" 

569 self._ensure_model_structure(path, hds_type) 

570 self._flush() 

571 return self._file["/" if path in ("", "/") else path] 

572 

573 def _ensure_array_structure(self, path: str) -> h5py.Group: 

574 """Ensure an HDS ``ARRAY`` structure exists at ``path``.""" 

575 return self._ensure_struct(path, "ARRAY") 

576 

577 def _ensure_quality_structure(self) -> h5py.Group: 

578 """Ensure ``/QUALITY`` exists with ``CLASS="QUALITY"`` and BADBITS. 

579 

580 BADBITS is set to 255 so every bit of the collapsed single-byte 

581 QUALITY plane is treated as bad by NDF applications. 

582 """ 

583 root = self._document.ensure_ndf("/") 

584 if "QUALITY" not in root.children: 

585 root.children["QUALITY"] = HdsStructure("QUALITY") 

586 quality = root.get_structure("QUALITY") 

587 quality.hds_type = "QUALITY" 

588 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8)) 

589 self._flush() 

590 return self._file["/QUALITY"] 

591 

592 def _ensure_quality_array_structure(self) -> h5py.Group: 

593 """Ensure the nested ``/QUALITY/QUALITY`` ARRAY structure exists.""" 

594 root = self._document.ensure_ndf("/") 

595 if "QUALITY" not in root.children: 

596 root.children["QUALITY"] = HdsStructure("QUALITY") 

597 quality = root.get_structure("QUALITY") 

598 quality.hds_type = "QUALITY" 

599 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8)) 

600 if "QUALITY" not in quality.children or not isinstance(quality.children["QUALITY"], HdsStructure): 

601 quality.children["QUALITY"] = HdsStructure("ARRAY") 

602 quality_array = quality.get_structure("QUALITY") 

603 quality_array.hds_type = "ARRAY" 

604 quality_array.children.setdefault("ORIGIN", HdsPrimitive.array(np.zeros(2, dtype=np.int32))) 

605 quality_array.children.setdefault("BAD_PIXEL", HdsPrimitive.array(np.array(False, dtype=np.bool_))) 

606 self._flush() 

607 return self._file["/QUALITY/QUALITY"] 

608 

609 def _write_quality_array(self, quality: np.ndarray) -> None: 

610 """Write or replace the NDF QUALITY array.""" 

611 self._set_quality_array(quality) 

612 self._flush() 

613 

614 def _set_quality_array(self, quality: np.ndarray) -> None: 

615 """Set or replace the NDF QUALITY array in the model.""" 

616 root = self._document.ensure_ndf("/") 

617 root.set_quality( 

618 NdfQuality( 

619 NdfArray( 

620 quality, 

621 origin=np.zeros(2, dtype=np.int32), 

622 bad_pixel=False, 

623 compression_options=self._compression_options, 

624 ) 

625 ) 

626 ) 

627 

628 def _collapse_mask_to_quality(self, array: np.ndarray) -> np.ndarray: 

629 """Compress an NDF-native 3-D mask array into 2-D QUALITY. 

630 

631 The input array is in HDF5 storage order ``(mask-byte, y, x)``. 

632 Single-byte masks copy directly to preserve bit values. Wider masks 

633 collapse to 1 where any byte is non-zero and 0 otherwise. 

634 """ 

635 if array.shape[0] == 1: 

636 return array[0, :, :] 

637 return np.any(array != 0, axis=0).astype(np.uint8) 

638 

639 def _write_origin_for_array(self, struct_path: str, array: np.ndarray) -> None: 

640 """Write a placeholder ORIGIN of zeros (int64). 

641 

642 The top-level `write` function overwrites this 

643 with bbox-derived values via :meth:`set_array_origin` once the 

644 bbox is known. 

645 """ 

646 struct = self._document.root.get_structure(struct_path) 

647 if "ORIGIN" not in struct.children: 

648 struct.children["ORIGIN"] = HdsPrimitive.array(np.zeros(array.ndim, dtype=np.int64)) 

649 

650 def set_array_origin(self, struct_path: str, origin: tuple[int, ...]) -> None: 

651 """Overwrite the ORIGIN of an ARRAY structure. 

652 

653 Parameters 

654 ---------- 

655 struct_path 

656 HDF5 path to the ARRAY structure (e.g. ``"/DATA_ARRAY"``). 

657 origin 

658 Origin in NDF/Fortran axis order (e.g. ``(x_min, y_min)`` 

659 for a 2D image with bbox lower bound ``(x_min, y_min)``). 

660 """ 

661 struct = self._document.root.get_structure(struct_path) 

662 origin_dtype = np.int32 if struct_path == "/QUALITY/QUALITY" else np.int64 

663 origin_array = np.asarray(origin, dtype=origin_dtype) 

664 data_node = struct.children.get("DATA") 

665 if isinstance(data_node, HdsPrimitive): 

666 data_ndim = data_node.read_array().ndim 

667 if origin_array.size < data_ndim: 

668 origin_array = np.pad(origin_array, (0, data_ndim - origin_array.size)) 

669 struct.children["ORIGIN"] = HdsPrimitive.array(origin_array) 

670 self._flush() 

671 

672 def _ensure_model_structure(self, path: str, hds_type: str | None = None) -> HdsStructure: 

673 """Return or create a structure in the NDF document model. 

674 

675 With ``hds_type=None`` the leaf type is derived from its name; 

676 see `HdsStructure.ensure_structure`. 

677 """ 

678 if path in ("", "/"): 

679 return self._document.root 

680 return self._document.root.ensure_structure(path, hds_type) 

681 

682 def _archive_path_to_hdf5_path(self, archive_path: str) -> str: 

683 """Translate an archive path to this layout's HDF5 path.""" 

684 if self._lsst_path == "/MORE/LSST": 

685 return archive_path_to_hdf5_path(archive_path) 

686 if not archive_path: 

687 return f"{self._lsst_path}/JSON" 

688 components = archive_path_to_hdf5_path_components(archive_path) 

689 return f"{self._lsst_path}/{'/'.join(components)}" 

690 

691 def _has_model_path(self, path: str) -> bool: 

692 """Return `True` if a path exists in the NDF document model.""" 

693 try: 

694 self._document.get(path) 

695 except KeyError: 

696 return False 

697 return True 

698 

699 def _model_array_ndim(self, struct_path: str) -> int: 

700 """Return the dimensionality of an ARRAY structure's DATA primitive.""" 

701 struct = self._document.root.get_structure(struct_path) 

702 data_node = struct.children["DATA"] 

703 if not isinstance(data_node, HdsPrimitive): 

704 raise TypeError(f"{struct_path}/DATA is not an HDS primitive.") 

705 return data_node.read_array().ndim 

706 

707 def _flush(self) -> None: 

708 """Synchronize the Python NDF document model to the open HDF5 file.""" 

709 self._document.write_to_hdf5(self._file) 

710 

711 def add_table( 

712 self, 

713 table: astropy.table.Table, 

714 *, 

715 name: str | None = None, 

716 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

717 ) -> TableModel: 

718 columns = TableColumnModel.from_table(table, inline=True) 

719 return TableModel(columns=columns, meta=table.meta) 

720 

721 def add_structured_array( 

722 self, 

723 array: np.ndarray, 

724 *, 

725 name: str | None = None, 

726 units: Mapping[str, astropy.units.Unit] | None = None, 

727 descriptions: Mapping[str, str] | None = None, 

728 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

729 ) -> TableModel: 

730 if name is None: 

731 columns = TableColumnModel.from_record_array(array, inline=True) 

732 for c in columns: 

733 if units and (unit := units.get(c.name)): 

734 c.unit = unit 

735 if descriptions and (description := descriptions.get(c.name)): 

736 c.description = description 

737 return TableModel(columns=columns) 

738 columns = TableColumnModel.from_record_dtype(array.dtype) 

739 for c in columns: 

740 column_path = name if len(columns) == 1 else f"{name}/{c.name}" 

741 archive_path = column_path if column_path.startswith("/") else f"/{column_path}" 

742 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path) 

743 column_array = np.asarray(array[c.name]) 

744 sub_ndf = self._document.ensure_ndf(sub_ndf_path) 

745 sub_ndf.set_array_component( 

746 "DATA_ARRAY", 

747 column_array, 

748 origin=np.zeros(column_array.ndim, dtype=np.int64), 

749 compression_options=self._compression_options, 

750 ) 

751 assert isinstance(c.data, ArrayReferenceModel) 

752 c.data.source = f"ndf:{sub_ndf_path}/DATA_ARRAY/DATA" 

753 for c in columns: 

754 if units and (unit := units.get(c.name)): 

755 c.unit = unit 

756 if descriptions and (description := descriptions.get(c.name)): 

757 c.description = description 

758 self._flush() 

759 return TableModel(columns=columns)