Coverage for python / lsst / images / _visit_image.py: 24%

393 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 08:26 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import functools 

17import warnings 

18from collections.abc import Callable, Mapping, MutableMapping 

19from types import EllipsisType 

20from typing import Any, Literal, cast, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

28 

29from ._backgrounds import BackgroundMap, BackgroundMapSerializationModel 

30from ._concrete_bounds import SerializableBounds 

31from ._geom import Bounds, Box 

32from ._image import Image, ImageSerializationModel 

33from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

34from ._masked_image import MaskedImage, MaskedImageSerializationModel 

35from ._observation_summary_stats import ObservationSummaryStats 

36from ._polygon import Polygon 

37from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

38from .aperture_corrections import ( 

39 ApertureCorrectionMap, 

40 ApertureCorrectionMapSerializationModel, 

41 aperture_corrections_from_legacy, 

42) 

43from .cameras import Detector, DetectorSerializationModel 

44from .fields import Field, FieldSerializationModel, field_from_legacy_photo_calib 

45from .fits import FitsOpaqueMetadata 

46from .psfs import ( 

47 GaussianPointSpreadFunction, 

48 GaussianPSFSerializationModel, 

49 PiffSerializationModel, 

50 PiffWrapper, 

51 PointSpreadFunction, 

52 PSFExSerializationModel, 

53 PSFExWrapper, 

54) 

55from .serialization import ArchiveReadError, InputArchive, InvalidParameterError, MetadataValue, OutputArchive 

56from .utils import is_none 

57 

58 

59def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

60 # Try to get an ObservationInfo from the primary header as if 

61 # it's a raw header. Else fallback. 

62 try: 

63 obs_info = ObservationInfo.from_header(md, quiet=True) 

64 except ValueError: 

65 # Not known translator. Must fall back to visit info. If we have 

66 # an actual VisitInfo, serialize it since we know that it will be 

67 # complete. 

68 if visit_info is not None: 

69 from lsst.afw.image import setVisitInfoMetadata 

70 from lsst.daf.base import PropertyList 

71 

72 pl = PropertyList() 

73 setVisitInfoMetadata(pl, visit_info) 

74 # Merge so that we still have access to butler provenance. 

75 md.update(pl) 

76 

77 # Try the given header looking for VisitInfo hints. 

78 # We get lots of warnings if nothing can be found. Currently 

79 # no way to disable those without capturing them. 

80 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

81 return obs_info 

82 

83 

84def _update_obs_info_from_legacy( 

85 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None 

86) -> ObservationInfo: 

87 extra_md: dict[str, str | int] = {} 

88 

89 if filter_label is not None and filter_label.hasBandLabel(): 

90 extra_md["physical_filter"] = filter_label.physicalLabel 

91 

92 # Fill in detector metadata, check for consistency. 

93 # ObsInfo detector name and group can not be derived from 

94 # the getName() information without knowing how the components 

95 # are separated. 

96 if detector is not None: 

97 detector_md = { 

98 "detector_num": detector.getId(), 

99 "detector_serial": detector.getSerial(), 

100 "detector_unique_name": detector.getName(), 

101 } 

102 extra_md.update(detector_md) 

103 

104 obs_info_updates: dict[str, str | int] = {} 

105 for k, v in extra_md.items(): 

106 current = getattr(obs_info, k) 

107 if current is None: 

108 obs_info_updates[k] = v 

109 continue 

110 if current != v: 

111 raise RuntimeError( 

112 f"ObservationInfo contains value for '{k}' that is inconsistent " 

113 f"with given legacy object: {v} != {current}" 

114 ) 

115 

116 if obs_info_updates: 

117 obs_info = obs_info.model_copy(update=obs_info_updates) 

118 return obs_info 

119 

120 

121class VisitImage(MaskedImage): 

