Coverage for python / lsst / images / _visit_image.py: 27%

301 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 08:46 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import warnings 

17from collections.abc import Callable, Mapping, MutableMapping 

18from types import EllipsisType 

19from typing import Any, Literal, cast, overload 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import pydantic 

25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

26 

27from ._geom import Box 

28from ._image import Image, ImageSerializationModel 

29from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

30from ._masked_image import MaskedImage, MaskedImageSerializationModel 

31from ._observation_summary_stats import ObservationSummaryStats 

32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

33from .aperture_corrections import ( 

34 ApertureCorrectionMap, 

35 ApertureCorrectionMapSerializationModel, 

36 aperture_corrections_from_legacy, 

37) 

38from .cameras import Detector, DetectorSerializationModel 

39from .fits import FitsOpaqueMetadata 

40from .psfs import ( 

41 GaussianPointSpreadFunction, 

42 GaussianPSFSerializationModel, 

43 PiffSerializationModel, 

44 PiffWrapper, 

45 PointSpreadFunction, 

46 PSFExSerializationModel, 

47 PSFExWrapper, 

48) 

49from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive 

50 

51 

52def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

53 # Try to get an ObservationInfo from the primary header as if 

54 # it's a raw header. Else fallback. 

55 try: 

56 obs_info = ObservationInfo.from_header(md, quiet=True) 

57 except ValueError: 

58 # Not known translator. Must fall back to visit info. If we have 

59 # an actual VisitInfo, serialize it since we know that it will be 

60 # complete. 

61 if visit_info is not None: 

62 from lsst.afw.image import setVisitInfoMetadata 

63 from lsst.daf.base import PropertyList 

64 

65 pl = PropertyList() 

66 setVisitInfoMetadata(pl, visit_info) 

67 # Merge so that we still have access to butler provenance. 

68 md.update(pl) 

69 

70 # Try the given header looking for VisitInfo hints. 

71 # We get lots of warnings if nothing can be found. Currently 

72 # no way to disable those without capturing them. 

73 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

74 return obs_info 

75 

76 

77def _update_obs_info_from_legacy( 

78 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None 

79) -> ObservationInfo: 

80 extra_md: dict[str, str | int] = {} 

81 

82 if filter_label is not None and filter_label.hasBandLabel(): 

83 extra_md["physical_filter"] = filter_label.physicalLabel 

84 

85 # Fill in detector metadata, check for consistency. 

86 # ObsInfo detector name and group can not be derived from 

87 # the getName() information without knowing how the components 

88 # are separated. 

89 if detector is not None: 

90 detector_md = { 

91 "detector_num": detector.getId(), 

92 "detector_serial": detector.getSerial(), 

93 "detector_unique_name": detector.getName(), 

94 } 

95 extra_md.update(detector_md) 

96 

97 obs_info_updates: dict[str, str | int] = {} 

98 for k, v in extra_md.items(): 

99 current = getattr(obs_info, k) 

100 if current is None: 

101 obs_info_updates[k] = v 

102 continue 

103 if current != v: 

104 raise RuntimeError( 

105 f"ObservationInfo contains value for '{k}' that is inconsistent " 

106 f"with given legacy object: {v} != {current}" 

107 ) 

108 

109 if obs_info_updates: 

110 obs_info = obs_info.model_copy(update=obs_info_updates) 

111 return obs_info 

112 

113 

114class VisitImage(MaskedImage): 

