Coverage for python / lsst / images / ndf / _output_archive.py: 14%

326 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-16 07:54 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "NdfOutputArchive", 

16 "write", 

17) 

18 

19import os 

20from collections.abc import Callable, Hashable, Iterator, Mapping, Sequence 

21from contextlib import contextmanager 

22from typing import Any, Self 

23 

24import astropy.io.fits 

25import astropy.table 

26import astropy.units 

27import h5py 

28import numpy as np 

29import pydantic 

30 

31from .._color_image import ColorImage 

32from .._transforms import FrameSet 

33from .._transforms._ast import Channel, CmpFrame, CmpMap, ShiftMap, StringStream, UnitMap 

34from .._transforms._ast import Frame as AstFrame 

35from .._transforms._ast import FrameSet as AstFrameSet 

36from .._transforms._transform import _prepend_ast_shift 

37from ..fits._common import ExtensionKey, FitsOpaqueMetadata 

38from ..serialization import ( 

39 ArchiveTree, 

40 ArrayReferenceModel, 

41 ButlerInfo, 

42 MetadataValue, 

43 NestedOutputArchive, 

44 NumberType, 

45 OutputArchive, 

46 TableColumnModel, 

47 TableModel, 

48 no_header_updates, 

49) 

50from . import _hds 

51from ._common import NdfPointerModel, archive_path_to_hdf5_path, archive_path_to_hdf5_path_components 

52from ._model import HdsPrimitive, HdsStructure, Ndf, NdfArray, NdfContainer, NdfDocument, NdfQuality, NdfWcs 

53 

54 

55def write( 

56 obj: Any, 

57 filename: str | None = None, 

58 *, 

59 metadata: dict[str, MetadataValue] | None = None, 

60 butler_info: ButlerInfo | None = None, 

61 compression_options: Mapping[str, Any] | None = None, 

62) -> ArchiveTree: 

63 """Write a serializable object to an NDF (HDS-on-HDF5) file. 

64 

65 Parameters 

66 ---------- 

67 obj 

68 Object with a ``serialize`` method. May carry an 

69 ``_opaque_metadata`` attribute (a 

70 `~lsst.images.fits.FitsOpaqueMetadata`) 

71 whose primary-HDU header gets written to ``/MORE/FITS``. This 

72 preserves FITS cards on objects that originated from a FITS read; 

73 butler provenance is conveyed through ``butler_info`` instead. 

74 filename 

75 Path to write to. Must not already exist. If `None`, an 

76 in-memory HDF5 file is used and the on-disk artefact is 

77 discarded; the returned tree still reflects all the writes the 

78 archive made (useful for tests). 

79 metadata, butler_info 

80 Optional caller-supplied entries that are written into the 

81 returned `~lsst.images.serialization.ArchiveTree`. 

82 compression_options 

83 Optional dict forwarded to the archive constructor for h5py 

84 dataset compression. 

85 

86 Returns 

87 ------- 

88 `~lsst.images.serialization.ArchiveTree` 

89 The Pydantic tree the object's ``serialize`` produced (with 

90 ``metadata``/``butler_info`` applied). 

91 """ 

92 opaque_metadata = getattr(obj, "_opaque_metadata", None) 

93 if not isinstance(opaque_metadata, FitsOpaqueMetadata): 

94 opaque_metadata = FitsOpaqueMetadata() 

95 archive_default_name = getattr(obj, "_archive_default_name", None) 

96 with NdfOutputArchive.open( 

97 filename, 

98 compression_options=compression_options, 

99 opaque_metadata=opaque_metadata, 

100 **_get_archive_layout(obj), 

101 ) as archive: 

102 if archive_default_name is not None: 

103 tree = archive.serialize_direct(archive_default_name, obj.serialize) 

104 else: 

105 tree = obj.serialize(archive) 

106 if metadata is not None: 

107 tree.metadata.update(metadata) 

108 if butler_info is not None: 

109 tree.butler_info = butler_info 

110 archive.add_tree( 

111 tree, 

112 projection=getattr(obj, "projection", None), 

113 bbox=getattr(obj, "bbox", None), 

114 unit=getattr(obj, "unit", None), 

115 root_name=archive_default_name or type(obj).__name__, 

116 ) 

117 return tree 

118 

119 

120def _origin_from_bbox(bbox: Any) -> tuple[int, ...]: 