122 """A calibrated single-visit image. 

123 

124 Parameters 

125 ---------- 

126 image 

127 The main image plane. If this has a `Projection`, it will be used 

128 for all planes unless a ``projection`` is passed separately. 

129 mask 

130 A bitmask image that annotates the main image plane. Must have the 

131 same bounding box as ``image`` if provided. Any attached projection 

132 is replaced (possibly by `None`). 

133 variance 

134 The per-pixel uncertainty of the main image as an image of variance 

135 values. Must have the same bounding box as ``image`` if provided, and 

136 its units must be the square of ``image.unit`` or `None`. 

137 Values default to ``1.0``. Any attached projection is replaced 

138 (possibly by `None`). 

139 mask_schema 

140 Schema for the mask plane. Must be provided if and only if ``mask`` is 

141 not provided. 

142 projection 

143 Projection that maps the pixel grid to the sky. Can only be `None` if 

144 a projection is already attached to ``image``. 

145 bounds 

146 The region where this image's pixels and other properties are valid. 

147 If not provided, the bounding box of the image is used. Other 

148 components (``psf``, ``projection``, ``aperture_corrections``, etc.) 

149 are assumed to have their own bounds which may or may not be the same 

150 as the image bounds. If ``bounds`` extends beyond the image bounding 

151 box, the intersection between ``bounds`` and the image bounding box 

152 is used instead. 

153 obs_info 

154 General information about this visit in standardized form. 

155 summary_stats 

156 Summary statistics associated with this visit. Initialized to default 

157 values if not provided. 

158 photometric_scaling 

159 Field that can be used to multiply a post-ISR image units to yield 

160 calibrated image units. This may be a scaling that was already 

161 applied (so dividing by it will recover the post-ISR units) or a 

162 scaling that has not been applied, depending on ``image.unit``. 

163 psf 

164 Point-spread function model for this image, or an exception explaining 

165 why it could not be read (to be raised if the PSF is requested later). 

166 detector 

167 Geometry and electronic information for the detector attached to this 

168 image. 

169 aperture_corrections : `dict` [`str`, `~fields.BaseField`] 

170 Mapping from photometry algorithm name to the aperture correction for 

171 that algorithm. 

172 backgrounds 

173 Background models associated with this image. 

174 metadata 

175 Arbitrary flexible metadata to associate with the image. 

176 """ 

177 

178 def __init__( 

179 self, 

180 image: Image, 

181 *, 

182 mask: Mask | None = None, 

183 variance: Image | None = None, 

184 mask_schema: MaskSchema | None = None, 

185 projection: Projection[DetectorFrame] | None = None, 

186 bounds: Bounds | None = None, 

187 obs_info: ObservationInfo | None = None, 

188 summary_stats: ObservationSummaryStats | None = None, 

189 photometric_scaling: Field | None = None, 

190 psf: PointSpreadFunction | ArchiveReadError, 

191 detector: Detector, 

192 aperture_corrections: ApertureCorrectionMap | None = None, 

193 backgrounds: BackgroundMap | None = None, 

194 metadata: dict[str, MetadataValue] | None = None, 

195 ): 

196 super().__init__( 

197 image, 

198 mask=mask, 

199 variance=variance, 

200 mask_schema=mask_schema, 

201 projection=projection, 

202 metadata=metadata, 

203 ) 

204 if self.image.unit is None: 

205 raise TypeError("The image component of a VisitImage must have units.") 

206 if self.image.projection is None: 

207 raise TypeError("The projection component of a VisitImage cannot be None.") 

208 if obs_info is None: 

209 raise TypeError("The observation info component of a VisitImage cannot be None.") 

210 self._obs_info = obs_info 

211 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

212 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

213 if summary_stats is None: 

214 summary_stats = ObservationSummaryStats() 

215 self._summary_stats = summary_stats 

216 if photometric_scaling is not None and photometric_scaling.unit is None: 

217 raise TypeError("If a photometric_scaling is provided, it must have units.") 

218 self._photometric_scaling = photometric_scaling 

219 self._psf = psf 

220 self._detector = detector 

221 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {} 

222 self._bounds = bounds if bounds is not None else self.bbox 

223 if not self.bbox.contains(self._bounds.bbox): 

224 self._bounds = self._bounds.intersection(self.bbox) 

225 self._backgrounds = backgrounds if backgrounds is not None else BackgroundMap() 

226 

227 @property 

228 def unit(self) -> astropy.units.UnitBase: 

229 """The units of the image plane (`astropy.units.Unit`).""" 

230 return cast(astropy.units.UnitBase, super().unit) 

231 

232 @property 

233 def projection(self) -> Projection[DetectorFrame]: 

234 """The projection that maps the pixel grid to the sky 

235 (`Projection` [`DetectorFrame`]). 

236 """ 

237 return cast(Projection[DetectorFrame], super().projection) 

238 

239 @property 

240 def bounds(self) -> Bounds: 

241 """The region where pixels are valid (`Bounds`).""" 

242 return self._bounds 

243 

244 @property 

245 def obs_info(self) -> ObservationInfo: 

246 """General information about this observation in standard form. 

247 (`~astro_metadata_translator.ObservationInfo`). 

248 """ 

249 return self._obs_info 

250 

251 @property 

252 def astropy_wcs(self) -> ProjectionAstropyView: 

253 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

254 

255 Notes 

256 ----- 

257 As expected for Astropy WCS objects, this defines pixel coordinates 

258 such that the first row and column in the arrays are ``(0, 0)``, not 

259 ``bbox.start``, as is the case for `projection`. 

260 

261 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

262 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

263 `astropy.wcs.WCS` (use `fits_wcs` for that). 

264 """ 

265 return cast(ProjectionAstropyView, super().astropy_wcs) 

266 

267 @property 

268 def summary_stats(self) -> ObservationSummaryStats: 

269 """Optional summary statistics for this observation 

270 (`ObservationSummaryStats`). 

271 """ 

272 return self._summary_stats 

273 

274 @property 

275 def photometric_scaling(self) -> Field | None: 

276 """Field that multiplies a post-ISR image to yield the calibrated 

