Coverage for python/lsst/images/ndf/_input_archive.py: 15%

271 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 08:10 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("NdfInputArchive", "read") 

15 

16import logging 

17from collections.abc import Callable, Iterator 

18from contextlib import contextmanager 

19from types import EllipsisType 

20from typing import Any, Self 

21 

22import astropy.io.fits 

23import astropy.table 

24import astropy.units as u 

25import h5py 

26import numpy as np 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from .._geom import Box 

31from .._image import Image 

32from .._mask import Mask, MaskPlane, MaskSchema 

33from .._masked_image import MaskedImage 

34from .._transforms import FrameSet, Projection 

35from .._transforms import _ast as astshim 

36from .._transforms._frames import GeneralFrame 

37from ..fits._common import FitsOpaqueMetadata 

38from ..serialization import ( 

39 ArchiveReadError, 

40 ArchiveTree, 

41 ArrayReferenceModel, 

42 InlineArrayModel, 

43 InputArchive, 

44 ReadResult, 

45 TableModel, 

46 no_header_updates, 

47) 

48from ..serialization._common import _check_format_version 

49from . import _hds 

50from ._common import NdfPointerModel 

51from ._model import HdsPrimitive, NdfDocument 

52 

53_LOG = logging.getLogger(__name__) 

54 

55_NDF_FORMAT_VERSION = 1 

56"""Container layout version this release of `NdfInputArchive` understands.""" 

57 

58 

59class NdfInputArchive(InputArchive[NdfPointerModel]): 

60 """Reads HDS-on-HDF5 NDF files written by `NdfOutputArchive`. 

61 

62 Instances should only be constructed via the :meth:`open` context 

63 manager. 

64 

65 Parameters 

66 ---------- 

67 file 

68 Open `h5py.File` handle. Owned by the caller of :meth:`open`; 

69 the archive does not close it. 

70 """ 

71 

72 def __init__(self, file: h5py.File) -> None: 

73 self._file = file 

74 self._document = NdfDocument.from_hdf5(file) 

75 self._opaque_metadata = FitsOpaqueMetadata() 

76 self._deserialized_pointer_cache: dict[str, Any] = {} 

77 self._frame_set_cache: dict[str, FrameSet] = {} 

78 self._read_opaque_fits_metadata() 

79 self._check_format_version() 

80 

81 @classmethod 

82 @contextmanager 

83 def open(cls, path: ResourcePathExpression) -> Iterator[Self]: 

84 """Open an NDF file for reading and yield an `NdfInputArchive`. 

85 

86 Remote ResourcePaths are materialised locally first; fsspec-direct 

87 h5py reads are a deferred follow-up. 

88 """ 

89 rp = ResourcePath(path) 

90 with rp.as_local() as local: 

91 with h5py.File(local.ospath, "r") as f: 

92 yield cls(f) 

93 

94 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T: 

95 """Read and validate the main Pydantic tree at ``/MORE/LSST/JSON``.""" 

96 json_path = self._get_main_json_path() 

97 if json_path is None: 

98 raise ArchiveReadError( 

99 "File has no /MORE/LSST/JSON tree; this is either a " 

100 "Starlink-only NDF (use ndf.read() with auto-detect) or " 

101 "the file was written by an unrelated tool." 

102 ) 

103 json_text = _read_json_record(self._get_primitive(json_path), json_path) 

104 return model_type.model_validate_json(json_text) 

105 

106 def deserialize_pointer[U: ArchiveTree, V]( 

107 self, 

108 pointer: NdfPointerModel, 

109 model_type: type[U], 

110 deserializer: Callable[[U, InputArchive[NdfPointerModel]], V], 

111 ) -> V: 

112 # Cache by pointer.path so repeated dereferences reuse the same 

113 # deserialised result and don't re-run the deserializer. 

114 if (cached := self._deserialized_pointer_cache.get(pointer.path)) is not None: 

115 return cached 

116 if not self._has_model_path(pointer.path): 

117 raise ArchiveReadError(f"Pointer reference {pointer.path!r} not found in NDF file.") 

118 primitive = self._get_primitive(pointer.path) 

119 json_text = _read_json_record(primitive, pointer.path) 

120 model = model_type.model_validate_json(json_text) 

121 result = deserializer(model, self) 

122 self._deserialized_pointer_cache[pointer.path] = result 

123 if isinstance(result, FrameSet): 

124 self._frame_set_cache[pointer.path] = result 

125 return result 

126 

127 def get_frame_set(self, pointer: NdfPointerModel) -> FrameSet: 

128 try: 

129 return self._frame_set_cache[pointer.path] 