115 """A calibrated single-visit image. 

116 

117 Parameters 

118 ---------- 

119 image 

120 The main image plane. If this has a `Projection`, it will be used 

121 for all planes unless a ``projection`` is passed separately. 

122 mask 

123 A bitmask image that annotates the main image plane. Must have the 

124 same bounding box as ``image`` if provided. Any attached projection 

125 is replaced (possibly by `None`). 

126 variance 

127 The per-pixel uncertainty of the main image as an image of variance 

128 values. Must have the same bounding box as ``image`` if provided, and 

129 its units must be the square of ``image.unit`` or `None`. 

130 Values default to ``1.0``. Any attached projection is replaced 

131 (possibly by `None`). 

132 mask_schema 

133 Schema for the mask plane. Must be provided if and only if ``mask`` is 

134 not provided. 

135 projection 

136 Projection that maps the pixel grid to the sky. Can only be `None` if 

137 a projection is already attached to ``image``. 

138 obs_info 

139 General information about this visit in standardized form. 

140 summary_stats 

141 Summary statistics associated with this visit. Initialized to default 

142 values if not provided. 

143 psf 

144 Point-spread function model for this image, or an exception explaining 

145 why it could not be read (to be raised if the PSF is requested later). 

146 detector 

147 Geometry and electronic information for the detector attached to this 

148 image. 

149 aperture_corrections : `dict` [`str`, `~fields.BaseField`] 

150 Mapping from photometry algorithm name to the aperture correction for 

151 that algorithm. 

152 metadata 

153 Arbitrary flexible metadata to associate with the image. 

154 """ 

155 

156 def __init__( 

157 self, 

158 image: Image, 

159 *, 

160 mask: Mask | None = None, 

161 variance: Image | None = None, 

162 mask_schema: MaskSchema | None = None, 

163 projection: Projection[DetectorFrame] | None = None, 

164 obs_info: ObservationInfo | None = None, 

165 summary_stats: ObservationSummaryStats | None = None, 

166 psf: PointSpreadFunction | ArchiveReadError, 

167 detector: Detector, 

168 aperture_corrections: ApertureCorrectionMap | None = None, 

169 metadata: dict[str, MetadataValue] | None = None, 

170 ): 

171 super().__init__( 

172 image, 

173 mask=mask, 

174 variance=variance, 

175 mask_schema=mask_schema, 

176 projection=projection, 

177 obs_info=obs_info, 

178 metadata=metadata, 

179 ) 

180 if self.image.unit is None: 

181 raise TypeError("The image component of a VisitImage must have units.") 

182 if self.image.projection is None: 

183 raise TypeError("The projection component of a VisitImage cannot be None.") 

184 if self.image.obs_info is None: 

185 raise TypeError("The observation info component of a VisitImage cannot be None.") 

186 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

187 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

188 if summary_stats is None: 

189 summary_stats = ObservationSummaryStats() 

190 self._summary_stats = summary_stats 

191 self._psf = psf 

192 self._detector = detector 

193 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {} 

194 

195 @property 

196 def unit(self) -> astropy.units.UnitBase: 

197 """The units of the image plane (`astropy.units.Unit`).""" 

198 return cast(astropy.units.UnitBase, super().unit) 

199 

200 @property 

201 def projection(self) -> Projection[DetectorFrame]: 

202 """The projection that maps the pixel grid to the sky 

203 (`Projection` [`DetectorFrame`]). 

204 """ 

205 return cast(Projection[DetectorFrame], super().projection) 

206 

207 @property 

208 def obs_info(self) -> ObservationInfo: 

209 """General information about this observation in standard form. 

210 (`~astro_metadata_translator.ObservationInfo`). 

211 """ 

212 obs_info = self.image.obs_info 

213 assert obs_info is not None 

214 return obs_info 

215 

216 @property 

217 def astropy_wcs(self) -> ProjectionAstropyView: 

218 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

219 

220 Notes 

221 ----- 

222 As expected for Astropy WCS objects, this defines pixel coordinates 

223 such that the first row and column in the arrays are ``(0, 0)``, not 

224 ``bbox.start``, as is the case for `projection`. 

225 

226 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

227 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

228 `astropy.wcs.WCS` (use `fits_wcs` for that). 

229 """ 

230 return cast(ProjectionAstropyView, super().astropy_wcs) 

231 

232 @property 

233 def summary_stats(self) -> ObservationSummaryStats: 

234 """Optional summary statistics for this observation 

235 (`ObservationSummaryStats`). 

