Coverage for python / lsst / images / ndf / _input_archive.py: 14%

258 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-27 01:31 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("NdfInputArchive", "read") 

15 

16import logging 

17from collections.abc import Callable, Iterator 

18from contextlib import contextmanager 

19from types import EllipsisType 

20from typing import Any, Self 

21 

22import astropy.io.fits 

23import astropy.table 

24import astropy.units as u 

25import h5py 

26import numpy as np 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from .._geom import Box 

31from .._image import Image 

32from .._mask import Mask, MaskPlane, MaskSchema 

33from .._masked_image import MaskedImage 

34from .._transforms import FrameSet, Projection 

35from .._transforms import _ast as astshim 

36from .._transforms._frames import GeneralFrame 

37from ..fits._common import FitsOpaqueMetadata 

38from ..serialization import ( 

39 ArchiveReadError, 

40 ArchiveTree, 

41 ArrayReferenceModel, 

42 InlineArrayModel, 

43 InputArchive, 

44 ReadResult, 

45 TableModel, 

46 no_header_updates, 

47) 

48from . import _hds 

49from ._common import NdfPointerModel 

50from ._model import HdsPrimitive, NdfDocument 

51 

52_LOG = logging.getLogger(__name__) 

53 

54 

55class NdfInputArchive(InputArchive[NdfPointerModel]): 

56 """Reads HDS-on-HDF5 NDF files written by `NdfOutputArchive`. 

57 

58 Instances should only be constructed via the :meth:`open` context 

59 manager. 

60 

61 Parameters 

62 ---------- 

63 file 

64 Open `h5py.File` handle. Owned by the caller of :meth:`open`; 

65 the archive does not close it. 

66 """ 

67 

68 def __init__(self, file: h5py.File) -> None: 

69 self._file = file 

70 self._document = NdfDocument.from_hdf5(file) 

71 self._opaque_metadata = FitsOpaqueMetadata() 

72 self._deserialized_pointer_cache: dict[str, Any] = {} 

73 self._frame_set_cache: dict[str, FrameSet] = {} 

74 self._read_opaque_fits_metadata() 

75 

76 @classmethod 

77 @contextmanager 

78 def open(cls, path: ResourcePathExpression) -> Iterator[Self]: 

79 """Open an NDF file for reading and yield an `NdfInputArchive`. 

80 

81 Remote ResourcePaths are materialised locally first; fsspec-direct 

82 h5py reads are a deferred follow-up. 

83 """ 

84 rp = ResourcePath(path) 

85 with rp.as_local() as local: 

86 with h5py.File(local.ospath, "r") as f: 

87 yield cls(f) 

88 

89 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T: 

90 """Read and validate the main Pydantic tree at ``/MORE/LSST/JSON``.""" 

91 json_path = self._get_main_json_path() 

92 if json_path is None: 

93 raise ArchiveReadError( 

94 "File has no /MORE/LSST/JSON tree; this is either a " 

95 "Starlink-only NDF (use ndf.read() with auto-detect) or " 

96 "the file was written by an unrelated tool." 

97 ) 

98 json_text = _read_json_record(self._get_primitive(json_path), json_path) 

99 return model_type.model_validate_json(json_text) 

100 

101 def deserialize_pointer[U: ArchiveTree, V]( 

102 self, 

103 pointer: NdfPointerModel, 

104 model_type: type[U], 

105 deserializer: Callable[[U, InputArchive[NdfPointerModel]], V], 

106 ) -> V: 

107 # Cache by pointer.path so repeated dereferences reuse the same 

108 # deserialised result and don't re-run the deserializer. 

109 if (cached := self._deserialized_pointer_cache.get(pointer.path)) is not None: 

110 return cached 

111 if not self._has_model_path(pointer.path): 

112 raise ArchiveReadError(f"Pointer reference {pointer.path!r} not found in NDF file.") 

113 primitive = self._get_primitive(pointer.path) 

114 json_text = _read_json_record(primitive, pointer.path) 

115 model = model_type.model_validate_json(json_text) 

116 result = deserializer(model, self) 

117 self._deserialized_pointer_cache[pointer.path] = result 

118 if isinstance(result, FrameSet): 

119 self._frame_set_cache[pointer.path] = result 

120 return result 

121 

122 def get_frame_set(self, pointer: NdfPointerModel) -> FrameSet: 

123 try: 