277 image (~`fields.BaseField`). 

278 """ 

279 return self._photometric_scaling 

280 

281 @photometric_scaling.setter 

282 def photometric_scaling(self, value: Field): 

283 if value.unit is None: 

284 raise TypeError("The photometric_scaling for a VisitImage must have units.") 

285 self._photometric_scaling = value 

286 

287 @property 

288 def psf(self) -> PointSpreadFunction: 

289 """The point-spread function model for this image 

290 (`.psfs.PointSpreadFunction`). 

291 """ 

292 if isinstance(self._psf, ArchiveReadError): 

293 raise self._psf 

294 return self._psf 

295 

296 @property 

297 def detector(self) -> Detector: 

298 """Geometry and electronic information about the detector 

299 (`.cameras.Detector`). 

300 """ 

301 return self._detector 

302 

303 @property 

304 def aperture_corrections(self) -> ApertureCorrectionMap: 

305 """A mapping from photometry algorithm name to the aperture correction 

306 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]). 

307 """ 

308 return self._aperture_corrections 

309 

310 @property 

311 def backgrounds(self) -> BackgroundMap: 

312 """A mapping of backgrounds associated with this image 

313 (`BackgroundMap`). 

314 """ 

315 return self._backgrounds 

316 

317 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

318 if bbox is ...: 

319 return self 

320 super().__getitem__(bbox) 

321 return self._transfer_metadata( 

322 VisitImage( 

323 self.image[bbox], 

324 mask=self.mask[bbox], 

325 variance=self.variance[bbox], 

326 projection=self.projection, 

327 psf=self.psf, 

328 obs_info=self.obs_info, 

329 bounds=self._bounds, # don't need to intersect here, because __init__ will do that. 

330 summary_stats=self.summary_stats, 

331 detector=self._detector, 

332 photometric_scaling=self._photometric_scaling, 

333 aperture_corrections=self.aperture_corrections, 

334 backgrounds=self._backgrounds, 

335 ), 

336 bbox=bbox, 

337 ) 

338 

339 def __str__(self) -> str: 

340 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

341 

342 def __repr__(self) -> str: 

343 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

344 

345 def copy(self, *, copy_detector: bool = False) -> VisitImage: 

346 """Deep-copy the visit image. 

347 

348 Parameters 

349 ---------- 

350 copy_detector 

351 Whether to deep-copy the `detector` attribute. 

352 """ 

353 return self._transfer_metadata( 

354 VisitImage( 

355 image=self._image.copy(), 

356 mask=self._mask.copy(), 

357 variance=self._variance.copy(), 

358 psf=self._psf, 

359 obs_info=self.obs_info, 

360 bounds=self._bounds, 

361 summary_stats=self.summary_stats.model_copy(), 

362 detector=self._detector.copy() if copy_detector else self._detector, 

363 photometric_scaling=self._photometric_scaling, 

364 aperture_corrections=self.aperture_corrections.copy(), 

365 backgrounds=self._backgrounds.copy(), 

366 ), 

367 copy=True, 

368 ) 

369 

370 def convert_unit( 

371 self, 

372 unit: astropy.units.UnitBase = astropy.units.nJy, 

373 copy: Literal["as-needed"] | bool = True, 

374 copy_detector: bool = False, 

375 ) -> VisitImage: 

376 """Return an equivalent image with different pixel units. 

377 

378 Parameters 

379 ---------- 

380 unit 

381 The unit to transform to. This may be any of the following: 

382 

383 - any unit directly relatable to the current units via Astropy; 

384 - any unit relatable to the product of the current units with the 

385 `photometric_scaling` (i.e. if the current image is in 

386 instrumental units but we know how to calibrate them) 

387 - any unit relatable to the quotient of the current units with the 

388 `photometric_scaling` (i.e. if the current image is in 

389 calibrated units and we want to revert back to instrumental 

390 units). 

391 copy 

392 Whether to copy the images and other components. If `True`, all 

393 components that aren't controlled by some other argument will 

394 always be deep-copied. If `False`, the operation will fail if the 

395 image is not already in the right units. If ``as-needed``, only 

396 the image and variance will be copied, and only if they are not 

397 already in the right units. 

398 copy_detector 

399 Whether to deep-copy the `detector` attribute. 

400 

401 Returns 

402 ------- 

403 `VisitImage` 

404 An image with the given units. 

