Coverage for python/lsst/images/_visit_image.py: 23%
447 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 01:09 -0700
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 01:09 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import functools
17import warnings
18from collections.abc import Callable, Mapping, MutableMapping
19from types import EllipsisType
20from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
29from ._backgrounds import BackgroundMap, BackgroundMapSerializationModel
30from ._concrete_bounds import SerializableBounds
31from ._geom import Bounds, Box
32from ._image import Image, ImageSerializationModel
33from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
34from ._masked_image import MaskedImage, MaskedImageSerializationModel
35from ._observation_summary_stats import ObservationSummaryStats
36from ._polygon import Polygon
37from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
38from .aperture_corrections import (
39 ApertureCorrectionMap,
40 ApertureCorrectionMapSerializationModel,
41 aperture_corrections_from_legacy,
42 aperture_corrections_to_legacy,
43)
44from .cameras import Detector, DetectorSerializationModel
45from .fields import BaseField, Field, FieldSerializationModel, field_from_legacy_photo_calib
46from .fits import ExtensionKey, FitsOpaqueMetadata
47from .psfs import (
48 GaussianPointSpreadFunction,
49 GaussianPSFSerializationModel,
50 LegacyPointSpreadFunction,
51 PiffSerializationModel,
52 PiffWrapper,
53 PointSpreadFunction,
54 PSFExSerializationModel,
55 PSFExWrapper,
56)
57from .serialization import ArchiveReadError, InputArchive, InvalidParameterError, MetadataValue, OutputArchive
58from .utils import is_none
60if TYPE_CHECKING:
61 try:
62 from lsst.afw.cameraGeom import Detector as LegacyDetector
63 from lsst.afw.image import Exposure as LegacyExposure
64 from lsst.afw.image import FilterLabel as LegacyFilterLabel
65 from lsst.afw.image import VisitInfo as LegacyVisitInfo
66 except ImportError:
67 type LegacyDetector = Any # type: ignore[no-redef]
68 type LegacyExposure = Any # type: ignore[no-redef]
69 type LegacyFilterLabel = Any # type: ignore[no-redef]
70 type LegacyVisitInfo = Any # type: ignore[no-redef]
73class VisitImage(MaskedImage):
74 """A calibrated single-visit image.
76 Parameters
77 ----------
78 image
79 The main image plane. If this has a `Projection`, it will be used
80 for all planes unless a ``projection`` is passed separately.
81 mask
82 A bitmask image that annotates the main image plane. Must have the
83 same bounding box as ``image`` if provided. Any attached projection
84 is replaced (possibly by `None`).
85 variance
86 The per-pixel uncertainty of the main image as an image of variance
87 values. Must have the same bounding box as ``image`` if provided, and
88 its units must be the square of ``image.unit`` or `None`.
89 Values default to ``1.0``. Any attached projection is replaced
90 (possibly by `None`).
91 mask_schema
92 Schema for the mask plane. Must be provided if and only if ``mask`` is
93 not provided.
94 projection
95 Projection that maps the pixel grid to the sky. Can only be `None` if
96 a projection is already attached to ``image``.
97 bounds
98 The region where this image's pixels and other properties are valid.
99 If not provided, the bounding box of the image is used. Other
100 components (``psf``, ``projection``, ``aperture_corrections``, etc.)
101 are assumed to have their own bounds which may or may not be the same
102 as the image bounds. If ``bounds`` extends beyond the image bounding
103 box, the intersection between ``bounds`` and the image bounding box
104 is used instead.
105 obs_info
106 General information about this visit in standardized form.
107 summary_stats
108 Summary statistics associated with this visit. Initialized to default
109 values if not provided.
110 photometric_scaling
111 Field that can be used to multiply a post-ISR image units to yield
112 calibrated image units. This may be a scaling that was already
113 applied (so dividing by it will recover the post-ISR units) or a
114 scaling that has not been applied, depending on ``image.unit``.
115 psf
116 Point-spread function model for this image, or an exception explaining
117 why it could not be read (to be raised if the PSF is requested later).
118 detector
119 Geometry and electronic information for the detector attached to this
120 image.
121 aperture_corrections : `dict` [`str`, `~fields.BaseField`]
122 Mapping from photometry algorithm name to the aperture correction for
123 that algorithm.
124 backgrounds
125 Background models associated with this image.
126 band
127 Name of the passband the image was observed with (this is a shorter,
128 less specific version of ``obs_info.physical_filter``).
129 metadata
130 Arbitrary flexible metadata to associate with the image.
131 """
133 def __init__(
134 self,
135 image: Image,
136 *,
137 mask: Mask | None = None,
138 variance: Image | None = None,
139 mask_schema: MaskSchema | None = None,
140 projection: Projection[DetectorFrame] | None = None,
141 bounds: Bounds | None = None,
142 obs_info: ObservationInfo | None = None,
143 summary_stats: ObservationSummaryStats | None = None,
144 photometric_scaling: Field | None = None,
145 psf: PointSpreadFunction | ArchiveReadError,
146 detector: Detector,
147 aperture_corrections: ApertureCorrectionMap | None = None,
148 backgrounds: BackgroundMap | None = None,
149 band: str,
150 metadata: dict[str, MetadataValue] | None = None,
151 ):
152 super().__init__(
153 image,
154 mask=mask,
155 variance=variance,
156 mask_schema=mask_schema,
157 projection=projection,
158 metadata=metadata,
159 )
160 if self.image.unit is None:
161 raise TypeError("The image component of a VisitImage must have units.")
162 if self.image.projection is None:
163 raise TypeError("The projection component of a VisitImage cannot be None.")
164 if obs_info is None:
165 raise TypeError("The observation info component of a VisitImage cannot be None.")
166 if obs_info.physical_filter is None:
167 raise ValueError("The obs_info.physical_filter attribute of a VisitImage cannot be None.")
168 self._obs_info = obs_info
169 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
170 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
171 if summary_stats is None:
172 summary_stats = ObservationSummaryStats()
173 self._summary_stats = summary_stats
174 if photometric_scaling is not None and photometric_scaling.unit is None:
175 raise TypeError("If a photometric_scaling is provided, it must have units.")
176 self._photometric_scaling = photometric_scaling
177 self._psf = psf
178 self._detector = detector
179 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {}
180 self._bounds = bounds if bounds is not None else self.bbox
181 if not self.bbox.contains(self._bounds.bbox):
182 self._bounds = self._bounds.intersection(self.bbox)
183 self._backgrounds = backgrounds if backgrounds is not None else BackgroundMap()
184 self._band = band
186 @property
187 def unit(self) -> astropy.units.UnitBase:
188 """The units of the image plane (`astropy.units.Unit`)."""
189 return cast(astropy.units.UnitBase, super().unit)
191 @property
192 def projection(self) -> Projection[DetectorFrame]:
193 """The projection that maps the pixel grid to the sky
194 (`Projection` [`DetectorFrame`]).
195 """
196 return cast(Projection[DetectorFrame], super().projection)
198 @property
199 def bounds(self) -> Bounds:
200 """The region where pixels are valid (`Bounds`)."""
201 return self._bounds
203 @property
204 def obs_info(self) -> ObservationInfo:
205 """General information about this observation in standard form.
206 (`~astro_metadata_translator.ObservationInfo`).
207 """
208 return self._obs_info
210 @property
211 def physical_filter(self) -> str:
212 """Full name of the physical bandpass filter (`str`)."""
213 assert self._obs_info.physical_filter is not None, "Guaranteed at construction."
214 return self._obs_info.physical_filter
216 @property
217 def band(self) -> str:
218 """Short name of the bandpass filter (`str`)."""
219 return self._band
221 @property
222 def astropy_wcs(self) -> ProjectionAstropyView:
223 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
225 Notes
226 -----
227 As expected for Astropy WCS objects, this defines pixel coordinates
228 such that the first row and column in the arrays are ``(0, 0)``, not
229 ``bbox.start``, as is the case for `projection`.
231 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
232 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
233 `astropy.wcs.WCS` (use `fits_wcs` for that).
234 """
235 return cast(ProjectionAstropyView, super().astropy_wcs)
237 @property
238 def summary_stats(self) -> ObservationSummaryStats:
239 """Optional summary statistics for this observation
240 (`ObservationSummaryStats`).
241 """
242 return self._summary_stats
244 @property
245 def photometric_scaling(self) -> Field | None:
246 """Field that multiplies a post-ISR image to yield the calibrated
247 image (~`fields.BaseField`).
248 """
249 return self._photometric_scaling
251 @photometric_scaling.setter
252 def photometric_scaling(self, value: Field):
253 if value.unit is None:
254 raise TypeError("The photometric_scaling for a VisitImage must have units.")
255 self._photometric_scaling = value
257 @property
258 def psf(self) -> PointSpreadFunction:
259 """The point-spread function model for this image
260 (`.psfs.PointSpreadFunction`).
261 """
262 if isinstance(self._psf, ArchiveReadError):
263 raise self._psf
264 return self._psf
266 @property
267 def detector(self) -> Detector:
268 """Geometry and electronic information about the detector
269 (`.cameras.Detector`).
270 """
271 return self._detector
273 @property
274 def aperture_corrections(self) -> ApertureCorrectionMap:
275 """A mapping from photometry algorithm name to the aperture correction
276 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]).
277 """
278 return self._aperture_corrections
280 @property
281 def backgrounds(self) -> BackgroundMap:
282 """A mapping of backgrounds associated with this image
283 (`BackgroundMap`).
284 """
285 return self._backgrounds
287 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
288 if bbox is ...:
289 return self
290 super().__getitem__(bbox)
291 return self._transfer_metadata(
292 VisitImage(
293 self.image[bbox],
294 mask=self.mask[bbox],
295 variance=self.variance[bbox],
296 projection=self.projection,
297 psf=self.psf,
298 obs_info=self.obs_info,
299 bounds=self._bounds, # don't need to intersect here, because __init__ will do that.
300 summary_stats=self.summary_stats,
301 detector=self._detector,
302 photometric_scaling=self._photometric_scaling,
303 aperture_corrections=self.aperture_corrections,
304 backgrounds=self._backgrounds,
305 band=self._band,
306 ),
307 bbox=bbox,
308 )
310 def __str__(self) -> str:
311 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
313 def __repr__(self) -> str:
314 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
316 def copy(self, *, copy_detector: bool = False) -> VisitImage:
317 """Deep-copy the visit image.
319 Parameters
320 ----------
321 copy_detector
322 Whether to deep-copy the `detector` attribute.
323 """
324 return self._transfer_metadata(
325 VisitImage(
326 image=self._image.copy(),
327 mask=self._mask.copy(),
328 variance=self._variance.copy(),
329 psf=self._psf,
330 obs_info=self.obs_info,
331 bounds=self._bounds,
332 summary_stats=self.summary_stats.model_copy(),
333 detector=self._detector.copy() if copy_detector else self._detector,
334 photometric_scaling=self._photometric_scaling,
335 aperture_corrections=self.aperture_corrections.copy(),
336 backgrounds=self._backgrounds.copy(),
337 band=self.band,
338 ),
339 copy=True,
340 )
342 def convert_unit(
343 self,
344 unit: astropy.units.UnitBase = astropy.units.nJy,
345 copy: Literal["as-needed"] | bool = True,
346 copy_detector: bool = False,
347 ) -> VisitImage:
348 """Return an equivalent image with different pixel units.
350 Parameters
351 ----------
352 unit
353 The unit to transform to. This may be any of the following:
355 - any unit directly relatable to the current units via Astropy;
356 - any unit relatable to the product of the current units with the
357 `photometric_scaling` (i.e. if the current image is in
358 instrumental units but we know how to calibrate them)
359 - any unit relatable to the quotient of the current units with the
360 `photometric_scaling` (i.e. if the current image is in
361 calibrated units and we want to revert back to instrumental
362 units).
363 copy
364 Whether to copy the images and other components. If `True`, all
365 components that aren't controlled by some other argument will
366 always be deep-copied. If `False`, the operation will fail if the
367 image is not already in the right units. If ``as-needed``, only
368 the image and variance will be copied, and only if they are not
369 already in the right units.
370 copy_detector
371 Whether to deep-copy the `detector` attribute.
373 Returns
374 -------
375 `VisitImage`
376 An image with the given units.
377 """
378 if copy not in (True, False, "as-needed"):
379 raise TypeError(f"Invalid value for 'copy' parameter: {copy!r}.")
380 if (factor := _get_unit_conversion_factor(self.unit, unit)) is not None:
381 if factor == 1.0:
382 if copy is True: # not "as-needed"
383 return self.copy()
384 else:
385 return self[...]
386 elif copy is False:
387 raise astropy.units.UnitConversionError(
388 f"Units must be converted ({self.unit} -> {unit}), but copy=False."
389 )
390 image = Image(self._image.array * factor, bbox=self.bbox, projection=self.projection, unit=unit)
391 variance = Image(
392 self._variance.array * factor**2,
393 bbox=self.bbox,
394 unit=unit**2,
395 )
396 elif self._photometric_scaling is None:
397 raise astropy.units.UnitConversionError(
398 "VisitImage.photometric_scaling is None, and there "
399 f"is no constant conversion from {self.unit} to {unit}."
400 )
401 else:
402 if copy is False:
403 raise astropy.units.UnitConversionError(
404 f"Photometric scaling must be applied to go from ={self.unit} to {unit}, but copy=False."
405 )
406 scaling = self._photometric_scaling
407 assert scaling.unit is not None, "Checked at construction."
408 if (constant_factor := _get_unit_conversion_factor(self.unit * scaling.unit, unit)) is not None:
409 if constant_factor != 1.0:
410 scaling = scaling * constant_factor
411 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array
412 elif (constant_factor := _get_unit_conversion_factor(self.unit / scaling.unit, unit)) is not None:
413 if constant_factor != 1.0:
414 scaling = scaling / constant_factor
415 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array
416 np.true_divide(1.0, scaling_array, out=scaling_array)
417 else:
418 raise astropy.units.UnitConversionError(
419 f"photometric_scaling with units {scaling.unit} does not "
420 f"provide a path from {self.unit} to {unit}."
421 )
422 # We needed to allocate a new array to evaluate the scaling field,
423 # and then we need to allocate another to hold its square for the
424 # variance scaling. But then we can multiply those arrays in-place
425 # to get the output image and variance to avoid yet more
426 # allocations (note we can't instead multiply the visit image's
427 # image and variance arrays in place because they might have other
428 # references that are still associated with the old units).
429 image = Image(scaling_array, bbox=self.bbox, unit=unit)
430 variance = Image(np.square(scaling_array), bbox=self.bbox, unit=unit**2)
431 image.array *= self._image.array
432 variance.array *= self._variance.array
433 copy_components = copy is True
434 return self._transfer_metadata(
435 VisitImage(
436 image=image,
437 mask=self._mask if not copy_components else self._mask.copy(),
438 variance=variance,
439 projection=self.projection, # never copied; immutable
440 obs_info=self.obs_info if not copy_components else self.obs_info.model_copy(),
441 psf=self._psf, # never copied; immutable
442 bounds=self._bounds, # never copied; immutable
443 summary_stats=self.summary_stats if not copy_components else self.summary_stats.model_copy(),
444 detector=self._detector if not copy_detector else self._detector.copy(),
445 photometric_scaling=self._photometric_scaling, # never copied; immutable
446 aperture_corrections=(
447 self.aperture_corrections if not copy_components else self.aperture_corrections.copy()
448 ),
449 backgrounds=self.backgrounds if not copy_components else self.backgrounds.copy(),
450 band=self.band,
451 )
452 )
454 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
455 masked_image_model = super().serialize(archive)
456 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
457 match self._psf:
458 # MyPy is able to figure things out here with this match statement,
459 # but not a single isinstance check on both types.
460 case PiffWrapper():
461 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
462 case PSFExWrapper():
463 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
464 case GaussianPointSpreadFunction():
465 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
466 case _:
467 raise TypeError(
468 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
469 )
470 assert masked_image_model.projection is not None, "VisitImage always has a projection."
471 serialized_detector = archive.serialize_direct("detector", self._detector.serialize)
472 serialized_photometric_scaling = (
473 archive.serialize_direct("photometric_scaling", self._photometric_scaling.serialize)
474 if self._photometric_scaling is not None
475 else None
476 )
477 serialized_aperture_corrections = archive.serialize_direct(
478 "aperture_corrections",
479 functools.partial(ApertureCorrectionMapSerializationModel.serialize, self.aperture_corrections),
480 )
481 serialized_backgrounds = archive.serialize_direct("backgrounds", self._backgrounds.serialize)
482 return VisitImageSerializationModel(
483 image=masked_image_model.image,
484 mask=masked_image_model.mask,
485 variance=masked_image_model.variance,
486 projection=masked_image_model.projection,
487 obs_info=self.obs_info,
488 photometric_scaling=serialized_photometric_scaling,
489 psf=serialized_psf,
490 summary_stats=self.summary_stats,
491 detector=serialized_detector,
492 aperture_corrections=serialized_aperture_corrections,
493 bounds=self._bounds.serialize() if self._bounds != self.bbox else None,
494 backgrounds=serialized_backgrounds,
495 band=self.band,
496 metadata=self.metadata,
497 )
499 @staticmethod
500 def _get_archive_tree_type[P: pydantic.BaseModel](
501 pointer_type: type[P],
502 ) -> type[VisitImageSerializationModel[P]]:
503 """Return the serialization model type for this object for an archive
504 type that uses the given pointer type.
505 """
506 return VisitImageSerializationModel[pointer_type] # type: ignore
508 # write_fits and read_fits inherited from MaskedImage.
510 @staticmethod
511 def from_legacy(
512 legacy: LegacyExposure,
513 *,
514 unit: astropy.units.UnitBase | None = None,
515 plane_map: Mapping[str, MaskPlane] | None = None,
516 instrument: str | None = None,
517 visit: int | None = None,
518 ) -> VisitImage:
519 """Convert from an `lsst.afw.image.Exposure` instance.
521 Parameters
522 ----------
523 legacy
524 An `lsst.afw.image.Exposure` instance that will share image and
525 variance (but not mask) pixel data with the returned object.
526 unit
527 Units of the image. If not provided, the ``BUNIT`` metadata
528 key will be used, if available.
529 plane_map
530 A mapping from legacy mask plane name to the new plane name and
531 description. If `None` (default)
532 `get_legacy_visit_image_mask_planes` is used.
533 instrument
534 Name of the instrument. Extracted from the metadata if not
535 provided.
536 visit
537 ID of the visit. Extracted from the metadata if not provided.
538 """
539 if plane_map is None:
540 plane_map = get_legacy_visit_image_mask_planes()
541 md = legacy.getMetadata()
542 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
543 instrument = _extract_or_check_header(
544 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
545 )
546 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, obs_info.exposure_id, int)
547 legacy_wcs = legacy.getWcs()
548 if legacy_wcs is None:
549 raise ValueError("Exposure does not have a SkyWcs.")
550 legacy_detector = legacy.getDetector()
551 if legacy_detector is None:
552 raise ValueError("Exposure does not have a Detector.")
553 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
555 # Update the ObservationInfo from other components.
556 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
558 opaque_fits_metadata = FitsOpaqueMetadata()
559 primary_header = astropy.io.fits.Header()
560 with warnings.catch_warnings():
561 # Silence warnings about long keys becoming HIERARCH.
562 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
563 primary_header.update(md.toOrderedDict())
564 metadata = opaque_fits_metadata.extract_legacy_primary_header(primary_header)
565 instrumental_unit = opaque_fits_metadata.get_instrumental_unit() or astropy.units.electron
566 hdr_unit: astropy.units.UnitBase | None = None
567 if hdr_unit_str := md.get("BUNIT"):
568 hdr_unit = astropy.units.Unit(hdr_unit_str, format="FITS")
569 if hdr_unit == astropy.units.adu and instrumental_unit == astropy.units.electron:
570 # Fix incorrect BUNIT='adu' in LSST
571 # preliminary_visit_image.
572 hdr_unit = astropy.units.electron
573 if unit is None:
574 unit = hdr_unit
575 elif hdr_unit is not None and hdr_unit != unit:
576 raise ValueError(f"BUNIT value {hdr_unit} disagrees with given unit {unit}.")
577 projection = Projection.from_legacy(
578 legacy_wcs,
579 DetectorFrame(
580 instrument=instrument,
581 visit=visit,
582 detector=legacy_detector.getId(),
583 bbox=detector_bbox,
584 ),
585 )
586 legacy_psf = legacy.getPsf()
587 if legacy_psf is None:
588 raise ValueError("Exposure file does not have a Psf.")
589 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
590 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
591 legacy_summary_stats = legacy.info.getSummaryStats()
592 legacy_ap_corr_map = legacy.info.getApCorrMap()
593 legacy_polygon = legacy.info.getValidPolygon()
594 legacy_photo_calib = legacy.info.getPhotoCalib()
595 result = VisitImage(
596 image=masked_image.image.view(unit=unit),
597 mask=masked_image.mask,
598 variance=masked_image.variance,
599 projection=projection,
600 psf=psf,
601 obs_info=obs_info,
602 summary_stats=(
603 ObservationSummaryStats.from_legacy(legacy_summary_stats)
604 if legacy_summary_stats is not None
605 else None
606 ),
607 detector=Detector.from_legacy(
608 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
609 ),
610 aperture_corrections=(
611 aperture_corrections_from_legacy(legacy_ap_corr_map)
612 if legacy_ap_corr_map is not None
613 else None
614 ),
615 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
616 photometric_scaling=(
617 field_from_legacy_photo_calib(
618 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit
619 )
620 if legacy_photo_calib is not None
621 else None
622 ),
623 band=legacy.info.getFilter().bandLabel,
624 metadata=metadata,
625 )
626 result.metadata["id"] = legacy.info.getId()
627 result._opaque_metadata = opaque_fits_metadata
628 return result
630 def to_legacy(
631 self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None
632 ) -> LegacyExposure:
633 """Convert to an `lsst.afw.image.Exposure` instance.
635 Parameters
636 ----------
637 copy
638 If `True`, always copy the image and variance pixel data.
639 If `False`, return a view, and raise `TypeError` if the pixel data
640 is read-only (this is not supported by afw). If `None`, only copy
641 if the pixel data is read-only. Mask pixel data is always copied.
642 plane_map
643 A mapping from legacy mask plane name to the new plane name and
644 description. If `None` (default),
645 `get_legacy_visit_image_mask_planes` is used.
646 """
647 from lsst.afw.image import Exposure as LegacyExposure
648 from lsst.afw.image import FilterLabel as LegacyFilterLabel
649 from lsst.obs.base.makeRawVisitInfoViaObsInfo import MakeRawVisitInfoViaObsInfo
651 if plane_map is None:
652 plane_map = get_legacy_visit_image_mask_planes()
653 legacy_masked_image = super().to_legacy(copy=copy, plane_map=plane_map)
654 result = LegacyExposure(legacy_masked_image, dtype=self.image.array.dtype)
655 result_info = result.info
656 result_info.setId(self.metadata.get("id"))
657 result_info.setWcs(self.projection.to_legacy())
658 result_info.setDetector(self.detector.to_legacy())
659 result_info.setFilter(LegacyFilterLabel.fromBandPhysical(self.band, self.obs_info.physical_filter))
660 if self._photometric_scaling is not None:
661 result_info.setPhotoCalib(self._photometric_scaling.to_legacy_photo_calib(self.unit))
662 else:
663 result_info.setPhotoCalib(BaseField.make_legacy_photo_calib(self.unit))
664 # We just dump all of the FITS headers and non-FITS metadata into the
665 # legacy metadata component, to make sure we have everything. We dump
666 # the latter into a pair of special cards to be able to full round-trip
667 # them (including case preservation).
668 result_md = result_info.getMetadata()
669 try:
670 result_md["BUNIT"] = self.unit.to_string(format="fits")
671 except ValueError:
672 # Write units that astropy doesn't think FITS will accept anyway;
673 # FITS standard says "SHOULD" about using it's recommended units,
674 # and coloring outside the lines is better than lying.
675 result_md["BUNIT"] = self.unit.to_string()
676 if isinstance(self._opaque_metadata, FitsOpaqueMetadata):
677 result_md.update(self._opaque_metadata.headers[ExtensionKey()])
678 for n, (k, v) in enumerate(self.metadata.items()):
679 result_md[f"LSST IMAGES KEY {n + 1}"] = k
680 result_md[f"LSST IMAGES VALUE {n + 1}"] = v
681 if isinstance(self._psf, LegacyPointSpreadFunction):
682 result_info.setPsf(self._psf.legacy_psf)
683 elif isinstance(self._psf, PiffWrapper):
684 result_info.setPsf(self._psf.to_legacy())
685 if isinstance(self.bounds, Polygon):
686 result_info.setValidPolygon(self.bounds.to_legacy())
687 if self.aperture_corrections:
688 result_info.setApCorrMap(aperture_corrections_to_legacy(self.aperture_corrections))
689 result_info.setVisitInfo(MakeRawVisitInfoViaObsInfo.observationInfo2visitInfo(self.obs_info))
690 result_info.setSummaryStats(self.summary_stats.to_legacy())
691 return result
693 @overload # type: ignore[override]
694 @staticmethod
695 def read_legacy( 695 ↛ exitline 695 didn't return from function 'read_legacy' because
696 filename: str,
697 *,
698 component: Literal["bbox"],
699 ) -> Box: ...
701 @overload
702 @staticmethod
703 def read_legacy( 703 ↛ exitline 703 didn't return from function 'read_legacy' because
704 filename: str,
705 *,
706 preserve_quantization: bool = False,
707 instrument: str | None = None,
708 visit: int | None = None,
709 component: Literal["image"],
710 ) -> Image: ...
712 @overload
713 @staticmethod
714 def read_legacy( 714 ↛ exitline 714 didn't return from function 'read_legacy' because
715 filename: str,
716 *,
717 plane_map: Mapping[str, MaskPlane] | None = None,
718 instrument: str | None = None,
719 visit: int | None = None,
720 component: Literal["mask"],
721 ) -> Mask: ...
723 @overload
724 @staticmethod
725 def read_legacy( 725 ↛ exitline 725 didn't return from function 'read_legacy' because
726 filename: str,
727 *,
728 preserve_quantization: bool = False,
729 instrument: str | None = None,
730 visit: int | None = None,
731 component: Literal["variance"],
732 ) -> Image: ...
734 @overload
735 @staticmethod
736 def read_legacy( 736 ↛ exitline 736 didn't return from function 'read_legacy' because
737 filename: str,
738 *,
739 instrument: str | None = None,
740 visit: int | None = None,
741 component: Literal["projection"],
742 ) -> Projection[DetectorFrame]: ...
744 @overload
745 @staticmethod
746 def read_legacy( 746 ↛ exitline 746 didn't return from function 'read_legacy' because
747 filename: str,
748 *,
749 component: Literal["psf"],
750 ) -> PointSpreadFunction: ...
752 @overload
753 @staticmethod
754 def read_legacy( 754 ↛ exitline 754 didn't return from function 'read_legacy' because
755 filename: str,
756 *,
757 component: Literal["detector"],
758 ) -> Detector: ...
760 @overload
761 @staticmethod
762 def read_legacy( 762 ↛ exitline 762 didn't return from function 'read_legacy' because
763 filename: str,
764 *,
765 component: Literal["obs_info"],
766 ) -> ObservationInfo: ...
768 @overload
769 @staticmethod
770 def read_legacy( 770 ↛ exitline 770 didn't return from function 'read_legacy' because
771 filename: str,
772 *,
773 component: Literal["photometric_scaling"],
774 ) -> Field | None: ...
776 @overload
777 @staticmethod
778 def read_legacy( 778 ↛ exitline 778 didn't return from function 'read_legacy' because
779 filename: str,
780 *,
781 component: Literal["summary_stats"],
782 ) -> ObservationSummaryStats: ...
784 @overload
785 @staticmethod
786 def read_legacy( 786 ↛ exitline 786 didn't return from function 'read_legacy' because
787 filename: str,
788 *,
789 component: Literal["aperture_corrections"],
790 ) -> ApertureCorrectionMap: ...
792 @overload
793 @staticmethod
794 def read_legacy( 794 ↛ exitline 794 didn't return from function 'read_legacy' because
795 filename: str,
796 *,
797 preserve_quantization: bool = False,
798 plane_map: Mapping[str, MaskPlane] | None = None,
799 instrument: str | None = None,
800 visit: int | None = None,
801 component: None = None,
802 ) -> VisitImage: ...
804 @staticmethod
805 def read_legacy( # type: ignore[override]
806 filename: str,
807 *,
808 preserve_quantization: bool = False,
809 plane_map: Mapping[str, MaskPlane] | None = None,
810 instrument: str | None = None,
811 visit: int | None = None,
812 component: Literal[
813 "bbox",
814 "image",
815 "mask",
816 "variance",
817 "projection",
818 "psf",
819 "detector",
820 "photometric_scaling",
821 "obs_info",
822 "summary_stats",
823 "aperture_corrections",
824 ]
825 | None = None,
826 ) -> Any:
827 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
829 Parameters
830 ----------
831 filename
832 Full name of the file.
833 preserve_quantization
834 If `True`, ensure that writing the masked image back out again will
835 exactly preserve quantization-compressed pixel values. This causes
836 the image and variance plane arrays to be marked as read-only and
837 stores the original binary table data for those planes in memory.
838 If the `MaskedImage` is copied, the precompressed pixel values are
839 not transferred to the copy.
840 plane_map
841 A mapping from legacy mask plane name to the new plane name and
842 description. If `None` (default)
843 `get_legacy_visit_image_mask_planes` is used.
844 instrument
845 Name of the instrument. Read from the primary header if not
846 provided.
847 visit
848 ID of the visit. Read from the primary header if not
849 provided.
850 component
851 A component to read instead of the full image.
852 """
853 from lsst.afw.image import ExposureFitsReader
855 reader = ExposureFitsReader(filename)
856 if component == "bbox":
857 return Box.from_legacy(reader.readBBox())
858 legacy_detector = reader.readDetector()
859 if legacy_detector is None:
860 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
861 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
862 legacy_wcs = None
863 if component in (None, "image", "mask", "variance", "projection"):
864 legacy_wcs = reader.readWcs()
865 if legacy_wcs is None:
866 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
867 legacy_exposure_info = reader.readExposureInfo()
868 summary_stats = None
869 if component in (None, "summary_stats"):
870 legacy_stats = legacy_exposure_info.getSummaryStats()
871 if legacy_stats is not None:
872 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
873 if component == "summary_stats":
874 return summary_stats
875 if component in (None, "psf"):
876 legacy_psf = reader.readPsf()
877 if legacy_psf is None:
878 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
879 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
880 if component == "psf":
881 return psf
882 aperture_corrections: ApertureCorrectionMap = {}
883 if component in (None, "aperture_corrections"):
884 legacy_ap_corr_map = reader.readApCorrMap()
885 if legacy_ap_corr_map is not None:
886 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map)
887 if component == "aperture_corrections":
888 return aperture_corrections
889 assert component in (
890 None,
891 "image",
892 "mask",
893 "variance",
894 "projection",
895 "obs_info",
896 "detector",
897 "photometric_scaling",
898 ), component # for MyPy
899 filter_label = reader.readFilter()
900 with astropy.io.fits.open(filename) as hdu_list:
901 primary_header = hdu_list[0].header
902 obs_info = _obs_info_from_md(primary_header)
903 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, filter_label)
904 if component == "obs_info":
905 return obs_info
906 instrument = _extract_or_check_header(
907 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
908 )
909 visit = _extract_or_check_header(
910 "LSST BUTLER DATAID VISIT", visit, primary_header, obs_info.exposure_id, int
911 )
912 opaque_metadata = FitsOpaqueMetadata()
913 # This extraction is destructive, so we need to be sure to pass
914 # this opaque_metadata down to MaskedImage._read_legacy_hdus
915 # so it doesn't try to extract it again.
916 metadata = opaque_metadata.extract_legacy_primary_header(primary_header)
917 if (instrumental_unit := opaque_metadata.get_instrumental_unit()) is None:
918 instrumental_unit = astropy.units.electron
919 photometric_scaling: Field | None = None
920 if component in (None, "photometric_scaling"):
921 legacy_photo_calib = reader.readPhotoCalib()
922 if legacy_photo_calib is not None:
923 photometric_scaling = field_from_legacy_photo_calib(
924 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit
925 )
926 if component == "photometric_scaling":
927 return photometric_scaling
928 if component == "detector":
929 return Detector.from_legacy(
930 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
931 )
932 projection = Projection.from_legacy(
933 legacy_wcs,
934 DetectorFrame(
935 instrument=instrument,
936 visit=visit,
937 detector=legacy_detector.getId(),
938 bbox=detector_bbox,
939 ),
940 )
941 if component == "projection":
942 return projection
943 if plane_map is None:
944 plane_map = get_legacy_visit_image_mask_planes()
945 assert component != "psf", component # for MyPy
946 from_masked_image = MaskedImage._read_legacy_hdus(
947 hdu_list,
948 filename,
949 opaque_metadata=opaque_metadata,
950 preserve_quantization=preserve_quantization,
951 plane_map=plane_map,
952 component=component,
953 )
954 if component is not None:
955 # This is the image, mask, or variance; attach the projection and
956 # obs_info and return
957 return from_masked_image.view(projection=projection)
958 legacy_polygon = reader.readValidPolygon()
959 result = VisitImage(
960 from_masked_image.image,
961 mask=from_masked_image.mask,
962 variance=from_masked_image.variance,
963 projection=projection,
964 psf=psf,
965 detector=Detector.from_legacy(
966 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True
967 ),
968 obs_info=obs_info,
969 summary_stats=summary_stats,
970 aperture_corrections=aperture_corrections,
971 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None,
972 photometric_scaling=photometric_scaling,
973 band=filter_label.bandLabel,
974 metadata=metadata,
975 )
976 result._opaque_metadata = from_masked_image._opaque_metadata
977 result.metadata["id"] = reader.readExposureId()
978 return result
981class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
982 """A Pydantic model used to represent a serialized `VisitImage`."""
984 SCHEMA_NAME: ClassVar[str] = "visit_image"
985 SCHEMA_VERSION: ClassVar[str] = "1.0.0"
986 MIN_READ_VERSION: ClassVar[int] = 1
988 # Inherited attributes are duplicated because that improves the docs
989 # (some limitation in the sphinx/pydantic integration), and these are
990 # important docs.
992 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
993 mask: MaskSerializationModel[P] = pydantic.Field(
994 description="Bitmask that annotates the main image's pixels."
995 )
996 variance: ImageSerializationModel[P] = pydantic.Field(
997 description="Per-pixel variance estimates for the main image."
998 )
999 projection: ProjectionSerializationModel[P] = pydantic.Field(
1000 description="Projection that maps the pixel grid to the sky.",
1001 )
1002 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
1003 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
1004 )
1005 obs_info: ObservationInfo = pydantic.Field(
1006 description="Standardized description of visit metadata",
1007 )
1008 photometric_scaling: FieldSerializationModel | None = pydantic.Field(
1009 default=None,
1010 description="Scaling that can be used to multiply a post-ISR image to yield calibrated pixel values.",
1011 )
1012 summary_stats: ObservationSummaryStats = pydantic.Field(
1013 description="Summary statistics for the observation."
1014 )
1015 detector: DetectorSerializationModel = pydantic.Field(
1016 description="Geometry and electronic information for the detector."
1017 )
1018 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field(
1019 default_factory=ApertureCorrectionMapSerializationModel,
1020 description="Aperture corrections, keyed by flux algorithm.",
1021 )
1022 bounds: SerializableBounds | None = pydantic.Field(
1023 default=None,
1024 description="Pixel validity region, if different from the image bounding box.",
1025 exclude_if=is_none,
1026 )
1027 backgrounds: BackgroundMapSerializationModel = pydantic.Field(
1028 default_factory=BackgroundMapSerializationModel,
1029 description="Background models associated with this image.",
1030 )
1031 band: str = pydantic.Field(description="Short name of the bandpass filter.")
1033 def deserialize(
1034 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any
1035 ) -> VisitImage:
1036 if kwargs:
1037 raise InvalidParameterError(f"Unrecognized parameters for VisitImage: {set(kwargs.keys())}.")
1038 masked_image = super().deserialize(archive, bbox=bbox)
1039 try:
1040 psf = self.psf.deserialize(archive)
1041 except ArchiveReadError as err:
1042 # Defer this until/unless somebody actually asks for the PSF.
1043 psf = err
1044 detector = self.detector.deserialize(archive)
1045 aperture_corrections = self.aperture_corrections.deserialize(archive)
1046 photometric_scaling = (
1047 self.photometric_scaling.deserialize(archive) if self.photometric_scaling is not None else None
1048 )
1049 return VisitImage(
1050 masked_image.image,
1051 mask=masked_image.mask,
1052 variance=masked_image.variance,
1053 psf=psf,
1054 projection=masked_image.projection,
1055 obs_info=self.obs_info,
1056 summary_stats=self.summary_stats,
1057 detector=detector,
1058 aperture_corrections=aperture_corrections,
1059 photometric_scaling=photometric_scaling,
1060 bounds=self.bounds.deserialize() if self.bounds is not None else None,
1061 backgrounds=self.backgrounds.deserialize(archive),
1062 band=self.band,
1063 )._finish_deserialize(self)
1065 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
1066 if kwargs and component not in ("image", "mask", "variance"):
1067 raise InvalidParameterError(
1068 f"Unsupported parameters for VisitImage component {component}: {set(kwargs.keys())}."
1069 )
1070 return super().deserialize_component(component, archive)
1073def _obs_info_from_md(
1074 md: MutableMapping[str, Any], visit_info: LegacyVisitInfo | None = None
1075) -> ObservationInfo:
1076 # Try to get an ObservationInfo from the primary header as if
1077 # it's a raw header. Else fallback.
1078 try:
1079 obs_info = ObservationInfo.from_header(md, quiet=True)
1080 except ValueError:
1081 # Not known translator. Must fall back to visit info. If we have
1082 # an actual VisitInfo, serialize it since we know that it will be
1083 # complete.
1084 if visit_info is not None:
1085 from lsst.afw.image import setVisitInfoMetadata
1086 from lsst.daf.base import PropertyList
1088 pl = PropertyList()
1089 setVisitInfoMetadata(pl, visit_info)
1090 # Merge so that we still have access to butler provenance.
1091 md.update(pl)
1093 # Try the given header looking for VisitInfo hints.
1094 # We get lots of warnings if nothing can be found. Currently
1095 # no way to disable those without capturing them.
1096 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
1097 return obs_info
1100def _update_obs_info_from_legacy(
1101 obs_info: ObservationInfo,
1102 detector: LegacyDetector | None = None,
1103 filter_label: LegacyFilterLabel | None = None,
1104) -> ObservationInfo:
1105 extra_md: dict[str, str | int] = {}
1107 if filter_label is not None and filter_label.hasBandLabel():
1108 extra_md["physical_filter"] = filter_label.physicalLabel
1110 # Fill in detector metadata, check for consistency.
1111 # ObsInfo detector name and group can not be derived from
1112 # the getName() information without knowing how the components
1113 # are separated.
1114 if detector is not None:
1115 detector_md = {
1116 "detector_num": detector.getId(),
1117 "detector_serial": detector.getSerial(),
1118 "detector_unique_name": detector.getName(),
1119 }
1120 extra_md.update(detector_md)
1122 obs_info_updates: dict[str, str | int] = {}
1123 for k, v in extra_md.items():
1124 current = getattr(obs_info, k)
1125 if current is None:
1126 obs_info_updates[k] = v
1127 continue
1128 if current != v:
1129 raise RuntimeError(
1130 f"ObservationInfo contains value for '{k}' that is inconsistent "
1131 f"with given legacy object: {v} != {current}"
1132 )
1134 if obs_info_updates:
1135 obs_info = obs_info.model_copy(update=obs_info_updates)
1136 return obs_info
1139def _extract_or_check_value[T](
1140 key: str,
1141 given_value: T | None,
1142 *sources: tuple[str, T | None],
1143) -> T:
1144 # Compare given value against multiple sources. If given value is not
1145 # supplied return the first non-None value in the reference sources.
1146 if given_value is not None:
1147 for source_name, source_value in sources:
1148 if source_value is not None and source_value != given_value:
1149 raise ValueError(
1150 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
1151 )
1152 if source_value is not None:
1153 # Only check the first non-None source rather than checking
1154 # all supplied values.
1155 break
1156 return given_value
1158 for _, source_value in sources:
1159 if source_value is not None:
1160 return source_value
1162 raise ValueError(f"No value found for {key}.")
1165def _extract_or_check_header[T](
1166 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
1167) -> T:
1168 hdr_value: T | None = None
1169 if (hdr_raw_value := header.get(key)) is not None:
1170 hdr_value = coerce(hdr_raw_value)
1171 return _extract_or_check_value(
1172 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
1173 )
1176def _get_unit_conversion_factor(
1177 original: astropy.units.UnitBase, new: astropy.units.UnitBase
1178) -> float | None:
1179 try:
1180 return original.to(new)
1181 except astropy.units.UnitConversionError:
1182 return None