236 """ 

237 return self._summary_stats 

238 

239 @property 

240 def psf(self) -> PointSpreadFunction: 

241 """The point-spread function model for this image 

242 (`.psfs.PointSpreadFunction`). 

243 """ 

244 if isinstance(self._psf, ArchiveReadError): 

245 raise self._psf 

246 return self._psf 

247 

248 @property 

249 def detector(self) -> Detector: 

250 """Geometry and electronic information about the detector 

251 (`.cameras.Detector`). 

252 """ 

253 return self._detector 

254 

255 @property 

256 def aperture_corrections(self) -> ApertureCorrectionMap: 

257 """A mapping from photometry algorithm name to the aperture correction 

258 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]). 

259 """ 

260 return self._aperture_corrections 

261 

262 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

263 if bbox is ...: 

264 return self 

265 super().__getitem__(bbox) 

266 return self._transfer_metadata( 

267 VisitImage( 

268 self.image[bbox], 

269 mask=self.mask[bbox], 

270 variance=self.variance[bbox], 

271 projection=self.projection, 

272 psf=self.psf, 

273 obs_info=self.obs_info, 

274 summary_stats=self.summary_stats, 

275 detector=self._detector, 

276 aperture_corrections=self.aperture_corrections, 

277 ), 

278 bbox=bbox, 

279 ) 

280 

281 def __str__(self) -> str: 

282 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

283 

284 def __repr__(self) -> str: 

285 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

286 

287 def copy(self, *, copy_detector: bool = False) -> VisitImage: 

288 """Deep-copy the visit image. 

289 

290 Parameters 

291 ---------- 

292 copy_detector 

293 Whether to deep-copy the `detector` attribute. 

294 """ 

295 return self._transfer_metadata( 

296 VisitImage( 

297 image=self._image.copy(), 

298 mask=self._mask.copy(), 

299 variance=self._variance.copy(), 

300 psf=self._psf, 

301 obs_info=self.obs_info, 

302 summary_stats=self.summary_stats.model_copy(), 

303 detector=self._detector.copy() if copy_detector else self._detector, 

304 aperture_corrections=self.aperture_corrections.copy(), 

305 ), 

306 copy=True, 

307 ) 

308 

309 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

310 masked_image_model = super().serialize(archive) 

311 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

312 match self._psf: 

313 # MyPy is able to figure things out here with this match statement, 

314 # but not a single isinstance check on both types. 

315 case PiffWrapper(): 

316 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

317 case PSFExWrapper(): 

318 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

319 case GaussianPointSpreadFunction(): 

320 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

321 case _: 

322 raise TypeError( 

323 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

324 ) 

325 assert masked_image_model.projection is not None, "VisitImage always has a projection." 

326 assert masked_image_model.obs_info is not None, "VisitImage always has observation info." 

327 serialized_detector = self._detector.serialize(archive) 

328 serialized_aperture_corrections = ApertureCorrectionMapSerializationModel.serialize( 

329 self.aperture_corrections, archive 

330 ) 

331 return VisitImageSerializationModel( 

332 image=masked_image_model.image, 

333 mask=masked_image_model.mask, 

334 variance=masked_image_model.variance, 

335 projection=masked_image_model.projection, 

336 obs_info=masked_image_model.obs_info, 

337 psf=serialized_psf, 

338 summary_stats=self.summary_stats, 

339 detector=serialized_detector, 

340 aperture_corrections=serialized_aperture_corrections, 

341 metadata=self.metadata, 

342 ) 

343 

344 # Type-checkers want the model argument to only require 

345 # MaskedImageSerializationModel[Any], and they'd be absolutely right if 

346 # this were a regular instance method. But whether Liskov substitution 

347 # applies to classmethods and staticmethods is sort of context-dependent, 

348 # and here we do not want it to. 

349 @staticmethod 

350 def deserialize( 

351 model: VisitImageSerializationModel[Any], # type: ignore[override] 

352 archive: InputArchive[Any], 

353 *, 

354 bbox: Box | None = None, 

355 ) -> VisitImage: 

356 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) 

357 psf = model.deserialize_psf(archive) 

358 detector = Detector.deserialize(model.detector, archive) 

359 aperture_corrections = model.aperture_corrections.deserialize(archive) 

360 return VisitImage( 

361 masked_image.image, 

362 mask=masked_image.mask, 

363 variance=masked_image.variance, 

364 psf=psf, 

365 projection=masked_image.projection, 

366 obs_info=masked_image.obs_info, 

367 summary_stats=model.summary_stats, 

368 detector=detector, 

369 aperture_corrections=aperture_corrections, 

370 )._finish_deserialize(model) 

371 

372 @staticmethod 

373 def _get_archive_tree_type[P: pydantic.BaseModel]( 

374 pointer_type: type[P], 

375 ) -> type[VisitImageSerializationModel[P]]: 

376 """Return the serialization model type for this object for an archive 