124 return self._frame_set_cache[pointer.path] 

125 except KeyError: 

126 raise AssertionError( 

127 f"Frame set at {pointer.path!r} must be deserialised via " 

128 f"deserialize_pointer before any dependent transform can be." 

129 ) from None 

130 

131 def get_array( 

132 self, 

133 model: ArrayReferenceModel | InlineArrayModel, 

134 *, 

135 slices: tuple[slice, ...] | EllipsisType = ..., 

136 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

137 ) -> np.ndarray: 

138 if isinstance(model, InlineArrayModel): 

139 data: np.ndarray = np.array(model.data, dtype=model.datatype.to_numpy()) 

140 return data if slices is ... else data[slices] 

141 if not isinstance(model.source, str) or not model.source.startswith("ndf:"): 

142 raise ArchiveReadError( 

143 f"NdfInputArchive cannot resolve array source {model.source!r}; " 

144 f"expected an 'ndf:<HDF5-path>' reference." 

145 ) 

146 path = model.source[len("ndf:") :] 

147 if not self._has_model_path(path): 

148 raise ArchiveReadError(f"Array reference {path!r} not in file.") 

149 primitive = self._get_primitive(path) 

150 # h5py supports lazy slicing via dataset[slices]. 

151 if isinstance(primitive.data, h5py.Dataset): 

152 return primitive.data[()] if slices is ... else primitive.data[slices] 

153 data = primitive.read_array() 

154 return data if slices is ... else data[slices] 

155 

156 def get_table( 

157 self, 

158 model: TableModel, 

159 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

160 ) -> astropy.table.Table: 

161 result = astropy.table.Table(meta=model.meta) 

162 for column_model in model.columns: 

163 if isinstance(column_model.data, InlineArrayModel): 

164 data: Any = column_model.data.data 

165 else: 

166 data = self.get_array(column_model.data, strip_header=strip_header) 

167 result[column_model.name] = astropy.table.Column( 

168 data, 

169 name=column_model.name, 

170 dtype=column_model.data.datatype.to_numpy(), 

171 unit=column_model.unit, 

172 description=column_model.description, 

173 meta=column_model.meta, 

174 ) 

175 return result 

176 

177 def get_structured_array( 

178 self, 

179 model: TableModel, 

180 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

181 ) -> np.ndarray: 

182 return self.get_table(model, strip_header).as_array() 

183 

184 def _read_opaque_fits_metadata(self) -> None: 

185 if not self._has_model_path("/MORE/FITS"): 

186 return 

187 cards = self._get_primitive("/MORE/FITS").read_char_array() 

188 # FITS Header.fromstring expects fixed-width 80-char cards 

189 # concatenated; pad each card defensively so readers tolerate 

190 # files written with shorter widths. 

191 header = astropy.io.fits.Header.fromstring("".join(c.ljust(80) for c in cards)) 

192 self._opaque_metadata.add_header(header, name="", ver=1) 

193 

194 def get_opaque_metadata(self) -> FitsOpaqueMetadata: 

195 return self._opaque_metadata 

196 

197 def _get_main_json_path(self) -> str | None: 

198 """Return the path of the main LSST JSON tree, if present.""" 

199 for path in ("/MORE/LSST/JSON", "/LSST/JSON"): 

200 if self._has_model_path(path): 

201 return path 

202 return None 

203 

204 def _has_model_path(self, path: str) -> bool: 

205 """Return `True` if a path exists in the NDF document model.""" 

206 try: 

207 self._document.get(path) 

208 except KeyError: 

209 return False 

210 return True 

211 

212 def _get_primitive(self, path: str) -> HdsPrimitive: 

213 """Return a primitive component from the NDF document model.""" 

214 node = self._document.get(path) 

215 if not isinstance(node, HdsPrimitive): 

216 raise ArchiveReadError(f"NDF reference {path!r} is not a primitive dataset.") 

217 return node 

218 

219 

220def read[T: Any](cls: type[T], path: ResourcePathExpression, **kwargs: Any) -> ReadResult[T]: 