405 """ 

406 if copy not in (True, False, "as-needed"): 

407 raise TypeError(f"Invalid value for 'copy' parameter: {copy!r}.") 

408 if (factor := _get_unit_conversion_factor(self.unit, unit)) is not None: 

409 if factor == 1.0: 

410 if copy is True: # not "as-needed" 

411 return self.copy() 

412 else: 

413 return self[...] 

414 elif copy is False: 

415 raise astropy.units.UnitConversionError( 

416 f"Units must be converted ({self.unit} -> {unit}), but copy=False." 

417 ) 

418 image = Image(self._image.array * factor, bbox=self.bbox, projection=self.projection, unit=unit) 

419 variance = Image( 

420 self._variance.array * factor**2, 

421 bbox=self.bbox, 

422 unit=unit**2, 

423 ) 

424 elif self._photometric_scaling is None: 

425 raise astropy.units.UnitConversionError( 

426 "VisitImage.photometric_scaling is None, and there " 

427 f"is no constant conversion from {self.unit} to {unit}." 

428 ) 

429 else: 

430 if copy is False: 

431 raise astropy.units.UnitConversionError( 

432 f"Photometric scaling must be applied to go from ={self.unit} to {unit}, but copy=False." 

433 ) 

434 scaling = self._photometric_scaling 

435 assert scaling.unit is not None, "Checked at construction." 

436 if (constant_factor := _get_unit_conversion_factor(self.unit * scaling.unit, unit)) is not None: 

437 if constant_factor != 1.0: 

438 scaling = scaling * constant_factor 

439 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array 

440 elif (constant_factor := _get_unit_conversion_factor(self.unit / scaling.unit, unit)) is not None: 

441 if constant_factor != 1.0: 

442 scaling = scaling / constant_factor 

443 scaling_array = scaling.render(self.bbox, dtype=self.image.array.dtype).array 

444 np.true_divide(1.0, scaling_array, out=scaling_array) 

445 else: 

446 raise astropy.units.UnitConversionError( 

447 f"photometric_scaling with units {scaling.unit} does not " 

448 f"provide a path from {self.unit} to {unit}." 

449 ) 

450 # We needed to allocate a new array to evaluate the scaling field, 

451 # and then we need to allocate another to hold its square for the 

452 # variance scaling. But then we can multiply those arrays in-place 

453 # to get the output image and variance to avoid yet more 

454 # allocations (note we can't instead multiply the visit image's 

455 # image and variance arrays in place because they might have other 

456 # references that are still associated with the old units). 

457 image = Image(scaling_array, bbox=self.bbox, unit=unit) 

458 variance = Image(np.square(scaling_array), bbox=self.bbox, unit=unit**2) 

459 image.array *= self._image.array 

460 variance.array *= self._variance.array 

461 copy_components = copy is True 

462 return self._transfer_metadata( 

463 VisitImage( 

464 image=image, 

465 mask=self._mask if not copy_components else self._mask.copy(), 

466 variance=variance, 

467 projection=self.projection, # never copied; immutable 

468 obs_info=self.obs_info if not copy_components else self.obs_info.model_copy(), 

469 psf=self._psf, # never copied; immutable 

470 bounds=self._bounds, # never copied; immutable 

471 summary_stats=self.summary_stats if not copy_components else self.summary_stats.model_copy(), 

472 detector=self._detector if not copy_detector else self._detector.copy(), 

473 photometric_scaling=self._photometric_scaling, # never copied; immutable 

474 aperture_corrections=( 

475 self.aperture_corrections if not copy_components else self.aperture_corrections.copy() 

476 ), 

477 ) 

478 ) 

479 

480 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

481 masked_image_model = super().serialize(archive) 

482 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

483 match self._psf: 

484 # MyPy is able to figure things out here with this match statement, 

485 # but not a single isinstance check on both types. 

486 case PiffWrapper(): 

487 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

488 case PSFExWrapper(): 

489 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

490 case GaussianPointSpreadFunction(): 

491 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

492 case _: 

493 raise TypeError( 

494 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

495 ) 

496 assert masked_image_model.projection is not None, "VisitImage always has a projection." 

497 serialized_detector = archive.serialize_direct("detector", self._detector.serialize) 

498 serialized_photometric_scaling = ( 

499 archive.serialize_direct("photometric_scaling", self._photometric_scaling.serialize) 

500 if self._photometric_scaling is not None 

501 else None 

502 ) 

503 serialized_aperture_corrections = archive.serialize_direct( 

504 "aperture_corrections", 

505 functools.partial(ApertureCorrectionMapSerializationModel.serialize, self.aperture_corrections), 

506 ) 

507 serialized_backgrounds = archive.serialize_direct("backgrounds", self._backgrounds.serialize) 

508 return VisitImageSerializationModel( 

509 image=masked_image_model.image, 

510 mask=masked_image_model.mask, 

511 variance=masked_image_model.variance, 

512 projection=masked_image_model.projection, 

513 obs_info=self.obs_info, 

514 photometric_scaling=serialized_photometric_scaling, 

515 psf=serialized_psf, 

516 summary_stats=self.summary_stats, 

517 detector=serialized_detector, 

518 aperture_corrections=serialized_aperture_corrections, 

519 bounds=self._bounds.serialize() if self._bounds != self.bbox else None, 

520 backgrounds=serialized_backgrounds, 

521 metadata=self.metadata, 

522 ) 

523 

524 @staticmethod 

525 def _get_archive_tree_type[P: pydantic.BaseModel]( 

526 pointer_type: type[P], 

527 ) -> type[VisitImageSerializationModel[P]]: 

528 """Return the serialization model type for this object for an archive 