377 type that uses the given pointer type. 

378 """ 

379 return VisitImageSerializationModel[pointer_type] # type: ignore 

380 

381 # write_fits and read_fits inherited from MaskedImage. 

382 

383 @staticmethod 

384 def from_legacy( 

385 legacy: Any, 

386 *, 

387 unit: astropy.units.Unit | None = None, 

388 plane_map: Mapping[str, MaskPlane] | None = None, 

389 instrument: str | None = None, 

390 visit: int | None = None, 

391 ) -> VisitImage: 

392 """Convert from an `lsst.afw.image.Exposure` instance. 

393 

394 Parameters 

395 ---------- 

396 legacy 

397 An `lsst.afw.image.Exposure` instance that will share image and 

398 variance (but not mask) pixel data with the returned object. 

399 unit 

400 Units of the image. If not provided, the ``BUNIT`` metadata 

401 key will be used, if available. 

402 plane_map 

403 A mapping from legacy mask plane name to the new plane name and 

404 description. If `None` (default) 

405 `get_legacy_visit_image_mask_planes` is used. 

406 instrument 

407 Name of the instrument. Extracted from the metadata if not 

408 provided. 

409 visit 

410 ID of the visit. Extracted from the metadata if not provided. 

411 """ 

412 if plane_map is None: 

413 plane_map = get_legacy_visit_image_mask_planes() 

414 md = legacy.getMetadata() 

415 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

416 instrument = _extract_or_check_header( 

417 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

418 ) 

419 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

420 unit = _extract_or_check_header( 

421 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits") 

422 ) 

423 legacy_wcs = legacy.getWcs() 

424 if legacy_wcs is None: 

425 raise ValueError("Exposure does not have a SkyWcs.") 

426 legacy_detector = legacy.getDetector() 

427 if legacy_detector is None: 

428 raise ValueError("Exposure does not have a Detector.") 

429 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

430 

431 # Update the ObservationInfo from other components. 

432 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter()) 

433 

434 opaque_fits_metadata = FitsOpaqueMetadata() 

435 primary_header = astropy.io.fits.Header() 

436 with warnings.catch_warnings(): 

437 # Silence warnings about long keys becoming HIERARCH. 

438 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

439 primary_header.update(md.toOrderedDict()) 

440 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

441 projection = Projection.from_legacy( 

442 legacy_wcs, 

443 DetectorFrame( 

444 instrument=instrument, 

445 visit=visit, 

446 detector=legacy_detector.getId(), 

447 bbox=detector_bbox, 

448 ), 

449 ) 

450 legacy_psf = legacy.getPsf() 

451 if legacy_psf is None: 

452 raise ValueError("Exposure file does not have a Psf.") 

453 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

454 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

455 legacy_summary_stats = legacy.info.getSummaryStats() 

456 legacy_ap_corr_map = legacy.info.getApCorrMap() 

457 result = VisitImage( 

458 image=masked_image.image.view(unit=unit), 

459 mask=masked_image.mask, 

460 variance=masked_image.variance, 

461 projection=projection, 

462 psf=psf, 

463 obs_info=obs_info, 

464 summary_stats=( 

465 ObservationSummaryStats.from_legacy(legacy_summary_stats) 

466 if legacy_summary_stats is not None 

467 else None 

468 ), 

469 detector=Detector.from_legacy( 

470 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

471 ), 

472 aperture_corrections=( 

473 aperture_corrections_from_legacy(legacy_ap_corr_map) 

474 if legacy_ap_corr_map is not None 

475 else None 

476 ), 

477 ) 

478 

479 result._opaque_metadata = opaque_fits_metadata 

480 return result 

481 

482 @overload # type: ignore[override] 

483 @staticmethod 

484 def read_legacy( 484 ↛ exitline 484 didn't return from function 'read_legacy' because

485 filename: str, 

486 *, 

487 component: Literal["bbox"], 

488 ) -> Box: ... 

489 

490 @overload 

491 @staticmethod 

492 def read_legacy( 492 ↛ exitline 492 didn't return from function 'read_legacy' because

493 filename: str, 

494 *, 

495 preserve_quantization: bool = False, 

496 instrument: str | None = None, 

497 visit: int | None = None, 

498 component: Literal["image"], 

499 ) -> Image: ... 

500 

501 @overload 

502 @staticmethod 

503 def read_legacy( 503 ↛ exitline 503 didn't return from function 'read_legacy' because

504 filename: str, 

505 *, 

506 plane_map: Mapping[str, MaskPlane] | None = None, 

507 instrument: str | None = None, 

508 visit: int | None = None, 

509 component: Literal["mask"], 

510 ) -> Mask: ... 

511 

512 @overload 

513 @staticmethod 

514 def read_legacy( 514 ↛ exitline 514 didn't return from function 'read_legacy' because

515 filename: str, 

516 *, 

517 preserve_quantization: bool = False, 

518 instrument: str | None = None, 

519 visit: int | None = None, 

520 component: Literal["variance"], 

521 ) -> Image: ... 

522 

523 @overload 

524 @staticmethod 

525 def read_legacy( 525 ↛ exitline 525 didn't return from function 'read_legacy' because

526 filename: str, 

527 *, 

528 instrument: str | None = None, 

529 visit: int | None = None, 

530 component: Literal["projection"], 

531 ) -> Projection[DetectorFrame]: ... 

532 

533 @overload 

534 @staticmethod 

535 def read_legacy( 535 ↛ exitline 535 didn't return from function 'read_legacy' because

536 filename: str, 

537 *, 

538 component: Literal["psf"], 

539 ) -> PointSpreadFunction: ... 

540 

541 @overload 

542 @staticmethod 

543 def read_legacy( 543 ↛ exitline 543 didn't return from function 'read_legacy' because

544 filename: str, 

545 *, 

546 component: Literal["detector"], 

547 ) -> Detector: ... 

548 

549 @overload 

550 @staticmethod 

551 def read_legacy( 551 ↛ exitline 551 didn't return from function 'read_legacy' because

552 filename: str, 

553 *, 

554 component: Literal["obs_info"], 

555 ) -> ObservationInfo: ... 

556 

557 @overload 

558 @staticmethod 

559 def read_legacy( 559 ↛ exitline 559 didn't return from function 'read_legacy' because

560 filename: str, 

561 *, 

562 component: Literal["summary_stats"], 

563 ) -> ObservationSummaryStats: ... 

564 

565 @overload 

566 @staticmethod 

567 def read_legacy( 567 ↛ exitline 567 didn't return from function 'read_legacy' because

568 filename: str, 

569 *, 

570 component: Literal["aperture_corrections"], 

571 ) -> ApertureCorrectionMap: ... 

572 

573 @overload 

574 @staticmethod 

575 def read_legacy( 575 ↛ exitline 575 didn't return from function 'read_legacy' because

576 filename: str, 

577 *, 

578 preserve_quantization: bool = False, 

579 plane_map: Mapping[str, MaskPlane] | None = None, 

580 instrument: str | None = None, 

581 visit: int | None = None, 

582 component: None = None, 

583 ) -> VisitImage: ... 

584 

585 @staticmethod 

586 def read_legacy( # type: ignore[override] 

587 filename: str, 

588 *, 

589 preserve_quantization: bool = False, 

590 plane_map: Mapping[str, MaskPlane] | None = None, 

591 instrument: str | None = None, 

592 visit: int | None = None, 

593 component: Literal[ 

594 "bbox", 

595 "image", 

596 "mask", 

597 "variance", 

598 "projection", 

599 "psf", 

600 "detector", 

601 "obs_info", 

602 "summary_stats", 

603 "aperture_corrections", 

604 ] 

605 | None = None, 

606 ) -> Any: 

607 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

608 

609 Parameters 

610 ---------- 

611 filename 

612 Full name of the file. 

613 preserve_quantization 

614 If `True`, ensure that writing the masked image back out again will 

615 exactly preserve quantization-compressed pixel values. This causes 

616 the image and variance plane arrays to be marked as read-only and 

617 stores the original binary table data for those planes in memory. 

618 If the `MaskedImage` is copied, the precompressed pixel values are 

619 not transferred to the copy. 

620 plane_map 

621 A mapping from legacy mask plane name to the new plane name and 

622 description. If `None` (default) 

623 `get_legacy_visit_image_mask_planes` is used. 

624 instrument 

625 Name of the instrument. Read from the primary header if not 

626 provided. 

627 visit 

628 ID of the visit. Read from the primary header if not 

629 provided. 

630 component 

631 A component to read instead of the full image. 

632 """ 