130 except KeyError: 

131 raise AssertionError( 

132 f"Frame set at {pointer.path!r} must be deserialised via " 

133 f"deserialize_pointer before any dependent transform can be." 

134 ) from None 

135 

136 def get_array( 

137 self, 

138 model: ArrayReferenceModel | InlineArrayModel, 

139 *, 

140 slices: tuple[slice, ...] | EllipsisType = ..., 

141 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

142 ) -> np.ndarray: 

143 if isinstance(model, InlineArrayModel): 

144 data: np.ndarray = np.array(model.data, dtype=model.datatype.to_numpy()) 

145 return data if slices is ... else data[slices] 

146 if not isinstance(model.source, str) or not model.source.startswith("ndf:"): 

147 raise ArchiveReadError( 

148 f"NdfInputArchive cannot resolve array source {model.source!r}; " 

149 f"expected an 'ndf:<HDF5-path>' reference." 

150 ) 

151 path = model.source[len("ndf:") :] 

152 if not self._has_model_path(path): 

153 raise ArchiveReadError(f"Array reference {path!r} not in file.") 

154 primitive = self._get_primitive(path) 

155 # h5py supports lazy slicing via dataset[slices]. 

156 if isinstance(primitive.data, h5py.Dataset): 

157 return primitive.data[()] if slices is ... else primitive.data[slices] 

158 data = primitive.read_array() 

159 return data if slices is ... else data[slices] 

160 

161 def get_table( 

162 self, 

163 model: TableModel, 

164 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

165 ) -> astropy.table.Table: 

166 result = astropy.table.Table(meta=model.meta) 

167 for column_model in model.columns: 

168 if isinstance(column_model.data, InlineArrayModel): 

169 data: Any = column_model.data.data 

170 else: 

171 data = self.get_array(column_model.data, strip_header=strip_header) 

172 result[column_model.name] = astropy.table.Column( 

173 data, 

174 name=column_model.name, 

175 dtype=column_model.data.datatype.to_numpy(), 

176 unit=column_model.unit, 

177 description=column_model.description, 

178 meta=column_model.meta, 

179 ) 

180 return result 

181 

182 def get_structured_array( 

183 self, 

184 model: TableModel, 

185 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

186 ) -> np.ndarray: 

187 return self.get_table(model, strip_header).as_array() 

188 

189 def _read_opaque_fits_metadata(self) -> None: 

190 if not self._has_model_path("/MORE/FITS"): 

191 return 

192 cards = self._get_primitive("/MORE/FITS").read_char_array() 

193 # FITS Header.fromstring expects fixed-width 80-char cards 

194 # concatenated; pad each card defensively so readers tolerate 

195 # files written with shorter widths. 

196 header = astropy.io.fits.Header.fromstring("".join(c.ljust(80) for c in cards)) 

197 self._opaque_metadata.add_header(header, name="", ver=1) 

198 

199 def get_opaque_metadata(self) -> FitsOpaqueMetadata: 

200 return self._opaque_metadata 

201 

202 def _get_main_json_path(self) -> str | None: 

203 """Return the path of the main LSST JSON tree, if present.""" 

204 for path in ("/MORE/LSST/JSON", "/LSST/JSON"): 

205 if self._has_model_path(path): 

206 return path 

207 return None 

208 

209 def _check_format_version(self) -> None: 

210 """Read FORMAT_VERSION from the NDF top-level structure and check it. 

211 

212 Absence is treated as ``1`` (legacy default). DATA_MODEL is 

213 informational only on read; the JSON tree's ``schema_version`` / 

214 ``min_read_version`` drive data-model compatibility. 

215 """ 

216 on_disk = 1 

217 for prefix in ("/MORE/LSST", "/LSST"): 

218 path = f"{prefix}/FORMAT_VERSION" 

219 if self._has_model_path(path): 

220 primitive = self._get_primitive(path) 

221 # We wrote the version as a 0-d int32 numpy array; .item() 

222 # unwraps to a Python int. 

223 on_disk = int(primitive.read_array().item()) 

224 break 

225 _check_format_version("ndf", on_disk, _NDF_FORMAT_VERSION) 

226 

227 def _has_model_path(self, path: str) -> bool: 

228 """Return `True` if a path exists in the NDF document model.""" 

229 try: 

230 self._document.get(path) 

231 except KeyError: 

232 return False 

233 return True 

234 

235 def _get_primitive(self, path: str) -> HdsPrimitive: 

236 """Return a primitive component from the NDF document model.""" 

237 node = self._document.get(path) 

238 if not isinstance(node, HdsPrimitive): 