221 """Read an object from an NDF (HDS-on-HDF5) file. 

222 

223 If the file has a ``/MORE/LSST/JSON`` tree it is used as the source 

224 of truth and ``cls.deserialize`` is invoked with the parsed model. 

225 Otherwise the reader falls back to auto-detection of a minimal 

226 recognised-component set (``DATA_ARRAY``, ``VARIANCE``, ``QUALITY``, 

227 ``MORE.FITS``); ``WCS`` is logged-and-dropped in v1. 

228 

229 Parameters 

230 ---------- 

231 cls 

232 Expected return type. `~lsst.images.Image` and 

233 `~lsst.images.MaskedImage` are the only types the auto-detect 

234 path can produce. The symmetric path accepts whatever the file's 

235 discriminator says. 

236 path 

237 File path or ``lsst.resources.ResourcePathExpression``. 

238 **kwargs 

239 Forwarded to ``cls.deserialize`` on the symmetric read path. 

240 

241 Returns 

242 ------- 

243 `~lsst.images.serialization.ReadResult` [T] 

244 Named tuple of (deserialized object, metadata, butler_info). 

245 """ 

246 with NdfInputArchive.open(path) as archive: 

247 if archive._get_main_json_path() is not None: 

248 tree_type = cls._get_archive_tree_type(NdfPointerModel) 

249 tree = archive.get_tree(tree_type) 

250 obj = tree.deserialize(archive, **kwargs) 

251 obj._opaque_metadata = archive.get_opaque_metadata() 

252 return ReadResult(obj, tree.metadata, tree.butler_info) 

253 return _read_auto_detect(cls, archive) 

254 

255 

256def _read_auto_detect[T: Any](cls: type[T], archive: NdfInputArchive) -> ReadResult[T]: 

257 """Reconstruct an `Image` (or `MaskedImage`) from a Starlink NDF. 

258 

259 Recognised components: ``DATA_ARRAY`` (in either simple or complex 

260 form), ``VARIANCE``, ``QUALITY``, ``MORE.FITS``. Other components 

261 (``WCS``, ``HISTORY``, ``AXIS``, ``LABEL``, custom ``MORE.*``, 

262 ``_LOGICAL`` primitives) are warned-and-dropped. 

263 """ 

264 f = archive._file 

265 ndf_group = _locate_ndf_root(f) 

266 

267 # DATA_ARRAY is required. 

268 if "DATA_ARRAY" not in ndf_group: 

269 raise ArchiveReadError(f"Auto-detect read of {f.filename!r}: no DATA_ARRAY component.") 

270 data_arr, bbox = _read_data_array_with_bbox(ndf_group["DATA_ARRAY"]) 

271 

272 # VARIANCE / QUALITY are optional. 

273 variance_arr: np.ndarray | None = None 

274 variance_bbox: Any | None = None 

275 if "VARIANCE" in ndf_group: 

276 variance_arr, variance_bbox = _read_data_array_with_bbox(ndf_group["VARIANCE"]) 

277 quality_arr: np.ndarray | None = None 

278 quality_bbox: Any | None = None 

279 quality_badbits = 255 

280 if "QUALITY" in ndf_group and isinstance(ndf_group["QUALITY"], h5py.Group): 

281 q = ndf_group["QUALITY"] 

282 quality_badbits = _read_quality_badbits(q) 

283 if "QUALITY" in q and isinstance(q["QUALITY"], h5py.Dataset): 

284 quality_arr = _validate_quality_array(_hds.read_array(q["QUALITY"])) 

285 quality_bbox = _make_bbox(x_min=0, y_min=0, array=quality_arr) 

286 elif "QUALITY" in q and isinstance(q["QUALITY"], h5py.Group): 

287 quality_arr, quality_bbox = _read_data_array_with_bbox(q["QUALITY"]) 

288 quality_arr = _validate_quality_array(quality_arr) 

289 

290 projection: Projection | None = None 

291 if "WCS" in ndf_group: 

292 try: 

293 wcs_group = ndf_group["WCS"] 

294 if isinstance(wcs_group, h5py.Group) and "DATA" in wcs_group: 

295 wcs_lines = _hds.read_char_array(wcs_group["DATA"]) 

296 wcs_text = _hds.decode_ndf_ast_data(wcs_lines) 

297 ast_obj = astshim.Object.fromString(wcs_text) 

298 if isinstance(ast_obj, astshim.FrameSet): 

299 pixel_frame = GeneralFrame(unit=u.pix) 

300 projection = Projection.from_ast_frame_set( 

301 ast_obj, 

302 pixel_frame, 

303 pixel_bounds=bbox, 

304 ) 

305 except Exception: 

