Coverage for python/lsst/images/fits/_input_archive.py: 29%

213 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 01:09 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "FitsInputArchive", 

16 "FitsOpaqueMetadata", 

17 "read", 

18) 

19 

20import io 

21import os 

22from collections.abc import Callable, Iterator 

23from contextlib import contextmanager 

24from functools import cached_property 

25from types import EllipsisType 

26from typing import IO, Any, Self 

27 

28import astropy.io.fits 

29import astropy.table 

30import fsspec 

31import numpy as np 

32 

33from lsst.resources import ResourcePath, ResourcePathExpression 

34 

35from .._transforms import FrameSet 

36from ..serialization import ( 

37 ArchiveReadError, 

38 ArchiveTree, 

39 ArrayReferenceModel, 

40 InlineArrayModel, 

41 InputArchive, 

42 ReadResult, 

43 TableModel, 

44 no_header_updates, 

45) 

46from ..serialization._common import _check_format_version 

47from ._common import ( 

48 JSON_COLUMN, 

49 JSON_EXTNAME, 

50 ExtensionHDU, 

51 ExtensionKey, 

52 FitsOpaqueMetadata, 

53 InvalidFitsArchiveError, 

54 PointerModel, 

55) 

56 

57_FITS_FORMAT_VERSION = 1 

58"""Container layout version this release of `FitsInputArchive` understands.""" 

59 

60 

61def read[T: Any]( 

62 cls: type[T], 

63 path: ResourcePathExpression, 

64 *, 

65 page_size: int = 2880 * 50, 

66 partial: bool | None = None, 

67 **kwargs: Any, 

68) -> ReadResult[T]: 

69 """Read an object from a FITS file. 

70 

71 Parameters 

72 ---------- 

73 path 

74 File to read; convertible to `lsst.resources.ResourcePath`. 

75 page_size 

76 Minimum number of bytes to read at at once. Making this a multiple 

77 of the FITS block size (2880) is recommended. 

78 partial 

79 Whether we will be reading only some of the archive, or if memory 

80 pressure forces us to read it only a little at a time. If `False`, 

81 the entire raw file may be read into memory up front. Defaults to 

82 `True` if any extra ``**kwargs`` are passed with values other than 

83 `None`, since those usually indicate that only some of the original 

84 object will be loaded. 

85 **kwargs 

86 Extra keyword arguments passed to ``cls.deserialize``. 

87 

88 Returns 

89 ------- 

90 ReadResult 

91 A named tuple containing the deserialized object and any additional 

92 metadata or butler information saved alongside it. 

93 

94 Notes 

95 ----- 

96 Supported types must implement ``deserialize`` and 

97 ``_get_archive_tree_type`` (see `.Image` for an example). 

98 """ 

99 if partial is None: 

100 partial = any(v is not None for v in kwargs.values()) 

101 with FitsInputArchive.open(path, page_size=page_size, partial=partial) as archive: 

102 tree = archive.get_tree(cls._get_archive_tree_type(PointerModel)) 

103 obj = tree.deserialize(archive, **kwargs) 

104 obj._opaque_metadata = archive.get_opaque_metadata() 

105 return ReadResult(obj, tree.metadata, tree.butler_info) 

106 

107 

108class FitsInputArchive(InputArchive[PointerModel]): 

109 """An implementation of the `.serialization.InputArchive` interface that 

110 reads from FITS files. 

111 

112 Instances of this class should only be constructed via the `open` 

113 context manager. 

114 """ 

115 

116 def __init__(self, stream: IO[bytes]): 

117 self._primary_hdu = astropy.io.fits.PrimaryHDU.readfrom(stream) 

118 on_disk_fmtver: int = self._primary_hdu.header.pop("FMTVER", 1) 

119 # DATAMODL is informational only on read; the JSON tree's 

120 # schema_version / min_read_version drive data-model checks. 

121 self._primary_hdu.header.pop("DATAMODL", None) 

122 _check_format_version("fits", on_disk_fmtver, _FITS_FORMAT_VERSION) 