633 from lsst.afw.image import ExposureFitsReader 

634 

635 reader = ExposureFitsReader(filename) 

636 if component == "bbox": 

637 return Box.from_legacy(reader.readBBox()) 

638 legacy_detector = reader.readDetector() 

639 if legacy_detector is None: 

640 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

641 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

642 legacy_wcs = None 

643 if component in (None, "image", "mask", "variance", "projection"): 

644 legacy_wcs = reader.readWcs() 

645 if legacy_wcs is None: 

646 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

647 legacy_exposure_info = reader.readExposureInfo() 

648 summary_stats = None 

649 if component in (None, "summary_stats"): 

650 legacy_stats = legacy_exposure_info.getSummaryStats() 

651 if legacy_stats is not None: 

652 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats) 

653 if component == "summary_stats": 

654 return summary_stats 

655 if component in (None, "psf"): 

656 legacy_psf = reader.readPsf() 

657 if legacy_psf is None: 

658 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

659 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

660 if component == "psf": 

661 return psf 

662 aperture_corrections: ApertureCorrectionMap = {} 

663 if component in (None, "aperture_corrections"): 

664 legacy_ap_corr_map = reader.readApCorrMap() 

665 if legacy_ap_corr_map is not None: 

666 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map) 

667 if component == "aperture_corrections": 