239 raise ArchiveReadError(f"NDF reference {path!r} is not a primitive dataset.") 

240 return node 

241 

242 

243def read[T: Any](cls: type[T], path: ResourcePathExpression, **kwargs: Any) -> ReadResult[T]: 

244 """Read an object from an NDF (HDS-on-HDF5) file. 

245 

246 If the file has a ``/MORE/LSST/JSON`` tree it is used as the source 

247 of truth and ``cls.deserialize`` is invoked with the parsed model. 

248 Otherwise the reader falls back to auto-detection of a minimal 

249 recognised-component set (``DATA_ARRAY``, ``VARIANCE``, ``QUALITY``, 

250 ``MORE.FITS``); ``WCS`` is logged-and-dropped in v1. 

251 

252 Parameters 

253 ---------- 

254 cls 

255 Expected return type. `~lsst.images.Image` and 

256 `~lsst.images.MaskedImage` are the only types the auto-detect 

257 path can produce. The symmetric path accepts whatever the file's 

258 discriminator says. 

259 path 

260 File path or ``lsst.resources.ResourcePathExpression``. 

261 **kwargs 

262 Forwarded to ``cls.deserialize`` on the symmetric read path. 

263 

264 Returns 

265 ------- 

266 `~lsst.images.serialization.ReadResult` [T] 

267 Named tuple of (deserialized object, metadata, butler_info). 

268 """ 

269 with NdfInputArchive.open(path) as archive: 

270 if archive._get_main_json_path() is not None: 

271 tree_type = cls._get_archive_tree_type(NdfPointerModel) 

272 tree = archive.get_tree(tree_type) 

273 obj = tree.deserialize(archive, **kwargs) 

274 obj._opaque_metadata = archive.get_opaque_metadata() 

275 return ReadResult(obj, tree.metadata, tree.butler_info) 

276 return _read_auto_detect(cls, archive) 

277 

278 

279def _read_auto_detect[T: Any](cls: type[T], archive: NdfInputArchive) -> ReadResult[T]: 

280 """Reconstruct an `Image` (or `MaskedImage`) from a Starlink NDF. 

281 

282 Recognised components: ``DATA_ARRAY`` (in either simple or complex 

283 form), ``VARIANCE``, ``QUALITY``, ``MORE.FITS``. Other components 

284 (``WCS``, ``HISTORY``, ``AXIS``, ``LABEL``, custom ``MORE.*``, 

285 ``_LOGICAL`` primitives) are warned-and-dropped. 

286 """ 

287 f = archive._file 

288 ndf_group = _locate_ndf_root(f) 

289 

290 # DATA_ARRAY is required. 

291 if "DATA_ARRAY" not in ndf_group: 

292 raise ArchiveReadError(f"Auto-detect read of {f.filename!r}: no DATA_ARRAY component.") 

293 data_arr, bbox = _read_data_array_with_bbox(ndf_group["DATA_ARRAY"]) 

294 

295 # VARIANCE / QUALITY are optional. 

296 variance_arr: np.ndarray | None = None 

297 variance_bbox: Any | None = None 

298 if "VARIANCE" in ndf_group: 

299 variance_arr, variance_bbox = _read_data_array_with_bbox(ndf_group["VARIANCE"]) 

300 quality_arr: np.ndarray | None = None 

301 quality_bbox: Any | None = None 

302 quality_badbits = 255 

303 if "QUALITY" in ndf_group and isinstance(ndf_group["QUALITY"], h5py.Group): 

304 q = ndf_group["QUALITY"] 

305 quality_badbits = _read_quality_badbits(q) 

306 if "QUALITY" in q and isinstance(q["QUALITY"], h5py.Dataset): 

307 quality_arr = _validate_quality_array(_hds.read_array(q["QUALITY"])) 

308 quality_bbox = _make_bbox(x_min=0, y_min=0, array=quality_arr) 

309 elif "QUALITY" in q and isinstance(q["QUALITY"], h5py.Group): 

310 quality_arr, quality_bbox = _read_data_array_with_bbox(q["QUALITY"]) 

311 quality_arr = _validate_quality_array(quality_arr) 

312 

313 projection: Projection | None = None 

314 if "WCS" in ndf_group: 

315 try: 

316 wcs_group = ndf_group["WCS"] 

317 if isinstance(wcs_group, h5py.Group) and "DATA" in wcs_group: 

318 wcs_lines = _hds.read_char_array(wcs_group["DATA"]) 

319 wcs_text = _hds.decode_ndf_ast_data(wcs_lines) 

