Coverage for python / lsst / images / _visit_image.py: 27%
301 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:43 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:43 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import warnings
17from collections.abc import Callable, Mapping, MutableMapping
18from types import EllipsisType
19from typing import Any, Literal, cast, overload
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import pydantic
25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
27from ._geom import Box
28from ._image import Image, ImageSerializationModel
29from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
30from ._masked_image import MaskedImage, MaskedImageSerializationModel
31from ._observation_summary_stats import ObservationSummaryStats
32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
33from .aperture_corrections import (
34 ApertureCorrectionMap,
35 ApertureCorrectionMapSerializationModel,
36 aperture_corrections_from_legacy,
37)
38from .cameras import Detector, DetectorSerializationModel
39from .fits import FitsOpaqueMetadata
40from .psfs import (
41 GaussianPointSpreadFunction,
42 GaussianPSFSerializationModel,
43 PiffSerializationModel,
44 PiffWrapper,
45 PointSpreadFunction,
46 PSFExSerializationModel,
47 PSFExWrapper,
48)
49from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
52def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
53 # Try to get an ObservationInfo from the primary header as if
54 # it's a raw header. Else fallback.
55 try:
56 obs_info = ObservationInfo.from_header(md, quiet=True)
57 except ValueError:
58 # Not known translator. Must fall back to visit info. If we have
59 # an actual VisitInfo, serialize it since we know that it will be
60 # complete.
61 if visit_info is not None:
62 from lsst.afw.image import setVisitInfoMetadata
63 from lsst.daf.base import PropertyList
65 pl = PropertyList()
66 setVisitInfoMetadata(pl, visit_info)
67 # Merge so that we still have access to butler provenance.
68 md.update(pl)
70 # Try the given header looking for VisitInfo hints.
71 # We get lots of warnings if nothing can be found. Currently
72 # no way to disable those without capturing them.
73 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
74 return obs_info
77def _update_obs_info_from_legacy(
78 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
79) -> ObservationInfo:
80 extra_md: dict[str, str | int] = {}
82 if filter_label is not None and filter_label.hasBandLabel():
83 extra_md["physical_filter"] = filter_label.physicalLabel
85 # Fill in detector metadata, check for consistency.
86 # ObsInfo detector name and group can not be derived from
87 # the getName() information without knowing how the components
88 # are separated.
89 if detector is not None:
90 detector_md = {
91 "detector_num": detector.getId(),
92 "detector_serial": detector.getSerial(),
93 "detector_unique_name": detector.getName(),
94 }
95 extra_md.update(detector_md)
97 obs_info_updates: dict[str, str | int] = {}
98 for k, v in extra_md.items():
99 current = getattr(obs_info, k)
100 if current is None:
101 obs_info_updates[k] = v
102 continue
103 if current != v:
104 raise RuntimeError(
105 f"ObservationInfo contains value for '{k}' that is inconsistent "
106 f"with given legacy object: {v} != {current}"
107 )
109 if obs_info_updates:
110 obs_info = obs_info.model_copy(update=obs_info_updates)
111 return obs_info
114class VisitImage(MaskedImage):
115 """A calibrated single-visit image.
117 Parameters
118 ----------
119 image
120 The main image plane. If this has a `Projection`, it will be used
121 for all planes unless a ``projection`` is passed separately.
122 mask
123 A bitmask image that annotates the main image plane. Must have the
124 same bounding box as ``image`` if provided. Any attached projection
125 is replaced (possibly by `None`).
126 variance
127 The per-pixel uncertainty of the main image as an image of variance
128 values. Must have the same bounding box as ``image`` if provided, and
129 its units must be the square of ``image.unit`` or `None`.
130 Values default to ``1.0``. Any attached projection is replaced
131 (possibly by `None`).
132 mask_schema
133 Schema for the mask plane. Must be provided if and only if ``mask`` is
134 not provided.
135 projection
136 Projection that maps the pixel grid to the sky. Can only be `None` if
137 a projection is already attached to ``image``.
138 obs_info
139 General information about this visit in standardized form.
140 summary_stats
141 Summary statistics associated with this visit. Initialized to default
142 values if not provided.
143 psf
144 Point-spread function model for this image, or an exception explaining
145 why it could not be read (to be raised if the PSF is requested later).
146 detector
147 Geometry and electronic information for the detector attached to this
148 image.
149 aperture_corrections : `dict` [`str`, `~fields.BaseField`]
150 Mapping from photometry algorithm name to the aperture correction for
151 that algorithm.
152 metadata
153 Arbitrary flexible metadata to associate with the image.
154 """
156 def __init__(
157 self,
158 image: Image,
159 *,
160 mask: Mask | None = None,
161 variance: Image | None = None,
162 mask_schema: MaskSchema | None = None,
163 projection: Projection[DetectorFrame] | None = None,
164 obs_info: ObservationInfo | None = None,
165 summary_stats: ObservationSummaryStats | None = None,
166 psf: PointSpreadFunction | ArchiveReadError,
167 detector: Detector,
168 aperture_corrections: ApertureCorrectionMap | None = None,
169 metadata: dict[str, MetadataValue] | None = None,
170 ):
171 super().__init__(
172 image,
173 mask=mask,
174 variance=variance,
175 mask_schema=mask_schema,
176 projection=projection,
177 obs_info=obs_info,
178 metadata=metadata,
179 )
180 if self.image.unit is None:
181 raise TypeError("The image component of a VisitImage must have units.")
182 if self.image.projection is None:
183 raise TypeError("The projection component of a VisitImage cannot be None.")
184 if self.image.obs_info is None:
185 raise TypeError("The observation info component of a VisitImage cannot be None.")
186 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
187 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
188 if summary_stats is None:
189 summary_stats = ObservationSummaryStats()
190 self._summary_stats = summary_stats
191 self._psf = psf
192 self._detector = detector
193 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {}
195 @property
196 def unit(self) -> astropy.units.UnitBase:
197 """The units of the image plane (`astropy.units.Unit`)."""
198 return cast(astropy.units.UnitBase, super().unit)
200 @property
201 def projection(self) -> Projection[DetectorFrame]:
202 """The projection that maps the pixel grid to the sky
203 (`Projection` [`DetectorFrame`]).
204 """
205 return cast(Projection[DetectorFrame], super().projection)
207 @property
208 def obs_info(self) -> ObservationInfo:
209 """General information about this observation in standard form.
210 (`~astro_metadata_translator.ObservationInfo`).
211 """
212 obs_info = self.image.obs_info
213 assert obs_info is not None
214 return obs_info
216 @property
217 def astropy_wcs(self) -> ProjectionAstropyView:
218 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
220 Notes
221 -----
222 As expected for Astropy WCS objects, this defines pixel coordinates
223 such that the first row and column in the arrays are ``(0, 0)``, not
224 ``bbox.start``, as is the case for `projection`.
226 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
227 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
228 `astropy.wcs.WCS` (use `fits_wcs` for that).
229 """
230 return cast(ProjectionAstropyView, super().astropy_wcs)
232 @property
233 def summary_stats(self) -> ObservationSummaryStats:
234 """Optional summary statistics for this observation
235 (`ObservationSummaryStats`).
236 """
237 return self._summary_stats
239 @property
240 def psf(self) -> PointSpreadFunction:
241 """The point-spread function model for this image
242 (`.psfs.PointSpreadFunction`).
243 """
244 if isinstance(self._psf, ArchiveReadError):
245 raise self._psf
246 return self._psf
248 @property
249 def detector(self) -> Detector:
250 """Geometry and electronic information about the detector
251 (`.cameras.Detector`).
252 """
253 return self._detector
255 @property
256 def aperture_corrections(self) -> ApertureCorrectionMap:
257 """A mapping from photometry algorithm name to the aperture correction
258 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]).
259 """
260 return self._aperture_corrections
262 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
263 if bbox is ...:
264 return self
265 super().__getitem__(bbox)
266 return self._transfer_metadata(
267 VisitImage(
268 self.image[bbox],
269 mask=self.mask[bbox],
270 variance=self.variance[bbox],
271 projection=self.projection,
272 psf=self.psf,
273 obs_info=self.obs_info,
274 summary_stats=self.summary_stats,
275 detector=self._detector,
276 aperture_corrections=self.aperture_corrections,
277 ),
278 bbox=bbox,
279 )
281 def __str__(self) -> str:
282 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
284 def __repr__(self) -> str:
285 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
287 def copy(self, *, copy_detector: bool = False) -> VisitImage:
288 """Deep-copy the visit image.
290 Parameters
291 ----------
292 copy_detector
293 Whether to deep-copy the `detector` attribute.
294 """
295 return self._transfer_metadata(
296 VisitImage(
297 image=self._image.copy(),
298 mask=self._mask.copy(),
299 variance=self._variance.copy(),
300 psf=self._psf,
301 obs_info=self.obs_info,
302 summary_stats=self.summary_stats.model_copy(),
303 detector=self._detector.copy() if copy_detector else self._detector,
304 aperture_corrections=self.aperture_corrections.copy(),
305 ),
306 copy=True,
307 )
309 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
310 masked_image_model = super().serialize(archive)
311 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
312 match self._psf:
313 # MyPy is able to figure things out here with this match statement,
314 # but not a single isinstance check on both types.
315 case PiffWrapper():
316 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
317 case PSFExWrapper():
318 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
319 case GaussianPointSpreadFunction():
320 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
321 case _:
322 raise TypeError(
323 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
324 )
325 assert masked_image_model.projection is not None, "VisitImage always has a projection."
326 assert masked_image_model.obs_info is not None, "VisitImage always has observation info."
327 serialized_detector = self._detector.serialize(archive)
328 serialized_aperture_corrections = ApertureCorrectionMapSerializationModel.serialize(
329 self.aperture_corrections, archive
330 )
331 return VisitImageSerializationModel(
332 image=masked_image_model.image,
333 mask=masked_image_model.mask,
334 variance=masked_image_model.variance,
335 projection=masked_image_model.projection,
336 obs_info=masked_image_model.obs_info,
337 psf=serialized_psf,
338 summary_stats=self.summary_stats,
339 detector=serialized_detector,
340 aperture_corrections=serialized_aperture_corrections,
341 metadata=self.metadata,
342 )
344 # Type-checkers want the model argument to only require
345 # MaskedImageSerializationModel[Any], and they'd be absolutely right if
346 # this were a regular instance method. But whether Liskov substitution
347 # applies to classmethods and staticmethods is sort of context-dependent,
348 # and here we do not want it to.
349 @staticmethod
350 def deserialize(
351 model: VisitImageSerializationModel[Any], # type: ignore[override]
352 archive: InputArchive[Any],
353 *,
354 bbox: Box | None = None,
355 ) -> VisitImage:
356 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox)
357 psf = model.deserialize_psf(archive)
358 detector = Detector.deserialize(model.detector, archive)
359 aperture_corrections = model.aperture_corrections.deserialize(archive)
360 return VisitImage(
361 masked_image.image,
362 mask=masked_image.mask,
363 variance=masked_image.variance,
364 psf=psf,
365 projection=masked_image.projection,
366 obs_info=masked_image.obs_info,
367 summary_stats=model.summary_stats,
368 detector=detector,
369 aperture_corrections=aperture_corrections,
370 )._finish_deserialize(model)
372 @staticmethod
373 def _get_archive_tree_type[P: pydantic.BaseModel](
374 pointer_type: type[P],
375 ) -> type[VisitImageSerializationModel[P]]:
376 """Return the serialization model type for this object for an archive
377 type that uses the given pointer type.
378 """
379 return VisitImageSerializationModel[pointer_type] # type: ignore
381 # write_fits and read_fits inherited from MaskedImage.
383 @staticmethod
384 def from_legacy(
385 legacy: Any,
386 *,
387 unit: astropy.units.Unit | None = None,
388 plane_map: Mapping[str, MaskPlane] | None = None,
389 instrument: str | None = None,
390 visit: int | None = None,
391 ) -> VisitImage:
392 """Convert from an `lsst.afw.image.Exposure` instance.
394 Parameters
395 ----------
396 legacy
397 An `lsst.afw.image.Exposure` instance that will share image and
398 variance (but not mask) pixel data with the returned object.
399 unit
400 Units of the image. If not provided, the ``BUNIT`` metadata
401 key will be used, if available.
402 plane_map
403 A mapping from legacy mask plane name to the new plane name and
404 description. If `None` (default)
405 `get_legacy_visit_image_mask_planes` is used.
406 instrument
407 Name of the instrument. Extracted from the metadata if not
408 provided.
409 visit
410 ID of the visit. Extracted from the metadata if not provided.
411 """
412 if plane_map is None:
413 plane_map = get_legacy_visit_image_mask_planes()
414 md = legacy.getMetadata()
415 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
416 instrument = _extract_or_check_header(
417 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
418 )
419 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
420 unit = _extract_or_check_header(
421 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
422 )
423 legacy_wcs = legacy.getWcs()
424 if legacy_wcs is None:
425 raise ValueError("Exposure does not have a SkyWcs.")
426 legacy_detector = legacy.getDetector()
427 if legacy_detector is None:
428 raise ValueError("Exposure does not have a Detector.")
429 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
431 # Update the ObservationInfo from other components.
432 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
434 opaque_fits_metadata = FitsOpaqueMetadata()
435 primary_header = astropy.io.fits.Header()
436 with warnings.catch_warnings():
437 # Silence warnings about long keys becoming HIERARCH.
438 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
439 primary_header.update(md.toOrderedDict())
440 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
441 projection = Projection.from_legacy(
442 legacy_wcs,
443 DetectorFrame(
444 instrument=instrument,
445 visit=visit,
446 detector=legacy_detector.getId(),
447 bbox=detector_bbox,
448 ),
449 )
450 legacy_psf = legacy.getPsf()
451 if legacy_psf is None:
452 raise ValueError("Exposure file does not have a Psf.")
453 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
454 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
455 legacy_summary_stats = legacy.info.getSummaryStats()
456 legacy_ap_corr_map = legacy.info.getApCorrMap()
457 result = VisitImage(
458 image=masked_image.image.view(unit=unit),
459 mask=masked_image.mask,
460 variance=masked_image.variance,
461 projection=projection,
462 psf=psf,
463 obs_info=obs_info,
464 summary_stats=(
465 ObservationSummaryStats.from_legacy(legacy_summary_stats)
466 if legacy_summary_stats is not None
467 else None
468 ),
469 detector=Detector.from_legacy(
470 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
471 ),
472 aperture_corrections=(
473 aperture_corrections_from_legacy(legacy_ap_corr_map)
474 if legacy_ap_corr_map is not None
475 else None
476 ),
477 )
479 result._opaque_metadata = opaque_fits_metadata
480 return result
482 @overload # type: ignore[override]
483 @staticmethod
484 def read_legacy( 484 ↛ exitline 484 didn't return from function 'read_legacy' because
485 filename: str,
486 *,
487 component: Literal["bbox"],
488 ) -> Box: ...
490 @overload
491 @staticmethod
492 def read_legacy( 492 ↛ exitline 492 didn't return from function 'read_legacy' because
493 filename: str,
494 *,
495 preserve_quantization: bool = False,
496 instrument: str | None = None,
497 visit: int | None = None,
498 component: Literal["image"],
499 ) -> Image: ...
501 @overload
502 @staticmethod
503 def read_legacy( 503 ↛ exitline 503 didn't return from function 'read_legacy' because
504 filename: str,
505 *,
506 plane_map: Mapping[str, MaskPlane] | None = None,
507 instrument: str | None = None,
508 visit: int | None = None,
509 component: Literal["mask"],
510 ) -> Mask: ...
512 @overload
513 @staticmethod
514 def read_legacy( 514 ↛ exitline 514 didn't return from function 'read_legacy' because
515 filename: str,
516 *,
517 preserve_quantization: bool = False,
518 instrument: str | None = None,
519 visit: int | None = None,
520 component: Literal["variance"],
521 ) -> Image: ...
523 @overload
524 @staticmethod
525 def read_legacy( 525 ↛ exitline 525 didn't return from function 'read_legacy' because
526 filename: str,
527 *,
528 instrument: str | None = None,
529 visit: int | None = None,
530 component: Literal["projection"],
531 ) -> Projection[DetectorFrame]: ...
533 @overload
534 @staticmethod
535 def read_legacy( 535 ↛ exitline 535 didn't return from function 'read_legacy' because
536 filename: str,
537 *,
538 component: Literal["psf"],
539 ) -> PointSpreadFunction: ...
541 @overload
542 @staticmethod
543 def read_legacy( 543 ↛ exitline 543 didn't return from function 'read_legacy' because
544 filename: str,
545 *,
546 component: Literal["detector"],
547 ) -> Detector: ...
549 @overload
550 @staticmethod
551 def read_legacy( 551 ↛ exitline 551 didn't return from function 'read_legacy' because
552 filename: str,
553 *,
554 component: Literal["obs_info"],
555 ) -> ObservationInfo: ...
557 @overload
558 @staticmethod
559 def read_legacy( 559 ↛ exitline 559 didn't return from function 'read_legacy' because
560 filename: str,
561 *,
562 component: Literal["summary_stats"],
563 ) -> ObservationSummaryStats: ...
565 @overload
566 @staticmethod
567 def read_legacy( 567 ↛ exitline 567 didn't return from function 'read_legacy' because
568 filename: str,
569 *,
570 component: Literal["aperture_corrections"],
571 ) -> ApertureCorrectionMap: ...
573 @overload
574 @staticmethod
575 def read_legacy( 575 ↛ exitline 575 didn't return from function 'read_legacy' because
576 filename: str,
577 *,
578 preserve_quantization: bool = False,
579 plane_map: Mapping[str, MaskPlane] | None = None,
580 instrument: str | None = None,
581 visit: int | None = None,
582 component: None = None,
583 ) -> VisitImage: ...
585 @staticmethod
586 def read_legacy( # type: ignore[override]
587 filename: str,
588 *,
589 preserve_quantization: bool = False,
590 plane_map: Mapping[str, MaskPlane] | None = None,
591 instrument: str | None = None,
592 visit: int | None = None,
593 component: Literal[
594 "bbox",
595 "image",
596 "mask",
597 "variance",
598 "projection",
599 "psf",
600 "detector",
601 "obs_info",
602 "summary_stats",
603 "aperture_corrections",
604 ]
605 | None = None,
606 ) -> Any:
607 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
609 Parameters
610 ----------
611 filename
612 Full name of the file.
613 preserve_quantization
614 If `True`, ensure that writing the masked image back out again will
615 exactly preserve quantization-compressed pixel values. This causes
616 the image and variance plane arrays to be marked as read-only and
617 stores the original binary table data for those planes in memory.
618 If the `MaskedImage` is copied, the precompressed pixel values are
619 not transferred to the copy.
620 plane_map
621 A mapping from legacy mask plane name to the new plane name and
622 description. If `None` (default)
623 `get_legacy_visit_image_mask_planes` is used.
624 instrument
625 Name of the instrument. Read from the primary header if not
626 provided.
627 visit
628 ID of the visit. Read from the primary header if not
629 provided.
630 component
631 A component to read instead of the full image.
632 """
633 from lsst.afw.image import ExposureFitsReader
635 reader = ExposureFitsReader(filename)
636 if component == "bbox":
637 return Box.from_legacy(reader.readBBox())
638 legacy_detector = reader.readDetector()
639 if legacy_detector is None:
640 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
641 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
642 legacy_wcs = None
643 if component in (None, "image", "mask", "variance", "projection"):
644 legacy_wcs = reader.readWcs()
645 if legacy_wcs is None:
646 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
647 legacy_exposure_info = reader.readExposureInfo()
648 summary_stats = None
649 if component in (None, "summary_stats"):
650 legacy_stats = legacy_exposure_info.getSummaryStats()
651 if legacy_stats is not None:
652 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
653 if component == "summary_stats":
654 return summary_stats
655 if component in (None, "psf"):
656 legacy_psf = reader.readPsf()
657 if legacy_psf is None:
658 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
659 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
660 if component == "psf":
661 return psf
662 aperture_corrections: ApertureCorrectionMap = {}
663 if component in (None, "aperture_corrections"):
664 legacy_ap_corr_map = reader.readApCorrMap()
665 if legacy_ap_corr_map is not None:
666 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map)
667 if component == "aperture_corrections":
668 return aperture_corrections
669 assert component in (None, "image", "mask", "variance", "projection", "obs_info", "detector"), (
670 component
671 ) # for MyPy
672 with astropy.io.fits.open(filename) as hdu_list:
673 primary_header = hdu_list[0].header
674 obs_info = _obs_info_from_md(primary_header)
675 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
676 if component == "obs_info":
677 return obs_info
678 instrument = _extract_or_check_header(
679 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
680 )
681 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
682 if component == "detector":
683 return Detector.from_legacy(
684 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
685 )
686 projection = Projection.from_legacy(
687 legacy_wcs,
688 DetectorFrame(
689 instrument=instrument,
690 visit=visit,
691 detector=legacy_detector.getId(),
692 bbox=detector_bbox,
693 ),
694 )
695 if component == "projection":
696 return projection
697 if plane_map is None:
698 plane_map = get_legacy_visit_image_mask_planes()
699 assert component != "psf", component # for MyPy
700 from_masked_image = MaskedImage._read_legacy_hdus(
701 hdu_list,
702 filename,
703 preserve_quantization=preserve_quantization,
704 plane_map=plane_map,
705 component=component,
706 )
707 if component is not None:
708 # This is the image, mask, or variance; attach the projection and
709 # obs_info and return
710 return from_masked_image.view(projection=projection, obs_info=obs_info)
711 result = VisitImage(
712 from_masked_image.image,
713 mask=from_masked_image.mask,
714 variance=from_masked_image.variance,
715 projection=projection,
716 psf=psf,
717 detector=Detector.from_legacy(
718 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
719 ),
720 obs_info=obs_info,
721 summary_stats=summary_stats,
722 aperture_corrections=aperture_corrections,
723 )
724 result._opaque_metadata = from_masked_image._opaque_metadata
725 return result
728class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
729 """A Pydantic model used to represent a serialized `VisitImage`."""
731 # Inherited attributes are duplicated because that improves the docs
732 # (some limitation in the sphinx/pydantic integration), and these are
733 # important docs.
735 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
736 mask: MaskSerializationModel[P] = pydantic.Field(
737 description="Bitmask that annotates the main image's pixels."
738 )
739 variance: ImageSerializationModel[P] = pydantic.Field(
740 description="Per-pixel variance estimates for the main image."
741 )
742 projection: ProjectionSerializationModel[P] = pydantic.Field(
743 description="Projection that maps the pixel grid to the sky.",
744 )
745 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
746 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
747 )
748 obs_info: ObservationInfo = pydantic.Field(
749 description="Standardized description of visit metadata",
750 )
751 summary_stats: ObservationSummaryStats = pydantic.Field(
752 description="Summary statistics for the observation."
753 )
754 detector: DetectorSerializationModel = pydantic.Field(
755 description="Geometry and electronic information for the detector."
756 )
757 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field(
758 default_factory=ApertureCorrectionMapSerializationModel,
759 description="Aperture corrections, keyed by flux algorithm.",
760 )
762 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
763 """Finish deserializing the PSF model, or *return* any exception
764 raised in the attempt.
765 """
766 try:
767 match self.psf:
768 case PiffSerializationModel():
769 return PiffWrapper.deserialize(self.psf, archive)
770 case PSFExSerializationModel():
771 return PSFExWrapper.deserialize(self.psf, archive)
772 case GaussianPSFSerializationModel():
773 return GaussianPointSpreadFunction.deserialize(self.psf, archive)
774 case _:
775 raise ArchiveReadError("PSF model type not recognized.")
776 except ArchiveReadError as err:
777 return err
780def _extract_or_check_value[T](
781 key: str,
782 given_value: T | None,
783 *sources: tuple[str, T | None],
784) -> T:
785 # Compare given value against multiple sources. If given value is not
786 # supplied return the first non-None value in the reference sources.
787 if given_value is not None:
788 for source_name, source_value in sources:
789 if source_value is not None and source_value != given_value:
790 raise ValueError(
791 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
792 )
793 if source_value is not None:
794 # Only check the first non-None source rather than checking
795 # all supplied values.
796 break
797 return given_value
799 for _, source_value in sources:
800 if source_value is not None:
801 return source_value
803 raise ValueError(f"No value found for {key}.")
806def _extract_or_check_header[T](
807 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
808) -> T:
809 hdr_value: T | None = None
810 if (hdr_raw_value := header.get(key)) is not None:
811 hdr_value = coerce(hdr_raw_value)
812 return _extract_or_check_value(
813 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
814 )