Coverage for python/lsst/images/fits/_input_archive.py: 29%
213 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "FitsInputArchive",
16 "FitsOpaqueMetadata",
17 "read",
18)
20import io
21import os
22from collections.abc import Callable, Iterator
23from contextlib import contextmanager
24from functools import cached_property
25from types import EllipsisType
26from typing import IO, Any, Self
28import astropy.io.fits
29import astropy.table
30import fsspec
31import numpy as np
33from lsst.resources import ResourcePath, ResourcePathExpression
35from .._transforms import FrameSet
36from ..serialization import (
37 ArchiveReadError,
38 ArchiveTree,
39 ArrayReferenceModel,
40 InlineArrayModel,
41 InputArchive,
42 ReadResult,
43 TableModel,
44 no_header_updates,
45)
46from ..serialization._common import _check_format_version
47from ._common import (
48 JSON_COLUMN,
49 JSON_EXTNAME,
50 ExtensionHDU,
51 ExtensionKey,
52 FitsOpaqueMetadata,
53 InvalidFitsArchiveError,
54 PointerModel,
55)
57_FITS_FORMAT_VERSION = 1
58"""Container layout version this release of `FitsInputArchive` understands."""
61def read[T: Any](
62 cls: type[T],
63 path: ResourcePathExpression,
64 *,
65 page_size: int = 2880 * 50,
66 partial: bool | None = None,
67 **kwargs: Any,
68) -> ReadResult[T]:
69 """Read an object from a FITS file.
71 Parameters
72 ----------
73 path
74 File to read; convertible to `lsst.resources.ResourcePath`.
75 page_size
76 Minimum number of bytes to read at at once. Making this a multiple
77 of the FITS block size (2880) is recommended.
78 partial
79 Whether we will be reading only some of the archive, or if memory
80 pressure forces us to read it only a little at a time. If `False`,
81 the entire raw file may be read into memory up front. Defaults to
82 `True` if any extra ``**kwargs`` are passed with values other than
83 `None`, since those usually indicate that only some of the original
84 object will be loaded.
85 **kwargs
86 Extra keyword arguments passed to ``cls.deserialize``.
88 Returns
89 -------
90 ReadResult
91 A named tuple containing the deserialized object and any additional
92 metadata or butler information saved alongside it.
94 Notes
95 -----
96 Supported types must implement ``deserialize`` and
97 ``_get_archive_tree_type`` (see `.Image` for an example).
98 """
99 if partial is None:
100 partial = any(v is not None for v in kwargs.values())
101 with FitsInputArchive.open(path, page_size=page_size, partial=partial) as archive:
102 tree = archive.get_tree(cls._get_archive_tree_type(PointerModel))
103 obj = tree.deserialize(archive, **kwargs)
104 obj._opaque_metadata = archive.get_opaque_metadata()
105 return ReadResult(obj, tree.metadata, tree.butler_info)
108class FitsInputArchive(InputArchive[PointerModel]):
109 """An implementation of the `.serialization.InputArchive` interface that
110 reads from FITS files.
112 Instances of this class should only be constructed via the `open`
113 context manager.
114 """
116 def __init__(self, stream: IO[bytes]):
117 self._primary_hdu = astropy.io.fits.PrimaryHDU.readfrom(stream)
118 on_disk_fmtver: int = self._primary_hdu.header.pop("FMTVER", 1)
119 # DATAMODL is informational only on read; the JSON tree's
120 # schema_version / min_read_version drive data-model checks.
121 self._primary_hdu.header.pop("DATAMODL", None)
122 _check_format_version("fits", on_disk_fmtver, _FITS_FORMAT_VERSION)
123 # TODO: do some basic checks that the file format conforms to our
124 # expectations (e.g. primary HDU should have no data).
125 #
126 # Read and strip the addresses and sizes from the headers. We don't
127 # actually need the index address because we always want to read the
128 # JSON HDU, too, and the index HDU is always the next one (but this
129 # could change in the future).
130 json_address: int = self._primary_hdu.header.pop("JSONADDR")
131 json_size: int = self._primary_hdu.header.pop("JSONSIZE")
132 del self._primary_hdu.header["INDXADDR"]
133 index_size: int = self._primary_hdu.header.pop("INDXSIZE")
134 # Save the remaining primary header keys so we can propagate them on
135 # rewrite.
136 self._opaque_metadata = FitsOpaqueMetadata()
137 self._opaque_metadata.add_header(self._primary_hdu.header.copy(strip=True), name="", ver=1)
138 # Read the JSON and index HDUs from the end.
139 stream.seek(json_address)
140 tail_data = stream.read(json_size + index_size)
141 index_hdu = astropy.io.fits.BinTableHDU.fromstring(tail_data[json_size:])
142 # Initialize lazy readers for all of the regular HDUs and the JSON HDU.
143 self._readers = {
144 ExtensionKey.from_index_row(row): _ExtensionReader.from_index_row(row, stream)
145 for row in index_hdu.data
146 }
147 self._readers[ExtensionKey(JSON_COLUMN)] = _ExtensionReader.from_bytes(
148 astropy.io.fits.BinTableHDU, tail_data[:json_size]
149 )
150 # Make any empty dictionary to cache deserialized objects. Keys are
151 # the zero-indexed row in the JSON table.
152 self._deserialized_pointer_cache: dict[int, Any] = {}
154 @classmethod
155 @contextmanager
156 def open(
157 cls,
158 path: ResourcePathExpression,
159 *,
160 page_size: int = 2880 * 50,
161 partial: bool = False,
162 ) -> Iterator[Self]:
163 """Create an output archive that writes to the given file.
165 Parameters
166 ----------
167 path
168 File to read; convertible to `lsst.resources.ResourcePath`.
169 page_size
170 Minimum number of bytes to read at at once. Making this a multiple
171 of the FITS block size (2880) is recommended.
172 partial
173 Whether we will be reading only some of the archive, or if memory
174 pressure forces us to read it only a little at a time. If `False`
175 (default), the entire raw file may be read into memory up front.
177 Returns
178 -------
179 `contextlib.AbstractContextManager` [`FitsInputArchive`]
180 A context manager that returns a `FitsInputArchive` when entered.
181 """
182 path = ResourcePath(path)
183 stream: IO[bytes]
184 if not partial:
185 stream = io.BytesIO(path.read())
186 yield cls(stream)
187 else:
188 fs: fsspec.AbstractFileSystem
189 fs, fp = path.to_fsspec()
190 with fs.open(fp, block_size=page_size) as stream:
191 yield cls(stream)
193 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T:
194 """Read the JSON tree from the archive.
196 Parameters
197 ----------
198 model_type
199 A Pydantic model type to use to validate the JSON.
201 Returns
202 -------
203 T
204 The validated Pydantic model.
205 """
206 json_bytes = self._readers[ExtensionKey(JSON_EXTNAME)].data[0][JSON_COLUMN].tobytes()
207 return model_type.model_validate_json(json_bytes)
209 def deserialize_pointer[U: ArchiveTree, V](
210 self,
211 pointer: PointerModel,
212 model_type: type[U],
213 deserializer: Callable[[U, InputArchive[PointerModel]], V],
214 ) -> V:
215 # Docstring inherited.
216 if (cached := self._deserialized_pointer_cache.get(pointer.row)) is not None:
217 return cached
218 if not isinstance(pointer.column.data, ArrayReferenceModel):
219 raise ArchiveReadError(f"Invalid pointer with inline array:\n{pointer.model_dump_json(indent=2)}")
220 _, reader = self._get_source_reader(pointer.column.data.source, is_table=True)
221 try:
222 json_bytes = reader.data[pointer.row][JSON_COLUMN].tobytes()
223 except Exception as err:
224 raise InvalidFitsArchiveError(
225 f"Failed to access the table cell referenced by {pointer.model_dump_json()}."
226 ) from err
227 result = deserializer(model_type.model_validate_json(json_bytes), self)
228 self._deserialized_pointer_cache[pointer.row] = result
229 return result
231 def get_frame_set(self, ref: PointerModel) -> FrameSet:
232 try:
233 result = self._deserialized_pointer_cache[ref.row]
234 except KeyError:
235 raise AssertionError(
236 f"Frame set at {ref.model_dump_json(indent=2)} must be deserialized "
237 "before any dependent transform can be."
238 ) from None
239 if not isinstance(result, FrameSet):
240 raise InvalidFitsArchiveError(f"Expected a FrameSet instance at {ref.model_dump_json(indent=2)}.")
241 return result
243 def get_array(
244 self,
245 model: ArrayReferenceModel | InlineArrayModel,
246 *,
247 slices: tuple[slice, ...] | EllipsisType = ...,
248 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
249 ) -> np.ndarray:
250 if not isinstance(model, ArrayReferenceModel):
251 raise ArchiveReadError("Inline array found where a reference array was expected.")
252 key, reader = self._get_source_reader(model.source, is_table=False)
253 if slices is not ...:
254 array = reader.section[slices]
255 else:
256 array = reader.data
257 if key not in self._opaque_metadata.headers:
258 opaque_header = reader.header.copy(strip=True)
259 strip_header(opaque_header)
260 self._opaque_metadata.add_header(opaque_header, key=key)
261 return array
263 def get_table(
264 self,
265 model: TableModel,
266 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
267 ) -> astropy.table.Table:
268 # Docstring inherited.
269 array = self.get_structured_array(model, strip_header)
270 table = astropy.table.Table(array)
271 for c in model.columns:
272 c.update_table(table)
273 return table
275 def get_structured_array(
276 self,
277 model: TableModel,
278 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
279 ) -> np.ndarray:
280 # Docstring inherited.
281 if not isinstance(model.columns[0].data, ArrayReferenceModel):
282 raise ArchiveReadError("Inline array found where a reference array was expected.")
283 # All columns should have the same data.source; just use the first.
284 key, reader = self._get_source_reader(model.columns[0].data.source, is_table=True)
285 if key not in self._opaque_metadata.headers:
286 opaque_header = reader.header.copy(strip=True)
287 strip_header(opaque_header)
288 self._opaque_metadata.add_header(opaque_header, key=key)
289 return reader.hdu.data
291 def get_opaque_metadata(self) -> FitsOpaqueMetadata:
292 # Docstring inherited.
293 return self._opaque_metadata
295 def _get_source_reader(self, source: str | int, is_table: bool) -> tuple[ExtensionKey, _ExtensionReader]:
296 """Get a reader for the extension referenced by a serialiation model's
297 ``source`` field.
299 Parameters
300 ----------
301 source
302 A ``source`` field of the form ``fits:${hdu}`` or
303 ``fits:${hdu}[${col}]``.
304 is_table
305 Whether the source should be for a table HDU.
307 Returns
308 -------
309 key
310 Identifier pair for the HDU (EXTNAME, EXTVER).
311 reader
312 A reader object for the extension.
313 """
314 if not isinstance(source, str):
315 raise InvalidFitsArchiveError(f"Reference with source={source!r} is not a string.")
316 if not source.startswith("fits:"):
317 raise InvalidFitsArchiveError(f"Reference with source={source!r} does not start with 'fits:'.")
318 key = ExtensionKey.from_str(source)
319 try:
320 reader = self._readers[key]
321 except KeyError:
322 raise InvalidFitsArchiveError(f"Unrecognized source value {key}.") from None
323 if is_table and not reader.is_table:
324 raise InvalidFitsArchiveError(
325 f"Extension with source={key} was expected to be be a binary table, not an image."
326 )
327 elif not is_table and reader.is_table:
328 raise InvalidFitsArchiveError(
329 f"Extension with source={key} was expected to be be an image, not a binary table."
330 )
331 return key, reader
334class _ExtensionReader:
335 """A lazy-load reader for a single extension HDU.
337 Parameters
338 ----------
339 hdu_cls
340 The type of the astropy HDU instance to construct.
341 stream
342 The file-like object to read from.
343 """
345 def __init__(self, hdu_cls: type[ExtensionHDU], stream: IO[bytes]):
346 self._hdu_cls = hdu_cls
347 self._stream = stream
349 @classmethod
350 def from_index_row(cls, index_row: np.void, stream: IO[bytes]) -> _ExtensionReader:
351 """Construct from a row of the binary table index HDU.
353 Parameters
354 ----------
355 index_row
356 A record array row from the index HDU.
357 stream
358 The file-like object being used to read the full FITS file.
360 Returns
361 -------
362 reader
363 A reader object for the extension.
364 """
365 match index_row["XTENSION"].strip():
366 case "IMAGE":
367 hdu_cls = astropy.io.fits.ImageHDU
368 case "BINTABLE":
369 if index_row["ZIMAGE"]:
370 hdu_cls = astropy.io.fits.CompImageHDU
371 else:
372 hdu_cls = astropy.io.fits.BinTableHDU
373 case other:
374 raise AssertionError(f"Unsupported HDU type {other!r}.")
375 return _ExtensionReader(
376 hdu_cls,
377 _RangeStreamProxy(
378 stream,
379 start=int(index_row["HDRADDR"]),
380 ),
381 )
383 @classmethod
384 def from_bytes(cls, hdu_cls: type[ExtensionHDU], data: bytes) -> _ExtensionReader:
385 """Construct from already-read `bytes`
387 Parameters
388 ----------
389 hdu_cls
390 The HDU type to instantiate.
391 data
392 Raw data for the HDU.
394 Returns
395 -------
396 reader
397 A reader object for extension.
398 """
399 return _ExtensionReader(hdu_cls, io.BytesIO(data))
401 @property
402 def is_table(self) -> bool:
403 """Whether this is logically a table HDU.
405 This is `False` for compressed image HDUs, even though they are
406 represented in FITS as a binary table.
407 """
408 return issubclass(self._hdu_cls, astropy.io.fits.BinTableHDU) and not issubclass(
409 self._hdu_cls, astropy.io.fits.CompImageHDU
410 )
412 @cached_property
413 def hdu(self) -> ExtensionHDU:
414 """The Astropy HDU object."""
415 self._stream.seek(0)
416 if self._hdu_cls is astropy.io.fits.CompImageHDU:
417 # CompImageHDU.readfrom doesn't work; we need to make a minimal
418 # example and report it upstream. Happily this workaround does
419 # work.
420 bintable_hdu = astropy.io.fits.BinTableHDU.readfrom(self._stream, memmap=False, cache=False)
421 return self._hdu_cls(bintable=bintable_hdu)
422 else:
423 return self._hdu_cls.readfrom(self._stream, memmap=False, cache=False, uint=True)
425 @property
426 def header(self) -> astropy.io.fits.Header:
427 """The header of the HDU."""
428 return self.hdu.header
430 @property
431 def data(self) -> np.ndarray:
432 """The data for the HDU."""
433 return self.hdu.data
435 @property
436 def section(self) -> astropy.io.fits.Section | astropy.io.fits.CompImageSection:
437 """An Astropy expression object that reads a subset of the data when
438 sliced.
439 """
440 return self.hdu.section
443class _RangeStreamProxy(IO[bytes]):
444 """A readable IO proxy object that makes the beginning of the file appear
445 at a custom position.
447 Parameters
448 ----------
449 base
450 Underlying readable, seekable buffer to proxy.
451 start
452 Offset into the base stream that will be considered the start of the
453 proxy stream.
455 Notes
456 -----
457 This class exists because Astropy doesn't seem to provide a way to read a
458 single HDU that starts at the current seek position of a file-like object.
459 It does provide ``readfrom`` methods on its HDU objects that take a
460 file-like object, but these assume (possibly unintentionally; it only
461 happens when Astropy is trying to see whether the file was opened for
462 appending) that ``seek(0)`` will set the file-like object to the start of
463 the HDU.
464 """
466 def __init__(self, base: IO[bytes], start: int):
467 self._base = base
468 self._start = start
470 @property
471 def mode(self) -> str:
472 return "rb"
474 def __enter__(self) -> Self:
475 raise AssertionError("This proxy should not be used as a context manager.")
477 def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
478 raise AssertionError("This proxy should not be used as a context manager.")
480 def __iter__(self) -> Iterator[bytes]:
481 return self._base.__iter__()
483 def __next__(self) -> bytes:
484 return self._base.__next__()
486 def close(self) -> None:
487 raise AssertionError("This proxy should not ever be closed.")
489 @property
490 def closed(self) -> bool:
491 return False
493 def fileno(self) -> int:
494 raise OSError()
496 def flush(self) -> None:
497 pass
499 def isatty(self) -> bool:
500 return False
502 def read(self, n: int = -1, /) -> bytes:
503 result = self._base.read(n)
504 return result
506 def readable(self) -> bool:
507 return True
509 def readline(self, limit: int = -1, /) -> bytes:
510 return self._base.readline(limit)
512 def readlines(self, hint: int = -1, /) -> list[bytes]:
513 return self._base.readlines(hint)
515 def seek(self, offset: int, whence: int = 0) -> int:
516 match whence:
517 case os.SEEK_SET:
518 return self._base.seek(offset + self._start, os.SEEK_SET) - self._start
519 case os.SEEK_CUR:
520 return self._base.seek(offset, os.SEEK_CUR) - self._start
521 case os.SEEK_END:
522 return self._base.seek(offset, os.SEEK_END) - self._start
523 raise TypeError(f"Invalid value for 'whence': {whence}.")
525 def seekable(self) -> bool:
526 return True
528 def tell(self) -> int:
529 return self._base.tell() - self._start
531 def truncate(self, size: int | None = None, /) -> int:
532 raise OSError()
534 def writable(self) -> bool:
535 return False
537 def write(self, arg: Any, /) -> int:
538 raise OSError()
540 def writelines(self, arg: Any, /) -> None:
541 raise OSError()