306 _LOG.warning( 

307 "Could not reconstruct Projection from WCS in %s; dropping.", 

308 f.filename, 

309 exc_info=True, 

310 ) 

311 

312 unit = _read_ndf_units(ndf_group) 

313 

314 # Anything unrecognised: warn-and-drop. 

315 recognised = { 

316 "DATA_ARRAY", 

317 "VARIANCE", 

318 "QUALITY", 

319 "WCS", 

320 "MORE", 

321 "TITLE", 

322 "LABEL", 

323 "UNITS", 

324 "HISTORY", 

325 "AXIS", 

326 } 

327 for name in ndf_group: 

328 if name not in recognised: 

329 _LOG.warning( 

330 "Ignoring unrecognised NDF component %s/%s during auto-detect read.", 

331 ndf_group.name, 

332 name, 

333 ) 

334 

335 # Build the requested in-memory object. Any NDF can be read as an Image; 

336 # MaskedImage construction uses whatever VARIANCE/QUALITY are present and 

337 # lets the MaskedImage constructor provide defaults for missing planes. 

338 image = Image(data_arr, bbox=bbox, unit=unit, projection=projection) 

339 obj: Any 

340 if cls is Image: 

341 obj = image 

342 elif issubclass(cls, MaskedImage): 

343 if quality_arr is not None: 

344 schema = _make_quality_mask_schema(quality_badbits) 

345 mask = Mask(quality_arr[:, :, np.newaxis], schema=schema, bbox=quality_bbox) 

346 else: 

347 schema = MaskSchema([MaskPlane(name="BAD", description="Bad pixel.")]) 

348 mask = None 

349 variance = Image(variance_arr, bbox=variance_bbox) if variance_arr is not None else None 

350 obj = cls( 

351 image=image, 

352 mask=mask, 

353 mask_schema=schema if mask is None else None, 

354 variance=variance, 

355 ) 

356 else: 

357 raise ArchiveReadError( 

358 f"Auto-detect can produce Image or MaskedImage, but caller asked for {cls.__name__}." 

359 ) 

360 obj._opaque_metadata = archive.get_opaque_metadata() 

361 # Auto-detect path produces no archive-tree metadata or butler_info. 

362 return ReadResult(obj, {}, None) 

363 

364 

365def _read_ndf_units(ndf_group: h5py.Group) -> u.UnitBase | None: 

366 """Read the NDF UNITS component, if present.""" 

367 if "UNITS" not in ndf_group or not isinstance(ndf_group["UNITS"], h5py.Dataset): 

368 return None 

369 dataset = ndf_group["UNITS"] 

370 if dataset.dtype.kind != "S": 

371 _LOG.warning("Ignoring non-character NDF UNITS component in %s.", ndf_group.name) 

372 return None 

373 if dataset.ndim == 0: 

374 raw = dataset[()] 

375 if isinstance(raw, np.bytes_): 

376 raw = bytes(raw) 

377 if not isinstance(raw, bytes): 

378 return None 

379 units_text = raw.decode("ascii").rstrip(" ") 

380 else: 

381 records = _hds.read_char_array(dataset) 

382 units_text = records[0] if records else "" 

383 if not units_text: 

384 return None 

385 for kwargs in ({"format": "fits"}, {}): 

386 try: 

387 return u.Unit(units_text, **kwargs) 

388 except ValueError: 

389 continue 

390 _LOG.warning("Could not parse NDF UNITS value %r in %s.", units_text, ndf_group.name) 

391 return None 

392 

393 

394def _read_quality_badbits(quality_group: h5py.Group) -> int: 

395 """Read the scalar NDF QUALITY.BADBITS value.""" 

396 badbits = quality_group.get("BADBITS") 

397 if not isinstance(badbits, h5py.Dataset): 

398 return 255 

399 value = np.asarray(_hds.read_array(badbits)).reshape(-1) 

400 if value.size == 0: 

401 return 255 

402 return int(value[0]) 

403 

404 

405def _validate_quality_array(quality: np.ndarray) -> np.ndarray: 

406 """Return an NDF QUALITY array as a `numpy.uint8` mask plane.""" 

407 if quality.dtype != np.dtype(np.uint8): 

408 raise ArchiveReadError(f"NDF QUALITY array has dtype {quality.dtype}; expected uint8.") 

409 return quality 

410 

411 

412def _make_quality_mask_schema(badbits: int) -> MaskSchema: 