529 type that uses the given pointer type. 

530 """ 

531 return VisitImageSerializationModel[pointer_type] # type: ignore 

532 

533 # write_fits and read_fits inherited from MaskedImage. 

534 

535 @staticmethod 

536 def from_legacy( 

537 legacy: Any, 

538 *, 

539 unit: astropy.units.UnitBase | None = None, 

540 plane_map: Mapping[str, MaskPlane] | None = None, 

541 instrument: str | None = None, 

542 visit: int | None = None, 

543 ) -> VisitImage: 

544 """Convert from an `lsst.afw.image.Exposure` instance. 

545 

546 Parameters 

547 ---------- 

548 legacy 

549 An `lsst.afw.image.Exposure` instance that will share image and 

550 variance (but not mask) pixel data with the returned object. 

551 unit 

552 Units of the image. If not provided, the ``BUNIT`` metadata 

553 key will be used, if available. 

554 plane_map 

555 A mapping from legacy mask plane name to the new plane name and 

556 description. If `None` (default) 

557 `get_legacy_visit_image_mask_planes` is used. 

558 instrument 

559 Name of the instrument. Extracted from the metadata if not 

560 provided. 

561 visit 

562 ID of the visit. Extracted from the metadata if not provided. 

563 """ 

564 if plane_map is None: 

565 plane_map = get_legacy_visit_image_mask_planes() 

566 md = legacy.getMetadata() 

567 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

568 instrument = _extract_or_check_header( 

569 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

570 ) 

571 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

572 legacy_wcs = legacy.getWcs() 

573 if legacy_wcs is None: 

574 raise ValueError("Exposure does not have a SkyWcs.") 

575 legacy_detector = legacy.getDetector() 

576 if legacy_detector is None: 

577 raise ValueError("Exposure does not have a Detector.") 

578 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

579 

580 # Update the ObservationInfo from other components. 

581 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter()) 

582 

583 opaque_fits_metadata = FitsOpaqueMetadata() 

584 primary_header = astropy.io.fits.Header() 

585 with warnings.catch_warnings(): 

586 # Silence warnings about long keys becoming HIERARCH. 

587 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

588 primary_header.update(md.toOrderedDict()) 

589 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

590 instrumental_unit = opaque_fits_metadata.get_instrumental_unit() or astropy.units.electron 

591 hdr_unit: astropy.units.UnitBase | None = None 

592 if hdr_unit_str := md.get("BUNIT"): 

593 hdr_unit = astropy.units.Unit(hdr_unit_str, format="FITS") 

594 if hdr_unit == astropy.units.adu and instrumental_unit == astropy.units.electron: 

595 # Fix incorrect BUNIT='adu' in LSST 

596 # preliminary_visit_image. 

597 hdr_unit = astropy.units.electron 

598 if unit is None: 

599 unit = hdr_unit 

600 elif hdr_unit is not None and hdr_unit != unit: 

601 raise ValueError(f"BUNIT value {hdr_unit} disagrees with given unit {unit}.") 

602 projection = Projection.from_legacy( 

603 legacy_wcs, 

604 DetectorFrame( 

605 instrument=instrument, 

606 visit=visit, 

607 detector=legacy_detector.getId(), 

608 bbox=detector_bbox, 

609 ), 

610 ) 

611 legacy_psf = legacy.getPsf() 

612 if legacy_psf is None: 

613 raise ValueError("Exposure file does not have a Psf.") 

614 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

615 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

616 legacy_summary_stats = legacy.info.getSummaryStats() 

617 legacy_ap_corr_map = legacy.info.getApCorrMap() 

618 legacy_polygon = legacy.info.getValidPolygon() 

619 legacy_photo_calib = legacy.info.getPhotoCalib() 

620 result = VisitImage( 

621 image=masked_image.image.view(unit=unit), 

622 mask=masked_image.mask, 

623 variance=masked_image.variance, 

624 projection=projection, 

625 psf=psf, 

626 obs_info=obs_info, 

627 summary_stats=( 

628 ObservationSummaryStats.from_legacy(legacy_summary_stats) 

629 if legacy_summary_stats is not None 

630 else None 

631 ), 

632 detector=Detector.from_legacy( 

633 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

634 ), 

635 aperture_corrections=( 

636 aperture_corrections_from_legacy(legacy_ap_corr_map) 

637 if legacy_ap_corr_map is not None 

638 else None 

639 ), 

640 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None, 

641 photometric_scaling=( 

642 field_from_legacy_photo_calib( 

643 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit 

644 ) 

645 if legacy_photo_calib is not None 

646 else None 

647 ), 

648 ) 

649 

650 result._opaque_metadata = opaque_fits_metadata 

651 return result 

652 

653 @overload # type: ignore[override] 

654 @staticmethod 

655 def read_legacy( 655 ↛ exitline 655 didn't return from function 'read_legacy' because

656 filename: str, 

657 *, 

658 component: Literal["bbox"], 

659 ) -> Box: ... 

660 

661 @overload 

662 @staticmethod 

663 def read_legacy( 663 ↛ exitline 663 didn't return from function 'read_legacy' because

664 filename: str, 

665 *, 

666 preserve_quantization: bool = False, 

667 instrument: str | None = None, 

668 visit: int | None = None, 

669 component: Literal["image"], 

670 ) -> Image: ... 

671 

672 @overload 

673 @staticmethod 

674 def read_legacy( 674 ↛ exitline 674 didn't return from function 'read_legacy' because

675 filename: str, 

676 *, 

677 plane_map: Mapping[str, MaskPlane] | None = None, 

678 instrument: str | None = None, 

679 visit: int | None = None, 

680 component: Literal["mask"], 

681 ) -> Mask: ... 

682 

683 @overload 

684 @staticmethod 

685 def read_legacy( 685 ↛ exitline 685 didn't return from function 'read_legacy' because

686 filename: str, 

687 *, 

688 preserve_quantization: bool = False, 

689 instrument: str | None = None, 

690 visit: int | None = None, 

691 component: Literal["variance"], 

692 ) -> Image: ... 

693 

694 @overload 

695 @staticmethod 

696 def read_legacy( 696 ↛ exitline 696 didn't return from function 'read_legacy' because

697 filename: str, 

698 *, 

699 instrument: str | None = None, 

700 visit: int | None = None, 

701 component: Literal["projection"], 

702 ) -> Projection[DetectorFrame]: ... 

703 

704 @overload 

705 @staticmethod 

706 def read_legacy( 706 ↛ exitline 706 didn't return from function 'read_legacy' because

707 filename: str, 

708 *, 

709 component: Literal["psf"], 

710 ) -> PointSpreadFunction: ... 

711 

712 @overload 

713 @staticmethod 

714 def read_legacy( 714 ↛ exitline 714 didn't return from function 'read_legacy' because

715 filename: str, 

716 *, 

717 component: Literal["detector"], 

718 ) -> Detector: ... 

719 

720 @overload 

721 @staticmethod 

722 def read_legacy( 722 ↛ exitline 722 didn't return from function 'read_legacy' because

723 filename: str, 

724 *, 

725 component: Literal["obs_info"], 

726 ) -> ObservationInfo: ... 

727 

728 @overload 

729 @staticmethod 

730 def read_legacy( 730 ↛ exitline 730 didn't return from function 'read_legacy' because

731 filename: str, 

732 *, 

733 component: Literal["photometric_scaling"], 

734 ) -> Field | None: ... 

735 

736 @overload 

737 @staticmethod 

738 def read_legacy( 738 ↛ exitline 738 didn't return from function 'read_legacy' because

739 filename: str, 

740 *, 

741 component: Literal["summary_stats"], 

742 ) -> ObservationSummaryStats: ... 

743 

744 @overload 

745 @staticmethod 

746 def read_legacy( 746 ↛ exitline 746 didn't return from function 'read_legacy' because

747 filename: str, 

748 *, 

749 component: Literal["aperture_corrections"], 

750 ) -> ApertureCorrectionMap: ... 

751 

752 @overload 

753 @staticmethod 

754 def read_legacy( 754 ↛ exitline 754 didn't return from function 'read_legacy' because

755 filename: str, 

756 *, 

757 preserve_quantization: bool = False, 

758 plane_map: Mapping[str, MaskPlane] | None = None, 

759 instrument: str | None = None, 

760 visit: int | None = None, 

761 component: None = None, 

762 ) -> VisitImage: ... 

763 

764 @staticmethod 

765 def read_legacy( # type: ignore[override] 

766 filename: str, 

767 *, 

768 preserve_quantization: bool = False, 

769 plane_map: Mapping[str, MaskPlane] | None = None, 

770 instrument: str | None = None, 

771 visit: int | None = None, 

772 component: Literal[ 

773 "bbox", 

774 "image", 

775 "mask", 

776 "variance", 

777 "projection", 

778 "psf", 

779 "detector", 

780 "photometric_scaling", 

781 "obs_info", 

782 "summary_stats", 

783 "aperture_corrections", 

784 ] 

785 | None = None, 

786 ) -> Any: 

787 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

788 

789 Parameters 

790 ---------- 

791 filename 

792 Full name of the file. 

793 preserve_quantization 

794 If `True`, ensure that writing the masked image back out again will 

795 exactly preserve quantization-compressed pixel values. This causes 

796 the image and variance plane arrays to be marked as read-only and 

797 stores the original binary table data for those planes in memory. 

798 If the `MaskedImage` is copied, the precompressed pixel values are 

799 not transferred to the copy. 

800 plane_map 

801 A mapping from legacy mask plane name to the new plane name and 

802 description. If `None` (default) 

803 `get_legacy_visit_image_mask_planes` is used. 

804 instrument 

805 Name of the instrument. Read from the primary header if not 

806 provided. 

807 visit 

808 ID of the visit. Read from the primary header if not 

809 provided. 

810 component 

811 A component to read instead of the full image. 

812 """ 