668 return aperture_corrections 

669 assert component in (None, "image", "mask", "variance", "projection", "obs_info", "detector"), ( 

670 component 

671 ) # for MyPy 

672 with astropy.io.fits.open(filename) as hdu_list: 

673 primary_header = hdu_list[0].header 

674 obs_info = _obs_info_from_md(primary_header) 

675 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter()) 

676 if component == "obs_info": 

677 return obs_info 

678 instrument = _extract_or_check_header( 

679 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

680 ) 

681 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

682 if component == "detector": 

683 return Detector.from_legacy( 

684 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

685 ) 

686 projection = Projection.from_legacy( 

687 legacy_wcs, 

688 DetectorFrame( 

689 instrument=instrument, 

690 visit=visit, 

691 detector=legacy_detector.getId(), 

692 bbox=detector_bbox, 

693 ), 

694 ) 

695 if component == "projection": 

696 return projection 

697 if plane_map is None: 

698 plane_map = get_legacy_visit_image_mask_planes() 

699 assert component != "psf", component # for MyPy 

700 from_masked_image = MaskedImage._read_legacy_hdus( 

701 hdu_list, 

702 filename, 

703 preserve_quantization=preserve_quantization, 

704 plane_map=plane_map, 

705 component=component, 

706 ) 

707 if component is not None: 

