Coverage for python / lsst / images / _visit_image.py: 23%
444 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:25 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:25 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import functools
17import warnings
18from collections.abc import Callable, Mapping, MutableMapping
19from types import EllipsisType
20from typing import TYPE_CHECKING, Any, Literal, cast, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
29from ._backgrounds import BackgroundMap, BackgroundMapSerializationModel
30from ._concrete_bounds import SerializableBounds
31from ._geom import Bounds, Box
32from ._image import Image, ImageSerializationModel
33from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
34from ._masked_image import MaskedImage, MaskedImageSerializationModel
35from ._observation_summary_stats import ObservationSummaryStats
36from ._polygon import Polygon
37from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
38from .aperture_corrections import (
39 ApertureCorrectionMap,
40 ApertureCorrectionMapSerializationModel,
41 aperture_corrections_from_legacy,
42 aperture_corrections_to_legacy,
43)
44from .cameras import Detector, DetectorSerializationModel
45from .fields import BaseField, Field, FieldSerializationModel, field_from_legacy_photo_calib
46from .fits import ExtensionKey, FitsOpaqueMetadata
47from .psfs import (
48 GaussianPointSpreadFunction,
49 GaussianPSFSerializationModel,
50 LegacyPointSpreadFunction,
51 PiffSerializationModel,
52 PiffWrapper,
53 PointSpreadFunction,
54 PSFExSerializationModel,
55 PSFExWrapper,
56)
57from .serialization import ArchiveReadError, InputArchive, InvalidParameterError, MetadataValue, OutputArchive
58from .utils import is_none
60if TYPE_CHECKING:
61 try:
62 from lsst.afw.cameraGeom import Detector as LegacyDetector
63 from lsst.afw.image import Exposure as LegacyExposure
64 from lsst.afw.image import FilterLabel as LegacyFilterLabel
65 from lsst.afw.image import VisitInfo as LegacyVisitInfo
66 except ImportError:
67 type LegacyDetector = Any # type: ignore[no-redef]
68 type LegacyExposure = Any # type: ignore[no-redef]
69 type LegacyFilterLabel = Any # type: ignore[no-redef]
70 type LegacyVisitInfo = Any # type: ignore[no-redef]
73class VisitImage(MaskedImage):
74 """A calibrated single-visit image.
76 Parameters
77 ----------
78 image
79 The main image plane. If this has a `Projection`, it will be used
80 for all planes unless a ``projection`` is passed separately.
81 mask
82 A bitmask image that annotates the main image plane. Must have the
83 same bounding box as ``image`` if provided. Any attached projection
84 is replaced (possibly by `None`).
85 variance
86 The per-pixel uncertainty of the main image as an image of variance
87 values. Must have the same bounding box as ``image`` if provided, and
88 its units must be the square of ``image.unit`` or `None`.
89 Values default to ``1.0``. Any attached projection is replaced
90 (possibly by `None`).
91 mask_schema
92 Schema for the mask plane. Must be provided if and only if ``mask`` is
93 not provided.
94 projection
95 Projection that maps the pixel grid to the sky. Can only be `None` if
96 a projection is already attached to ``image``.
97 bounds
98 The region where this image's pixels and other properties are valid.
99 If not provided, the bounding box of the image is used. Other
100 components (``psf``, ``projection``, ``aperture_corrections``, etc.)
101 are assumed to have their own bounds which may or may not be the same
102 as the image bounds. If ``bounds`` extends beyond the image bounding
103 box, the intersection between ``bounds`` and the image bounding box
104 is used instead.
105 obs_info
106 General information about this visit in standardized form.
107 summary_stats
108 Summary statistics associated with this visit. Initialized to default
109 values if not provided.
110 photometric_scaling
111 Field that can be used to multiply a post-ISR image units to yield
112 calibrated image units. This may be a scaling that was already
113 applied (so dividing by it will recover the post-ISR units) or a
114 scaling that has not been applied, depending on ``image.unit``.
115 psf
116 Point-spread function model for this image, or an exception explaining
117 why it could not be read (to be raised if the PSF is requested later).
118 detector
119 Geometry and electronic information for the detector attached to this
120 image.
121 aperture_corrections : `dict` [`str`, `~fields.BaseField`]
122 Mapping from photometry algorithm name to the aperture correction for
123 that algorithm.
124 backgrounds
125 Background models associated with this image.
126 band
127 Name of the passband the image was observed with (this is a shorter,
128 less specific version of ``obs_info.physical_filter``).
129 metadata
130 Arbitrary flexible metadata to associate with the image.
131 """
133 def __init__(
134 self,
135 image: Image,
136 *,
137 mask: Mask | None = None,
138 variance: Image | None = None,
139 mask_schema: MaskSchema | None = None,
140 projection: Projection[DetectorFrame] | None = None,
141 bounds: Bounds | None = None,
142 obs_info: ObservationInfo | None = None,
143 summary_stats: ObservationSummaryStats | None = None,
144 photometric_scaling: Field | None = None,
145 psf: PointSpreadFunction | ArchiveReadError,
146 detector: Detector,
147 aperture_corrections: ApertureCorrectionMap | None = None,
148 backgrounds: BackgroundMap | None = None,
149 band: str,
150 metadata: dict[str, MetadataValue] | None = None,
151 ):
152 super().__init__(
153 image,
154 mask=mask,
155 variance=variance,
156 mask_schema=mask_schema,
157 projection=projection,
158 metadata=metadata,
159 )
160 if self.image.unit is None:
161 raise TypeError("The image component of a VisitImage must have units.")
162 if self.image.projection is None:
163 raise TypeError("The projection component of a VisitImage cannot be None.")
164 if obs_info is None:
165 raise TypeError("The observation info component of a VisitImage cannot be None.")
166 if obs_info.physical_filter is None:
167 raise ValueError("The obs_info.physical_filter attribute of a VisitImage cannot be None.")
168 self._obs_info = obs_info
169 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
170 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
171 if summary_stats is None:
172 summary_stats = ObservationSummaryStats()
173 self._summary_stats = summary_stats
174 if photometric_scaling is not None and photometric_scaling.unit is None:
175 raise TypeError("If a photometric_scaling is provided, it must have units.")
176 self._photometric_scaling = photometric_scaling
177 self._psf = psf
178 self._detector = detector
179 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {}
180 self._bounds = bounds if bounds is not None else self.bbox
181 if not self.bbox.contains(self._bounds.bbox):
182 self._bounds = self._bounds.intersection(self.bbox)
183 self._backgrounds = backgrounds if backgrounds is not None else BackgroundMap()
184 self._band = band
186 @property
187 def unit(self) -> astropy.units.UnitBase:
188 """The units of the image plane (`astropy.units.Unit`)."""
189 return cast(astropy.units.UnitBase, super().unit)
191 @property
192 def projection(self) -> Projection[DetectorFrame]:
193 """The projection that maps the pixel grid to the sky
194 (`Projection` [`DetectorFrame`]).
195 """
196 return cast(Projection[DetectorFrame], super().projection)
198 @property
199 def bounds(self) -> Bounds:
200 """The region where pixels are valid (`Bounds`)."""
201 return self._bounds
203 @property
204 def obs_info(self) -> ObservationInfo:
205 """General information about this observation in standard form.
206 (`~astro_metadata_translator.ObservationInfo`).
207 """
208 return self._obs_info
210 @property
211 def physical_filter(self) -> str:
212 """Full name of the physical bandpass filter (`str`)."""
213 assert self._obs_info.physical_filter is not None, "Guaranteed at construction."
214 return self._obs_info.physical_filter
216 @property
217 def band(self) -> str:
218 """Short name of the bandpass filter (`str`)."""
219 return self._band
221 @property
222 def astropy_wcs(self) -> ProjectionAstropyView:
223 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
225 Notes
226 -----
227 As expected for Astropy WCS objects, this defines pixel coordinates
228 such that the first row and column in the arrays are ``(0, 0)``, not
229 ``bbox.start``, as is the case for `projection`.
231 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
232 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
233 `astropy.wcs.WCS` (use `fits_wcs` for that).
234 """
235 return cast(ProjectionAstropyView, super().astropy_wcs)
237 @property
238 def summary_stats(self) -> ObservationSummaryStats:
239 """Optional summary statistics for this observation
240 (`ObservationSummaryStats`).
241 """
242 return self._summary_stats
244 @property
245 def photometric_scaling(self) -> Field | None:
246 """Field that multiplies a post-ISR image to yield the calibrated
247 image (~`fields.BaseField`).
248 """
249 return self._photometric_scaling
251 @photometric_scaling.setter
252 def photometric_scaling(self, value: Field):
253 if value.unit is None:
254 raise TypeError("The photometric_scaling for a VisitImage must have units.")
255 self._photometric_scaling = value
257 @property
258 def psf(self) -> PointSpreadFunction:
259 """The point-spread function model for this image
260 (`.psfs.PointSpreadFunction`).
261 """
262 if isinstance(self._psf, ArchiveReadError):
263 raise self._psf
264 return self._psf
266 @property
267 def detector(self) -> Detector:
268 """Geometry and electronic information about the detector
269 (`.cameras.Detector`).
270 """
271 return self._detector
273 @property
274 def aperture_corrections(self) -> ApertureCorrectionMap:
275 """A mapping from photometry algorithm name to the aperture correction
276 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]).
277 """
278 return self._aperture_corrections
280 @property
281 def backgrounds(self) -> BackgroundMap:
282 """A mapping of backgrounds associated with this image
283 (`BackgroundMap`).
284 """
285 return self._backgrounds
287 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
288 if bbox is ...:
289 return self
290 super().__getitem__(bbox)
291 return self._transfer_metadata(
292 VisitImage(
293 self.image[bbox],
294 mask=self.mask[bbox],
295 variance=self.variance[bbox],
296 projection=self.projection,
297 psf=self.psf,
298 obs_info=self.obs_info,
299 bounds=self._bounds, # don't need to intersect here, because __init__ will do that.
300 summary_stats=self.summary_stats,
301 detector=self._detector,
302 photometric_scaling=self._photometric_scaling,
303 aperture_corrections=self.aperture_corrections,
304 backgrounds=self._backgrounds,
305 band=self._band,
306 ),
307 bbox=bbox,
308 )
310 def __str__(self) -> str:
311 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
313 def __repr__(self) -> str:
314 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
316 def copy(self, *, copy_detector: bool = False) -> VisitImage:
317 """Deep-copy the visit image.
319 Parameters
320 ----------
321 copy_detector
322 Whether to deep-copy the `detector` attribute.
323 """
324 return self._transfer_metadata(
325 VisitImage(
326 image=self._image.copy(),
327 mask=self._mask.copy(),
328 variance=self._variance.copy(),
329 psf=self._psf,
330 obs_info=self.obs_info,
331 bounds=self._bounds,
332 summary_stats=self.summary_stats.model_copy(),
333 detector=self._detector.copy() if copy_detector else self._detector,
334 photometric_scaling=self._photometric_scaling,
335 aperture_corrections=self.aperture_corrections.copy(),
336 backgrounds=self._backgrounds.copy(),
337 band=self.band,
338 ),
339 copy=True,
340 )
342 def convert_unit(
343 self,
344 unit: astropy.units.UnitBase = astropy.units.nJy,
345 copy: Literal["as-needed"] | bool = True,
346 copy_detector: bool = False,
347 ) -> VisitImage:
348 """Return an equivalent image with different pixel units.
350 Parameters
351 ----------
352 unit
353 The unit to transform to. This may be any of the following:
355 - any unit directly relatable to the current units via Astropy;
356 - any unit relatable to the product of the current units with the
357 `photometric_scaling` (i.e. if the current image is in
358 instrumental units but we know how to calibrate them)
359 - any unit relatable to the quotient of the current units with the
360 `photometric_scaling` (i.e. if the current image is in
361 calibrated units and we want to revert back to instrumental
362 units).
363 copy
364 Whether to copy the images and other components. If `True`, all
365 components that aren't controlled by some other argument will
366 always be deep-copied. If `False`, the operation will fail if the
367 image is not already in the right units. If ``as-needed``, only
368 the image and variance will be copied, and only if they are not
369 already in the right units.
370 copy_detector
371 Whether to deep-copy the `detector` attribute.
373 Returns
374 -------
375 `VisitImage`
376 An image with the given units.
377 """
378 if copy not in (True, False, "as-needed"):
379 raise TypeError(f"Invalid value for 'copy' parameter: {copy!r}.")
380 if (factor := _get_unit_conversion_factor(self.unit, unit)) is not None:
381 if factor == 1.0:
382 if copy is True: # not "as-needed"
383 return self.copy()
384 else:
385 return self[...]
386 elif copy is False:
387 raise astropy.units.UnitConversionError(
388 f"Units must be converted ({self.unit} -> {unit}), but copy=False."
389 )
390 image = Image(self._image.array * factor, bbox=self.bbox, projection=self.projection, unit=unit)
391 variance = Image(
392 self._variance.array * factor**2,
393 bbox=self.bbox,
394 unit=unit**2,
395 )
396 elif self._photometric_scaling is None:
397 raise astropy.units.UnitConversionError(
398 "VisitImage.photometric_scaling is None, and there "
399 f"is no constant conversion from {self.unit} to {unit}."
400 )
401 else:
402 if copy is False:
403 raise astropy.units.UnitConversionError(
404 f"Photometric scaling must be applied to go from ={self.unit} to {unit}, but copy=False."
405 )
406 scaling = self._photometric_scaling
407 assert scaling.unit is not None, "Checked at construction."
408 if (constant_factor := _get_unit_conversion_factor(self.unit * scaling.unit, unit)) is not None:
409 if constant_factor != 1.0:
410 scaling = scaling * constant_factor
411 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array
412 elif (constant_factor := _get_unit_conversion_factor(self.unit / scaling.unit, unit)) is not None:
413 if constant_factor != 1.0:
414 scaling = scaling / constant_factor
415 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array
416 np.true_divide(1.0, scaling_array, out=scaling_array)
417 else:
418 raise astropy.units.UnitConversionError(
419 f"photometric_scaling with units {scaling.unit} does not "
420 f"provide a path from {self.unit} to {unit}."
421 )
422 # We needed to allocate a new array to evaluate the scaling field,
423 # and then we need to allocate another to hold its square for the
424 # variance scaling. But then we can multiply those arrays in-place
425 # to get the output image and variance to avoid yet more
426 # allocations (note we can't instead multiply the visit image's
427 # image and variance arrays in place because they might have other
428 # references that are still associated with the old units).
429 image = Image(scaling_array, bbox=self.bbox, unit=unit)
430 variance = Image(np.square(scaling_array), bbox=self.bbox, unit=unit**2)
431 image.array *= self._image.array
432 variance.array *= self._variance.array
433 copy_components = copy is True
434 return self._transfer_metadata(
435 VisitImage(
436 image=image,
437 mask=self._mask if not copy_components else self._mask.copy(),
438 variance=variance,
439 projection=self.projection, # never copied; immutable
440 obs_info=self.obs_info if not copy_components else self.obs_info.model_copy(),
441 psf=self._psf, # never copied; immutable
442 bounds=self._bounds, # never copied; immutable
443 summary_stats=self.summary_stats if not copy_components else self.summary_stats.model_copy(),
444 detector=self._detector if not copy_detector else self._detector.copy(),
445 photometric_scaling=self._photometric_scaling, # never copied; immutable
446 aperture_corrections=(
447 self.aperture_corrections if not copy_components else self.aperture_corrections.copy()
448 ),
449 backgrounds=self.backgrounds if not copy_components else self.backgrounds.copy(),
450 band=self.band,
451 )
452 )
454 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
455 masked_image_model = super().serialize(archive)
456 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
457 match self._psf:
458 # MyPy is able to figure things out here with this match statement,
459 # but not a single isinstance check on both types.
460 case PiffWrapper():
461 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
462 case PSFExWrapper():
463 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
464 case GaussianPointSpreadFunction():
465 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
466 case _:
467 raise TypeError(
468 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
469 )
470 assert masked_image_model.projection is not None, "VisitImage always has a projection."
471 serialized_detector = archive.serialize_direct("detector", self._detector.serialize)
472 serialized_photometric_scaling = (
473 archive.serialize_direct("photometric_scaling", self._photometric_scaling.serialize)
474 if self._photometric_scaling is not None
475 else None
476 )
477 serialized_aperture_corrections = archive.serialize_direct(
478 "aperture_corrections",
479 functools.partial(ApertureCorrectionMapSerializationModel.serialize, self.aperture_corrections),
480 )
481 serialized_backgrounds = archive.serialize_direct("backgrounds", self._backgrounds.serialize)
482 return VisitImageSerializationModel(
483 image=masked_image_model.image,
484 mask=masked_image_model.mask,
485 variance=masked_image_model.variance,
486 projection=masked_image_model.projection,
487 obs_info=self.obs_info,
488 photometric_scaling=serialized_photometric_scaling,
489 psf=serialized_psf,
490 summary_stats=self.summary_stats,
491 detector=serialized_detector,
492 aperture_corrections=serialized_aperture_corrections,
493 bounds=self._bounds.serialize() if self._bounds != self.bbox else None,
494 backgrounds=serialized_backgrounds,
495 band=self.band,
496 metadata=self.metadata,
497 )
499 @staticmethod
500 def _get_archive_tree_type[P: pydantic.BaseModel](
501 pointer_type: type[P],
502 ) -> type[VisitImageSerializationModel[P]]:
503 """Return the serialization model type for this object for an archive
504 type that uses the given pointer type.
505 """
506 return VisitImageSerializationModel[pointer_type] # type: ignore
508 # write_fits and read_fits inherited from MaskedImage.
510 @staticmethod
511 def from_legacy(
512 legacy: LegacyExposure,
513 *,
514 unit: astropy.units.UnitBase | None = None,
515 plane_map: Mapping[str, MaskPlane] | None = None,
516 instrument: str | None = None,
517 visit: int | None = None,
518 ) -> VisitImage:
519 """Convert from an `lsst.afw.image.Exposure` instance.
521 Parameters
522 ----------
523 legacy
524 An `lsst.afw.image.Exposure` instance that will share image and
525 variance (but not mask) pixel data with the returned object.
526 unit
527 Units of the image. If not provided, the ``BUNIT`` metadata
528 key will be used, if available.
529 plane_map
530 A mapping from legacy mask plane name to the new plane name and
531 description. If `None` (default)
532 `get_legacy_visit_image_mask_planes` is used.
533 instrument
534 Name of the instrument. Extracted from the metadata if not
535 provided.
536 visit
537 ID of the visit. Extracted from the metadata if not provided.
538 """
539 if plane_map is None:
540 plane_map = get_legacy_visit_image_mask_planes()
541 md = legacy.getMetadata()
542 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
543 instrument = _extract_or_check_header(
544 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
545 )
546 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, obs_info.exposure_id, int)
547 legacy_wcs = legacy.getWcs()
548 if legacy_wcs is None:
549 raise ValueError("Exposure does not have a SkyWcs.")
550 legacy_detector = legacy.getDetector()
551 if legacy_detector is None:
552 raise ValueError("Exposure does not have a Detector.")
553 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
555 # Update the ObservationInfo from other components.
556 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
558 opaque_fits_metadata = FitsOpaqueMetadata()
559 primary_header = astropy.io.fits.Header()
560 with warnings.catch_warnings():
561 # Silence warnings about long keys becoming HIERARCH.
562 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
563 primary_header.update(md.toOrderedDict())
564 metadata = opaque_fits_metadata.extract_legacy_primary_header(primary_header)
565 instrumental_unit = opaque_fits_metadata.get_instrumental_unit() or astropy.units.electron
566 hdr_unit: astropy.units.UnitBase | None = None
567 if hdr_unit_str := md.get("BUNIT"):
568 hdr_unit = astropy.units.Unit(hdr_unit_str, format="FITS")
569 if hdr_unit == astropy.units.adu and instrumental_unit == astropy.units.electron:
570 # Fix incorrect BUNIT='adu' in LSST
571 # preliminary_visit_image.
572 hdr_unit = astropy.units.electron
573 if unit is None:
574 unit = hdr_unit
575 elif hdr_unit is not None and hdr_unit != unit:
576 raise ValueError(f"BUNIT value {hdr_unit} disagrees with given unit {unit}.")
577 projection = Projection.from_legacy(
578 legacy_wcs,
579 DetectorFrame(
580 instrument=instrument,
581 visit=visit,
582 detector=legacy_detector.getId(),
583 bbox=detector_bbox,
584 ),
585 )
586 legacy_psf = legacy.getPsf()
587 if legacy_psf is None:
588 raise ValueError("Exposure file does not have a Psf.")
589 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
590 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
591 legacy_summary_stats = legacy.info.getSummaryStats()
592 legacy_ap_corr_map = legacy.info.getApCorrMap()
593 legacy_polygon = legacy.info.getValidPolygon()
594 legacy_photo_calib = legacy.info.getPhotoCalib()
595 result = VisitImage(
596 image=masked_image.image.view(unit=unit),
597 mask=masked_image.mask,
598 variance=masked_image.variance,
599 projection=projection,
600 psf=psf,
601 obs_info=obs_info,
602 summary_stats=(
603 ObservationSummaryStats.from_legacy(legacy_summary_stats)
604 if legacy_summary_stats is not None
605 else None
606 ),
607 detector=Detector.from_legacy(
608 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
609 ),
610 aperture_corrections=(
611 aperture_corrections_from_legacy(legacy_ap_corr_map)
612 if legacy_ap_corr_map is not None
613 else None
614 ),
615 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
616 photometric_scaling=(
617 field_from_legacy_photo_calib(
618 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit
619 )
620 if legacy_photo_calib is not None
621 else None
622 ),
623 band=legacy.info.getFilter().bandLabel,
624 metadata=metadata,
625 )
626 result.metadata["id"] = legacy.info.getId()
627 result._opaque_metadata = opaque_fits_metadata
628 return result
630 def to_legacy(
631 self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None
632 ) -> LegacyExposure:
633 """Convert to an `lsst.afw.image.Exposure` instance.
635 Parameters
636 ----------
637 copy
638 If `True`, always copy the image and variance pixel data.
639 If `False`, return a view, and raise `TypeError` if the pixel data
640 is read-only (this is not supported by afw). If `None`, only copy
641 if the pixel data is read-only. Mask pixel data is always copied.
642 plane_map
643 A mapping from legacy mask plane name to the new plane name and
644 description. If `None` (default),
645 `get_legacy_visit_image_mask_planes` is used.
646 """
647 from lsst.afw.image import Exposure as LegacyExposure
648 from lsst.afw.image import FilterLabel as LegacyFilterLabel
649 from lsst.obs.base.makeRawVisitInfoViaObsInfo import MakeRawVisitInfoViaObsInfo
651 if plane_map is None:
652 plane_map = get_legacy_visit_image_mask_planes()
653 legacy_masked_image = super().to_legacy(copy=copy, plane_map=plane_map)
654 result = LegacyExposure(legacy_masked_image, dtype=self.image.array.dtype)
655 result_info = result.info
656 result_info.setId(self.metadata.get("id"))
657 result_info.setWcs(self.projection.to_legacy())
658 result_info.setDetector(self.detector.to_legacy())
659 result_info.setFilter(LegacyFilterLabel.fromBandPhysical(self.band, self.obs_info.physical_filter))
660 if self._photometric_scaling is not None:
661 result_info.setPhotoCalib(self._photometric_scaling.to_legacy_photo_calib(self.unit))
662 else:
663 result_info.setPhotoCalib(BaseField.make_legacy_photo_calib(self.unit))
664 # We just dump all of the FITS headers and non-FITS metadata into the
665 # legacy metadata component, to make sure we have everything. We dump
666 # the latter into a pair of special cards to be able to full round-trip
667 # them (including case preservation).
668 result_md = result_info.getMetadata()
669 try:
670 result_md["BUNIT"] = self.unit.to_string(format="fits")
671 except ValueError:
672 # Write units that astropy doesn't think FITS will accept anyway;
673 # FITS standard says "SHOULD" about using it's recommended units,
674 # and coloring outside the lines is better than lying.
675 result_md["BUNIT"] = self.unit.to_string()
676 if isinstance(self._opaque_metadata, FitsOpaqueMetadata):
677 result_md.update(self._opaque_metadata.headers[ExtensionKey()])
678 for n, (k, v) in enumerate(self.metadata.items()):
679 result_md[f"LSST IMAGES KEY {n + 1}"] = k
680 result_md[f"LSST IMAGES VALUE {n + 1}"] = v
681 if isinstance(self._psf, LegacyPointSpreadFunction):
682 result_info.setPsf(self._psf.legacy_psf)
683 elif isinstance(self._psf, PiffWrapper):
684 result_info.setPsf(self._psf.to_legacy())
685 if isinstance(self.bounds, Polygon):
686 result_info.setValidPolygon(self.bounds.to_legacy())
687 if self.aperture_corrections:
688 result_info.setApCorrMap(aperture_corrections_to_legacy(self.aperture_corrections))
689 result_info.setVisitInfo(MakeRawVisitInfoViaObsInfo.observationInfo2visitInfo(self.obs_info))
690 result_info.setSummaryStats(self.summary_stats.to_legacy())
691 return result
693 @overload # type: ignore[override]
694 @staticmethod
695 def read_legacy( 695 ↛ exitline 695 didn't return from function 'read_legacy' because
696 filename: str,
697 *,
698 component: Literal["bbox"],
699 ) -> Box: ...
701 @overload
702 @staticmethod
703 def read_legacy( 703 ↛ exitline 703 didn't return from function 'read_legacy' because
704 filename: str,
705 *,
706 preserve_quantization: bool = False,
707 instrument: str | None = None,
708 visit: int | None = None,
709 component: Literal["image"],
710 ) -> Image: ...
712 @overload
713 @staticmethod
714 def read_legacy( 714 ↛ exitline 714 didn't return from function 'read_legacy' because
715 filename: str,
716 *,
717 plane_map: Mapping[str, MaskPlane] | None = None,
718 instrument: str | None = None,
719 visit: int | None = None,
720 component: Literal["mask"],
721 ) -> Mask: ...
723 @overload
724 @staticmethod
725 def read_legacy( 725 ↛ exitline 725 didn't return from function 'read_legacy' because
726 filename: str,
727 *,
728 preserve_quantization: bool = False,
729 instrument: str | None = None,
730 visit: int | None = None,
731 component: Literal["variance"],
732 ) -> Image: ...
734 @overload
735 @staticmethod
736 def read_legacy( 736 ↛ exitline 736 didn't return from function 'read_legacy' because
737 filename: str,
738 *,
739 instrument: str | None = None,
740 visit: int | None = None,
741 component: Literal["projection"],
742 ) -> Projection[DetectorFrame]: ...
744 @overload
745 @staticmethod
746 def read_legacy( 746 ↛ exitline 746 didn't return from function 'read_legacy' because
747 filename: str,
748 *,
749 component: Literal["psf"],
750 ) -> PointSpreadFunction: ...
752 @overload
753 @staticmethod
754 def read_legacy( 754 ↛ exitline 754 didn't return from function 'read_legacy' because
755 filename: str,
756 *,
757 component: Literal["detector"],
758 ) -> Detector: ...
760 @overload
761 @staticmethod
762 def read_legacy( 762 ↛ exitline 762 didn't return from function 'read_legacy' because
763 filename: str,
764 *,
765 component: Literal["obs_info"],
766 ) -> ObservationInfo: ...
768 @overload
769 @staticmethod
770 def read_legacy( 770 ↛ exitline 770 didn't return from function 'read_legacy' because
771 filename: str,
772 *,
773 component: Literal["photometric_scaling"],
774 ) -> Field | None: ...
776 @overload
777 @staticmethod
778 def read_legacy( 778 ↛ exitline 778 didn't return from function 'read_legacy' because
779 filename: str,
780 *,
781 component: Literal["summary_stats"],
782 ) -> ObservationSummaryStats: ...
784 @overload
785 @staticmethod
786 def read_legacy( 786 ↛ exitline 786 didn't return from function 'read_legacy' because
787 filename: str,
788 *,
789 component: Literal["aperture_corrections"],
790 ) -> ApertureCorrectionMap: ...
792 @overload
793 @staticmethod
794 def read_legacy( 794 ↛ exitline 794 didn't return from function 'read_legacy' because
795 filename: str,
796 *,
797 preserve_quantization: bool = False,
798 plane_map: Mapping[str, MaskPlane] | None = None,
799 instrument: str | None = None,
800 visit: int | None = None,
801 component: None = None,
802 ) -> VisitImage: ...
804 @staticmethod
805 def read_legacy( # type: ignore[override]
806 filename: str,
807 *,
808 preserve_quantization: bool = False,
809 plane_map: Mapping[str, MaskPlane] | None = None,
810 instrument: str | None = None,
811 visit: int | None = None,
812 component: Literal[
813 "bbox",
814 "image",
815 "mask",
816 "variance",
817 "projection",
818 "psf",
819 "detector",
820 "photometric_scaling",
821 "obs_info",
822 "summary_stats",
823 "aperture_corrections",
824 ]
825 | None = None,
826 ) -> Any:
827 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
829 Parameters
830 ----------
831 filename
832 Full name of the file.
833 preserve_quantization
834 If `True`, ensure that writing the masked image back out again will
835 exactly preserve quantization-compressed pixel values. This causes
836 the image and variance plane arrays to be marked as read-only and
837 stores the original binary table data for those planes in memory.
838 If the `MaskedImage` is copied, the precompressed pixel values are
839 not transferred to the copy.
840 plane_map
841 A mapping from legacy mask plane name to the new plane name and
842 description. If `None` (default)
843 `get_legacy_visit_image_mask_planes` is used.
844 instrument
845 Name of the instrument. Read from the primary header if not
846 provided.
847 visit
848 ID of the visit. Read from the primary header if not
849 provided.
850 component
851 A component to read instead of the full image.
852 """
853 from lsst.afw.image import ExposureFitsReader
855 reader = ExposureFitsReader(filename)
856 if component == "bbox":
857 return Box.from_legacy(reader.readBBox())
858 legacy_detector = reader.readDetector()
859 if legacy_detector is None:
860 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
861 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
862 legacy_wcs = None
863 if component in (None, "image", "mask", "variance", "projection"):
864 legacy_wcs = reader.readWcs()
865 if legacy_wcs is None:
866 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
867 legacy_exposure_info = reader.readExposureInfo()
868 summary_stats = None
869 if component in (None, "summary_stats"):
870 legacy_stats = legacy_exposure_info.getSummaryStats()
871 if legacy_stats is not None:
872 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
873 if component == "summary_stats":
874 return summary_stats
875 if component in (None, "psf"):
876 legacy_psf = reader.readPsf()
877 if legacy_psf is None:
878 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
879 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
880 if component == "psf":
881 return psf
882 aperture_corrections: ApertureCorrectionMap = {}
883 if component in (None, "aperture_corrections"):
884 legacy_ap_corr_map = reader.readApCorrMap()
885 if legacy_ap_corr_map is not None:
886 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map)
887 if component == "aperture_corrections":
888 return aperture_corrections
889 assert component in (
890 None,
891 "image",
892 "mask",
893 "variance",
894 "projection",
895 "obs_info",
896 "detector",
897 "photometric_scaling",
898 ), component # for MyPy
899 filter_label = reader.readFilter()
900 with astropy.io.fits.open(filename) as hdu_list:
901 primary_header = hdu_list[0].header
902 obs_info = _obs_info_from_md(primary_header)
903 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, filter_label)
904 if component == "obs_info":
905 return obs_info
906 instrument = _extract_or_check_header(
907 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
908 )
909 visit = _extract_or_check_header(
910 "LSST BUTLER DATAID VISIT", visit, primary_header, obs_info.exposure_id, int
911 )
912 opaque_metadata = FitsOpaqueMetadata()
913 # This extraction is destructive, so we need to be sure to pass
914 # this opaque_metadata down to MaskedImage._read_legacy_hdus
915 # so it doesn't try to extract it again.
916 metadata = opaque_metadata.extract_legacy_primary_header(primary_header)
917 if (instrumental_unit := opaque_metadata.get_instrumental_unit()) is None:
918 instrumental_unit = astropy.units.electron
919 photometric_scaling: Field | None = None
920 if component in (None, "photometric_scaling"):
921 legacy_photo_calib = reader.readPhotoCalib()
922 if legacy_photo_calib is not None:
923 photometric_scaling = field_from_legacy_photo_calib(
924 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit
925 )
926 if component == "photometric_scaling":
927 return photometric_scaling
928 if component == "detector":
929 return Detector.from_legacy(
930 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
931 )
932 projection = Projection.from_legacy(
933 legacy_wcs,
934 DetectorFrame(
935 instrument=instrument,
936 visit=visit,
937 detector=legacy_detector.getId(),
938 bbox=detector_bbox,
939 ),
940 )
941 if component == "projection":
942 return projection
943 if plane_map is None:
944 plane_map = get_legacy_visit_image_mask_planes()
945 assert component != "psf", component # for MyPy
946 from_masked_image = MaskedImage._read_legacy_hdus(
947 hdu_list,
948 filename,
949 opaque_metadata=opaque_metadata,
950 preserve_quantization=preserve_quantization,
951 plane_map=plane_map,
952 component=component,
953 )
954 if component is not None:
955 # This is the image, mask, or variance; attach the projection and
956 # obs_info and return
957 return from_masked_image.view(projection=projection)
958 legacy_polygon = reader.readValidPolygon()
959 result = VisitImage(
960 from_masked_image.image,
961 mask=from_masked_image.mask,
962 variance=from_masked_image.variance,
963 projection=projection,
964 psf=psf,
965 detector=Detector.from_legacy(
966 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
967 ),
968 obs_info=obs_info,
969 summary_stats=summary_stats,
970 aperture_corrections=aperture_corrections,
971 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
972 photometric_scaling=photometric_scaling,
973 band=filter_label.bandLabel,
974 metadata=metadata,
975 )
976 result._opaque_metadata = from_masked_image._opaque_metadata
977 result.metadata["id"] = reader.readExposureId()
978 return result
981class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
982 """A Pydantic model used to represent a serialized `VisitImage`."""
984 # Inherited attributes are duplicated because that improves the docs
985 # (some limitation in the sphinx/pydantic integration), and these are
986 # important docs.
988 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
989 mask: MaskSerializationModel[P] = pydantic.Field(
990 description="Bitmask that annotates the main image's pixels."
991 )
992 variance: ImageSerializationModel[P] = pydantic.Field(
993 description="Per-pixel variance estimates for the main image."
994 )
995 projection: ProjectionSerializationModel[P] = pydantic.Field(
996 description="Projection that maps the pixel grid to the sky.",
997 )
998 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
999 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
1000 )
1001 obs_info: ObservationInfo = pydantic.Field(
1002 description="Standardized description of visit metadata",
1003 )
1004 photometric_scaling: FieldSerializationModel | None = pydantic.Field(
1005 default=None,
1006 description="Scaling that can be used to multiply a post-ISR image to yield calibrated pixel values.",
1007 )
1008 summary_stats: ObservationSummaryStats = pydantic.Field(
1009 description="Summary statistics for the observation."
1010 )
1011 detector: DetectorSerializationModel = pydantic.Field(
1012 description="Geometry and electronic information for the detector."
1013 )
1014 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field(
1015 default_factory=ApertureCorrectionMapSerializationModel,
1016 description="Aperture corrections, keyed by flux algorithm.",
1017 )
1018 bounds: SerializableBounds | None = pydantic.Field(
1019 default=None,
1020 description="Pixel validity region, if different from the image bounding box.",
1021 exclude_if=is_none,
1022 )
1023 backgrounds: BackgroundMapSerializationModel = pydantic.Field(
1024 default_factory=BackgroundMapSerializationModel,
1025 description="Background models associated with this image.",
1026 )
1027 band: str = pydantic.Field(description="Short name of the bandpass filter.")
1029 def deserialize(
1030 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any
1031 ) -> VisitImage:
1032 if kwargs:
1033 raise InvalidParameterError(f"Unrecognized parameters for VisitImage: {set(kwargs.keys())}.")
1034 masked_image = super().deserialize(archive, bbox=bbox)
1035 try:
1036 psf = self.psf.deserialize(archive)
1037 except ArchiveReadError as err:
1038 # Defer this until/unless somebody actually asks for the PSF.
1039 psf = err
1040 detector = self.detector.deserialize(archive)
1041 aperture_corrections = self.aperture_corrections.deserialize(archive)
1042 photometric_scaling = (
1043 self.photometric_scaling.deserialize(archive) if self.photometric_scaling is not None else None
1044 )
1045 return VisitImage(
1046 masked_image.image,
1047 mask=masked_image.mask,
1048 variance=masked_image.variance,
1049 psf=psf,
1050 projection=masked_image.projection,
1051 obs_info=self.obs_info,
1052 summary_stats=self.summary_stats,
1053 detector=detector,
1054 aperture_corrections=aperture_corrections,
1055 photometric_scaling=photometric_scaling,
1056 bounds=self.bounds.deserialize() if self.bounds is not None else None,
1057 backgrounds=self.backgrounds.deserialize(archive),
1058 band=self.band,
1059 )._finish_deserialize(self)
1061 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
1062 if kwargs and component not in ("image", "mask", "variance"):
1063 raise InvalidParameterError(
1064 f"Unsupported parameters for VisitImage component {component}: {set(kwargs.keys())}."
1065 )
1066 return super().deserialize_component(component, archive)
1069def _obs_info_from_md(
1070 md: MutableMapping[str, Any], visit_info: LegacyVisitInfo | None = None
1071) -> ObservationInfo:
1072 # Try to get an ObservationInfo from the primary header as if
1073 # it's a raw header. Else fallback.
1074 try:
1075 obs_info = ObservationInfo.from_header(md, quiet=True)
1076 except ValueError:
1077 # Not known translator. Must fall back to visit info. If we have
1078 # an actual VisitInfo, serialize it since we know that it will be
1079 # complete.
1080 if visit_info is not None:
1081 from lsst.afw.image import setVisitInfoMetadata
1082 from lsst.daf.base import PropertyList
1084 pl = PropertyList()
1085 setVisitInfoMetadata(pl, visit_info)
1086 # Merge so that we still have access to butler provenance.
1087 md.update(pl)
1089 # Try the given header looking for VisitInfo hints.
1090 # We get lots of warnings if nothing can be found. Currently
1091 # no way to disable those without capturing them.
1092 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
1093 return obs_info
1096def _update_obs_info_from_legacy(
1097 obs_info: ObservationInfo,
1098 detector: LegacyDetector | None = None,
1099 filter_label: LegacyFilterLabel | None = None,
1100) -> ObservationInfo:
1101 extra_md: dict[str, str | int] = {}
1103 if filter_label is not None and filter_label.hasBandLabel():
1104 extra_md["physical_filter"] = filter_label.physicalLabel
1106 # Fill in detector metadata, check for consistency.
1107 # ObsInfo detector name and group can not be derived from
1108 # the getName() information without knowing how the components
1109 # are separated.
1110 if detector is not None:
1111 detector_md = {
1112 "detector_num": detector.getId(),
1113 "detector_serial": detector.getSerial(),
1114 "detector_unique_name": detector.getName(),
1115 }
1116 extra_md.update(detector_md)
1118 obs_info_updates: dict[str, str | int] = {}
1119 for k, v in extra_md.items():
1120 current = getattr(obs_info, k)
1121 if current is None:
1122 obs_info_updates[k] = v
1123 continue
1124 if current != v:
1125 raise RuntimeError(
1126 f"ObservationInfo contains value for '{k}' that is inconsistent "
1127 f"with given legacy object: {v} != {current}"
1128 )
1130 if obs_info_updates:
1131 obs_info = obs_info.model_copy(update=obs_info_updates)
1132 return obs_info
1135def _extract_or_check_value[T](
1136 key: str,
1137 given_value: T | None,
1138 *sources: tuple[str, T | None],
1139) -> T:
1140 # Compare given value against multiple sources. If given value is not
1141 # supplied return the first non-None value in the reference sources.
1142 if given_value is not None:
1143 for source_name, source_value in sources:
1144 if source_value is not None and source_value != given_value:
1145 raise ValueError(
1146 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
1147 )
1148 if source_value is not None:
1149 # Only check the first non-None source rather than checking
1150 # all supplied values.
1151 break
1152 return given_value
1154 for _, source_value in sources:
1155 if source_value is not None:
1156 return source_value
1158 raise ValueError(f"No value found for {key}.")
1161def _extract_or_check_header[T](
1162 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
1163) -> T:
1164 hdr_value: T | None = None
1165 if (hdr_raw_value := header.get(key)) is not None:
1166 hdr_value = coerce(hdr_raw_value)
1167 return _extract_or_check_value(
1168 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
1169 )
1172def _get_unit_conversion_factor(
1173 original: astropy.units.UnitBase, new: astropy.units.UnitBase
1174) -> float | None:
1175 try:
1176 return original.to(new)
1177 except astropy.units.UnitConversionError:
1178 return None