813 from lsst.afw.image import ExposureFitsReader 

814 

815 reader = ExposureFitsReader(filename) 

816 if component == "bbox": 

817 return Box.from_legacy(reader.readBBox()) 

818 legacy_detector = reader.readDetector() 

819 if legacy_detector is None: 

820 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

821 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

822 legacy_wcs = None 

823 if component in (None, "image", "mask", "variance", "projection"): 

824 legacy_wcs = reader.readWcs() 

825 if legacy_wcs is None: 

826 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

827 legacy_exposure_info = reader.readExposureInfo() 

828 summary_stats = None 

829 if component in (None, "summary_stats"): 

830 legacy_stats = legacy_exposure_info.getSummaryStats() 

831 if legacy_stats is not None: 

832 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats) 

833 if component == "summary_stats": 

834 return summary_stats 

835 if component in (None, "psf"): 

836 legacy_psf = reader.readPsf() 

837 if legacy_psf is None: 

838 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

839 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

840 if component == "psf": 

841 return psf 

842 aperture_corrections: ApertureCorrectionMap = {} 

843 if component in (None, "aperture_corrections"): 

844 legacy_ap_corr_map = reader.readApCorrMap() 

845 if legacy_ap_corr_map is not None: 

846 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map) 

847 if component == "aperture_corrections": 