121 """Extract NDF/Fortran-order origin tuple from an lsst.images Box. 

122 

123 The exact attribute names on Box depend on the version. Inspect the 

124 object and pick whatever exposes the integer lower bounds. For a 2D 

125 image with bbox lower bound (x_min, y_min) the returned tuple is 

126 ``(x_min, y_min)``. 

127 """ 

128 # Box exposes .x and .y properties returning Interval objects with a 

129 # .start attribute (the lower bound). 

130 if hasattr(bbox, "x") and hasattr(bbox, "y"): 

131 x = bbox.x 

132 y = bbox.y 

133 if hasattr(x, "start") and hasattr(y, "start"): 

134 return (int(x.start), int(y.start)) 

135 raise AttributeError( 

136 f"Don't know how to extract origin from bbox of type {type(bbox).__name__!r}; " 

137 "_origin_from_bbox needs updating." 

138 ) 

139 

140 

141def _unit_to_ndf_string(unit: astropy.units.UnitBase) -> str: 

142 """Return an ASCII unit string for the NDF UNITS component.""" 

143 try: 

144 return unit.to_string(format="fits") 

145 except ValueError: 

146 return unit.to_string() 

147 

148 

149def _fits_header_records(header: astropy.io.fits.Header) -> list[str]: 

150 """Return fixed-width FITS records for an opaque NDF FITS extension. 

151 

152 NDF ``.MORE.FITS`` is a ``_CHAR*80`` vector. Use Astropy's FITS 

153 header serializer to preserve multi-record logical cards such as 

154 CONTINUE strings, but omit the FITS END card and 2880-byte padding 

155 because this is an NDF extension component, not a complete FITS 

156 header block. 

157 """ 

158 block = header.tostring(sep="", endcard=False, padding=False) 

159 encoded = block.encode("ascii") 

160 if len(encoded) % 80: 

161 raise ValueError( 

162 f"FITS header block is {len(encoded)} bytes, not a multiple of the 80-byte FITS record size." 

163 ) 

164 return [block[n : n + 80] for n in range(0, len(block), 80)] 

165 

166 

167def _get_archive_layout(obj: Any) -> dict[str, Any]: 

168 """Return NDF document layout options for a top-level object.""" 

169 if isinstance(obj, ColorImage): 

170 return { 

171 "root": NdfContainer(), 

172 "lsst_path": "/LSST", 

173 "direct_ndf_array_paths": { 

174 "red": "/RED", 

175 "green": "/GREEN", 

176 "blue": "/BLUE", 

177 }, 

178 "wcs_ndf_paths": ("/RED", "/GREEN", "/BLUE"), 

179 } 

180 return {} 

181 

182 

183def _show_ast_for_ndf(ast_frame_set: Any, bbox: Any | None) -> str: 

184 """Return AST Channel text matching Starlink NDF WCS serialization. 

185 

186 Tags the original base frame with ``Domain="PIXEL"`` and prepends a 

187 new ``Domain="GRID"`` base frame related to it by a `ShiftMap` whose 

188 shift converts ``bbox``-origin pixel coordinates into 1-based grid 

189 coordinates. The result is written via an abstraction-layer 

190 ``Channel`` configured with the same options the Starlink C writer 

191 uses (``Full=-1,Comment=0``; see ``ndf1Wwrt.c``) plus ``Indent=0`` so 

192 each line is just the bare AST item with the single-space prefix 

193 that ``ndf1Rdast`` strips back off on read. 

194 """ 

195 if bbox is None: 

196 x_shift = 1.0 

197 y_shift = 1.0 

198 else: 

199 x_shift = 1.0 - float(bbox.x.start) 

200 y_shift = 1.0 - float(bbox.y.start) 

201 

202 saved_current = ast_frame_set.current 

203 ast_frame_set.current = ast_frame_set.base 

204 ast_frame_set.domain = "PIXEL" 

205 ast_frame_set.current = saved_current 

206 _prepend_ast_shift(ast_frame_set, x=x_shift, y=y_shift, ast_domain="GRID") 

207 

208 stream = StringStream() 

209 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0") 

210 channel.write(ast_frame_set) 

211 return stream.getSinkData() 

212 

213 

214def _show_mask_ast_for_ndf( 

215 parent_ast_frame_set: Any, 

216 origin: Sequence[int], 

217 *, 

218 labels: Sequence[str] = (), 

219) -> str: 