320 ast_obj = astshim.Object.fromString(wcs_text) 

321 if isinstance(ast_obj, astshim.FrameSet): 

322 pixel_frame = GeneralFrame(unit=u.pix) 

323 projection = Projection.from_ast_frame_set( 

324 ast_obj, 

325 pixel_frame, 

326 pixel_bounds=bbox, 

327 ) 

328 except Exception: 

329 _LOG.warning( 

330 "Could not reconstruct Projection from WCS in %s; dropping.", 

331 f.filename, 

332 exc_info=True, 

333 ) 

334 

335 unit = _read_ndf_units(ndf_group) 

336 

337 # Anything unrecognised: warn-and-drop. 

338 recognised = { 

339 "DATA_ARRAY", 

340 "VARIANCE", 

341 "QUALITY", 

342 "WCS", 

343 "MORE", 

344 "TITLE", 

345 "LABEL", 

346 "UNITS", 

347 "HISTORY", 

348 "AXIS", 

349 } 

350 for name in ndf_group: 

351 if name not in recognised: 

352 _LOG.warning( 

353 "Ignoring unrecognised NDF component %s/%s during auto-detect read.", 

354 ndf_group.name, 

355 name, 

356 ) 

357 

358 # Build the requested in-memory object. Any NDF can be read as an Image; 

359 # MaskedImage construction uses whatever VARIANCE/QUALITY are present and 

360 # lets the MaskedImage constructor provide defaults for missing planes. 

361 image = Image(data_arr, bbox=bbox, unit=unit, projection=projection) 

362 obj: Any 

363 if cls is Image: 

364 obj = image 

365 elif issubclass(cls, MaskedImage): 

366 if quality_arr is not None: 

367 schema = _make_quality_mask_schema(quality_badbits) 

368 mask = Mask(quality_arr[:, :, np.newaxis], schema=schema, bbox=quality_bbox) 

369 else: 

370 schema = MaskSchema([MaskPlane(name="BAD", description="Bad pixel.")]) 

371 mask = None 

372 variance = Image(variance_arr, bbox=variance_bbox) if variance_arr is not None else None 

373 obj = cls( 

374 image=image, 

375 mask=mask, 

376 mask_schema=schema if mask is None else None, 

377 variance=variance, 

378 ) 

379 else: 

380 raise ArchiveReadError( 

381 f"Auto-detect can produce Image or MaskedImage, but caller asked for {cls.__name__}." 

382 ) 

383 obj._opaque_metadata = archive.get_opaque_metadata() 

384 # Auto-detect path produces no archive-tree metadata or butler_info. 

385 return ReadResult(obj, {}, None) 

386 

387 

388def _read_ndf_units(ndf_group: h5py.Group) -> u.UnitBase | None: 

389 """Read the NDF UNITS component, if present.""" 

390 if "UNITS" not in ndf_group or not isinstance(ndf_group["UNITS"], h5py.Dataset): 

391 return None 

392 dataset = ndf_group["UNITS"] 

393 if dataset.dtype.kind != "S": 

394 _LOG.warning("Ignoring non-character NDF UNITS component in %s.", ndf_group.name) 

395 return None 

396 if dataset.ndim == 0: 

397 raw = dataset[()] 

398 if isinstance(raw, np.bytes_): 

399 raw = bytes(raw) 

400 if not isinstance(raw, bytes): 

401 return None 

402 units_text = raw.decode("ascii").rstrip(" ") 

403 else: 

404 records = _hds.read_char_array(dataset) 

405 units_text = records[0] if records else "" 

406 if not units_text: 

407 return None 

408 for kwargs in ({"format": "fits"}, {}): 

409 try: 

410 return u.Unit(units_text, **kwargs) 

411 except ValueError: 

412 continue 

413 _LOG.warning("Could not parse NDF UNITS value %r in %s.", units_text, ndf_group.name) 

414 return None 

415 

416 

417def _read_quality_badbits(quality_group: h5py.Group) -> int: 

418 """Read the scalar NDF QUALITY.BADBITS value.""" 

419 badbits = quality_group.get("BADBITS") 

420 if not isinstance(badbits, h5py.Dataset): 

421 return 255 

422 value = np.asarray(_hds.read_array(badbits)).reshape(-1) 

423 if value.size == 0: 

424 return 255 

425 return int(value[0]) 

426 

427 

428def _validate_quality_array(quality: np.ndarray) -> np.ndarray: 

429 """Return an NDF QUALITY array as a `numpy.uint8` mask plane.""" 

430 if quality.dtype != np.dtype(np.uint8): 