123 # TODO: do some basic checks that the file format conforms to our 

124 # expectations (e.g. primary HDU should have no data). 

125 # 

126 # Read and strip the addresses and sizes from the headers. We don't 

127 # actually need the index address because we always want to read the 

128 # JSON HDU, too, and the index HDU is always the next one (but this 

129 # could change in the future). 

130 json_address: int = self._primary_hdu.header.pop("JSONADDR") 

131 json_size: int = self._primary_hdu.header.pop("JSONSIZE") 

132 del self._primary_hdu.header["INDXADDR"] 

133 index_size: int = self._primary_hdu.header.pop("INDXSIZE") 

134 # Save the remaining primary header keys so we can propagate them on 

135 # rewrite. 

136 self._opaque_metadata = FitsOpaqueMetadata() 

137 self._opaque_metadata.add_header(self._primary_hdu.header.copy(strip=True), name="", ver=1) 

138 # Read the JSON and index HDUs from the end. 

139 stream.seek(json_address) 

140 tail_data = stream.read(json_size + index_size) 

141 index_hdu = astropy.io.fits.BinTableHDU.fromstring(tail_data[json_size:]) 

142 # Initialize lazy readers for all of the regular HDUs and the JSON HDU. 

143 self._readers = { 

144 ExtensionKey.from_index_row(row): _ExtensionReader.from_index_row(row, stream) 

145 for row in index_hdu.data 

146 } 

147 self._readers[ExtensionKey(JSON_COLUMN)] = _ExtensionReader.from_bytes( 

148 astropy.io.fits.BinTableHDU, tail_data[:json_size] 

149 ) 

150 # Make any empty dictionary to cache deserialized objects. Keys are 

151 # the zero-indexed row in the JSON table. 

152 self._deserialized_pointer_cache: dict[int, Any] = {} 

153 

154 @classmethod 

155 @contextmanager 

156 def open( 

157 cls, 

158 path: ResourcePathExpression, 

159 *, 

160 page_size: int = 2880 * 50, 

161 partial: bool = False, 

162 ) -> Iterator[Self]: 

163 """Create an output archive that writes to the given file. 

164 

165 Parameters 

166 ---------- 

167 path 

168 File to read; convertible to `lsst.resources.ResourcePath`. 

169 page_size 

170 Minimum number of bytes to read at at once. Making this a multiple 

171 of the FITS block size (2880) is recommended. 

172 partial 

173 Whether we will be reading only some of the archive, or if memory 

174 pressure forces us to read it only a little at a time. If `False` 

175 (default), the entire raw file may be read into memory up front. 

176 

177 Returns 

178 ------- 

179 `contextlib.AbstractContextManager` [`FitsInputArchive`] 

180 A context manager that returns a `FitsInputArchive` when entered. 

181 """ 

182 path = ResourcePath(path) 

183 stream: IO[bytes] 

184 if not partial: 

185 stream = io.BytesIO(path.read()) 

186 yield cls(stream) 

187 else: 

188 fs: fsspec.AbstractFileSystem 

189 fs, fp = path.to_fsspec() 

190 with fs.open(fp, block_size=page_size) as stream: 

191 yield cls(stream) 

192 

193 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T: 

194 """Read the JSON tree from the archive. 

195 

196 Parameters 

197 ---------- 

198 model_type 

199 A Pydantic model type to use to validate the JSON. 

200 

201 Returns 

202 ------- 

203 T 

204 The validated Pydantic model. 

205 """ 

206 json_bytes = self._readers[ExtensionKey(JSON_EXTNAME)].data[0][JSON_COLUMN].tobytes() 

207 return model_type.model_validate_json(json_bytes) 

208 

209 def deserialize_pointer[U: ArchiveTree, V]( 

210 self, 

211 pointer: PointerModel, 

212 model_type: type[U], 

213 deserializer: Callable[[U, InputArchive[PointerModel]], V], 

214 ) -> V: 

215 # Docstring inherited. 