220 """Return an NDF WCS for the 3D native mask sub-NDF. 

221 

222 The first two axes reuse the parent image's pixel-to-sky mapping. The 

223 third axis is a generic mask-byte coordinate that passes through unchanged. 

224 """ 

225 n_axes = len(origin) 

226 ast_frame_set = AstFrameSet(AstFrame(n_axes, "Domain=GRID")) 

227 pixel_frame = AstFrame(n_axes, "Domain=PIXEL") 

228 for axis, label in enumerate(labels[:n_axes], start=1): 

229 pixel_frame.setLabel(axis, label) 

230 shifts = [1.0 - float(axis_origin) for axis_origin in origin] 

231 ast_frame_set.addFrame(AstFrameSet.BASE, ShiftMap(shifts), pixel_frame) 

232 

233 parent_pixel_to_sky = parent_ast_frame_set.getMapping( 

234 parent_ast_frame_set.base, 

235 parent_ast_frame_set.current, 

236 ) 

237 parent_sky_frame = parent_ast_frame_set.getFrame(parent_ast_frame_set.current) 

238 mask_axis_frame = AstFrame(1, "Domain=MASK") 

239 mask_axis_frame.setLabel(1, labels[2] if len(labels) > 2 else "mask-byte") 

240 ast_frame_set.addFrame( 

241 ast_frame_set.current, 

242 CmpMap(parent_pixel_to_sky, UnitMap(1), False), 

243 CmpFrame(parent_sky_frame, mask_axis_frame), 

244 ) 

245 

246 stream = StringStream() 

247 channel = Channel(stream, options="Full=-1,Comment=0,Indent=0") 

248 channel.write(ast_frame_set) 

249 return stream.getSinkData() 

250 

251 

252class NdfOutputArchive(OutputArchive[NdfPointerModel]): 

253 """An `~lsst.images.serialization.OutputArchive` implementation 

254 that writes HDS-on-HDF5 files compatible with the Starlink NDF data 

255 model. 

256 

257 Parameters 

258 ---------- 

259 file 

260 An open `h5py.File` opened in a writable mode. The archive does 

261 not close the file; the caller is responsible for that. 

262 compression_options 

263 Optional dict passed through to `h5py.Group.create_dataset` for image 

264 arrays (e.g. ``{"compression": "gzip", "compression_opts": 4}``). 

265 opaque_metadata 

266 Optional `~lsst.images.fits.FitsOpaqueMetadata`; if its primary-HDU 

267 header is non-empty its cards will be written to ``/MORE/FITS`` by the 

268 top-level `write` function. 

269 """ 

270 

271 def __init__( 

272 self, 

273 file: h5py.File, 

274 compression_options: Mapping[str, Any] | None = None, 

275 opaque_metadata: FitsOpaqueMetadata | None = None, 

276 root: Ndf | NdfContainer | None = None, 

277 lsst_path: str = "/MORE/LSST", 

278 direct_ndf_array_paths: Mapping[str, str] | None = None, 

279 wcs_ndf_paths: Sequence[str] = ("/",), 

280 ) -> None: 

281 self._file = file 

282 self._document = NdfDocument(root=root if root is not None else Ndf()) 

283 self._lsst_path = lsst_path.rstrip("/") or "/LSST" 

284 self._direct_ndf_array_paths = dict(direct_ndf_array_paths) if direct_ndf_array_paths else {} 

285 self._wcs_ndf_paths = tuple(wcs_ndf_paths) 

286 self._bbox_array_struct_paths: set[str] = set() 

287 self._compression_options = dict(compression_options) if compression_options else {} 

288 self._opaque_metadata = opaque_metadata if opaque_metadata is not None else FitsOpaqueMetadata() 

289 self._frame_sets: list[tuple[FrameSet, NdfPointerModel]] = [] 

290 self._pointers: dict[Hashable, NdfPointerModel] = {} 

291 # Keep the open file in sync so existing direct-archive tests can 

292 # inspect it immediately, while all mutations go through the IR. 

293 self._flush() 

294 

295 @classmethod 

296 @contextmanager 

297 def open( 

298 cls, 

299 filename: str | None, 

300 *, 

301 compression_options: Mapping[str, Any] | None = None, 

302 opaque_metadata: FitsOpaqueMetadata | None = None, 

303 root: Ndf | NdfContainer | None = None, 

304 lsst_path: str = "/MORE/LSST", 

305 direct_ndf_array_paths: Mapping[str, str] | None = None, 

306 wcs_ndf_paths: Sequence[str] = ("/",), 

307 ) -> Iterator[Self]: 

