Coverage for python / lsst / images / formatters.py: 24%
217 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 07:54 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 07:54 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Unified butler formatter for lsst.images.
14This formatter dispatches on a write-time ``format`` parameter and on the
15file extension at read time, replacing the three per-format
16(`lsst.images.fits.formatters`, `lsst.images.json.formatters`,
17`lsst.images.ndf.formatters`) hierarchies that previously duplicated almost
18all of their logic.
19"""
21from __future__ import annotations
23__all__ = (
24 "CellCoaddFormatter",
25 "ComponentSentinel",
26 "GenericFormatter",
27 "ImageFormatter",
28 "MaskedImageFormatter",
29 "VisitImageFormatter",
30)
32import enum
33import hashlib
34import json as _stdlib_json # disambiguates from .json subpackage
35from collections.abc import Callable
36from dataclasses import dataclass
37from typing import Any, ClassVar
39import astropy.io.fits
40from astro_metadata_translator import ObservationInfo
42from lsst.daf.butler import DatasetProvenance, FormatterV2
43from lsst.resources import ResourcePath
45from . import fits as _fits
46from . import json as _json
47from ._geom import Box
48from ._masked_image import MaskedImageSerializationModel
49from ._transforms import ProjectionSerializationModel
50from ._visit_image import VisitImageSerializationModel
51from .fits._common import FitsCompressionOptions
52from .fits._common import PointerModel as _FitsPointerModel
53from .fits._input_archive import FitsInputArchive as _FitsInputArchive
54from .serialization import ButlerInfo
56try:
57 from . import ndf as _ndf
58 from .ndf._common import NdfPointerModel as _NdfPointerModel
59 from .ndf._input_archive import NdfInputArchive as _NdfInputArchive
61 _HAVE_NDF = True
62except ImportError: # h5py is optional; see ndf/__init__.py
63 _ndf = None # type: ignore[assignment]
64 _NdfPointerModel = None # type: ignore[assignment,misc]
65 _NdfInputArchive = None # type: ignore[assignment,misc]
66 _HAVE_NDF = False
69@dataclass(frozen=True)
70class _Backend:
71 """One row of the extension-to-backend lookup table."""
73 read: Callable[..., Any]
74 write: Callable[..., Any]
75 input_archive: type | None
76 pointer_model: type | None
79_BACKENDS: dict[str, _Backend] = {
80 ".fits": _Backend(
81 read=_fits.read,
82 write=_fits.write,
83 input_archive=_FitsInputArchive,
84 pointer_model=_FitsPointerModel,
85 ),
86 ".json": _Backend(
87 read=_json.read,
88 write=_json.write,
89 input_archive=None,
90 pointer_model=None,
91 ),
92}
93if _HAVE_NDF: 93 ↛ 102line 93 didn't jump to line 102 because the condition on line 93 was always true
94 _BACKENDS[".sdf"] = _Backend(
95 read=_ndf.read,
96 write=_ndf.write,
97 input_archive=_NdfInputArchive,
98 pointer_model=_NdfPointerModel,
99 )
102class GenericFormatter(FormatterV2):
103 """Unified butler formatter for any lsst.images type.
105 The on-disk format is selected by the ``format`` write parameter
106 (``fits``, ``json``, ``sdf``) at write time and by the file
107 extension at read time. The default format is taken from
108 ``self.default_extension`` (``.fits`` for the base class).
110 Notes
111 -----
112 Subclasses (`ImageFormatter` and below) add component-level read
113 support. This base class forwards any read parameters straight to
114 the underlying ``read`` function.
115 """
117 default_extension: ClassVar[str] = ".fits"
118 supported_extensions: ClassVar[frozenset[str]] = frozenset({".fits", ".sdf", ".json"})
119 supported_write_parameters: ClassVar[frozenset[str]] = frozenset({"format", "recipe"})
120 can_read_from_uri: ClassVar[bool] = True
122 butler_provenance: DatasetProvenance | None = None
124 # --- Write parameter handling -------------------------------------------
126 def get_write_extension(self) -> str:
127 default_fmt = self.default_extension.lstrip(".")
128 fmt = self.write_parameters.get("format", default_fmt)
129 ext = "." + fmt
130 if ext not in self.supported_extensions:
131 raise RuntimeError(
132 f"Requested format {fmt!r} is not supported; expected one of {{fits, json, sdf}}."
133 )
134 return ext
136 def _validate_write_parameters(self) -> None:
137 ext = self.get_write_extension()
138 if ext != ".fits" and "recipe" in self.write_parameters:
139 raise RuntimeError("The 'recipe' write parameter is only valid for FITS output.")
141 # --- Write path ---------------------------------------------------------
143 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None:
144 self._validate_write_parameters()
145 ext = self.get_write_extension()
146 backend = _BACKENDS[ext]
147 butler_info = ButlerInfo(
148 dataset=self.dataset_ref.to_simple(),
149 provenance=self.butler_provenance if self.butler_provenance is not None else DatasetProvenance(),
150 )
151 kwargs: dict[str, Any] = {"butler_info": butler_info}
152 if ext == ".fits":
153 kwargs["update_header"] = self._update_header
154 kwargs["compression_options"] = self._get_compression_options()
155 kwargs["compression_seed"] = self._get_compression_seed()
156 backend.write(in_memory_dataset, uri.ospath, **kwargs)
158 def add_provenance(
159 self,
160 in_memory_dataset: Any,
161 /,
162 *,
163 provenance: DatasetProvenance | None = None,
164 ) -> Any:
165 # A FormatterV2 instance is used once; stash provenance on self
166 # rather than mutating the dataset.
167 self.butler_provenance = provenance
168 return in_memory_dataset
170 # --- FITS-specific helpers (kept verbatim from fits/formatters.py) ----
172 def _get_compression_seed(self) -> int:
173 # Set the seed based on data ID (all logic here duplicated from
174 # obs_base). We can't just use 'hash', since like 'set' that's not
175 # deterministic. And we can't rely on a DimensionPacker because those
176 # are only defined for certain combinations of dimensions. Doing an MD5
177 # of the JSON feels like overkill but I don't really see anything much
178 # simpler.
179 hash_bytes = hashlib.md5(
180 _stdlib_json.dumps(list(self.data_id.required_values)).encode(),
181 usedforsecurity=False,
182 ).digest()
183 # And it *really* feels like overkill when we squash that into the [1,
184 # 10000] range allowed by FITS.
185 return 1 + int.from_bytes(hash_bytes) % 9999
187 def _get_compression_options(self) -> dict[str, FitsCompressionOptions]:
188 recipe = self.write_parameters.get("recipe", "default")
189 try:
190 config = self.write_recipes[recipe]
191 except KeyError:
192 if recipe == "default":
193 # If there's no default recipe just use the software defaults.
194 return {}
195 raise RuntimeError(f"Invalid recipe for GenericFormatter: {recipe!r}.") from None
196 return {k: FitsCompressionOptions.model_validate(v) for k, v in config.items()}
198 def _update_header(self, header: astropy.io.fits.Header) -> None:
199 # Logic here largely lifted from lsst.obs.base.utils, which we
200 # can't use directly for dependency and maybe mapping-type
201 # (PropertyList vs. astropy) reasons. We assume we can always add
202 # long cards (astropy will CONTINUE them) but not comments
203 # (astropy will truncate and warn on long cards).
204 for key in list(header):
205 if key.startswith("LSST BUTLER"):
206 del header[key]
207 if self.butler_provenance is not None:
208 for key, value in self.butler_provenance.to_flat_dict(
209 self.dataset_ref,
210 prefix="HIERARCH LSST BUTLER",
211 sep=" ",
212 simple_types=True,
213 max_inputs=3_000,
214 ).items():
215 header.set(key, value)
217 # --- Read path ---------------------------------------------------------
219 def _extension_from_uri(self, uri: ResourcePath) -> str:
220 ext = uri.getExtension()
221 if ext not in self.supported_extensions:
222 raise RuntimeError(f"Cannot read {uri}: unsupported extension {ext!r}.")
223 return ext
225 def read_from_uri(
226 self,
227 uri: ResourcePath,
228 component: str | None = None,
229 expected_size: int = -1,
230 ) -> Any:
231 pytype = self.dataset_ref.datasetType.storageClass.pytype
232 ext = self._extension_from_uri(uri)
233 backend = _BACKENDS[ext]
234 kwargs = self.file_descriptor.parameters or {}
235 return backend.read(pytype, uri, **kwargs).deserialized
238class ComponentSentinel(enum.Enum):
239 """Special return values from `ImageFormatter.read_component`."""
241 UNRECOGNIZED_COMPONENT = enum.auto()
242 """Subclasses might still recognise this component."""
244 INVALID_COMPONENT_MODEL = enum.auto()
245 """Component name is known but the model attribute is missing or
246 has the wrong type.
247 """
250class ImageFormatter(GenericFormatter):
251 """Adds component-level read support for image-like types.
253 Subclasses override `read_component` to handle additional components
254 (image/mask/variance for MaskedImage; psf/summary_stats/etc. for
255 VisitImage).
256 """
258 def read_from_uri(
259 self,
260 uri: ResourcePath,
261 component: str | None = None,
262 expected_size: int = -1,
263 ) -> Any:
264 pytype: Any = self.file_descriptor.storageClass.pytype
265 ext = self._extension_from_uri(uri)
266 backend = _BACKENDS[ext]
267 if component is None:
268 result = backend.read(pytype, uri, bbox=self.pop_bbox_from_parameters()).deserialized
269 else:
270 result = self._read_component_from_uri(component, uri)
271 self.check_unhandled_parameters()
272 return result
274 def _read_component_from_uri(self, component: str, uri: ResourcePath) -> Any:
275 ext = self._extension_from_uri(uri)
276 backend = _BACKENDS[ext]
277 pytype: Any = self.file_descriptor.storageClass.pytype
278 if ext == ".json":
279 obj = backend.read(pytype, uri).deserialized
280 try:
281 return getattr(obj, component)
282 except AttributeError as exc:
283 raise NotImplementedError(f"Unrecognized component {component!r} for JSON read.") from exc
284 # FITS/NDF archive path. backend.input_archive and pointer_model are
285 # typed as `type | None` to allow the JSON row to opt out; here we
286 # know they are populated.
287 archive_cls: Any = backend.input_archive
288 pointer_model: Any = backend.pointer_model
289 assert archive_cls is not None
290 assert pointer_model is not None
291 # FitsInputArchive uses partial=True for component reads; NDF
292 # has no such kwarg.
293 open_kwargs = {"partial": True} if ext == ".fits" else {}
294 with archive_cls.open(uri, **open_kwargs) as archive:
295 tree_type = pytype._get_archive_tree_type(pointer_model)
296 tree = archive.get_tree(tree_type)
297 result = self.read_component(component, tree, archive)
298 if result is ComponentSentinel.UNRECOGNIZED_COMPONENT:
299 raise NotImplementedError(f"Unrecognized component {component!r} for {type(self).__name__}.")
300 if result is ComponentSentinel.INVALID_COMPONENT_MODEL:
301 raise NotImplementedError(
302 f"Invalid serialization model for component {component!r} for {type(self).__name__}."
303 )
304 return result
306 def pop_bbox_from_parameters(self) -> Box | None:
307 parameters = self.file_descriptor.parameters or {}
308 return parameters.pop("bbox", None)
310 def check_unhandled_parameters(self) -> None:
311 parameters = self.file_descriptor.parameters
312 if parameters:
313 raise RuntimeError(f"Parameters {list(parameters.keys())} not recognized.")
315 def read_component(self, component: str, tree: Any, archive: Any) -> Any:
316 match component:
317 case "projection":
318 if isinstance(
319 p := getattr(tree, "projection", None),
320 ProjectionSerializationModel,
321 ):
322 return p.deserialize(archive)
323 return ComponentSentinel.INVALID_COMPONENT_MODEL
324 case "bbox":
325 if isinstance(bbox := getattr(tree, "bbox", None), Box):
326 return bbox
327 return ComponentSentinel.INVALID_COMPONENT_MODEL
328 case "obs_info":
329 if isinstance(oi := getattr(tree, "obs_info", None), ObservationInfo):
330 return oi
331 return ComponentSentinel.INVALID_COMPONENT_MODEL
332 return ComponentSentinel.UNRECOGNIZED_COMPONENT
335class MaskedImageFormatter(ImageFormatter):
336 """Adds image/mask/variance component support."""
338 def read_component(self, component: str, tree: Any, archive: Any) -> Any:
339 match super().read_component(component, tree, archive):
340 case ComponentSentinel():
341 pass
342 case handled:
343 return handled
344 if not isinstance(tree, MaskedImageSerializationModel):
345 return ComponentSentinel.INVALID_COMPONENT_MODEL
346 match component:
347 case "image":
348 return tree.image.deserialize(archive, bbox=self.pop_bbox_from_parameters())
349 case "mask":
350 return tree.mask.deserialize(archive, bbox=self.pop_bbox_from_parameters())
351 case "variance":
352 return tree.variance.deserialize(archive, bbox=self.pop_bbox_from_parameters())
353 return ComponentSentinel.UNRECOGNIZED_COMPONENT
356class VisitImageFormatter(MaskedImageFormatter):
357 """Adds psf/summary_stats/detector/aperture_corrections."""
359 def read_component(self, component: str, tree: Any, archive: Any) -> Any:
360 match super().read_component(component, tree, archive):
361 case ComponentSentinel():
362 pass
363 case handled:
364 return handled
365 if not isinstance(tree, VisitImageSerializationModel):
366 return ComponentSentinel.INVALID_COMPONENT_MODEL
367 match component:
368 case "psf":
369 # The FITS path uses tree.psf.deserialize; the NDF tree
370 # exposes deserialize_psf for the same effect.
371 if hasattr(tree, "deserialize_psf"):
372 return tree.deserialize_psf(archive)
373 return tree.psf.deserialize(archive)
374 case "summary_stats":
375 return tree.summary_stats
376 case "detector":
377 if getattr(tree, "detector", None) is not None:
378 return tree.detector.deserialize(archive)
379 return ComponentSentinel.INVALID_COMPONENT_MODEL
380 case "aperture_corrections":
381 return tree.aperture_corrections.deserialize(archive)
382 return ComponentSentinel.UNRECOGNIZED_COMPONENT
385class CellCoaddFormatter(MaskedImageFormatter):
386 """Adds CellCoadd-specific psf and provenance components."""
388 def read_component(self, component: str, tree: Any, archive: Any) -> Any:
389 from .cells import CellCoaddSerializationModel # avoid cycles
391 match super().read_component(component, tree, archive):
392 case ComponentSentinel():
393 pass
394 case handled:
395 return handled
396 if not isinstance(tree, CellCoaddSerializationModel):
397 return ComponentSentinel.INVALID_COMPONENT_MODEL
398 match component:
399 case "psf":
400 bbox = self.pop_bbox_from_parameters()
401 return tree.deserialize_psf(archive, bbox=bbox)
402 case "provenance":
403 return tree.deserialize_provenance(archive)
404 return ComponentSentinel.UNRECOGNIZED_COMPONENT