216 if (cached := self._deserialized_pointer_cache.get(pointer.row)) is not None: 

217 return cached 

218 if not isinstance(pointer.column.data, ArrayReferenceModel): 

219 raise ArchiveReadError(f"Invalid pointer with inline array:\n{pointer.model_dump_json(indent=2)}") 

220 _, reader = self._get_source_reader(pointer.column.data.source, is_table=True) 

221 try: 

222 json_bytes = reader.data[pointer.row][JSON_COLUMN].tobytes() 

223 except Exception as err: 

224 raise InvalidFitsArchiveError( 

225 f"Failed to access the table cell referenced by {pointer.model_dump_json()}." 

226 ) from err 

227 result = deserializer(model_type.model_validate_json(json_bytes), self) 

228 self._deserialized_pointer_cache[pointer.row] = result 

229 return result 

230 

231 def get_frame_set(self, ref: PointerModel) -> FrameSet: 

232 try: 

233 result = self._deserialized_pointer_cache[ref.row] 

234 except KeyError: 

235 raise AssertionError( 

236 f"Frame set at {ref.model_dump_json(indent=2)} must be deserialized " 

237 "before any dependent transform can be." 

238 ) from None 

239 if not isinstance(result, FrameSet): 

240 raise InvalidFitsArchiveError(f"Expected a FrameSet instance at {ref.model_dump_json(indent=2)}.") 

241 return result 

242 

243 def get_array( 

244 self, 

245 model: ArrayReferenceModel | InlineArrayModel, 

246 *, 

247 slices: tuple[slice, ...] | EllipsisType = ..., 

248 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

249 ) -> np.ndarray: 

250 if not isinstance(model, ArrayReferenceModel): 

251 raise ArchiveReadError("Inline array found where a reference array was expected.") 

252 key, reader = self._get_source_reader(model.source, is_table=False) 

253 if slices is not ...: 

254 array = reader.section[slices] 

255 else: 

256 array = reader.data 

257 if key not in self._opaque_metadata.headers: 

258 opaque_header = reader.header.copy(strip=True) 

259 strip_header(opaque_header) 

260 self._opaque_metadata.add_header(opaque_header, key=key) 

261 return array 

262 

263 def get_table( 

264 self, 

265 model: TableModel, 

266 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

267 ) -> astropy.table.Table: 

268 # Docstring inherited. 

269 array = self.get_structured_array(model, strip_header) 

270 table = astropy.table.Table(array) 

271 for c in model.columns: 

272 c.update_table(table) 

273 return table 

274 

275 def get_structured_array( 

276 self, 

277 model: TableModel, 

278 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

279 ) -> np.ndarray: 

280 # Docstring inherited. 

281 if not isinstance(model.columns[0].data, ArrayReferenceModel): 

282 raise ArchiveReadError("Inline array found where a reference array was expected.") 

283 # All columns should have the same data.source; just use the first. 

284 key, reader = self._get_source_reader(model.columns[0].data.source, is_table=True) 

285 if key not in self._opaque_metadata.headers: 

286 opaque_header = reader.header.copy(strip=True) 

287 strip_header(opaque_header) 

288 self._opaque_metadata.add_header(opaque_header, key=key) 

289 return reader.hdu.data 

290 

291 def get_opaque_metadata(self) -> FitsOpaqueMetadata: 

292 # Docstring inherited. 

293 return self._opaque_metadata 

294 

295 def _get_source_reader(self, source: str | int, is_table: bool) -> tuple[ExtensionKey, _ExtensionReader]: 

296 """Get a reader for the extension referenced by a serialiation model's 

297 ``source`` field. 

298 

299 Parameters 

300 ---------- 

301 source 

302 A ``source`` field of the form ``fits:${hdu}`` or 

303 ``fits:${hdu}[${col}]``. 

304 is_table 

305 Whether the source should be for a table HDU. 

306 

307 Returns 

308 ------- 

309 key 

310 Identifier pair for the HDU (EXTNAME, EXTVER). 

311 reader 

312 A reader object for the extension. 

313 """ 