708 # This is the image, mask, or variance; attach the projection and 

709 # obs_info and return 

710 return from_masked_image.view(projection=projection, obs_info=obs_info) 

711 result = VisitImage( 

712 from_masked_image.image, 

713 mask=from_masked_image.mask, 

714 variance=from_masked_image.variance, 

715 projection=projection, 

716 psf=psf, 

717 detector=Detector.from_legacy( 

718 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

719 ), 

720 obs_info=obs_info, 

721 summary_stats=summary_stats, 

722 aperture_corrections=aperture_corrections, 

723 ) 

724 result._opaque_metadata = from_masked_image._opaque_metadata 

725 return result 

726 

727 

728class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

729 """A Pydantic model used to represent a serialized `VisitImage`.""" 

730 

731 # Inherited attributes are duplicated because that improves the docs 

732 # (some limitation in the sphinx/pydantic integration), and these are 

733 # important docs. 

734 

735 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

736 mask: MaskSerializationModel[P] = pydantic.Field( 

737 description="Bitmask that annotates the main image's pixels." 

738 ) 

739 variance: ImageSerializationModel[P] = pydantic.Field( 

740 description="Per-pixel variance estimates for the main image." 

741 ) 

742 projection: ProjectionSerializationModel[P] = pydantic.Field( 

743 description="Projection that maps the pixel grid to the sky.", 

744 ) 

745 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

746 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

747 ) 

748 obs_info: ObservationInfo = pydantic.Field( 

749 description="Standardized description of visit metadata", 

750 ) 

751 summary_stats: ObservationSummaryStats = pydantic.Field( 

752 description="Summary statistics for the observation." 

753 ) 

754 detector: DetectorSerializationModel = pydantic.Field( 

755 description="Geometry and electronic information for the detector." 

756 ) 

757 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field( 

758 default_factory=ApertureCorrectionMapSerializationModel, 

759 description="Aperture corrections, keyed by flux algorithm.", 

760 ) 

761 

762 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError: 

763 """Finish deserializing the PSF model, or *return* any exception 

764 raised in the attempt. 

765 """ 

766 try: 

767 match self.psf: 

768 case PiffSerializationModel(): 

769 return PiffWrapper.deserialize(self.psf, archive) 

770 case PSFExSerializationModel(): 

771 return PSFExWrapper.deserialize(self.psf, archive) 

772 case GaussianPSFSerializationModel(): 

773 return GaussianPointSpreadFunction.deserialize(self.psf, archive) 

774 case _: 

775 raise ArchiveReadError("PSF model type not recognized.") 

776 except ArchiveReadError as err: 

777 return err 

778 

779 

780def _extract_or_check_value[T]( 

781 key: str, 

782 given_value: T | None, 

783 *sources: tuple[str, T | None], 

784) -> T: 

785 # Compare given value against multiple sources. If given value is not 

786 # supplied return the first non-None value in the reference sources. 

787 if given_value is not None: 

788 for source_name, source_value in sources: 

789 if source_value is not None and source_value != given_value: 

790 raise ValueError( 

791 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

792 ) 

793 if source_value is not None: 

794 # Only check the first non-None source rather than checking 

795 # all supplied values. 

796 break 

797 return given_value 

798 

799 for _, source_value in sources: 

800 if source_value is not None: 

801 return source_value 

802 

803 raise ValueError(f"No value found for {key}.") 

804 

805 

806def _extract_or_check_header[T]( 

807 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

808) -> T: 

809 hdr_value: T | None = None 

810 if (hdr_raw_value := header.get(key)) is not None: 

811 hdr_value = coerce(hdr_raw_value) 

812 return _extract_or_check_value( 

813 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

814 )