413 """Create a fallback `MaskSchema` for an unnamed 8-bit QUALITY array.""" 

414 planes = [] 

415 for bit in range(8): 

416 mask = 1 << bit 

417 description = f"NDF QUALITY bit {bit}." 

418 if badbits & mask: 

419 description += " Selected by BADBITS." 

420 planes.append(MaskPlane(name=f"MASK{bit}", description=description)) 

421 return MaskSchema(planes, dtype=np.uint8) 

422 

423 

424def _locate_ndf_root(f: h5py.File) -> h5py.Group: 

425 """Return the group representing the top-level NDF. 

426 

427 Most files have the NDF at the root group itself. A few wrap it 

428 in a single-child container at the root; we accept that shape 

429 too. Anything more elaborate raises. 

430 """ 

431 root_class = f["/"].attrs.get(_hds.ATTR_CLASS) 

432 if isinstance(root_class, bytes): 

433 root_class = root_class.decode("ascii") 

434 if root_class == "NDF": 

435 return f["/"] 

436 # Maybe a one-level container. 

437 candidates = [] 

438 for name, child in f["/"].items(): 

439 if isinstance(child, h5py.Group): 

440 cls_attr = child.attrs.get(_hds.ATTR_CLASS) 

441 if isinstance(cls_attr, bytes): 

442 cls_attr = cls_attr.decode("ascii") 

443 if cls_attr == "NDF": 

444 candidates.append(name) 

445 if len(candidates) == 1: 

446 return f[candidates[0]] 

447 raise ArchiveReadError( 

448 f"Could not locate top-level NDF in {f.filename!r}; " 

449 f"expected the root group or a single NDF-typed child." 

450 ) 

451 

452 

453def _read_data_array_with_bbox( 

454 obj: h5py.Group | h5py.Dataset, 

455) -> tuple[np.ndarray, Any]: 

456 """Read a DATA_ARRAY component in either simple or complex form. 

457 

458 The complex form (what our writer always produces) is an HDS 

459 ARRAY structure (h5py group with CLASS="ARRAY") containing 

460 ``DATA`` and ``ORIGIN`` primitives. The simple form is a bare 

461 primitive dataset. 

462 

463 Returns 

464 ------- 

465 array, bbox : tuple 

466 ``array`` is the C-order numpy data (shape ``(height, width)`` 

467 for 2D images). ``bbox`` is constructed from the ORIGIN if 

468 present, else from a default origin of (0, 0). 

469 """ 

470 if isinstance(obj, h5py.Dataset): 

471 # Simple form. 

472 array = _hds.read_array(obj) 

473 bbox = _make_bbox(x_min=0, y_min=0, array=array) 

474 return array, bbox 

475 # Complex form: an HDS structure with DATA + ORIGIN. 

476 data = _hds.read_array(obj["DATA"]) 

477 if "ORIGIN" in obj: 

478 origin = _hds.read_array(obj["ORIGIN"]) 

479 bbox = _make_bbox(x_min=int(origin[0]), y_min=int(origin[1]), array=data) 

480 else: 

481 bbox = _make_bbox(x_min=0, y_min=0, array=data) 

482 return data, bbox 

483 

484 

485def _read_json_record(primitive: HdsPrimitive, path: str) -> str: 

486 """Read a JSON document stored as a single _CHAR*N record. 

487 

488 Our writer always emits JSON trees as a single-element character 

489 array sized to the document. Joining multiple records would lose 

490 trailing whitespace inside JSON string values, since 

491 `read_char_array` strips trailing spaces per record. 

492 """ 

493 records = primitive.read_char_array() 

494 if len(records) != 1: 

495 raise ArchiveReadError(f"Expected a single _CHAR*N record at {path!r}, got {len(records)}.") 

496 return records[0] 

497 

498 

499def _make_bbox(*, x_min: int, y_min: int, array: np.ndarray) -> Any: 

500 """Build an lsst.images.Box for a 2D image array. 

501 

502 The array is C-order ``(height, width)``. NDF stores ``ORIGIN`` 

503 in Fortran axis order ``(x_min, y_min)``. 

504 """ 

505 if array.ndim != 2: 

506 raise ArchiveReadError(f"Auto-detect read only supports 2D arrays, got ndim={array.ndim}.") 

507 # Box.from_shape takes (height, width) and start=(y_start, x_start). 

508 return Box.from_shape(array.shape, start=(y_min, x_min))