308 """Open an NDF file for writing and yield an `NdfOutputArchive`. 

309 

310 ``filename=None`` uses an in-memory HDF5 file; the on-disk 

311 artefact is discarded but the archive's writes still produce a 

312 usable returned tree (handy for tests). 

313 """ 

314 if filename is None: 

315 h5_file = h5py.File("inmem.sdf", "w", driver="core", backing_store=False) 

316 else: 

317 if os.path.exists(filename) and os.path.getsize(filename) > 0: 

318 raise OSError(f"File {filename!r} already exists.") 

319 h5_file = h5py.File(filename, "w") 

320 try: 

321 yield cls( 

322 h5_file, 

323 compression_options=compression_options, 

324 opaque_metadata=opaque_metadata, 

325 root=root, 

326 lsst_path=lsst_path, 

327 direct_ndf_array_paths=direct_ndf_array_paths, 

328 wcs_ndf_paths=wcs_ndf_paths, 

329 ) 

330 finally: 

331 h5_file.close() 

332 

333 def add_tree( 

334 self, 

335 tree: ArchiveTree, 

336 *, 

337 projection: Any = None, 

338 bbox: Any = None, 

339 unit: astropy.units.UnitBase | None = None, 

340 root_name: str | None = None, 

341 ) -> None: 

342 """Finalize the file: write WCS, units, JSON tree, and ORIGIN. 

343 

344 Writes the canonical NDF ``/WCS`` HDS structure (an AST channel 

345 text dump that KAPPA / hdstrace expect) when ``projection`` is 

346 provided. A native mask sub-NDF at ``/MORE/LSST/MASK`` gets a 3D 

347 WCS whose first two axes reuse the parent's sky projection and 

348 whose third axis is the mask-byte coordinate. The JSON tree at 

349 ``<lsst_path>/JSON`` remains the source of truth for symmetric 

350 round-trips; ``/WCS`` is for Starlink tools. Auto-detect read of 

351 ``/WCS/DATA`` into a typed ``Projection`` is a follow-up. 

352 

353 Parameters 

354 ---------- 

355 tree 

356 Pydantic tree returned by the object's ``serialize`` method, 

357 with ``metadata``/``butler_info`` already applied. 

358 projection, bbox, unit 

359 Top-level object attributes that drive NDF-canonical writes. 

360 root_name 

361 Value to assign to the root group's ``HDS_ROOT_NAME`` 

362 attribute (fixed-length ASCII so KAPPA / hdstrace decode it). 

363 """ 

364 if projection is not None: 

365 self._write_wcs(projection, bbox) 

366 if unit is not None and isinstance(self._document.root, Ndf): 

367 self._document.ensure_ndf("/").set_units(_unit_to_ndf_string(unit)) 

368 json_text = tree.model_dump_json() 

369 # Let ensure_structure derive types from path-component names so 

370 # /MORE/LSST/... gets <MORE> stay as EXT and <LSST> typed LSST, 

371 # mirroring HDS convention. 

372 lsst = self._ensure_model_structure(self._lsst_path) 

373 lsst.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text))) 

374 primary = self._opaque_metadata.headers.get(ExtensionKey()) 

375 if primary is not None and len(primary): 

376 cards = _fits_header_records(primary) 

377 more = self._ensure_model_structure("/MORE") 

378 more.children["FITS"] = HdsPrimitive.char_array(cards, width=80) 

379 if bbox is not None: 

380 origin = _origin_from_bbox(bbox) 

381 for struct_path in self._bbox_array_struct_paths: 

382 if self._has_model_path(struct_path): 

383 self.set_array_origin(struct_path, origin) 

384 if root_name is not None: 

385 self._document.root_name = root_name 

386 self._flush() 

387 

388 def _write_wcs(self, projection: Any, bbox: Any) -> None: 

389 ast_frame_set = projection._pixel_to_sky._get_ast_frame_set() 

390 text = _show_ast_for_ndf(ast_frame_set, bbox) 

391 lines = _hds.encode_ndf_ast_data(text) 

392 for ndf_path in self._wcs_ndf_paths: 

393 if self._has_model_path(ndf_path): 

394 self._document.ensure_ndf(ndf_path).set_wcs(NdfWcs(lines)) 

