Coverage for python / lsst / images / _visit_image.py: 28%
304 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 08:01 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 08:01 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import warnings
17from collections.abc import Callable, Mapping, MutableMapping
18from types import EllipsisType
19from typing import Any, Literal, cast, overload
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import pydantic
25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
27from ._concrete_bounds import SerializableBounds
28from ._geom import Bounds, Box
29from ._image import Image, ImageSerializationModel
30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
31from ._masked_image import MaskedImage, MaskedImageSerializationModel
32from ._observation_summary_stats import ObservationSummaryStats
33from ._polygon import Polygon
34from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
35from .aperture_corrections import (
36 ApertureCorrectionMap,
37 ApertureCorrectionMapSerializationModel,
38 aperture_corrections_from_legacy,
39)
40from .cameras import Detector, DetectorSerializationModel
41from .fits import FitsOpaqueMetadata
42from .psfs import (
43 GaussianPointSpreadFunction,
44 GaussianPSFSerializationModel,
45 PiffSerializationModel,
46 PiffWrapper,
47 PointSpreadFunction,
48 PSFExSerializationModel,
49 PSFExWrapper,
50)
51from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
52from .utils import is_none
55def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
56 # Try to get an ObservationInfo from the primary header as if
57 # it's a raw header. Else fallback.
58 try:
59 obs_info = ObservationInfo.from_header(md, quiet=True)
60 except ValueError:
61 # Not known translator. Must fall back to visit info. If we have
62 # an actual VisitInfo, serialize it since we know that it will be
63 # complete.
64 if visit_info is not None:
65 from lsst.afw.image import setVisitInfoMetadata
66 from lsst.daf.base import PropertyList
68 pl = PropertyList()
69 setVisitInfoMetadata(pl, visit_info)
70 # Merge so that we still have access to butler provenance.
71 md.update(pl)
73 # Try the given header looking for VisitInfo hints.
74 # We get lots of warnings if nothing can be found. Currently
75 # no way to disable those without capturing them.
76 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
77 return obs_info
80def _update_obs_info_from_legacy(
81 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
82) -> ObservationInfo:
83 extra_md: dict[str, str | int] = {}
85 if filter_label is not None and filter_label.hasBandLabel():
86 extra_md["physical_filter"] = filter_label.physicalLabel
88 # Fill in detector metadata, check for consistency.
89 # ObsInfo detector name and group can not be derived from
90 # the getName() information without knowing how the components
91 # are separated.
92 if detector is not None:
93 detector_md = {
94 "detector_num": detector.getId(),
95 "detector_serial": detector.getSerial(),
96 "detector_unique_name": detector.getName(),
97 }
98 extra_md.update(detector_md)
100 obs_info_updates: dict[str, str | int] = {}
101 for k, v in extra_md.items():
102 current = getattr(obs_info, k)
103 if current is None:
104 obs_info_updates[k] = v
105 continue
106 if current != v:
107 raise RuntimeError(
108 f"ObservationInfo contains value for '{k}' that is inconsistent "
109 f"with given legacy object: {v} != {current}"
110 )
112 if obs_info_updates:
113 obs_info = obs_info.model_copy(update=obs_info_updates)
114 return obs_info
117class VisitImage(MaskedImage):
118 """A calibrated single-visit image.
120 Parameters
121 ----------
122 image
123 The main image plane. If this has a `Projection`, it will be used
124 for all planes unless a ``projection`` is passed separately.
125 mask
126 A bitmask image that annotates the main image plane. Must have the
127 same bounding box as ``image`` if provided. Any attached projection
128 is replaced (possibly by `None`).
129 variance
130 The per-pixel uncertainty of the main image as an image of variance
131 values. Must have the same bounding box as ``image`` if provided, and
132 its units must be the square of ``image.unit`` or `None`.
133 Values default to ``1.0``. Any attached projection is replaced
134 (possibly by `None`).
135 mask_schema
136 Schema for the mask plane. Must be provided if and only if ``mask`` is
137 not provided.
138 projection
139 Projection that maps the pixel grid to the sky. Can only be `None` if
140 a projection is already attached to ``image``.
141 bounds
142 The region where this image's pixels and other properties are valid.
143 If not provided, the bounding box of the image is used. Other
144 components (``psf``, ``projection``, ``aperture_corrections``, etc.)
145 are assumed to have their own bounds which may or may not be the same
146 as the image bounds. If ``bounds`` extends beyond the image bounding
147 box, the intersection between ``bounds`` and the image bounding box
148 is used instead.
149 obs_info
150 General information about this visit in standardized form.
151 summary_stats
152 Summary statistics associated with this visit. Initialized to default
153 values if not provided.
154 psf
155 Point-spread function model for this image, or an exception explaining
156 why it could not be read (to be raised if the PSF is requested later).
157 detector
158 Geometry and electronic information for the detector attached to this
159 image.
160 aperture_corrections : `dict` [`str`, `~fields.BaseField`]
161 Mapping from photometry algorithm name to the aperture correction for
162 that algorithm.
163 metadata
164 Arbitrary flexible metadata to associate with the image.
165 """
167 def __init__(
168 self,
169 image: Image,
170 *,
171 mask: Mask | None = None,
172 variance: Image | None = None,
173 mask_schema: MaskSchema | None = None,
174 projection: Projection[DetectorFrame] | None = None,
175 bounds: Bounds | None = None,
176 obs_info: ObservationInfo | None = None,
177 summary_stats: ObservationSummaryStats | None = None,
178 psf: PointSpreadFunction | ArchiveReadError,
179 detector: Detector,
180 aperture_corrections: ApertureCorrectionMap | None = None,
181 metadata: dict[str, MetadataValue] | None = None,
182 ):
183 super().__init__(
184 image,
185 mask=mask,
186 variance=variance,
187 mask_schema=mask_schema,
188 projection=projection,
189 obs_info=obs_info,
190 metadata=metadata,
191 )
192 if self.image.unit is None:
193 raise TypeError("The image component of a VisitImage must have units.")
194 if self.image.projection is None:
195 raise TypeError("The projection component of a VisitImage cannot be None.")
196 if self.image.obs_info is None:
197 raise TypeError("The observation info component of a VisitImage cannot be None.")
198 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
199 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
200 if summary_stats is None:
201 summary_stats = ObservationSummaryStats()
202 self._summary_stats = summary_stats
203 self._psf = psf
204 self._detector = detector
205 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {}
206 self._bounds = bounds if bounds is not None else self.bbox
207 if not self.bbox.contains(self._bounds.bbox):
208 self._bounds = self._bounds.intersection(self.bbox)
210 @property
211 def unit(self) -> astropy.units.UnitBase:
212 """The units of the image plane (`astropy.units.Unit`)."""
213 return cast(astropy.units.UnitBase, super().unit)
215 @property
216 def projection(self) -> Projection[DetectorFrame]:
217 """The projection that maps the pixel grid to the sky
218 (`Projection` [`DetectorFrame`]).
219 """
220 return cast(Projection[DetectorFrame], super().projection)
222 @property
223 def bounds(self) -> Bounds:
224 """The region where pixels are valid (`Bounds`)."""
225 return self._bounds
227 @property
228 def obs_info(self) -> ObservationInfo:
229 """General information about this observation in standard form.
230 (`~astro_metadata_translator.ObservationInfo`).
231 """
232 obs_info = self.image.obs_info
233 assert obs_info is not None
234 return obs_info
236 @property
237 def astropy_wcs(self) -> ProjectionAstropyView:
238 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
240 Notes
241 -----
242 As expected for Astropy WCS objects, this defines pixel coordinates
243 such that the first row and column in the arrays are ``(0, 0)``, not
244 ``bbox.start``, as is the case for `projection`.
246 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
247 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
248 `astropy.wcs.WCS` (use `fits_wcs` for that).
249 """
250 return cast(ProjectionAstropyView, super().astropy_wcs)
252 @property
253 def summary_stats(self) -> ObservationSummaryStats:
254 """Optional summary statistics for this observation
255 (`ObservationSummaryStats`).
256 """
257 return self._summary_stats
259 @property
260 def psf(self) -> PointSpreadFunction:
261 """The point-spread function model for this image
262 (`.psfs.PointSpreadFunction`).
263 """
264 if isinstance(self._psf, ArchiveReadError):
265 raise self._psf
266 return self._psf
268 @property
269 def detector(self) -> Detector:
270 """Geometry and electronic information about the detector
271 (`.cameras.Detector`).
272 """
273 return self._detector
275 @property
276 def aperture_corrections(self) -> ApertureCorrectionMap:
277 """A mapping from photometry algorithm name to the aperture correction
278 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]).
279 """
280 return self._aperture_corrections
282 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
283 if bbox is ...:
284 return self
285 super().__getitem__(bbox)
286 return self._transfer_metadata(
287 VisitImage(
288 self.image[bbox],
289 mask=self.mask[bbox],
290 variance=self.variance[bbox],
291 projection=self.projection,
292 psf=self.psf,
293 obs_info=self.obs_info,
294 bounds=self._bounds, # don't need to intersect here, because __init__ will do that.
295 summary_stats=self.summary_stats,
296 detector=self._detector,
297 aperture_corrections=self.aperture_corrections,
298 ),
299 bbox=bbox,
300 )
302 def __str__(self) -> str:
303 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
305 def __repr__(self) -> str:
306 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
308 def copy(self, *, copy_detector: bool = False) -> VisitImage:
309 """Deep-copy the visit image.
311 Parameters
312 ----------
313 copy_detector
314 Whether to deep-copy the `detector` attribute.
315 """
316 return self._transfer_metadata(
317 VisitImage(
318 image=self._image.copy(),
319 mask=self._mask.copy(),
320 variance=self._variance.copy(),
321 psf=self._psf,
322 obs_info=self.obs_info,
323 bounds=self._bounds,
324 summary_stats=self.summary_stats.model_copy(),
325 detector=self._detector.copy() if copy_detector else self._detector,
326 aperture_corrections=self.aperture_corrections.copy(),
327 ),
328 copy=True,
329 )
331 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
332 masked_image_model = super().serialize(archive)
333 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
334 match self._psf:
335 # MyPy is able to figure things out here with this match statement,
336 # but not a single isinstance check on both types.
337 case PiffWrapper():
338 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
339 case PSFExWrapper():
340 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
341 case GaussianPointSpreadFunction():
342 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
343 case _:
344 raise TypeError(
345 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
346 )
347 assert masked_image_model.projection is not None, "VisitImage always has a projection."
348 assert masked_image_model.obs_info is not None, "VisitImage always has observation info."
349 serialized_detector = self._detector.serialize(archive)
350 serialized_aperture_corrections = ApertureCorrectionMapSerializationModel.serialize(
351 self.aperture_corrections, archive
352 )
353 return VisitImageSerializationModel(
354 image=masked_image_model.image,
355 mask=masked_image_model.mask,
356 variance=masked_image_model.variance,
357 projection=masked_image_model.projection,
358 obs_info=masked_image_model.obs_info,
359 psf=serialized_psf,
360 summary_stats=self.summary_stats,
361 detector=serialized_detector,
362 aperture_corrections=serialized_aperture_corrections,
363 bounds=self._bounds.serialize() if self._bounds != self.bbox else None,
364 metadata=self.metadata,
365 )
367 @staticmethod
368 def _get_archive_tree_type[P: pydantic.BaseModel](
369 pointer_type: type[P],
370 ) -> type[VisitImageSerializationModel[P]]:
371 """Return the serialization model type for this object for an archive
372 type that uses the given pointer type.
373 """
374 return VisitImageSerializationModel[pointer_type] # type: ignore
376 # write_fits and read_fits inherited from MaskedImage.
378 @staticmethod
379 def from_legacy(
380 legacy: Any,
381 *,
382 unit: astropy.units.Unit | None = None,
383 plane_map: Mapping[str, MaskPlane] | None = None,
384 instrument: str | None = None,
385 visit: int | None = None,
386 ) -> VisitImage:
387 """Convert from an `lsst.afw.image.Exposure` instance.
389 Parameters
390 ----------
391 legacy
392 An `lsst.afw.image.Exposure` instance that will share image and
393 variance (but not mask) pixel data with the returned object.
394 unit
395 Units of the image. If not provided, the ``BUNIT`` metadata
396 key will be used, if available.
397 plane_map
398 A mapping from legacy mask plane name to the new plane name and
399 description. If `None` (default)
400 `get_legacy_visit_image_mask_planes` is used.
401 instrument
402 Name of the instrument. Extracted from the metadata if not
403 provided.
404 visit
405 ID of the visit. Extracted from the metadata if not provided.
406 """
407 if plane_map is None:
408 plane_map = get_legacy_visit_image_mask_planes()
409 md = legacy.getMetadata()
410 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
411 instrument = _extract_or_check_header(
412 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
413 )
414 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
415 unit = _extract_or_check_header(
416 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
417 )
418 legacy_wcs = legacy.getWcs()
419 if legacy_wcs is None:
420 raise ValueError("Exposure does not have a SkyWcs.")
421 legacy_detector = legacy.getDetector()
422 if legacy_detector is None:
423 raise ValueError("Exposure does not have a Detector.")
424 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
426 # Update the ObservationInfo from other components.
427 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
429 opaque_fits_metadata = FitsOpaqueMetadata()
430 primary_header = astropy.io.fits.Header()
431 with warnings.catch_warnings():
432 # Silence warnings about long keys becoming HIERARCH.
433 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
434 primary_header.update(md.toOrderedDict())
435 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
436 projection = Projection.from_legacy(
437 legacy_wcs,
438 DetectorFrame(
439 instrument=instrument,
440 visit=visit,
441 detector=legacy_detector.getId(),
442 bbox=detector_bbox,
443 ),
444 )
445 legacy_psf = legacy.getPsf()
446 if legacy_psf is None:
447 raise ValueError("Exposure file does not have a Psf.")
448 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
449 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
450 legacy_summary_stats = legacy.info.getSummaryStats()
451 legacy_ap_corr_map = legacy.info.getApCorrMap()
452 legacy_polygon = legacy.info.getValidPolygon()
453 result = VisitImage(
454 image=masked_image.image.view(unit=unit),
455 mask=masked_image.mask,
456 variance=masked_image.variance,
457 projection=projection,
458 psf=psf,
459 obs_info=obs_info,
460 summary_stats=(
461 ObservationSummaryStats.from_legacy(legacy_summary_stats)
462 if legacy_summary_stats is not None
463 else None
464 ),
465 detector=Detector.from_legacy(
466 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
467 ),
468 aperture_corrections=(
469 aperture_corrections_from_legacy(legacy_ap_corr_map)
470 if legacy_ap_corr_map is not None
471 else None
472 ),
473 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
474 )
476 result._opaque_metadata = opaque_fits_metadata
477 return result
479 @overload # type: ignore[override]
480 @staticmethod
481 def read_legacy( 481 ↛ exitline 481 didn't return from function 'read_legacy' because
482 filename: str,
483 *,
484 component: Literal["bbox"],
485 ) -> Box: ...
487 @overload
488 @staticmethod
489 def read_legacy( 489 ↛ exitline 489 didn't return from function 'read_legacy' because
490 filename: str,
491 *,
492 preserve_quantization: bool = False,
493 instrument: str | None = None,
494 visit: int | None = None,
495 component: Literal["image"],
496 ) -> Image: ...
498 @overload
499 @staticmethod
500 def read_legacy( 500 ↛ exitline 500 didn't return from function 'read_legacy' because
501 filename: str,
502 *,
503 plane_map: Mapping[str, MaskPlane] | None = None,
504 instrument: str | None = None,
505 visit: int | None = None,
506 component: Literal["mask"],
507 ) -> Mask: ...
509 @overload
510 @staticmethod
511 def read_legacy( 511 ↛ exitline 511 didn't return from function 'read_legacy' because
512 filename: str,
513 *,
514 preserve_quantization: bool = False,
515 instrument: str | None = None,
516 visit: int | None = None,
517 component: Literal["variance"],
518 ) -> Image: ...
520 @overload
521 @staticmethod
522 def read_legacy( 522 ↛ exitline 522 didn't return from function 'read_legacy' because
523 filename: str,
524 *,
525 instrument: str | None = None,
526 visit: int | None = None,
527 component: Literal["projection"],
528 ) -> Projection[DetectorFrame]: ...
530 @overload
531 @staticmethod
532 def read_legacy( 532 ↛ exitline 532 didn't return from function 'read_legacy' because
533 filename: str,
534 *,
535 component: Literal["psf"],
536 ) -> PointSpreadFunction: ...
538 @overload
539 @staticmethod
540 def read_legacy( 540 ↛ exitline 540 didn't return from function 'read_legacy' because
541 filename: str,
542 *,
543 component: Literal["detector"],
544 ) -> Detector: ...
546 @overload
547 @staticmethod
548 def read_legacy( 548 ↛ exitline 548 didn't return from function 'read_legacy' because
549 filename: str,
550 *,
551 component: Literal["obs_info"],
552 ) -> ObservationInfo: ...
554 @overload
555 @staticmethod
556 def read_legacy( 556 ↛ exitline 556 didn't return from function 'read_legacy' because
557 filename: str,
558 *,
559 component: Literal["summary_stats"],
560 ) -> ObservationSummaryStats: ...
562 @overload
563 @staticmethod
564 def read_legacy( 564 ↛ exitline 564 didn't return from function 'read_legacy' because
565 filename: str,
566 *,
567 component: Literal["aperture_corrections"],
568 ) -> ApertureCorrectionMap: ...
570 @overload
571 @staticmethod
572 def read_legacy( 572 ↛ exitline 572 didn't return from function 'read_legacy' because
573 filename: str,
574 *,
575 preserve_quantization: bool = False,
576 plane_map: Mapping[str, MaskPlane] | None = None,
577 instrument: str | None = None,
578 visit: int | None = None,
579 component: None = None,
580 ) -> VisitImage: ...
582 @staticmethod
583 def read_legacy( # type: ignore[override]
584 filename: str,
585 *,
586 preserve_quantization: bool = False,
587 plane_map: Mapping[str, MaskPlane] | None = None,
588 instrument: str | None = None,
589 visit: int | None = None,
590 component: Literal[
591 "bbox",
592 "image",
593 "mask",
594 "variance",
595 "projection",
596 "psf",
597 "detector",
598 "obs_info",
599 "summary_stats",
600 "aperture_corrections",
601 ]
602 | None = None,
603 ) -> Any:
604 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
606 Parameters
607 ----------
608 filename
609 Full name of the file.
610 preserve_quantization
611 If `True`, ensure that writing the masked image back out again will
612 exactly preserve quantization-compressed pixel values. This causes
613 the image and variance plane arrays to be marked as read-only and
614 stores the original binary table data for those planes in memory.
615 If the `MaskedImage` is copied, the precompressed pixel values are
616 not transferred to the copy.
617 plane_map
618 A mapping from legacy mask plane name to the new plane name and
619 description. If `None` (default)
620 `get_legacy_visit_image_mask_planes` is used.
621 instrument
622 Name of the instrument. Read from the primary header if not
623 provided.
624 visit
625 ID of the visit. Read from the primary header if not
626 provided.
627 component
628 A component to read instead of the full image.
629 """
630 from lsst.afw.image import ExposureFitsReader
632 reader = ExposureFitsReader(filename)
633 if component == "bbox":
634 return Box.from_legacy(reader.readBBox())
635 legacy_detector = reader.readDetector()
636 if legacy_detector is None:
637 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
638 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
639 legacy_wcs = None
640 if component in (None, "image", "mask", "variance", "projection"):
641 legacy_wcs = reader.readWcs()
642 if legacy_wcs is None:
643 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
644 legacy_exposure_info = reader.readExposureInfo()
645 summary_stats = None
646 if component in (None, "summary_stats"):
647 legacy_stats = legacy_exposure_info.getSummaryStats()
648 if legacy_stats is not None:
649 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
650 if component == "summary_stats":
651 return summary_stats
652 if component in (None, "psf"):
653 legacy_psf = reader.readPsf()
654 if legacy_psf is None:
655 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
656 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
657 if component == "psf":
658 return psf
659 aperture_corrections: ApertureCorrectionMap = {}
660 if component in (None, "aperture_corrections"):
661 legacy_ap_corr_map = reader.readApCorrMap()
662 if legacy_ap_corr_map is not None:
663 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map)
664 if component == "aperture_corrections":
665 return aperture_corrections
666 assert component in (None, "image", "mask", "variance", "projection", "obs_info", "detector"), (
667 component
668 ) # for MyPy
669 with astropy.io.fits.open(filename) as hdu_list:
670 primary_header = hdu_list[0].header
671 obs_info = _obs_info_from_md(primary_header)
672 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
673 if component == "obs_info":
674 return obs_info
675 instrument = _extract_or_check_header(
676 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
677 )
678 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
679 if component == "detector":
680 return Detector.from_legacy(
681 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
682 )
683 projection = Projection.from_legacy(
684 legacy_wcs,
685 DetectorFrame(
686 instrument=instrument,
687 visit=visit,
688 detector=legacy_detector.getId(),
689 bbox=detector_bbox,
690 ),
691 )
692 if component == "projection":
693 return projection
694 if plane_map is None:
695 plane_map = get_legacy_visit_image_mask_planes()
696 assert component != "psf", component # for MyPy
697 from_masked_image = MaskedImage._read_legacy_hdus(
698 hdu_list,
699 filename,
700 preserve_quantization=preserve_quantization,
701 plane_map=plane_map,
702 component=component,
703 )
704 if component is not None:
705 # This is the image, mask, or variance; attach the projection and
706 # obs_info and return
707 return from_masked_image.view(projection=projection, obs_info=obs_info)
708 legacy_polygon = reader.readValidPolygon()
709 result = VisitImage(
710 from_masked_image.image,
711 mask=from_masked_image.mask,
712 variance=from_masked_image.variance,
713 projection=projection,
714 psf=psf,
715 detector=Detector.from_legacy(
716 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
717 ),
718 obs_info=obs_info,
719 summary_stats=summary_stats,
720 aperture_corrections=aperture_corrections,
721 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
722 )
723 result._opaque_metadata = from_masked_image._opaque_metadata
724 return result
727class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
728 """A Pydantic model used to represent a serialized `VisitImage`."""
730 # Inherited attributes are duplicated because that improves the docs
731 # (some limitation in the sphinx/pydantic integration), and these are
732 # important docs.
734 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
735 mask: MaskSerializationModel[P] = pydantic.Field(
736 description="Bitmask that annotates the main image's pixels."
737 )
738 variance: ImageSerializationModel[P] = pydantic.Field(
739 description="Per-pixel variance estimates for the main image."
740 )
741 projection: ProjectionSerializationModel[P] = pydantic.Field(
742 description="Projection that maps the pixel grid to the sky.",
743 )
744 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
745 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
746 )
747 obs_info: ObservationInfo = pydantic.Field(
748 description="Standardized description of visit metadata",
749 )
750 summary_stats: ObservationSummaryStats = pydantic.Field(
751 description="Summary statistics for the observation."
752 )
753 detector: DetectorSerializationModel = pydantic.Field(
754 description="Geometry and electronic information for the detector."
755 )
756 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field(
757 default_factory=ApertureCorrectionMapSerializationModel,
758 description="Aperture corrections, keyed by flux algorithm.",
759 )
760 bounds: SerializableBounds | None = pydantic.Field(
761 default=None,
762 description="Pixel validity region, if different from the image bounding box.",
763 exclude_if=is_none,
764 )
766 def deserialize(self, archive: InputArchive[Any], *, bbox: Box | None = None) -> VisitImage:
767 masked_image = super().deserialize(archive, bbox=bbox)
768 psf = self.deserialize_psf(archive)
769 detector = self.detector.deserialize(archive)
770 aperture_corrections = self.aperture_corrections.deserialize(archive)
771 return VisitImage(
772 masked_image.image,
773 mask=masked_image.mask,
774 variance=masked_image.variance,
775 psf=psf,
776 projection=masked_image.projection,
777 obs_info=masked_image.obs_info,
778 summary_stats=self.summary_stats,
779 detector=detector,
780 aperture_corrections=aperture_corrections,
781 bounds=self.bounds.deserialize() if self.bounds is not None else None,
782 )._finish_deserialize(self)
784 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
785 """Finish deserializing the PSF model, or *return* any exception
786 raised in the attempt.
787 """
788 try:
789 return self.psf.deserialize(archive)
790 except ArchiveReadError as err:
791 return err
794def _extract_or_check_value[T](
795 key: str,
796 given_value: T | None,
797 *sources: tuple[str, T | None],
798) -> T:
799 # Compare given value against multiple sources. If given value is not
800 # supplied return the first non-None value in the reference sources.
801 if given_value is not None:
802 for source_name, source_value in sources:
803 if source_value is not None and source_value != given_value:
804 raise ValueError(
805 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
806 )
807 if source_value is not None:
808 # Only check the first non-None source rather than checking
809 # all supplied values.
810 break
811 return given_value
813 for _, source_value in sources:
814 if source_value is not None:
815 return source_value
817 raise ValueError(f"No value found for {key}.")
820def _extract_or_check_header[T](
821 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
822) -> T:
823 hdr_value: T | None = None
824 if (hdr_raw_value := header.get(key)) is not None:
825 hdr_value = coerce(hdr_raw_value)
826 return _extract_or_check_value(
827 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
828 )