Coverage for python/lsst/images/formatters.py: 0%
103 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:08 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Unified butler formatter for lsst.images.
14This formatter dispatches on a write-time ``format`` parameter and on the
15file extension at read time, replacing the three per-format
16(`lsst.images.fits.formatters`, `lsst.images.json.formatters`,
17`lsst.images.ndf.formatters`) hierarchies that previously duplicated almost
18all of their logic.
19"""
21from __future__ import annotations
23__all__ = ("GenericFormatter",)
25import hashlib
26import json as _stdlib_json # disambiguates from .json subpackage
27from collections.abc import Callable, Iterator
28from contextlib import contextmanager
29from typing import Any, ClassVar
31import astropy.io.fits
33from lsst.daf.butler import DatasetProvenance, FormatterV2
34from lsst.resources import ResourcePath
36from . import fits as _fits
37from . import json as _json
38from .serialization import ArchiveTree, ButlerInfo, InputArchive, JsonRef
41class GenericFormatter(FormatterV2):
42 """Unified butler formatter for any lsst.images type.
44 The on-disk format is selected by the ``format`` write parameter
45 (``fits``, ``json``, ``sdf``) at write time and by the file
46 extension at read time. The default format is taken from
47 ``self.default_extension`` (``.fits`` for the base class).
49 Notes
50 -----
51 Subclasses (`ImageFormatter` and below) add component-level read
52 support. This base class forwards any read parameters straight to
53 the underlying ``read`` function.
54 """
56 default_extension: ClassVar[str] = ".fits"
57 supported_extensions: ClassVar[frozenset[str]] = frozenset({".fits", ".sdf", ".json"})
58 supported_write_parameters: ClassVar[frozenset[str]] = frozenset({"format", "recipe"})
59 can_read_from_uri: ClassVar[bool] = True
61 butler_provenance: DatasetProvenance | None = None
63 # --- Write parameter handling -------------------------------------------
65 def get_write_extension(self) -> str:
66 default_fmt = self.default_extension.lstrip(".")
67 fmt = self.write_parameters.get("format", default_fmt)
68 ext = "." + fmt
69 if ext not in self.supported_extensions:
70 raise RuntimeError(
71 f"Requested format {fmt!r} is not supported; expected one of {{fits, json, sdf}}."
72 )
73 return ext
75 def _validate_write_parameters(self) -> None:
76 ext = self.get_write_extension()
77 if ext != ".fits" and "recipe" in self.write_parameters:
78 raise RuntimeError("The 'recipe' write parameter is only valid for FITS output.")
80 # --- Write path ---------------------------------------------------------
82 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None:
83 self._validate_write_parameters()
84 ext = self.get_write_extension()
85 butler_info = ButlerInfo(
86 dataset=self.dataset_ref.to_simple(),
87 provenance=self.butler_provenance if self.butler_provenance is not None else DatasetProvenance(),
88 )
89 kwargs: dict[str, Any] = {"butler_info": butler_info}
90 write_func: Callable[..., ArchiveTree]
91 match ext:
92 case ".fits":
93 kwargs["update_header"] = self._update_header
94 kwargs["compression_options"] = self._get_compression_options()
95 kwargs["compression_seed"] = self._get_compression_seed()
96 write_func = _fits.write
97 case ".json":
98 write_func = _json.write
99 case ".sdf":
100 from . import ndf as _ndf
102 write_func = _ndf.write
103 write_func(in_memory_dataset, uri.ospath, **kwargs)
105 def add_provenance(
106 self,
107 in_memory_dataset: Any,
108 /,
109 *,
110 provenance: DatasetProvenance | None = None,
111 ) -> Any:
112 # A FormatterV2 instance is used once; stash provenance on self
113 # rather than mutating the dataset.
114 self.butler_provenance = provenance
115 return in_memory_dataset
117 # --- FITS-specific helpers (kept verbatim from fits/formatters.py) ----
119 def _get_compression_seed(self) -> int:
120 # Set the seed based on data ID (all logic here duplicated from
121 # obs_base). We can't just use 'hash', since like 'set' that's not
122 # deterministic. And we can't rely on a DimensionPacker because those
123 # are only defined for certain combinations of dimensions. Doing an MD5
124 # of the JSON feels like overkill but I don't really see anything much
125 # simpler.
126 hash_bytes = hashlib.md5(
127 _stdlib_json.dumps(list(self.data_id.required_values)).encode(),
128 usedforsecurity=False,
129 ).digest()
130 # And it *really* feels like overkill when we squash that into the [1,
131 # 10000] range allowed by FITS.
132 return 1 + int.from_bytes(hash_bytes) % 9999
134 def _get_compression_options(self) -> dict[str, _fits.FitsCompressionOptions]:
135 recipe = self.write_parameters.get("recipe", "default")
136 try:
137 config = self.write_recipes[recipe]
138 except KeyError:
139 if recipe == "default":
140 # If there's no default recipe just use the software defaults.
141 return {}
142 raise RuntimeError(f"Invalid recipe for GenericFormatter: {recipe!r}.") from None
143 return {k: _fits.FitsCompressionOptions.model_validate(v) for k, v in config.items()}
145 def _update_header(self, header: astropy.io.fits.Header) -> None:
146 # Logic here largely lifted from lsst.obs.base.utils, which we
147 # can't use directly for dependency and maybe mapping-type
148 # (PropertyList vs. astropy) reasons. We assume we can always add
149 # long cards (astropy will CONTINUE them) but not comments
150 # (astropy will truncate and warn on long cards).
151 for key in list(header):
152 if key.startswith("LSST BUTLER"):
153 del header[key]
154 if self.butler_provenance is not None:
155 for key, value in self.butler_provenance.to_flat_dict(
156 self.dataset_ref,
157 prefix="HIERARCH LSST BUTLER",
158 sep=" ",
159 simple_types=True,
160 max_inputs=3_000,
161 ).items():
162 header.set(key, value)
164 # --- Read path ---------------------------------------------------------
166 def _extension_from_uri(self, uri: ResourcePath) -> str:
167 ext = uri.getExtension()
168 if ext not in self.supported_extensions:
169 raise RuntimeError(f"Cannot read {uri}: unsupported extension {ext!r}.")
170 return ext
172 @contextmanager
173 def _open_archive_and_tree(
174 self, uri: ResourcePath, partial: bool
175 ) -> Iterator[tuple[InputArchive[Any], ArchiveTree]]:
176 pytype: type[Any] = self.dataset_ref.datasetType.storageClass.pytype
177 ext = self._extension_from_uri(uri)
178 archive: InputArchive[Any]
179 match ext:
180 case ".fits":
181 tree_type = pytype._get_archive_tree_type(_fits.PointerModel)
182 with _fits.FitsInputArchive.open(uri, partial=partial) as archive:
183 tree = archive.get_tree(tree_type)
184 yield archive, tree
185 case ".json":
186 tree_type = pytype._get_archive_tree_type(JsonRef)
187 tree = tree_type.model_validate_json(ResourcePath(uri).read())
188 archive = _json.JsonInputArchive(tree.indirect)
189 yield archive, tree
190 case ".sdf":
191 from . import ndf as _ndf
193 tree_type = pytype._get_archive_tree_type(_ndf.NdfPointerModel)
194 with _ndf.NdfInputArchive.open(uri) as archive:
195 tree = archive.get_tree(tree_type)
196 yield archive, tree
198 def read_from_uri(
199 self,
200 uri: ResourcePath,
201 component: str | None = None,
202 expected_size: int = -1,
203 ) -> Any:
204 kwargs = self.file_descriptor.parameters or {}
205 with self._open_archive_and_tree(uri, partial=bool(kwargs or component)) as (archive, tree):
206 if component is None:
207 result = tree.deserialize(archive, **kwargs)
208 result._opaque_metadata = archive.get_opaque_metadata()
209 return result
210 else:
211 return tree.deserialize_component(component, archive, **kwargs)