395 if self._has_model_path("/MORE/LSST/MASK"): 

396 mask_origin = _origin_from_bbox(bbox) if bbox is not None else (0, 0) 

397 mask_ndim = self._model_array_ndim("/MORE/LSST/MASK/DATA_ARRAY") 

398 mask_origin = (*mask_origin, *((0,) * max(0, mask_ndim - len(mask_origin)))) 

399 mask_ast_frame_set = projection._pixel_to_sky._get_ast_frame_set() 

400 mask_text = _show_mask_ast_for_ndf( 

401 mask_ast_frame_set, 

402 mask_origin, 

403 labels=("x", "y", "mask-byte"), 

404 ) 

405 self._document.ensure_ndf("/MORE/LSST/MASK").set_wcs(NdfWcs(_hds.encode_ndf_ast_data(mask_text))) 

406 

407 def serialize_direct[T: pydantic.BaseModel]( 

408 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T] 

409 ) -> T: 

410 nested = NestedOutputArchive[NdfPointerModel](name, self) 

411 return serializer(nested) 

412 

413 def serialize_pointer[T: ArchiveTree]( 

414 self, name: str, serializer: Callable[[OutputArchive[NdfPointerModel]], T], key: Hashable 

415 ) -> NdfPointerModel: 

416 if (pointer := self._pointers.get(key)) is not None: 

417 return pointer 

418 archive_path = name if name.startswith("/") else f"/{name}" 

419 target_path = self._archive_path_to_hdf5_path(archive_path) 

420 # Run the serializer first so any nested add_array / serialize_pointer 

421 # calls write into the file before we dump this sub-tree to JSON. 

422 model = self.serialize_direct(name, serializer) 

423 json_text = model.model_dump_json() 

424 # Store the JSON document as a "JSON" child of the target structure, 

425 # mirroring the main /MORE/LSST/JSON tree. Writing it at the target 

426 # path itself would clobber any nested arrays the serializer added 

427 # under that path (IR.write_to_hdf5 deletes the existing node before 

428 # writing a primitive). 

429 # ensure_structure derives the HDS type from the leaf name, so 

430 # /MORE/LSST/PSF is typed <PSF> rather than the generic <EXT>. 

431 target = self._ensure_model_structure(target_path) 

432 target.children["JSON"] = HdsPrimitive.char_array([json_text], width=max(80, len(json_text))) 

433 self._flush() 

434 pointer = NdfPointerModel(path=f"{target_path}/JSON") 

435 self._pointers[key] = pointer 

436 return pointer 

437 

438 def serialize_frame_set[T: ArchiveTree]( 

439 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

440 ) -> NdfPointerModel: 

441 pointer = self.serialize_pointer(name, serializer, key) 

442 self._frame_sets.append((frame_set, pointer)) 

443 return pointer 

444 

445 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, NdfPointerModel]]: 

446 return iter(self._frame_sets) 

447 

448 _COMPATIBLE_MASK_DTYPES = (np.dtype(np.uint8),) 

449 _prefer_native_mask_arrays = True 

450 

451 def add_array( 

452 self, 

453 array: np.ndarray, 

454 *, 

455 name: str | None = None, 

456 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

457 ) -> ArrayReferenceModel: 

458 # Recognised top-level names go to standard NDF locations. 

459 # Anything else hoists under /MORE/LSST. 

460 if name == "image": 

461 root = self._document.ensure_ndf("/") 

462 root.set_array_component( 

463 "DATA_ARRAY", 

464 array, 

465 origin=np.zeros(array.ndim, dtype=np.int64), 

466 compression_options=self._compression_options, 

467 ) 

468 path = "/DATA_ARRAY/DATA" 

469 self._bbox_array_struct_paths.add("/DATA_ARRAY") 

470 elif name == "variance": 

471 root = self._document.ensure_ndf("/") 

472 root.set_array_component( 

473 "VARIANCE", 

474 array, 

475 origin=np.zeros(array.ndim, dtype=np.int64), 

476 compression_options=self._compression_options, 

477 ) 

478 path = "/VARIANCE/DATA" 

479 self._bbox_array_struct_paths.add("/VARIANCE") 

480 elif name == "mask": 

481 if array.ndim == 2 and array.dtype in self._COMPATIBLE_MASK_DTYPES: 

482 self._set_quality_array(array) 

483 path = "/QUALITY/QUALITY/DATA" 