314 if not isinstance(source, str): 

315 raise InvalidFitsArchiveError(f"Reference with source={source!r} is not a string.") 

316 if not source.startswith("fits:"): 

317 raise InvalidFitsArchiveError(f"Reference with source={source!r} does not start with 'fits:'.") 

318 key = ExtensionKey.from_str(source) 

319 try: 

320 reader = self._readers[key] 

321 except KeyError: 

322 raise InvalidFitsArchiveError(f"Unrecognized source value {key}.") from None 

323 if is_table and not reader.is_table: 

324 raise InvalidFitsArchiveError( 

325 f"Extension with source={key} was expected to be be a binary table, not an image." 

326 ) 

327 elif not is_table and reader.is_table: 

328 raise InvalidFitsArchiveError( 

329 f"Extension with source={key} was expected to be be an image, not a binary table." 

330 ) 

331 return key, reader 

332 

333 

334class _ExtensionReader: 

335 """A lazy-load reader for a single extension HDU. 

336 

337 Parameters 

338 ---------- 

339 hdu_cls 

340 The type of the astropy HDU instance to construct. 

341 stream 

342 The file-like object to read from. 

343 """ 

344 

345 def __init__(self, hdu_cls: type[ExtensionHDU], stream: IO[bytes]): 

346 self._hdu_cls = hdu_cls 

347 self._stream = stream 

348 

349 @classmethod 

350 def from_index_row(cls, index_row: np.void, stream: IO[bytes]) -> _ExtensionReader: 

351 """Construct from a row of the binary table index HDU. 

352 

353 Parameters 

354 ---------- 

355 index_row 

356 A record array row from the index HDU. 

357 stream 

358 The file-like object being used to read the full FITS file. 

359 

360 Returns 

361 ------- 

362 reader 

363 A reader object for the extension. 

364 """ 

365 match index_row["XTENSION"].strip(): 

366 case "IMAGE": 

367 hdu_cls = astropy.io.fits.ImageHDU 

368 case "BINTABLE": 

369 if index_row["ZIMAGE"]: 

370 hdu_cls = astropy.io.fits.CompImageHDU 

371 else: 

372 hdu_cls = astropy.io.fits.BinTableHDU 

373 case other: 

374 raise AssertionError(f"Unsupported HDU type {other!r}.") 

375 return _ExtensionReader( 

376 hdu_cls, 

377 _RangeStreamProxy( 

378 stream, 

379 start=int(index_row["HDRADDR"]), 

380 ), 

381 ) 

382 

383 @classmethod 

384 def from_bytes(cls, hdu_cls: type[ExtensionHDU], data: bytes) -> _ExtensionReader: 

385 """Construct from already-read `bytes` 

386 

387 Parameters 

388 ---------- 

389 hdu_cls 

390 The HDU type to instantiate. 

391 data 

392 Raw data for the HDU. 

393 

394 Returns 

395 ------- 

396 reader 

397 A reader object for extension. 

398 """ 

399 return _ExtensionReader(hdu_cls, io.BytesIO(data)) 

400 

401 @property 

402 def is_table(self) -> bool: 

403 """Whether this is logically a table HDU. 

404 

405 This is `False` for compressed image HDUs, even though they are 

406 represented in FITS as a binary table. 

407 """ 

408 return issubclass(self._hdu_cls, astropy.io.fits.BinTableHDU) and not issubclass( 

409 self._hdu_cls, astropy.io.fits.CompImageHDU 

410 ) 

411 

412 @cached_property 

413 def hdu(self) -> ExtensionHDU: 

414 """The Astropy HDU object.""" 

415 self._stream.seek(0) 

416 if self._hdu_cls is astropy.io.fits.CompImageHDU: 

417 # CompImageHDU.readfrom doesn't work; we need to make a minimal 

418 # example and report it upstream. Happily this workaround does 

419 # work. 

420 bintable_hdu = astropy.io.fits.BinTableHDU.readfrom(self._stream, memmap=False, cache=False) 