431 raise ArchiveReadError(f"NDF QUALITY array has dtype {quality.dtype}; expected uint8.") 

432 return quality 

433 

434 

435def _make_quality_mask_schema(badbits: int) -> MaskSchema: 

436 """Create a fallback `MaskSchema` for an unnamed 8-bit QUALITY array.""" 

437 planes = [] 

438 for bit in range(8): 

439 mask = 1 << bit 

440 description = f"NDF QUALITY bit {bit}." 

441 if badbits & mask: 

442 description += " Selected by BADBITS." 

443 planes.append(MaskPlane(name=f"MASK{bit}", description=description)) 

444 return MaskSchema(planes, dtype=np.uint8) 

445 

446 

447def _locate_ndf_root(f: h5py.File) -> h5py.Group: 

448 """Return the group representing the top-level NDF. 

449 

450 Most files have the NDF at the root group itself. A few wrap it 

451 in a single-child container at the root; we accept that shape 

452 too. Anything more elaborate raises. 

453 """ 

454 root_class = f["/"].attrs.get(_hds.ATTR_CLASS) 

455 if isinstance(root_class, bytes): 

456 root_class = root_class.decode("ascii") 

457 if root_class == "NDF": 

458 return f["/"] 

459 # Maybe a one-level container. 

460 candidates = [] 

461 for name, child in f["/"].items(): 

462 if isinstance(child, h5py.Group): 

463 cls_attr = child.attrs.get(_hds.ATTR_CLASS) 

464 if isinstance(cls_attr, bytes): 

465 cls_attr = cls_attr.decode("ascii") 

466 if cls_attr == "NDF": 

467 candidates.append(name) 

468 if len(candidates) == 1: 

469 return f[candidates[0]] 

470 raise ArchiveReadError( 

471 f"Could not locate top-level NDF in {f.filename!r}; " 

472 f"expected the root group or a single NDF-typed child." 

473 ) 

474 

475 

476def _read_data_array_with_bbox( 

477 obj: h5py.Group | h5py.Dataset, 

478) -> tuple[np.ndarray, Any]: 

479 """Read a DATA_ARRAY component in either simple or complex form. 

480 

481 The complex form (what our writer always produces) is an HDS 

482 ARRAY structure (h5py group with CLASS="ARRAY") containing 

483 ``DATA`` and ``ORIGIN`` primitives. The simple form is a bare 

484 primitive dataset. 

485 

486 Returns 

487 ------- 

488 array, bbox : tuple 

489 ``array`` is the C-order numpy data (shape ``(height, width)`` 

490 for 2D images). ``bbox`` is constructed from the ORIGIN if 

491 present, else from a default origin of (0, 0). 

492 """ 

493 if isinstance(obj, h5py.Dataset): 

494 # Simple form. 

495 array = _hds.read_array(obj) 

496 bbox = _make_bbox(x_min=0, y_min=0, array=array) 

497 return array, bbox 

498 # Complex form: an HDS structure with DATA + ORIGIN. 

499 data = _hds.read_array(obj["DATA"]) 

500 if "ORIGIN" in obj: 

501 origin = _hds.read_array(obj["ORIGIN"]) 

502 bbox = _make_bbox(x_min=int(origin[0]), y_min=int(origin[1]), array=data) 

503 else: 

504 bbox = _make_bbox(x_min=0, y_min=0, array=data) 

505 return data, bbox 

506 

507 

508def _read_json_record(primitive: HdsPrimitive, path: str) -> str: 

509 """Read a JSON document stored as a single _CHAR*N record. 

510 

511 Our writer always emits JSON trees as a single-element character 

512 array sized to the document. Joining multiple records would lose 

513 trailing whitespace inside JSON string values, since 

514 `read_char_array` strips trailing spaces per record. 

515 """ 

516 records = primitive.read_char_array() 

517 if len(records) != 1: 

518 raise ArchiveReadError(f"Expected a single _CHAR*N record at {path!r}, got {len(records)}.") 

519 return records[0] 

520 

521 

522def _make_bbox(*, x_min: int, y_min: int, array: np.ndarray) -> Any: 

523 """Build an lsst.images.Box for a 2D image array. 

524 

525 The array is C-order ``(height, width)``. NDF stores ``ORIGIN`` 

526 in Fortran axis order ``(x_min, y_min)``. 

527 """ 

528 if array.ndim != 2: 

529 raise ArchiveReadError(f"Auto-detect read only supports 2D arrays, got ndim={array.ndim}.") 

530 # Box.from_shape takes (height, width) and start=(y_start, x_start). 

531 return Box.from_shape(array.shape, start=(y_min, x_min))