Coverage for python/lsst/images/fits/_output_archive.py: 21%
215 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:10 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("FitsOutputArchive", "write")
16import dataclasses
17from collections import Counter
18from collections.abc import Callable, Hashable, Iterator, Mapping
19from contextlib import contextmanager
20from typing import Any, Self
22import astropy.io.fits
23import astropy.table
24import numpy as np
25import pydantic
27from .._transforms import FrameSet
28from ..serialization import (
29 ArchiveTree,
30 ArrayReferenceModel,
31 ButlerInfo,
32 MetadataValue,
33 NestedOutputArchive,
34 NumberType,
35 OutputArchive,
36 TableColumnModel,
37 TableModel,
38 no_header_updates,
39)
40from ._common import (
41 JSON_COLUMN,
42 JSON_EXTNAME,
43 ExtensionHDU,
44 ExtensionKey,
45 FitsCompressionOptions,
46 FitsOpaqueMetadata,
47 PointerModel,
48)
50_FITS_FORMAT_VERSION = 1
51"""Container layout version for files written by `FitsOutputArchive`.
53Bumps when the on-disk FITS layout (HDU placement, INDX/JSON keyword schema)
54changes. Independent of any data-model ``SCHEMA_VERSION``.
55"""
58def write(
59 obj: Any,
60 path: str,
61 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
62 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
63 compression_seed: int | None = None,
64 metadata: dict[str, MetadataValue] | None = None,
65 butler_info: ButlerInfo | None = None,
66) -> Any:
67 """Write an object with a ``serialize`` method to a FITS file.
69 Parameters
70 ----------
71 path
72 Name of the file to write to. Must not already exist.
73 compression_options
74 Options for how to compress the FITS file, keyed by the name of
75 the attribute (with JSON pointer ``/`` separators for nested
76 attributes).
77 update_header
78 A callback that will be given the primary HDU FITS header and an
79 opportunity to modify it.
80 compression_seed
81 A FITS tile compression seed to use whenever the configured
82 compression seed is `None` or (for backwards compatibility) ``0``.
83 This value is then incremented every time it is used.
84 metadata
85 Additional metadata to save with the object. This will override any
86 flexible metadata carried by the object itself with the same keys.
87 butler_info
88 Butler information to store in the file.
90 Returns
91 -------
92 `.serialization.ArchiveTree`
93 The serialized representation of the object.
94 """
95 opaque_metadata = getattr(obj, "_opaque_metadata", None)
96 name = getattr(obj, "_archive_default_name", None)
97 with FitsOutputArchive.open(
98 path,
99 compression_options=compression_options,
100 opaque_metadata=opaque_metadata,
101 update_header=update_header,
102 compression_seed=compression_seed,
103 ) as archive:
104 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive)
105 if metadata is not None:
106 tree.metadata.update(metadata)
107 if butler_info is not None:
108 tree.butler_info = butler_info
109 archive.add_tree(tree)
110 return tree
113class FitsOutputArchive(OutputArchive[PointerModel]):
114 """An implementation of the `.serialization.OutputArchive` interface that
115 writes to FITS files.
117 Instances of this class should only be constructed via the `open`
118 context manager.
119 """
121 def __init__(
122 self,
123 hdu_list: astropy.io.fits.HDUList,
124 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
125 opaque_metadata: Any = None,
126 compression_seed: int | None = None,
127 ):
128 # JSON blobs for objects we've saved as pointers:
129 self._pointer_targets: list[bytes] = []
130 # Mapping from user provided key (e.g. id(some object)) to a table
131 # pointer to where we actually saved it:
132 self._pointers_by_key: dict[Hashable, PointerModel] = {}
133 self._hdu_list = hdu_list
134 self._primary_hdu = astropy.io.fits.PrimaryHDU()
135 self._primary_hdu.header.set("FMTVER", _FITS_FORMAT_VERSION, "FITS container layout version.")
136 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.")
137 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.")
138 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.")
139 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.")
140 self._hdus_by_name = Counter[str]()
141 self._compression_options = dict(compression_options) if compression_options is not None else {}
142 self._compression_seed = compression_seed
143 self._opaque_metadata = (
144 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata()
145 )
146 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None:
147 self._primary_hdu.header.extend(opaque_primary_header)
148 self._hdu_list.append(self._primary_hdu)
149 self._json_hdu_added: bool = False
150 self._frame_sets: list[tuple[FrameSet, PointerModel]] = []
152 @classmethod
153 @contextmanager
154 def open(
155 cls,
156 filename: str,
157 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
158 opaque_metadata: Any = None,
159 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
160 compression_seed: int | None = None,
161 ) -> Iterator[Self]:
162 """Create an output archive that writes to the given file.
164 Parameters
165 ----------
166 filename
167 Name of the file to write to. Must not already exist.
168 compression_options
169 Options for how to compress the FITS file, keyed by the name of
170 the attribute (with JSON pointer ``/`` separators for nested
171 attributes).
172 opaque_metadata
173 Metadata read from an input archive along with the object being
174 written now. Ignored if the metadata is not from a FITS archive.
175 update_header
176 A callback that will be given the primary HDU FITS header and an
177 opportunity to modify it.
178 compression_seed
179 A FITS tile compression seed to use whenever the configured
180 compression seed is `None` or (for backwards compatibility) ``0``.
181 This value is then incremented every time it is used.
183 Returns
184 -------
185 `contextlib.AbstractContextManager` [`FitsOutputArchive`]
186 A context manager that returns a `FitsOutputArchive` when entered.
187 """
188 with astropy.io.fits.open(filename, mode="append") as hdu_list:
189 if hdu_list:
190 raise OSError(f"File {filename!r} already exists.")
191 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed)
192 update_header(hdu_list[0].header)
193 yield archive
194 if not archive._json_hdu_added:
195 raise RuntimeError("Write context exited without 'add_tree' being called.")
196 hdu_list.flush()
197 # This multi-open dance is necessary to get Astropy to tell us the
198 # byte addresses of the HDUs. Hopefully we can get an upstream change
199 # make this unnecessary at some point.
200 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list:
201 index_hdu = cls._make_index_table(hdu_list)
202 with astropy.io.fits.open(filename, mode="append") as hdu_list:
203 hdu_list.append(index_hdu)
204 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1])
205 index_bytes = _HDUBytes.from_write_hdu(index_hdu)
206 # Update the primary HDU with the address and size of the index and
207 # JSON HDUs, and rewrite just that. We do this write manually, since
208 # astropy's docs on its 'update' mode are scarce and it's not obvious
209 # whether we can guarantee it won't rewrite the whole file if we edit
210 # the primary header.
211 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address
212 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size
213 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address
214 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size
215 with open(filename, "r+b") as stream:
216 stream.write(archive._primary_hdu.header.tostring().encode())
218 def serialize_direct[T: pydantic.BaseModel](
219 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T]
220 ) -> T:
221 nested = NestedOutputArchive[PointerModel](name, self)
222 return serializer(nested)
224 def serialize_pointer[T: ArchiveTree](
225 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T], key: Hashable
226 ) -> PointerModel:
227 if (pointer := self._pointers_by_key.get(key)) is not None:
228 return pointer
229 model = self.serialize_direct("", serializer)
230 json_bytes = model.model_dump_json().encode()
231 self._pointer_targets.append(json_bytes)
232 pointer = PointerModel(
233 column=TableColumnModel(
234 name=JSON_COLUMN,
235 data=ArrayReferenceModel(
236 source=f"fits:{JSON_EXTNAME}[1]",
237 shape=[len(json_bytes)],
238 datatype=NumberType.uint8,
239 ),
240 ),
241 row=len(self._pointer_targets),
242 )
243 self._pointers_by_key[key] = pointer
244 return pointer
246 def serialize_frame_set[T: ArchiveTree](
247 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
248 ) -> PointerModel:
249 # Docstring inherited.
250 pointer = self.serialize_pointer(name, serializer, key)
251 self._frame_sets.append((frame_set, pointer))
252 return pointer
254 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, PointerModel]]:
255 return iter(self._frame_sets)
257 def add_array(
258 self,
259 array: np.ndarray,
260 *,
261 name: str | None = None,
262 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
263 ) -> ArrayReferenceModel:
264 if name is None:
265 raise RuntimeError("Cannot save array with name=None unless it is nested.")
266 extname = name.upper()
267 hdu = self._opaque_metadata.maybe_use_precompressed(extname)
268 if hdu is None:
269 if (compression_options := self._get_compression_options(name)) is not None:
270 hdu = compression_options.make_hdu(array, name=extname)
271 else:
272 hdu = astropy.io.fits.ImageHDU(array, name=extname)
273 key = self._add_hdu(hdu, update_header)
274 return ArrayReferenceModel(
275 source=str(key),
276 shape=list(array.shape),
277 datatype=NumberType.from_numpy(array.dtype),
278 )
280 def add_table(
281 self,
282 table: astropy.table.Table,
283 *,
284 name: str | None = None,
285 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
286 ) -> TableModel:
287 if name is None:
288 raise RuntimeError("Cannot save table with name=None unless it is nested.")
289 extname = name.upper()
290 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname)
291 # Extract column information directly from the input array, not the
292 # data in the binary table HDU, because we want to assume as little as
293 # possible about where Astropy does uint -> TZERO stuff.
294 columns = TableColumnModel.from_table(table)
295 key = self._add_hdu(hdu, update_header)
296 for n, c in enumerate(columns, start=1):
297 assert isinstance(c.data, ArrayReferenceModel)
298 c.data.source = f"{key}[{n}]"
299 return TableModel(columns=columns, meta=table.meta)
301 def add_structured_array(
302 self,
303 array: np.ndarray,
304 *,
305 name: str | None = None,
306 units: Mapping[str, astropy.units.Unit] | None = None,
307 descriptions: Mapping[str, str] | None = None,
308 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
309 ) -> TableModel:
310 if name is None:
311 raise RuntimeError("Cannot save structured array with name=None unless it is nested.")
312 extname = name.upper()
313 # Extract column information directly from the input array, not the
314 # data in the binary table HDU, because we want to assume as little as
315 # possible about where Astropy does uint -> TZERO stuff.
316 columns = TableColumnModel.from_record_dtype(array.dtype)
317 hdu = astropy.io.fits.BinTableHDU(array, name=extname)
318 if units is not None:
319 for c in columns:
320 c.unit = units.get(c.name)
321 if descriptions is not None:
322 for c in columns:
323 c.description = descriptions.get(c.name, "")
324 key = self._add_hdu(hdu, update_header)
325 for n, c in enumerate(columns, start=1):
326 assert isinstance(c.data, ArrayReferenceModel)
327 c.data.source = f"{key}[{n}]"
328 return TableModel(columns=columns)
330 def _add_hdu(
331 self,
332 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
333 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
334 ) -> ExtensionKey:
335 n_hdus = self._hdus_by_name.get(hdu.name, 0)
336 key = ExtensionKey(hdu.name, n_hdus + 1)
337 key.check()
338 if n_hdus:
339 hdu.header["EXTVER"] = key.ver
340 self._hdus_by_name[hdu.name] += 1
341 update_header(hdu.header)
342 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None:
343 hdu.header.extend(opaque_headers)
344 self._hdu_list.append(hdu)
345 return key
347 def add_tree(self, tree: ArchiveTree) -> None:
348 """Write the JSON tree to the archive.
350 This method must be called exactly once, just before the `open` context
351 is exited.
353 Parameters
354 ----------
355 tree
356 Pydantic model that represents the tree.
357 """
358 self._primary_hdu.header.set("DATAMODL", tree.schema_url, "Schema URL.")
359 json_hdu = astropy.io.fits.BinTableHDU.from_columns(
360 [astropy.io.fits.Column(JSON_COLUMN, "PB")],
361 nrows=len(self._pointer_targets) + 1,
362 name=JSON_EXTNAME,
363 )
364 json_hdu.data[0][JSON_COLUMN] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte)
365 for n, json_target_data in enumerate(self._pointer_targets):
366 json_hdu.data[n + 1][JSON_COLUMN] = np.frombuffer(json_target_data, dtype=np.byte)
367 self._hdu_list.append(json_hdu)
368 self._json_hdu_added = True
370 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None:
371 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT)
372 if result is None or result.quantization is None:
373 return result
374 if self._compression_seed is not None and not result.quantization.seed:
375 result = result.model_copy(
376 update={
377 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed})
378 }
379 )
380 self._compression_seed += 1
381 if self._compression_seed > 10000:
382 self._compression_seed = 1
383 # MyPy can tell that result.quantization is not None in the 'if', but
384 # forgets that by this 'else':
385 elif result.quantization.seed is None: # type: ignore[union-attr]
386 raise RuntimeError("No quantization seed provided.")
387 return result
389 @staticmethod
390 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU:
391 # We use a fixed-length string for the EXTNAME column; it might be
392 # better to use a variable-length array, but I have not been able to
393 # figure out how to get astropy to accept a string for the the
394 # character (TFORM='A') variant of that. And that's only better if the
395 # EXTNAMEs get super long, which is not likely (but maybe something to
396 # guard against).
397 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list)
398 index_hdu = astropy.io.fits.BinTableHDU.from_columns(
399 [
400 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"),
401 astropy.io.fits.Column("EXTVER", "J"),
402 astropy.io.fits.Column("XTENSION", "A8"),
403 astropy.io.fits.Column("ZIMAGE", "L"),
404 ]
405 + _HDUBytes.get_index_hdu_columns(),
406 nrows=len(hdu_list),
407 name="INDEX",
408 )
409 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU
410 for n, hdu in enumerate(hdu_list):
411 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "")
412 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1)
413 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE")
414 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False)
415 bytes = _HDUBytes.from_read_hdu(hdu)
416 bytes.update_index_row(index_hdu.data[n])
417 return index_hdu
420@dataclasses.dataclass
421class _HDUBytes:
422 """A struct that records the byte offsets into a FITS HDU."""
424 @classmethod
425 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
426 """Construct from an Astropy HDU instance that has just been written.
428 Parameters
429 ----------
430 hdu
431 An Astropy HDU object.
433 Returns
434 -------
435 hdu_bytes
436 Struct with byte offsets.
438 Notes
439 -----
440 This method relies on internal Astropy attributes and does not work on
441 CompImageHDU objects.
442 """
443 # This is implemented by accessing private Astropy attributes because
444 # it turns out that's much more reliable than the public fileinfo()
445 # method, which seems to always return a dict with `None` entries or
446 # raise; it looks buggy, but docs are scarce enough that it's not clear
447 # what the right behavior is supposed to be.
448 if (header_address := getattr(hdu, "_header_offset", None)) is None:
449 raise RuntimeError("Failed to get Astropy's _header_offset.")
450 if (data_address := getattr(hdu, "_data_offset", None)) is None:
451 raise RuntimeError("Failed to get Astropy's _data_offset.")
452 if (data_size := getattr(hdu, "_data_size", None)) is None:
453 raise RuntimeError("Failed to get Astropy's _data_size.")
454 return cls(header_address, data_address, data_size)
456 @classmethod
457 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
458 """Construct from an Astropy HDU instance that has just been read.
460 Parameters
461 ----------
462 hdu
463 An Astropy HDU object.
465 Returns
466 -------
467 hdu_bytes
468 Struct with byte offsets.
469 """
470 info = hdu.fileinfo()
471 header_address = info["hdrLoc"]
472 data_address = info["datLoc"]
473 data_size = info["datSpan"]
474 return cls(header_address, data_address, data_size)
476 @classmethod
477 def from_index_row(cls, row: np.void) -> Self:
478 """Construct from a row of the index HDU.
480 Parameters
481 ----------
482 row
483 A Numpy struct-like scalar.
485 Returns
486 -------
487 hdu_bytes
488 Struct with byte offsets.
489 """
490 return cls(
491 header_address=int(row["HDRADDR"]),
492 data_address=int(row["DATADDR"]),
493 data_size=int(row["DATSIZE"]),
494 )
496 @staticmethod
497 def get_index_hdu_columns() -> list[astropy.io.fits.Column]:
498 """Return the definitions of the columns this class gets and sets
499 from the index HDU.
501 Returns
502 -------
503 columns
504 A `list` of `astropy.io.fits.Column` objects that represent the
505 header address, data address, and data size.
506 """
507 return [
508 astropy.io.fits.Column("HDRADDR", "K"),
509 astropy.io.fits.Column("DATADDR", "K"),
510 astropy.io.fits.Column("DATSIZE", "K"),
511 ]
513 header_address: int
514 """Offset from the beginning of the start of the file to the header of this
515 HDU, in bytes.
516 """
518 data_address: int
519 """Offset from the beginning of the start of the file to the data section
520 of this HDU, in bytes.
521 """
523 data_size: int
524 """Size of the data section in bytes."""
526 @property
527 def header_size(self) -> int:
528 """Size of the header in bytes."""
529 return self.data_address - self.header_address
531 @property
532 def end_address(self) -> int:
533 """Offset in bytes from the start of the file to the end of the HDU."""
534 return self.data_address + self.data_size
536 @property
537 def size(self) -> int:
538 """Total size of this HDU in bytes."""
539 return self.data_size + self.data_address - self.header_address
541 def update_index_row(self, row: np.void) -> None:
542 """Set the values of a row of the index HDU from this strut.
544 Parameters
545 ----------
546 row
547 A Numpy struct-like scalar to modify in place.
548 """
549 row["HDRADDR"] = self.header_address
550 row["DATADDR"] = self.data_address
551 row["DATSIZE"] = self.data_size