421 return self._hdu_cls(bintable=bintable_hdu) 

422 else: 

423 return self._hdu_cls.readfrom(self._stream, memmap=False, cache=False, uint=True) 

424 

425 @property 

426 def header(self) -> astropy.io.fits.Header: 

427 """The header of the HDU.""" 

428 return self.hdu.header 

429 

430 @property 

431 def data(self) -> np.ndarray: 

432 """The data for the HDU.""" 

433 return self.hdu.data 

434 

435 @property 

436 def section(self) -> astropy.io.fits.Section | astropy.io.fits.CompImageSection: 

437 """An Astropy expression object that reads a subset of the data when 

438 sliced. 

439 """ 

440 return self.hdu.section 

441 

442 

443class _RangeStreamProxy(IO[bytes]): 

444 """A readable IO proxy object that makes the beginning of the file appear 

445 at a custom position. 

446 

447 Parameters 

448 ---------- 

449 base 

450 Underlying readable, seekable buffer to proxy. 

451 start 

452 Offset into the base stream that will be considered the start of the 

453 proxy stream. 

454 

455 Notes 

456 ----- 

457 This class exists because Astropy doesn't seem to provide a way to read a 

458 single HDU that starts at the current seek position of a file-like object. 

459 It does provide ``readfrom`` methods on its HDU objects that take a 

460 file-like object, but these assume (possibly unintentionally; it only 

461 happens when Astropy is trying to see whether the file was opened for 

462 appending) that ``seek(0)`` will set the file-like object to the start of 

463 the HDU. 

464 """ 

465 

466 def __init__(self, base: IO[bytes], start: int): 

467 self._base = base 

468 self._start = start 

469 

470 @property 

471 def mode(self) -> str: 

472 return "rb" 

473 

474 def __enter__(self) -> Self: 

475 raise AssertionError("This proxy should not be used as a context manager.") 

476 

477 def __exit__(self, type: Any, value: Any, traceback: Any) -> None: 

478 raise AssertionError("This proxy should not be used as a context manager.") 

479 

480 def __iter__(self) -> Iterator[bytes]: 

481 return self._base.__iter__() 

482 

483 def __next__(self) -> bytes: 

484 return self._base.__next__() 

485 

486 def close(self) -> None: 

487 raise AssertionError("This proxy should not ever be closed.") 

488 

489 @property 

490 def closed(self) -> bool: 

491 return False 

492 

493 def fileno(self) -> int: 

494 raise OSError() 

495 

496 def flush(self) -> None: 

497 pass 

498 

499 def isatty(self) -> bool: 

500 return False 

501 

502 def read(self, n: int = -1, /) -> bytes: 

503 result = self._base.read(n) 

504 return result 

505 

506 def readable(self) -> bool: 

507 return True 

508 

509 def readline(self, limit: int = -1, /) -> bytes: 

510 return self._base.readline(limit) 

511 

512 def readlines(self, hint: int = -1, /) -> list[bytes]: 

513 return self._base.readlines(hint) 

514 

515 def seek(self, offset: int, whence: int = 0) -> int: 

516 match whence: 

517 case os.SEEK_SET: 

518 return self._base.seek(offset + self._start, os.SEEK_SET) - self._start 

519 case os.SEEK_CUR: 

520 return self._base.seek(offset, os.SEEK_CUR) - self._start 

521 case os.SEEK_END: 

522 return self._base.seek(offset, os.SEEK_END) - self._start 

523 raise TypeError(f"Invalid value for 'whence': {whence}.") 

524 

525 def seekable(self) -> bool: 

526 return True 

527 

528 def tell(self) -> int: 

529 return self._base.tell() - self._start 

530 

531 def truncate(self, size: int | None = None, /) -> int: 

532 raise OSError() 

533 

534 def writable(self) -> bool: 

535 return False 

536 

537 def write(self, arg: Any, /) -> int: 

538 raise OSError() 

539 

540 def writelines(self, arg: Any, /) -> None: 

541 raise OSError()