Coverage for python/lsst/images/fits/_output_archive.py: 21%
211 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:43 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:43 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("FitsOutputArchive", "write")
16import dataclasses
17from collections import Counter
18from collections.abc import Callable, Hashable, Iterator, Mapping
19from contextlib import contextmanager
20from typing import Any, Self
22import astropy.io.fits
23import astropy.table
24import numpy as np
25import pydantic
27from .._transforms import FrameSet
28from ..serialization import (
29 ArchiveTree,
30 ArrayReferenceModel,
31 ButlerInfo,
32 MetadataValue,
33 NestedOutputArchive,
34 NumberType,
35 OutputArchive,
36 TableColumnModel,
37 TableModel,
38 no_header_updates,
39)
40from ._common import (
41 JSON_COLUMN,
42 JSON_EXTNAME,
43 ExtensionHDU,
44 ExtensionKey,
45 FitsCompressionOptions,
46 FitsOpaqueMetadata,
47 PointerModel,
48)
51def write(
52 obj: Any,
53 path: str,
54 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
55 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
56 compression_seed: int | None = None,
57 metadata: dict[str, MetadataValue] | None = None,
58 butler_info: ButlerInfo | None = None,
59) -> Any:
60 """Write an object with a ``serialize`` method to a FITS file.
62 Parameters
63 ----------
64 path
65 Name of the file to write to. Must not already exist.
66 compression_options
67 Options for how to compress the FITS file, keyed by the name of
68 the attribute (with JSON pointer ``/`` separators for nested
69 attributes).
70 update_header
71 A callback that will be given the primary HDU FITS header and an
72 opportunity to modify it.
73 compression_seed
74 A FITS tile compression seed to use whenever the configured
75 compression seed is `None` or (for backwards compatibility) ``0``.
76 This value is then incremented every time it is used.
77 metadata
78 Additional metadata to save with the object. This will override any
79 flexible metadata carried by the object itself with the same keys.
80 butler_info
81 Butler information to store in the file.
83 Returns
84 -------
85 `.serialization.ArchiveTree`
86 The serialized representation of the object.
87 """
88 opaque_metadata = getattr(obj, "_opaque_metadata", None)
89 name = getattr(obj, "_archive_default_name", None)
90 with FitsOutputArchive.open(
91 path,
92 compression_options=compression_options,
93 opaque_metadata=opaque_metadata,
94 update_header=update_header,
95 compression_seed=compression_seed,
96 ) as archive:
97 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive)
98 if metadata is not None:
99 tree.metadata.update(metadata)
100 if butler_info is not None:
101 tree.butler_info = butler_info
102 archive.add_tree(tree)
103 return tree
106class FitsOutputArchive(OutputArchive[PointerModel]):
107 """An implementation of the `.serialization.OutputArchive` interface that
108 writes to FITS files.
110 Instances of this class should only be constructed via the `open`
111 context manager.
112 """
114 def __init__(
115 self,
116 hdu_list: astropy.io.fits.HDUList,
117 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
118 opaque_metadata: Any = None,
119 compression_seed: int | None = None,
120 ):
121 # JSON blobs for objects we've saved as pointers:
122 self._pointer_targets: list[bytes] = []
123 # Mapping from user provided key (e.g. id(some object)) to a table
124 # pointer to where we actually saved it:
125 self._pointers_by_key: dict[Hashable, PointerModel] = {}
126 self._hdu_list = hdu_list
127 self._primary_hdu = astropy.io.fits.PrimaryHDU()
128 # TODO: add subformat description and version to primary HDU.
129 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.")
130 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.")
131 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.")
132 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.")
133 self._hdus_by_name = Counter[str]()
134 self._compression_options = dict(compression_options) if compression_options is not None else {}
135 self._compression_seed = compression_seed
136 self._opaque_metadata = (
137 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata()
138 )
139 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None:
140 self._primary_hdu.header.extend(opaque_primary_header)
141 self._hdu_list.append(self._primary_hdu)
142 self._json_hdu_added: bool = False
143 self._frame_sets: list[tuple[FrameSet, PointerModel]] = []
145 @classmethod
146 @contextmanager
147 def open(
148 cls,
149 filename: str,
150 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
151 opaque_metadata: Any = None,
152 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
153 compression_seed: int | None = None,
154 ) -> Iterator[Self]:
155 """Create an output archive that writes to the given file.
157 Parameters
158 ----------
159 filename
160 Name of the file to write to. Must not already exist.
161 compression_options
162 Options for how to compress the FITS file, keyed by the name of
163 the attribute (with JSON pointer ``/`` separators for nested
164 attributes).
165 opaque_metadata
166 Metadata read from an input archive along with the object being
167 written now. Ignored if the metadata is not from a FITS archive.
168 update_header
169 A callback that will be given the primary HDU FITS header and an
170 opportunity to modify it.
171 compression_seed
172 A FITS tile compression seed to use whenever the configured
173 compression seed is `None` or (for backwards compatibility) ``0``.
174 This value is then incremented every time it is used.
176 Returns
177 -------
178 `contextlib.AbstractContextManager` [`FitsOutputArchive`]
179 A context manager that returns a `FitsOutputArchive` when entered.
180 """
181 with astropy.io.fits.open(filename, mode="append") as hdu_list:
182 if hdu_list:
183 raise OSError(f"File {filename!r} already exists.")
184 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed)
185 update_header(hdu_list[0].header)
186 yield archive
187 if not archive._json_hdu_added:
188 raise RuntimeError("Write context exited without 'add_tree' being called.")
189 hdu_list.flush()
190 # This multi-open dance is necessary to get Astropy to tell us the
191 # byte addresses of the HDUs. Hopefully we can get an upstream change
192 # make this unnecessary at some point.
193 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list:
194 index_hdu = cls._make_index_table(hdu_list)
195 with astropy.io.fits.open(filename, mode="append") as hdu_list:
196 hdu_list.append(index_hdu)
197 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1])
198 index_bytes = _HDUBytes.from_write_hdu(index_hdu)
199 # Update the primary HDU with the address and size of the index and
200 # JSON HDUs, and rewrite just that. We do this write manually, since
201 # astropy's docs on its 'update' mode are scarce and it's not obvious
202 # whether we can guarantee it won't rewrite the whole file if we edit
203 # the primary header.
204 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address
205 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size
206 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address
207 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size
208 with open(filename, "r+b") as stream:
209 stream.write(archive._primary_hdu.header.tostring().encode())
211 def serialize_direct[T: pydantic.BaseModel](
212 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T]
213 ) -> T:
214 nested = NestedOutputArchive[PointerModel](name, self)
215 return serializer(nested)
217 def serialize_pointer[T: ArchiveTree](
218 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T], key: Hashable
219 ) -> PointerModel:
220 if (pointer := self._pointers_by_key.get(key)) is not None:
221 return pointer
222 model = self.serialize_direct("", serializer)
223 json_bytes = model.model_dump_json().encode()
224 self._pointer_targets.append(json_bytes)
225 pointer = PointerModel(
226 column=TableColumnModel(
227 name=JSON_COLUMN,
228 data=ArrayReferenceModel(
229 source=f"fits:{JSON_EXTNAME}[1]",
230 shape=[len(json_bytes)],
231 datatype=NumberType.uint8,
232 ),
233 ),
234 row=len(self._pointer_targets),
235 )
236 self._pointers_by_key[key] = pointer
237 return pointer
239 def serialize_frame_set[T: ArchiveTree](
240 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
241 ) -> PointerModel:
242 # Docstring inherited.
243 pointer = self.serialize_pointer(name, serializer, key)
244 self._frame_sets.append((frame_set, pointer))
245 return pointer
247 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, PointerModel]]:
248 return iter(self._frame_sets)
250 def add_array(
251 self,
252 array: np.ndarray,
253 *,
254 name: str | None = None,
255 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
256 ) -> ArrayReferenceModel:
257 if name is None:
258 raise RuntimeError("Cannot save array with name=None unless it is nested.")
259 extname = name.upper()
260 hdu = self._opaque_metadata.maybe_use_precompressed(extname)
261 if hdu is None:
262 if (compression_options := self._get_compression_options(name)) is not None:
263 hdu = compression_options.make_hdu(array, name=extname)
264 else:
265 hdu = astropy.io.fits.ImageHDU(array, name=extname)
266 key = self._add_hdu(hdu, update_header)
267 return ArrayReferenceModel(
268 source=str(key),
269 shape=list(array.shape),
270 datatype=NumberType.from_numpy(array.dtype),
271 )
273 def add_table(
274 self,
275 table: astropy.table.Table,
276 *,
277 name: str | None = None,
278 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
279 ) -> TableModel:
280 if name is None:
281 raise RuntimeError("Cannot save table with name=None unless it is nested.")
282 extname = name.upper()
283 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname)
284 # Extract column information directly from the input array, not the
285 # data in the binary table HDU, because we want to assume as little as
286 # possible about where Astropy does uint -> TZERO stuff.
287 columns = TableColumnModel.from_table(table)
288 key = self._add_hdu(hdu, update_header)
289 for n, c in enumerate(columns, start=1):
290 assert isinstance(c.data, ArrayReferenceModel)
291 c.data.source = f"{key}[{n}]"
292 return TableModel(columns=columns, meta=table.meta)
294 def add_structured_array(
295 self,
296 array: np.ndarray,
297 *,
298 name: str | None = None,
299 units: Mapping[str, astropy.units.Unit] | None = None,
300 descriptions: Mapping[str, str] | None = None,
301 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
302 ) -> TableModel:
303 if name is None:
304 raise RuntimeError("Cannot save structured array with name=None unless it is nested.")
305 extname = name.upper()
306 # Extract column information directly from the input array, not the
307 # data in the binary table HDU, because we want to assume as little as
308 # possible about where Astropy does uint -> TZERO stuff.
309 columns = TableColumnModel.from_record_dtype(array.dtype)
310 hdu = astropy.io.fits.BinTableHDU(array, name=extname)
311 if units is not None:
312 for c in columns:
313 c.unit = units.get(c.name)
314 if descriptions is not None:
315 for c in columns:
316 c.description = descriptions.get(c.name, "")
317 key = self._add_hdu(hdu, update_header)
318 for n, c in enumerate(columns, start=1):
319 assert isinstance(c.data, ArrayReferenceModel)
320 c.data.source = f"{key}[{n}]"
321 return TableModel(columns=columns)
323 def _add_hdu(
324 self,
325 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
326 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
327 ) -> ExtensionKey:
328 n_hdus = self._hdus_by_name.get(hdu.name, 0)
329 key = ExtensionKey(hdu.name, n_hdus + 1)
330 key.check()
331 if n_hdus:
332 hdu.header["EXTVER"] = key.ver
333 self._hdus_by_name[hdu.name] += 1
334 update_header(hdu.header)
335 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None:
336 hdu.header.extend(opaque_headers)
337 self._hdu_list.append(hdu)
338 return key
340 def add_tree(self, tree: ArchiveTree) -> None:
341 """Write the JSON tree to the archive.
343 This method must be called exactly once, just before the `open` context
344 is exited.
346 Parameters
347 ----------
348 tree
349 Pydantic model that represents the tree.
350 """
351 json_hdu = astropy.io.fits.BinTableHDU.from_columns(
352 [astropy.io.fits.Column(JSON_COLUMN, "PB")],
353 nrows=len(self._pointer_targets) + 1,
354 name=JSON_EXTNAME,
355 )
356 json_hdu.data[0][JSON_COLUMN] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte)
357 for n, json_target_data in enumerate(self._pointer_targets):
358 json_hdu.data[n + 1][JSON_COLUMN] = np.frombuffer(json_target_data, dtype=np.byte)
359 self._hdu_list.append(json_hdu)
360 self._json_hdu_added = True
362 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None:
363 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT)
364 if result is None or result.quantization is None:
365 return result
366 if self._compression_seed is not None and not result.quantization.seed:
367 result = result.model_copy(
368 update={
369 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed})
370 }
371 )
372 self._compression_seed += 1
373 if self._compression_seed > 10000:
374 self._compression_seed = 1
375 # MyPy can tell that result.quantization is not None in the 'if', but
376 # forgets that by this 'else':
377 elif result.quantization.seed is None: # type: ignore[union-attr]
378 raise RuntimeError("No quantization seed provided.")
379 return result
381 @staticmethod
382 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU:
383 # We use a fixed-length string for the EXTNAME column; it might be
384 # better to use a variable-length array, but I have not been able to
385 # figure out how to get astropy to accept a string for the the
386 # character (TFORM='A') variant of that. And that's only better if the
387 # EXTNAMEs get super long, which is not likely (but maybe something to
388 # guard against).
389 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list)
390 index_hdu = astropy.io.fits.BinTableHDU.from_columns(
391 [
392 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"),
393 astropy.io.fits.Column("EXTVER", "J"),
394 astropy.io.fits.Column("XTENSION", "A8"),
395 astropy.io.fits.Column("ZIMAGE", "L"),
396 ]
397 + _HDUBytes.get_index_hdu_columns(),
398 nrows=len(hdu_list),
399 name="INDEX",
400 )
401 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU
402 for n, hdu in enumerate(hdu_list):
403 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "")
404 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1)
405 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE")
406 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False)
407 bytes = _HDUBytes.from_read_hdu(hdu)
408 bytes.update_index_row(index_hdu.data[n])
409 return index_hdu
412@dataclasses.dataclass
413class _HDUBytes:
414 """A struct that records the byte offsets into a FITS HDU."""
416 @classmethod
417 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
418 """Construct from an Astropy HDU instance that has just been written.
420 Parameters
421 ----------
422 hdu
423 An Astropy HDU object.
425 Returns
426 -------
427 hdu_bytes
428 Struct with byte offsets.
430 Notes
431 -----
432 This method relies on internal Astropy attributes and does not work on
433 CompImageHDU objects.
434 """
435 # This is implemented by accessing private Astropy attributes because
436 # it turns out that's much more reliable than the public fileinfo()
437 # method, which seems to always return a dict with `None` entries or
438 # raise; it looks buggy, but docs are scarce enough that it's not clear
439 # what the right behavior is supposed to be.
440 if (header_address := getattr(hdu, "_header_offset", None)) is None:
441 raise RuntimeError("Failed to get Astropy's _header_offset.")
442 if (data_address := getattr(hdu, "_data_offset", None)) is None:
443 raise RuntimeError("Failed to get Astropy's _data_offset.")
444 if (data_size := getattr(hdu, "_data_size", None)) is None:
445 raise RuntimeError("Failed to get Astropy's _data_size.")
446 return cls(header_address, data_address, data_size)
448 @classmethod
449 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
450 """Construct from an Astropy HDU instance that has just been read.
452 Parameters
453 ----------
454 hdu
455 An Astropy HDU object.
457 Returns
458 -------
459 hdu_bytes
460 Struct with byte offsets.
461 """
462 info = hdu.fileinfo()
463 header_address = info["hdrLoc"]
464 data_address = info["datLoc"]
465 data_size = info["datSpan"]
466 return cls(header_address, data_address, data_size)
468 @classmethod
469 def from_index_row(cls, row: np.void) -> Self:
470 """Construct from a row of the index HDU.
472 Parameters
473 ----------
474 row
475 A Numpy struct-like scalar.
477 Returns
478 -------
479 hdu_bytes
480 Struct with byte offsets.
481 """
482 return cls(
483 header_address=int(row["HDRADDR"]),
484 data_address=int(row["DATADDR"]),
485 data_size=int(row["DATSIZE"]),
486 )
488 @staticmethod
489 def get_index_hdu_columns() -> list[astropy.io.fits.Column]:
490 """Return the definitions of the columns this class gets and sets
491 from the index HDU.
493 Returns
494 -------
495 columns
496 A `list` of `astropy.io.fits.Column` objects that represent the
497 header address, data address, and data size.
498 """
499 return [
500 astropy.io.fits.Column("HDRADDR", "K"),
501 astropy.io.fits.Column("DATADDR", "K"),
502 astropy.io.fits.Column("DATSIZE", "K"),
503 ]
505 header_address: int
506 """Offset from the beginning of the start of the file to the header of this
507 HDU, in bytes.
508 """
510 data_address: int
511 """Offset from the beginning of the start of the file to the data section
512 of this HDU, in bytes.
513 """
515 data_size: int
516 """Size of the data section in bytes."""
518 @property
519 def header_size(self) -> int:
520 """Size of the header in bytes."""
521 return self.data_address - self.header_address
523 @property
524 def end_address(self) -> int:
525 """Offset in bytes from the start of the file to the end of the HDU."""
526 return self.data_address + self.data_size
528 @property
529 def size(self) -> int:
530 """Total size of this HDU in bytes."""
531 return self.data_size + self.data_address - self.header_address
533 def update_index_row(self, row: np.void) -> None:
534 """Set the values of a row of the index HDU from this strut.
536 Parameters
537 ----------
538 row
539 A Numpy struct-like scalar to modify in place.
540 """
541 row["HDRADDR"] = self.header_address
542 row["DATADDR"] = self.data_address
543 row["DATSIZE"] = self.data_size