Coverage for python / lsst / images / _visit_image.py: 24%
393 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:57 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:57 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import functools
17import warnings
18from collections.abc import Callable, Mapping, MutableMapping
19from types import EllipsisType
20from typing import Any, Literal, cast, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
29from ._backgrounds import BackgroundMap, BackgroundMapSerializationModel
30from ._concrete_bounds import SerializableBounds
31from ._geom import Bounds, Box
32from ._image import Image, ImageSerializationModel
33from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
34from ._masked_image import MaskedImage, MaskedImageSerializationModel
35from ._observation_summary_stats import ObservationSummaryStats
36from ._polygon import Polygon
37from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
38from .aperture_corrections import (
39 ApertureCorrectionMap,
40 ApertureCorrectionMapSerializationModel,
41 aperture_corrections_from_legacy,
42)
43from .cameras import Detector, DetectorSerializationModel
44from .fields import Field, FieldSerializationModel, field_from_legacy_photo_calib
45from .fits import FitsOpaqueMetadata
46from .psfs import (
47 GaussianPointSpreadFunction,
48 GaussianPSFSerializationModel,
49 PiffSerializationModel,
50 PiffWrapper,
51 PointSpreadFunction,
52 PSFExSerializationModel,
53 PSFExWrapper,
54)
55from .serialization import ArchiveReadError, InputArchive, InvalidParameterError, MetadataValue, OutputArchive
56from .utils import is_none
59def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
60 # Try to get an ObservationInfo from the primary header as if
61 # it's a raw header. Else fallback.
62 try:
63 obs_info = ObservationInfo.from_header(md, quiet=True)
64 except ValueError:
65 # Not known translator. Must fall back to visit info. If we have
66 # an actual VisitInfo, serialize it since we know that it will be
67 # complete.
68 if visit_info is not None:
69 from lsst.afw.image import setVisitInfoMetadata
70 from lsst.daf.base import PropertyList
72 pl = PropertyList()
73 setVisitInfoMetadata(pl, visit_info)
74 # Merge so that we still have access to butler provenance.
75 md.update(pl)
77 # Try the given header looking for VisitInfo hints.
78 # We get lots of warnings if nothing can be found. Currently
79 # no way to disable those without capturing them.
80 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
81 return obs_info
84def _update_obs_info_from_legacy(
85 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
86) -> ObservationInfo:
87 extra_md: dict[str, str | int] = {}
89 if filter_label is not None and filter_label.hasBandLabel():
90 extra_md["physical_filter"] = filter_label.physicalLabel
92 # Fill in detector metadata, check for consistency.
93 # ObsInfo detector name and group can not be derived from
94 # the getName() information without knowing how the components
95 # are separated.
96 if detector is not None:
97 detector_md = {
98 "detector_num": detector.getId(),
99 "detector_serial": detector.getSerial(),
100 "detector_unique_name": detector.getName(),
101 }
102 extra_md.update(detector_md)
104 obs_info_updates: dict[str, str | int] = {}
105 for k, v in extra_md.items():
106 current = getattr(obs_info, k)
107 if current is None:
108 obs_info_updates[k] = v
109 continue
110 if current != v:
111 raise RuntimeError(
112 f"ObservationInfo contains value for '{k}' that is inconsistent "
113 f"with given legacy object: {v} != {current}"
114 )
116 if obs_info_updates:
117 obs_info = obs_info.model_copy(update=obs_info_updates)
118 return obs_info
121class VisitImage(MaskedImage):
122 """A calibrated single-visit image.
124 Parameters
125 ----------
126 image
127 The main image plane. If this has a `Projection`, it will be used
128 for all planes unless a ``projection`` is passed separately.
129 mask
130 A bitmask image that annotates the main image plane. Must have the
131 same bounding box as ``image`` if provided. Any attached projection
132 is replaced (possibly by `None`).
133 variance
134 The per-pixel uncertainty of the main image as an image of variance
135 values. Must have the same bounding box as ``image`` if provided, and
136 its units must be the square of ``image.unit`` or `None`.
137 Values default to ``1.0``. Any attached projection is replaced
138 (possibly by `None`).
139 mask_schema
140 Schema for the mask plane. Must be provided if and only if ``mask`` is
141 not provided.
142 projection
143 Projection that maps the pixel grid to the sky. Can only be `None` if
144 a projection is already attached to ``image``.
145 bounds
146 The region where this image's pixels and other properties are valid.
147 If not provided, the bounding box of the image is used. Other
148 components (``psf``, ``projection``, ``aperture_corrections``, etc.)
149 are assumed to have their own bounds which may or may not be the same
150 as the image bounds. If ``bounds`` extends beyond the image bounding
151 box, the intersection between ``bounds`` and the image bounding box
152 is used instead.
153 obs_info
154 General information about this visit in standardized form.
155 summary_stats
156 Summary statistics associated with this visit. Initialized to default
157 values if not provided.
158 photometric_scaling
159 Field that can be used to multiply a post-ISR image units to yield
160 calibrated image units. This may be a scaling that was already
161 applied (so dividing by it will recover the post-ISR units) or a
162 scaling that has not been applied, depending on ``image.unit``.
163 psf
164 Point-spread function model for this image, or an exception explaining
165 why it could not be read (to be raised if the PSF is requested later).
166 detector
167 Geometry and electronic information for the detector attached to this
168 image.
169 aperture_corrections : `dict` [`str`, `~fields.BaseField`]
170 Mapping from photometry algorithm name to the aperture correction for
171 that algorithm.
172 backgrounds
173 Background models associated with this image.
174 metadata
175 Arbitrary flexible metadata to associate with the image.
176 """
178 def __init__(
179 self,
180 image: Image,
181 *,
182 mask: Mask | None = None,
183 variance: Image | None = None,
184 mask_schema: MaskSchema | None = None,
185 projection: Projection[DetectorFrame] | None = None,
186 bounds: Bounds | None = None,
187 obs_info: ObservationInfo | None = None,
188 summary_stats: ObservationSummaryStats | None = None,
189 photometric_scaling: Field | None = None,
190 psf: PointSpreadFunction | ArchiveReadError,
191 detector: Detector,
192 aperture_corrections: ApertureCorrectionMap | None = None,
193 backgrounds: BackgroundMap | None = None,
194 metadata: dict[str, MetadataValue] | None = None,
195 ):
196 super().__init__(
197 image,
198 mask=mask,
199 variance=variance,
200 mask_schema=mask_schema,
201 projection=projection,
202 metadata=metadata,
203 )
204 if self.image.unit is None:
205 raise TypeError("The image component of a VisitImage must have units.")
206 if self.image.projection is None:
207 raise TypeError("The projection component of a VisitImage cannot be None.")
208 if obs_info is None:
209 raise TypeError("The observation info component of a VisitImage cannot be None.")
210 self._obs_info = obs_info
211 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
212 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
213 if summary_stats is None:
214 summary_stats = ObservationSummaryStats()
215 self._summary_stats = summary_stats
216 if photometric_scaling is not None and photometric_scaling.unit is None:
217 raise TypeError("If a photometric_scaling is provided, it must have units.")
218 self._photometric_scaling = photometric_scaling
219 self._psf = psf
220 self._detector = detector
221 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {}
222 self._bounds = bounds if bounds is not None else self.bbox
223 if not self.bbox.contains(self._bounds.bbox):
224 self._bounds = self._bounds.intersection(self.bbox)
225 self._backgrounds = backgrounds if backgrounds is not None else BackgroundMap()
227 @property
228 def unit(self) -> astropy.units.UnitBase:
229 """The units of the image plane (`astropy.units.Unit`)."""
230 return cast(astropy.units.UnitBase, super().unit)
232 @property
233 def projection(self) -> Projection[DetectorFrame]:
234 """The projection that maps the pixel grid to the sky
235 (`Projection` [`DetectorFrame`]).
236 """
237 return cast(Projection[DetectorFrame], super().projection)
239 @property
240 def bounds(self) -> Bounds:
241 """The region where pixels are valid (`Bounds`)."""
242 return self._bounds
244 @property
245 def obs_info(self) -> ObservationInfo:
246 """General information about this observation in standard form.
247 (`~astro_metadata_translator.ObservationInfo`).
248 """
249 return self._obs_info
251 @property
252 def astropy_wcs(self) -> ProjectionAstropyView:
253 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
255 Notes
256 -----
257 As expected for Astropy WCS objects, this defines pixel coordinates
258 such that the first row and column in the arrays are ``(0, 0)``, not
259 ``bbox.start``, as is the case for `projection`.
261 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
262 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
263 `astropy.wcs.WCS` (use `fits_wcs` for that).
264 """
265 return cast(ProjectionAstropyView, super().astropy_wcs)
267 @property
268 def summary_stats(self) -> ObservationSummaryStats:
269 """Optional summary statistics for this observation
270 (`ObservationSummaryStats`).
271 """
272 return self._summary_stats
274 @property
275 def photometric_scaling(self) -> Field | None:
276 """Field that multiplies a post-ISR image to yield the calibrated
277 image (~`fields.BaseField`).
278 """
279 return self._photometric_scaling
281 @photometric_scaling.setter
282 def photometric_scaling(self, value: Field):
283 if value.unit is None:
284 raise TypeError("The photometric_scaling for a VisitImage must have units.")
285 self._photometric_scaling = value
287 @property
288 def psf(self) -> PointSpreadFunction:
289 """The point-spread function model for this image
290 (`.psfs.PointSpreadFunction`).
291 """
292 if isinstance(self._psf, ArchiveReadError):
293 raise self._psf
294 return self._psf
296 @property
297 def detector(self) -> Detector:
298 """Geometry and electronic information about the detector
299 (`.cameras.Detector`).
300 """
301 return self._detector
303 @property
304 def aperture_corrections(self) -> ApertureCorrectionMap:
305 """A mapping from photometry algorithm name to the aperture correction
306 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]).
307 """
308 return self._aperture_corrections
310 @property
311 def backgrounds(self) -> BackgroundMap:
312 """A mapping of backgrounds associated with this image
313 (`BackgroundMap`).
314 """
315 return self._backgrounds
317 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
318 if bbox is ...:
319 return self
320 super().__getitem__(bbox)
321 return self._transfer_metadata(
322 VisitImage(
323 self.image[bbox],
324 mask=self.mask[bbox],
325 variance=self.variance[bbox],
326 projection=self.projection,
327 psf=self.psf,
328 obs_info=self.obs_info,
329 bounds=self._bounds, # don't need to intersect here, because __init__ will do that.
330 summary_stats=self.summary_stats,
331 detector=self._detector,
332 photometric_scaling=self._photometric_scaling,
333 aperture_corrections=self.aperture_corrections,
334 backgrounds=self._backgrounds,
335 ),
336 bbox=bbox,
337 )
339 def __str__(self) -> str:
340 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
342 def __repr__(self) -> str:
343 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
345 def copy(self, *, copy_detector: bool = False) -> VisitImage:
346 """Deep-copy the visit image.
348 Parameters
349 ----------
350 copy_detector
351 Whether to deep-copy the `detector` attribute.
352 """
353 return self._transfer_metadata(
354 VisitImage(
355 image=self._image.copy(),
356 mask=self._mask.copy(),
357 variance=self._variance.copy(),
358 psf=self._psf,
359 obs_info=self.obs_info,
360 bounds=self._bounds,
361 summary_stats=self.summary_stats.model_copy(),
362 detector=self._detector.copy() if copy_detector else self._detector,
363 photometric_scaling=self._photometric_scaling,
364 aperture_corrections=self.aperture_corrections.copy(),
365 backgrounds=self._backgrounds.copy(),
366 ),
367 copy=True,
368 )
370 def convert_unit(
371 self,
372 unit: astropy.units.UnitBase = astropy.units.nJy,
373 copy: Literal["as-needed"] | bool = True,
374 copy_detector: bool = False,
375 ) -> VisitImage:
376 """Return an equivalent image with different pixel units.
378 Parameters
379 ----------
380 unit
381 The unit to transform to. This may be any of the following:
383 - any unit directly relatable to the current units via Astropy;
384 - any unit relatable to the product of the current units with the
385 `photometric_scaling` (i.e. if the current image is in
386 instrumental units but we know how to calibrate them)
387 - any unit relatable to the quotient of the current units with the
388 `photometric_scaling` (i.e. if the current image is in
389 calibrated units and we want to revert back to instrumental
390 units).
391 copy
392 Whether to copy the images and other components. If `True`, all
393 components that aren't controlled by some other argument will
394 always be deep-copied. If `False`, the operation will fail if the
395 image is not already in the right units. If ``as-needed``, only
396 the image and variance will be copied, and only if they are not
397 already in the right units.
398 copy_detector
399 Whether to deep-copy the `detector` attribute.
401 Returns
402 -------
403 `VisitImage`
404 An image with the given units.
405 """
406 if copy not in (True, False, "as-needed"):
407 raise TypeError(f"Invalid value for 'copy' parameter: {copy!r}.")
408 if (factor := _get_unit_conversion_factor(self.unit, unit)) is not None:
409 if factor == 1.0:
410 if copy is True: # not "as-needed"
411 return self.copy()
412 else:
413 return self[...]
414 elif copy is False:
415 raise astropy.units.UnitConversionError(
416 f"Units must be converted ({self.unit} -> {unit}), but copy=False."
417 )
418 image = Image(self._image.array * factor, bbox=self.bbox, projection=self.projection, unit=unit)
419 variance = Image(
420 self._variance.array * factor**2,
421 bbox=self.bbox,
422 unit=unit**2,
423 )
424 elif self._photometric_scaling is None:
425 raise astropy.units.UnitConversionError(
426 "VisitImage.photometric_scaling is None, and there "
427 f"is no constant conversion from {self.unit} to {unit}."
428 )
429 else:
430 if copy is False:
431 raise astropy.units.UnitConversionError(
432 f"Photometric scaling must be applied to go from ={self.unit} to {unit}, but copy=False."
433 )
434 scaling = self._photometric_scaling
435 assert scaling.unit is not None, "Checked at construction."
436 if (constant_factor := _get_unit_conversion_factor(self.unit * scaling.unit, unit)) is not None:
437 if constant_factor != 1.0:
438 scaling = scaling * constant_factor
439 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array
440 elif (constant_factor := _get_unit_conversion_factor(self.unit / scaling.unit, unit)) is not None:
441 if constant_factor != 1.0:
442 scaling = scaling / constant_factor
443 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array
444 np.true_divide(1.0, scaling_array, out=scaling_array)
445 else:
446 raise astropy.units.UnitConversionError(
447 f"photometric_scaling with units {scaling.unit} does not "
448 f"provide a path from {self.unit} to {unit}."
449 )
450 # We needed to allocate a new array to evaluate the scaling field,
451 # and then we need to allocate another to hold its square for the
452 # variance scaling. But then we can multiply those arrays in-place
453 # to get the output image and variance to avoid yet more
454 # allocations (note we can't instead multiply the visit image's
455 # image and variance arrays in place because they might have other
456 # references that are still associated with the old units).
457 image = Image(scaling_array, bbox=self.bbox, unit=unit)
458 variance = Image(np.square(scaling_array), bbox=self.bbox, unit=unit**2)
459 image.array *= self._image.array
460 variance.array *= self._variance.array
461 copy_components = copy is True
462 return self._transfer_metadata(
463 VisitImage(
464 image=image,
465 mask=self._mask if not copy_components else self._mask.copy(),
466 variance=variance,
467 projection=self.projection, # never copied; immutable
468 obs_info=self.obs_info if not copy_components else self.obs_info.model_copy(),
469 psf=self._psf, # never copied; immutable
470 bounds=self._bounds, # never copied; immutable
471 summary_stats=self.summary_stats if not copy_components else self.summary_stats.model_copy(),
472 detector=self._detector if not copy_detector else self._detector.copy(),
473 photometric_scaling=self._photometric_scaling, # never copied; immutable
474 aperture_corrections=(
475 self.aperture_corrections if not copy_components else self.aperture_corrections.copy()
476 ),
477 )
478 )
480 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
481 masked_image_model = super().serialize(archive)
482 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
483 match self._psf:
484 # MyPy is able to figure things out here with this match statement,
485 # but not a single isinstance check on both types.
486 case PiffWrapper():
487 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
488 case PSFExWrapper():
489 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
490 case GaussianPointSpreadFunction():
491 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
492 case _:
493 raise TypeError(
494 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
495 )
496 assert masked_image_model.projection is not None, "VisitImage always has a projection."
497 serialized_detector = archive.serialize_direct("detector", self._detector.serialize)
498 serialized_photometric_scaling = (
499 archive.serialize_direct("photometric_scaling", self._photometric_scaling.serialize)
500 if self._photometric_scaling is not None
501 else None
502 )
503 serialized_aperture_corrections = archive.serialize_direct(
504 "aperture_corrections",
505 functools.partial(ApertureCorrectionMapSerializationModel.serialize, self.aperture_corrections),
506 )
507 serialized_backgrounds = archive.serialize_direct("backgrounds", self._backgrounds.serialize)
508 return VisitImageSerializationModel(
509 image=masked_image_model.image,
510 mask=masked_image_model.mask,
511 variance=masked_image_model.variance,
512 projection=masked_image_model.projection,
513 obs_info=self.obs_info,
514 photometric_scaling=serialized_photometric_scaling,
515 psf=serialized_psf,
516 summary_stats=self.summary_stats,
517 detector=serialized_detector,
518 aperture_corrections=serialized_aperture_corrections,
519 bounds=self._bounds.serialize() if self._bounds != self.bbox else None,
520 backgrounds=serialized_backgrounds,
521 metadata=self.metadata,
522 )
524 @staticmethod
525 def _get_archive_tree_type[P: pydantic.BaseModel](
526 pointer_type: type[P],
527 ) -> type[VisitImageSerializationModel[P]]:
528 """Return the serialization model type for this object for an archive
529 type that uses the given pointer type.
530 """
531 return VisitImageSerializationModel[pointer_type] # type: ignore
533 # write_fits and read_fits inherited from MaskedImage.
535 @staticmethod
536 def from_legacy(
537 legacy: Any,
538 *,
539 unit: astropy.units.UnitBase | None = None,
540 plane_map: Mapping[str, MaskPlane] | None = None,
541 instrument: str | None = None,
542 visit: int | None = None,
543 ) -> VisitImage:
544 """Convert from an `lsst.afw.image.Exposure` instance.
546 Parameters
547 ----------
548 legacy
549 An `lsst.afw.image.Exposure` instance that will share image and
550 variance (but not mask) pixel data with the returned object.
551 unit
552 Units of the image. If not provided, the ``BUNIT`` metadata
553 key will be used, if available.
554 plane_map
555 A mapping from legacy mask plane name to the new plane name and
556 description. If `None` (default)
557 `get_legacy_visit_image_mask_planes` is used.
558 instrument
559 Name of the instrument. Extracted from the metadata if not
560 provided.
561 visit
562 ID of the visit. Extracted from the metadata if not provided.
563 """
564 if plane_map is None:
565 plane_map = get_legacy_visit_image_mask_planes()
566 md = legacy.getMetadata()
567 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
568 instrument = _extract_or_check_header(
569 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
570 )
571 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
572 legacy_wcs = legacy.getWcs()
573 if legacy_wcs is None:
574 raise ValueError("Exposure does not have a SkyWcs.")
575 legacy_detector = legacy.getDetector()
576 if legacy_detector is None:
577 raise ValueError("Exposure does not have a Detector.")
578 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
580 # Update the ObservationInfo from other components.
581 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
583 opaque_fits_metadata = FitsOpaqueMetadata()
584 primary_header = astropy.io.fits.Header()
585 with warnings.catch_warnings():
586 # Silence warnings about long keys becoming HIERARCH.
587 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
588 primary_header.update(md.toOrderedDict())
589 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
590 instrumental_unit = opaque_fits_metadata.get_instrumental_unit() or astropy.units.electron
591 hdr_unit: astropy.units.UnitBase | None = None
592 if hdr_unit_str := md.get("BUNIT"):
593 hdr_unit = astropy.units.Unit(hdr_unit_str, format="FITS")
594 if hdr_unit == astropy.units.adu and instrumental_unit == astropy.units.electron:
595 # Fix incorrect BUNIT='adu' in LSST
596 # preliminary_visit_image.
597 hdr_unit = astropy.units.electron
598 if unit is None:
599 unit = hdr_unit
600 elif hdr_unit is not None and hdr_unit != unit:
601 raise ValueError(f"BUNIT value {hdr_unit} disagrees with given unit {unit}.")
602 projection = Projection.from_legacy(
603 legacy_wcs,
604 DetectorFrame(
605 instrument=instrument,
606 visit=visit,
607 detector=legacy_detector.getId(),
608 bbox=detector_bbox,
609 ),
610 )
611 legacy_psf = legacy.getPsf()
612 if legacy_psf is None:
613 raise ValueError("Exposure file does not have a Psf.")
614 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
615 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
616 legacy_summary_stats = legacy.info.getSummaryStats()
617 legacy_ap_corr_map = legacy.info.getApCorrMap()
618 legacy_polygon = legacy.info.getValidPolygon()
619 legacy_photo_calib = legacy.info.getPhotoCalib()
620 result = VisitImage(
621 image=masked_image.image.view(unit=unit),
622 mask=masked_image.mask,
623 variance=masked_image.variance,
624 projection=projection,
625 psf=psf,
626 obs_info=obs_info,
627 summary_stats=(
628 ObservationSummaryStats.from_legacy(legacy_summary_stats)
629 if legacy_summary_stats is not None
630 else None
631 ),
632 detector=Detector.from_legacy(
633 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
634 ),
635 aperture_corrections=(
636 aperture_corrections_from_legacy(legacy_ap_corr_map)
637 if legacy_ap_corr_map is not None
638 else None
639 ),
640 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
641 photometric_scaling=(
642 field_from_legacy_photo_calib(
643 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit
644 )
645 if legacy_photo_calib is not None
646 else None
647 ),
648 )
650 result._opaque_metadata = opaque_fits_metadata
651 return result
653 @overload # type: ignore[override]
654 @staticmethod
655 def read_legacy( 655 ↛ exitline 655 didn't return from function 'read_legacy' because
656 filename: str,
657 *,
658 component: Literal["bbox"],
659 ) -> Box: ...
661 @overload
662 @staticmethod
663 def read_legacy( 663 ↛ exitline 663 didn't return from function 'read_legacy' because
664 filename: str,
665 *,
666 preserve_quantization: bool = False,
667 instrument: str | None = None,
668 visit: int | None = None,
669 component: Literal["image"],
670 ) -> Image: ...
672 @overload
673 @staticmethod
674 def read_legacy( 674 ↛ exitline 674 didn't return from function 'read_legacy' because
675 filename: str,
676 *,
677 plane_map: Mapping[str, MaskPlane] | None = None,
678 instrument: str | None = None,
679 visit: int | None = None,
680 component: Literal["mask"],
681 ) -> Mask: ...
683 @overload
684 @staticmethod
685 def read_legacy( 685 ↛ exitline 685 didn't return from function 'read_legacy' because
686 filename: str,
687 *,
688 preserve_quantization: bool = False,
689 instrument: str | None = None,
690 visit: int | None = None,
691 component: Literal["variance"],
692 ) -> Image: ...
694 @overload
695 @staticmethod
696 def read_legacy( 696 ↛ exitline 696 didn't return from function 'read_legacy' because
697 filename: str,
698 *,
699 instrument: str | None = None,
700 visit: int | None = None,
701 component: Literal["projection"],
702 ) -> Projection[DetectorFrame]: ...
704 @overload
705 @staticmethod
706 def read_legacy( 706 ↛ exitline 706 didn't return from function 'read_legacy' because
707 filename: str,
708 *,
709 component: Literal["psf"],
710 ) -> PointSpreadFunction: ...
712 @overload
713 @staticmethod
714 def read_legacy( 714 ↛ exitline 714 didn't return from function 'read_legacy' because
715 filename: str,
716 *,
717 component: Literal["detector"],
718 ) -> Detector: ...
720 @overload
721 @staticmethod
722 def read_legacy( 722 ↛ exitline 722 didn't return from function 'read_legacy' because
723 filename: str,
724 *,
725 component: Literal["obs_info"],
726 ) -> ObservationInfo: ...
728 @overload
729 @staticmethod
730 def read_legacy( 730 ↛ exitline 730 didn't return from function 'read_legacy' because
731 filename: str,
732 *,
733 component: Literal["photometric_scaling"],
734 ) -> Field | None: ...
736 @overload
737 @staticmethod
738 def read_legacy( 738 ↛ exitline 738 didn't return from function 'read_legacy' because
739 filename: str,
740 *,
741 component: Literal["summary_stats"],
742 ) -> ObservationSummaryStats: ...
744 @overload
745 @staticmethod
746 def read_legacy( 746 ↛ exitline 746 didn't return from function 'read_legacy' because
747 filename: str,
748 *,
749 component: Literal["aperture_corrections"],
750 ) -> ApertureCorrectionMap: ...
752 @overload
753 @staticmethod
754 def read_legacy( 754 ↛ exitline 754 didn't return from function 'read_legacy' because
755 filename: str,
756 *,
757 preserve_quantization: bool = False,
758 plane_map: Mapping[str, MaskPlane] | None = None,
759 instrument: str | None = None,
760 visit: int | None = None,
761 component: None = None,
762 ) -> VisitImage: ...
764 @staticmethod
765 def read_legacy( # type: ignore[override]
766 filename: str,
767 *,
768 preserve_quantization: bool = False,
769 plane_map: Mapping[str, MaskPlane] | None = None,
770 instrument: str | None = None,
771 visit: int | None = None,
772 component: Literal[
773 "bbox",
774 "image",
775 "mask",
776 "variance",
777 "projection",
778 "psf",
779 "detector",
780 "photometric_scaling",
781 "obs_info",
782 "summary_stats",
783 "aperture_corrections",
784 ]
785 | None = None,
786 ) -> Any:
787 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
789 Parameters
790 ----------
791 filename
792 Full name of the file.
793 preserve_quantization
794 If `True`, ensure that writing the masked image back out again will
795 exactly preserve quantization-compressed pixel values. This causes
796 the image and variance plane arrays to be marked as read-only and
797 stores the original binary table data for those planes in memory.
798 If the `MaskedImage` is copied, the precompressed pixel values are
799 not transferred to the copy.
800 plane_map
801 A mapping from legacy mask plane name to the new plane name and
802 description. If `None` (default)
803 `get_legacy_visit_image_mask_planes` is used.
804 instrument
805 Name of the instrument. Read from the primary header if not
806 provided.
807 visit
808 ID of the visit. Read from the primary header if not
809 provided.
810 component
811 A component to read instead of the full image.
812 """
813 from lsst.afw.image import ExposureFitsReader
815 reader = ExposureFitsReader(filename)
816 if component == "bbox":
817 return Box.from_legacy(reader.readBBox())
818 legacy_detector = reader.readDetector()
819 if legacy_detector is None:
820 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
821 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
822 legacy_wcs = None
823 if component in (None, "image", "mask", "variance", "projection"):
824 legacy_wcs = reader.readWcs()
825 if legacy_wcs is None:
826 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
827 legacy_exposure_info = reader.readExposureInfo()
828 summary_stats = None
829 if component in (None, "summary_stats"):
830 legacy_stats = legacy_exposure_info.getSummaryStats()
831 if legacy_stats is not None:
832 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
833 if component == "summary_stats":
834 return summary_stats
835 if component in (None, "psf"):
836 legacy_psf = reader.readPsf()
837 if legacy_psf is None:
838 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
839 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
840 if component == "psf":
841 return psf
842 aperture_corrections: ApertureCorrectionMap = {}
843 if component in (None, "aperture_corrections"):
844 legacy_ap_corr_map = reader.readApCorrMap()
845 if legacy_ap_corr_map is not None:
846 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map)
847 if component == "aperture_corrections":
848 return aperture_corrections
849 assert component in (
850 None,
851 "image",
852 "mask",
853 "variance",
854 "projection",
855 "obs_info",
856 "detector",
857 "photometric_scaling",
858 ), component # for MyPy
859 with astropy.io.fits.open(filename) as hdu_list:
860 primary_header = hdu_list[0].header
861 obs_info = _obs_info_from_md(primary_header)
862 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
863 if component == "obs_info":
864 return obs_info
865 instrument = _extract_or_check_header(
866 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
867 )
868 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
869 opaque_metadata = FitsOpaqueMetadata()
870 # This extraction is destructive, so we need to be sure to pass
871 # this opaque_metadata down to MaskedImage._read_legacy_hdus
872 # so it doesn't try to extract it again.
873 opaque_metadata.extract_legacy_primary_header(primary_header)
874 if (instrumental_unit := opaque_metadata.get_instrumental_unit()) is None:
875 instrumental_unit = astropy.units.electron
876 photometric_scaling: Field | None = None
877 if component in (None, "photometric_scaling"):
878 legacy_photo_calib = reader.readPhotoCalib()
879 if legacy_photo_calib is not None:
880 photometric_scaling = field_from_legacy_photo_calib(
881 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit
882 )
883 if component == "photometric_scaling":
884 return photometric_scaling
885 if component == "detector":
886 return Detector.from_legacy(
887 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
888 )
889 projection = Projection.from_legacy(
890 legacy_wcs,
891 DetectorFrame(
892 instrument=instrument,
893 visit=visit,
894 detector=legacy_detector.getId(),
895 bbox=detector_bbox,
896 ),
897 )
898 if component == "projection":
899 return projection
900 if plane_map is None:
901 plane_map = get_legacy_visit_image_mask_planes()
902 assert component != "psf", component # for MyPy
903 from_masked_image = MaskedImage._read_legacy_hdus(
904 hdu_list,
905 filename,
906 opaque_metadata=opaque_metadata,
907 preserve_quantization=preserve_quantization,
908 plane_map=plane_map,
909 component=component,
910 )
911 if component is not None:
912 # This is the image, mask, or variance; attach the projection and
913 # obs_info and return
914 return from_masked_image.view(projection=projection)
915 legacy_polygon = reader.readValidPolygon()
916 result = VisitImage(
917 from_masked_image.image,
918 mask=from_masked_image.mask,
919 variance=from_masked_image.variance,
920 projection=projection,
921 psf=psf,
922 detector=Detector.from_legacy(
923 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
924 ),
925 obs_info=obs_info,
926 summary_stats=summary_stats,
927 aperture_corrections=aperture_corrections,
928 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
929 photometric_scaling=photometric_scaling,
930 )
931 result._opaque_metadata = from_masked_image._opaque_metadata
932 return result
935class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
936 """A Pydantic model used to represent a serialized `VisitImage`."""
938 # Inherited attributes are duplicated because that improves the docs
939 # (some limitation in the sphinx/pydantic integration), and these are
940 # important docs.
942 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
943 mask: MaskSerializationModel[P] = pydantic.Field(
944 description="Bitmask that annotates the main image's pixels."
945 )
946 variance: ImageSerializationModel[P] = pydantic.Field(
947 description="Per-pixel variance estimates for the main image."
948 )
949 projection: ProjectionSerializationModel[P] = pydantic.Field(
950 description="Projection that maps the pixel grid to the sky.",
951 )
952 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
953 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
954 )
955 obs_info: ObservationInfo = pydantic.Field(
956 description="Standardized description of visit metadata",
957 )
958 photometric_scaling: FieldSerializationModel | None = pydantic.Field(
959 default=None,
960 description="Scaling that can be used to multiply a post-ISR image to yield calibrated pixel values.",
961 )
962 summary_stats: ObservationSummaryStats = pydantic.Field(
963 description="Summary statistics for the observation."
964 )
965 detector: DetectorSerializationModel = pydantic.Field(
966 description="Geometry and electronic information for the detector."
967 )
968 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field(
969 default_factory=ApertureCorrectionMapSerializationModel,
970 description="Aperture corrections, keyed by flux algorithm.",
971 )
972 bounds: SerializableBounds | None = pydantic.Field(
973 default=None,
974 description="Pixel validity region, if different from the image bounding box.",
975 exclude_if=is_none,
976 )
977 backgrounds: BackgroundMapSerializationModel = pydantic.Field(
978 default_factory=BackgroundMapSerializationModel,
979 description="Background models associated with this image.",
980 )
982 def deserialize(
983 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any
984 ) -> VisitImage:
985 if kwargs:
986 raise InvalidParameterError(f"Unrecognized parameters for VisitImage: {set(kwargs.keys())}.")
987 masked_image = super().deserialize(archive, bbox=bbox)
988 try:
989 psf = self.psf.deserialize(archive)
990 except ArchiveReadError as err:
991 # Defer this until/unless somebody actually asks for the PSF.
992 psf = err
993 detector = self.detector.deserialize(archive)
994 aperture_corrections = self.aperture_corrections.deserialize(archive)
995 photometric_scaling = (
996 self.photometric_scaling.deserialize(archive) if self.photometric_scaling is not None else None
997 )
998 return VisitImage(
999 masked_image.image,
1000 mask=masked_image.mask,
1001 variance=masked_image.variance,
1002 psf=psf,
1003 projection=masked_image.projection,
1004 obs_info=self.obs_info,
1005 summary_stats=self.summary_stats,
1006 detector=detector,
1007 aperture_corrections=aperture_corrections,
1008 photometric_scaling=photometric_scaling,
1009 bounds=self.bounds.deserialize() if self.bounds is not None else None,
1010 backgrounds=self.backgrounds.deserialize(archive),
1011 )._finish_deserialize(self)
1013 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
1014 if kwargs and component not in ("image", "mask", "variance"):
1015 raise InvalidParameterError(
1016 f"Unsupported parameters for VisitImage component {component}: {set(kwargs.keys())}."
1017 )
1018 return super().deserialize_component(component, archive)
1021def _extract_or_check_value[T](
1022 key: str,
1023 given_value: T | None,
1024 *sources: tuple[str, T | None],
1025) -> T:
1026 # Compare given value against multiple sources. If given value is not
1027 # supplied return the first non-None value in the reference sources.
1028 if given_value is not None:
1029 for source_name, source_value in sources:
1030 if source_value is not None and source_value != given_value:
1031 raise ValueError(
1032 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
1033 )
1034 if source_value is not None:
1035 # Only check the first non-None source rather than checking
1036 # all supplied values.
1037 break
1038 return given_value
1040 for _, source_value in sources:
1041 if source_value is not None:
1042 return source_value
1044 raise ValueError(f"No value found for {key}.")
1047def _extract_or_check_header[T](
1048 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
1049) -> T:
1050 hdr_value: T | None = None
1051 if (hdr_raw_value := header.get(key)) is not None:
1052 hdr_value = coerce(hdr_raw_value)
1053 return _extract_or_check_value(
1054 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
1055 )
1058def _get_unit_conversion_factor(
1059 original: astropy.units.UnitBase, new: astropy.units.UnitBase
1060) -> float | None:
1061 try:
1062 return original.to(new)
1063 except astropy.units.UnitConversionError:
1064 return None