484 self._bbox_array_struct_paths.add("/QUALITY/QUALITY") 

485 else: 

486 # Native Mask serialization passes HDF5 shape 

487 # (mask-byte, y, x). HDS reports the reverse dimension order, 

488 # so Starlink tools see (x, y, mask-byte). 

489 if array.ndim == 3 and array.dtype in self._COMPATIBLE_MASK_DTYPES: 

490 self._set_quality_array(self._collapse_mask_to_quality(array)) 

491 self._bbox_array_struct_paths.add("/QUALITY/QUALITY") 

492 mask_ndf = self._document.ensure_ndf("/MORE/LSST/MASK") 

493 mask_ndf.set_array_component( 

494 "DATA_ARRAY", 

495 array, 

496 origin=np.zeros(array.ndim, dtype=np.int64), 

497 bad_pixel=False, 

498 compression_options=self._compression_options, 

499 ) 

500 path = "/MORE/LSST/MASK/DATA_ARRAY/DATA" 

501 self._bbox_array_struct_paths.add("/MORE/LSST/MASK/DATA_ARRAY") 

502 elif name in self._direct_ndf_array_paths: 

503 sub_ndf_path = self._direct_ndf_array_paths[name] 

504 sub_ndf = self._document.ensure_ndf(sub_ndf_path) 

505 sub_ndf.set_array_component( 

506 "DATA_ARRAY", 

507 array, 

508 origin=np.zeros(array.ndim, dtype=np.int64), 

509 compression_options=self._compression_options, 

510 ) 

511 path = f"{sub_ndf_path}/DATA_ARRAY/DATA" 

512 self._bbox_array_struct_paths.add(f"{sub_ndf_path}/DATA_ARRAY") 

513 else: 

514 if name is None: 

515 raise ValueError("Anonymous arrays are not supported in the NDF archive.") 

516 archive_path = name if name.startswith("/") else f"/{name}" 

517 # Hoisted numeric arrays are wrapped as sub-NDFs under 

518 # /MORE/LSST/<UPPER_PATH> so Starlink tools (KAPPA `display`, 

519 # `hdstrace`, etc.) can inspect them just like the main 

520 # image. The sub-NDF has the canonical layout: top-level 

521 # group with CLASS="NDF" containing a DATA_ARRAY structure 

522 # (CLASS="ARRAY") with DATA + ORIGIN primitives. Hoisted 

523 # JSON sub-trees from serialize_pointer stay as bare 

524 # _CHAR*N datasets at /MORE/LSST/<NAME> (no NDF wrapper) — 

525 # they're JSON documents, not numeric arrays. 

526 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path) 

527 sub_ndf = self._document.ensure_ndf(sub_ndf_path) 

528 sub_ndf.set_array_component( 

529 "DATA_ARRAY", 

530 array, 

531 origin=np.zeros(array.ndim, dtype=np.int64), 

532 compression_options=self._compression_options, 

533 ) 

534 path = f"{sub_ndf_path}/DATA_ARRAY/DATA" 

535 self._flush() 

536 # Shape is stored in the JSON tree (matching the FITS archive) because 

537 # MaskSerializationModel.bbox needs it before any arrays are read. 

538 # Future work: resolve shape from the HDF5 dataset on read instead. 

539 return ArrayReferenceModel( 

540 source=f"ndf:{path}", 

541 shape=list(array.shape), 

542 datatype=NumberType.from_numpy(array.dtype), 

543 ) 

544 

545 def _ensure_path(self, path: str) -> h5py.Group: 

546 """Walk/create groups for an HDF5 absolute path. 

547 

548 Intermediate groups created on demand are tagged with 

549 ``CLASS="EXT"``, the HDS type for general-purpose extension 

550 containers (matches what hds-v5 writes for ``MORE``). 

551 """ 

552 self._ensure_model_structure(path, "EXT") 

553 self._flush() 

554 return self._file["/" if path in ("", "/") else path] 

555 

556 def _ensure_struct(self, path: str, hds_type: str) -> h5py.Group: 

557 """Ensure a structure exists at ``path`` with the given HDS type.""" 

558 self._ensure_model_structure(path, hds_type) 

559 self._flush() 

560 return self._file["/" if path in ("", "/") else path] 

561 

562 def _ensure_array_structure(self, path: str) -> h5py.Group: 

563 """Ensure an HDS ``ARRAY`` structure exists at ``path``.""" 