848 return aperture_corrections 

849 assert component in ( 

850 None, 

851 "image", 

852 "mask", 

853 "variance", 

854 "projection", 

855 "obs_info", 

856 "detector", 

857 "photometric_scaling", 

858 ), component # for MyPy 

859 with astropy.io.fits.open(filename) as hdu_list: 

860 primary_header = hdu_list[0].header 

861 obs_info = _obs_info_from_md(primary_header) 

862 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter()) 

863 if component == "obs_info": 

864 return obs_info 

865 instrument = _extract_or_check_header( 

866 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

867 ) 

868 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

869 opaque_metadata = FitsOpaqueMetadata() 

870 # This extraction is destructive, so we need to be sure to pass 

871 # this opaque_metadata down to MaskedImage._read_legacy_hdus 

872 # so it doesn't try to extract it again. 

873 opaque_metadata.extract_legacy_primary_header(primary_header) 

874 if (instrumental_unit := opaque_metadata.get_instrumental_unit()) is None: 

875 instrumental_unit = astropy.units.electron 

876 photometric_scaling: Field | None = None 

877 if component in (None, "photometric_scaling"): 

878 legacy_photo_calib = reader.readPhotoCalib() 

879 if legacy_photo_calib is not None: 

880 photometric_scaling = field_from_legacy_photo_calib( 

881 legacy_photo_calib, bounds=detector_bbox, instrumental_unit=instrumental_unit 

882 ) 

883 if component == "photometric_scaling": 

884 return photometric_scaling 

885 if component == "detector": 

886 return Detector.from_legacy( 

887 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

888 ) 

889 projection = Projection.from_legacy( 

890 legacy_wcs, 

891 DetectorFrame( 

892 instrument=instrument, 

893 visit=visit, 

894 detector=legacy_detector.getId(), 

895 bbox=detector_bbox, 

896 ), 

897 ) 

898 if component == "projection": 

899 return projection 

900 if plane_map is None: 

901 plane_map = get_legacy_visit_image_mask_planes() 

902 assert component != "psf", component # for MyPy 

903 from_masked_image = MaskedImage._read_legacy_hdus( 

904 hdu_list, 

905 filename, 

906 opaque_metadata=opaque_metadata, 

907 preserve_quantization=preserve_quantization, 

908 plane_map=plane_map, 

909 component=component, 

910 ) 

911 if component is not None: 

912 # This is the image, mask, or variance; attach the projection and 

913 # obs_info and return 

914 return from_masked_image.view(projection=projection) 

915 legacy_polygon = reader.readValidPolygon() 

916 result = VisitImage( 

917 from_masked_image.image, 

918 mask=from_masked_image.mask, 

919 variance=from_masked_image.variance, 

920 projection=projection, 

921 psf=psf, 

922 detector=Detector.from_legacy( 

923 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

924 ), 

925 obs_info=obs_info, 

926 summary_stats=summary_stats, 

927 aperture_corrections=aperture_corrections, 

928 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None, 

929 photometric_scaling=photometric_scaling, 

930 ) 

931 result._opaque_metadata = from_masked_image._opaque_metadata 

932 return result 

933 

934 

935class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

936 """A Pydantic model used to represent a serialized `VisitImage`.""" 

937 

938 # Inherited attributes are duplicated because that improves the docs 

939 # (some limitation in the sphinx/pydantic integration), and these are 

940 # important docs. 

941 

942 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

943 mask: MaskSerializationModel[P] = pydantic.Field( 

944 description="Bitmask that annotates the main image's pixels." 

945 ) 

946 variance: ImageSerializationModel[P] = pydantic.Field( 

947 description="Per-pixel variance estimates for the main image." 

948 ) 

949 projection: ProjectionSerializationModel[P] = pydantic.Field( 

950 description="Projection that maps the pixel grid to the sky.", 

951 ) 

952 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

953 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

954 ) 

955 obs_info: ObservationInfo = pydantic.Field( 

956 description="Standardized description of visit metadata", 

957 ) 

958 photometric_scaling: FieldSerializationModel | None = pydantic.Field( 

959 default=None, 

960 description="Scaling that can be used to multiply a post-ISR image to yield calibrated pixel values.", 

961 ) 

962 summary_stats: ObservationSummaryStats = pydantic.Field( 

963 description="Summary statistics for the observation." 

964 ) 

965 detector: DetectorSerializationModel = pydantic.Field( 

966 description="Geometry and electronic information for the detector." 

967 ) 

968 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field( 

969 default_factory=ApertureCorrectionMapSerializationModel, 

970 description="Aperture corrections, keyed by flux algorithm.", 

971 ) 

972 bounds: SerializableBounds | None = pydantic.Field( 

973 default=None, 

974 description="Pixel validity region, if different from the image bounding box.", 

975 exclude_if=is_none, 

976 ) 

977 backgrounds: BackgroundMapSerializationModel = pydantic.Field( 

978 default_factory=BackgroundMapSerializationModel, 

979 description="Background models associated with this image.", 

980 ) 

981 

982 def deserialize( 

983 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any 

984 ) -> VisitImage: 

985 if kwargs: 

986 raise InvalidParameterError(f"Unrecognized parameters for VisitImage: {set(kwargs.keys())}.") 

987 masked_image = super().deserialize(archive, bbox=bbox) 

988 try: 

989 psf = self.psf.deserialize(archive) 

990 except ArchiveReadError as err: 

991 # Defer this until/unless somebody actually asks for the PSF. 

992 psf = err 

993 detector = self.detector.deserialize(archive) 

994 aperture_corrections = self.aperture_corrections.deserialize(archive) 

995 photometric_scaling = ( 

996 self.photometric_scaling.deserialize(archive) if self.photometric_scaling is not None else None 

997 ) 

998 return VisitImage( 

999 masked_image.image, 

1000 mask=masked_image.mask, 

1001 variance=masked_image.variance, 

1002 psf=psf, 

1003 projection=masked_image.projection, 

1004 obs_info=self.obs_info, 

1005 summary_stats=self.summary_stats, 

1006 detector=detector, 

1007 aperture_corrections=aperture_corrections, 

1008 photometric_scaling=photometric_scaling, 

1009 bounds=self.bounds.deserialize() if self.bounds is not None else None, 

1010 backgrounds=self.backgrounds.deserialize(archive), 

1011 )._finish_deserialize(self) 

1012 

1013 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

1014 if kwargs and component not in ("image", "mask", "variance"): 

1015 raise InvalidParameterError( 

1016 f"Unsupported parameters for VisitImage component {component}: {set(kwargs.keys())}." 

1017 ) 

1018 return super().deserialize_component(component, archive) 

1019 

1020 

1021def _extract_or_check_value[T]( 

1022 key: str, 

1023 given_value: T | None, 

1024 *sources: tuple[str, T | None], 

1025) -> T: 

1026 # Compare given value against multiple sources. If given value is not 

1027 # supplied return the first non-None value in the reference sources. 

1028 if given_value is not None: 

1029 for source_name, source_value in sources: 

1030 if source_value is not None and source_value != given_value: 

1031 raise ValueError( 

1032 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

1033 ) 

1034 if source_value is not None: 

1035 # Only check the first non-None source rather than checking 

1036 # all supplied values. 

1037 break 

1038 return given_value 

1039 

1040 for _, source_value in sources: 

1041 if source_value is not None: 

1042 return source_value 

1043 

1044 raise ValueError(f"No value found for {key}.") 

1045 

1046 

1047def _extract_or_check_header[T]( 

1048 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

1049) -> T: 

1050 hdr_value: T | None = None 

1051 if (hdr_raw_value := header.get(key)) is not None: 

1052 hdr_value = coerce(hdr_raw_value) 

1053 return _extract_or_check_value( 

1054 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

1055 ) 

1056 

1057 

1058def _get_unit_conversion_factor( 

1059 original: astropy.units.UnitBase, new: astropy.units.UnitBase 

1060) -> float | None: 

1061 try: 

1062 return original.to(new) 

1063 except astropy.units.UnitConversionError: 

1064 return None