564 return self._ensure_struct(path, "ARRAY") 

565 

566 def _ensure_quality_structure(self) -> h5py.Group: 

567 """Ensure ``/QUALITY`` exists with ``CLASS="QUALITY"`` and BADBITS. 

568 

569 BADBITS is set to 255 so every bit of the collapsed single-byte 

570 QUALITY plane is treated as bad by NDF applications. 

571 """ 

572 root = self._document.ensure_ndf("/") 

573 if "QUALITY" not in root.children: 

574 root.children["QUALITY"] = HdsStructure("QUALITY") 

575 quality = root.get_structure("QUALITY") 

576 quality.hds_type = "QUALITY" 

577 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8)) 

578 self._flush() 

579 return self._file["/QUALITY"] 

580 

581 def _ensure_quality_array_structure(self) -> h5py.Group: 

582 """Ensure the nested ``/QUALITY/QUALITY`` ARRAY structure exists.""" 

583 root = self._document.ensure_ndf("/") 

584 if "QUALITY" not in root.children: 

585 root.children["QUALITY"] = HdsStructure("QUALITY") 

586 quality = root.get_structure("QUALITY") 

587 quality.hds_type = "QUALITY" 

588 quality.children["BADBITS"] = HdsPrimitive.array(np.array(255, dtype=np.uint8)) 

589 if "QUALITY" not in quality.children or not isinstance(quality.children["QUALITY"], HdsStructure): 

590 quality.children["QUALITY"] = HdsStructure("ARRAY") 

591 quality_array = quality.get_structure("QUALITY") 

592 quality_array.hds_type = "ARRAY" 

593 quality_array.children.setdefault("ORIGIN", HdsPrimitive.array(np.zeros(2, dtype=np.int32))) 

594 quality_array.children.setdefault("BAD_PIXEL", HdsPrimitive.array(np.array(False, dtype=np.bool_))) 

595 self._flush() 

596 return self._file["/QUALITY/QUALITY"] 

597 

598 def _write_quality_array(self, quality: np.ndarray) -> None: 

599 """Write or replace the NDF QUALITY array.""" 

600 self._set_quality_array(quality) 

601 self._flush() 

602 

603 def _set_quality_array(self, quality: np.ndarray) -> None: 

604 """Set or replace the NDF QUALITY array in the model.""" 

605 root = self._document.ensure_ndf("/") 

606 root.set_quality( 

607 NdfQuality( 

608 NdfArray( 

609 quality, 

610 origin=np.zeros(2, dtype=np.int32), 

611 bad_pixel=False, 

612 compression_options=self._compression_options, 

613 ) 

614 ) 

615 ) 

616 

617 def _collapse_mask_to_quality(self, array: np.ndarray) -> np.ndarray: 

618 """Compress an NDF-native 3-D mask array into 2-D QUALITY. 

619 

620 The input array is in HDF5 storage order ``(mask-byte, y, x)``. 

621 Single-byte masks copy directly to preserve bit values. Wider masks 

622 collapse to 1 where any byte is non-zero and 0 otherwise. 

623 """ 

624 if array.shape[0] == 1: 

625 return array[0, :, :] 

626 return np.any(array != 0, axis=0).astype(np.uint8) 

627 

628 def _write_origin_for_array(self, struct_path: str, array: np.ndarray) -> None: 

629 """Write a placeholder ORIGIN of zeros (int64). 

630 

631 The top-level `write` function overwrites this 

632 with bbox-derived values via :meth:`set_array_origin` once the 

633 bbox is known. 

634 """ 

635 struct = self._document.root.get_structure(struct_path) 

636 if "ORIGIN" not in struct.children: 

637 struct.children["ORIGIN"] = HdsPrimitive.array(np.zeros(array.ndim, dtype=np.int64)) 

638 

639 def set_array_origin(self, struct_path: str, origin: tuple[int, ...]) -> None: 

640 """Overwrite the ORIGIN of an ARRAY structure. 

641 

642 Parameters 

643 ---------- 

644 struct_path 

645 HDF5 path to the ARRAY structure (e.g. ``"/DATA_ARRAY"``). 

646 origin 

647 Origin in NDF/Fortran axis order (e.g. ``(x_min, y_min)`` 

648 for a 2D image with bbox lower bound ``(x_min, y_min)``). 

649 """ 

650 struct = self._document.root.get_structure(struct_path) 

651 origin_dtype = np.int32 if struct_path == "/QUALITY/QUALITY" else np.int64 

652 origin_array = np.asarray(origin, dtype=origin_dtype) 

653 data_node = struct.children.get("DATA") 

654 if isinstance(data_node, HdsPrimitive): 

655 data_ndim = data_node.read_array().ndim 

656 if origin_array.size < data_ndim: 

657 origin_array = np.pad(origin_array, (0, data_ndim - origin_array.size)) 

658 struct.children["ORIGIN"] = HdsPrimitive.array(origin_array) 

659 self._flush() 

660 

661 def _ensure_model_structure(self, path: str, hds_type: str | None = None) -> HdsStructure: 

662 """Return or create a structure in the NDF document model. 

663 

664 With ``hds_type=None`` the leaf type is derived from its name; 

665 see `HdsStructure.ensure_structure`. 

666 """ 

667 if path in ("", "/"): 

668 return self._document.root 

669 return self._document.root.ensure_structure(path, hds_type) 

670 

671 def _archive_path_to_hdf5_path(self, archive_path: str) -> str: 

672 """Translate an archive path to this layout's HDF5 path.""" 

673 if self._lsst_path == "/MORE/LSST": 

674 return archive_path_to_hdf5_path(archive_path) 

675 if not archive_path: 

676 return f"{self._lsst_path}/JSON" 

677 components = archive_path_to_hdf5_path_components(archive_path) 

678 return f"{self._lsst_path}/{'/'.join(components)}" 

679 

680 def _has_model_path(self, path: str) -> bool: 

681 """Return `True` if a path exists in the NDF document model.""" 

682 try: 

683 self._document.get(path) 

684 except KeyError: 

685 return False 

686 return True 

687 

688 def _model_array_ndim(self, struct_path: str) -> int: 

689 """Return the dimensionality of an ARRAY structure's DATA primitive.""" 

690 struct = self._document.root.get_structure(struct_path) 

691 data_node = struct.children["DATA"] 

692 if not isinstance(data_node, HdsPrimitive): 

693 raise TypeError(f"{struct_path}/DATA is not an HDS primitive.") 

694 return data_node.read_array().ndim 

695 

696 def _flush(self) -> None: 

697 """Synchronize the Python NDF document model to the open HDF5 file.""" 

698 self._document.write_to_hdf5(self._file) 

699 

700 def add_table( 

701 self, 

702 table: astropy.table.Table, 

703 *, 

704 name: str | None = None, 

705 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

706 ) -> TableModel: 

707 columns = TableColumnModel.from_table(table, inline=True) 

708 return TableModel(columns=columns, meta=table.meta) 

709 

710 def add_structured_array( 

711 self, 

712 array: np.ndarray, 

713 *, 

714 name: str | None = None, 

715 units: Mapping[str, astropy.units.Unit] | None = None, 

716 descriptions: Mapping[str, str] | None = None, 

717 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

718 ) -> TableModel: 

719 if name is None: 

720 columns = TableColumnModel.from_record_array(array, inline=True) 

721 for c in columns: 

722 if units and (unit := units.get(c.name)): 

723 c.unit = unit 

724 if descriptions and (description := descriptions.get(c.name)): 

725 c.description = description 

726 return TableModel(columns=columns) 

727 columns = TableColumnModel.from_record_dtype(array.dtype) 

728 for c in columns: 

729 column_path = name if len(columns) == 1 else f"{name}/{c.name}" 

730 archive_path = column_path if column_path.startswith("/") else f"/{column_path}" 

731 sub_ndf_path = self._archive_path_to_hdf5_path(archive_path) 

732 column_array = np.asarray(array[c.name]) 

733 sub_ndf = self._document.ensure_ndf(sub_ndf_path) 

734 sub_ndf.set_array_component( 

735 "DATA_ARRAY", 

736 column_array, 

737 origin=np.zeros(column_array.ndim, dtype=np.int64), 

738 compression_options=self._compression_options, 

739 ) 

740 assert isinstance(c.data, ArrayReferenceModel) 

741 c.data.source = f"ndf:{sub_ndf_path}/DATA_ARRAY/DATA" 

742 for c in columns: 

743 if units and (unit := units.get(c.name)): 

744 c.unit = unit 

745 if descriptions and (description := descriptions.get(c.name)): 

746 c.description = description 

747 self._flush() 

748 return TableModel(columns=columns)