Coverage for python / lsst / images / _visit_image.py: 28%

304 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-15 08:42 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import warnings 

17from collections.abc import Callable, Mapping, MutableMapping 

18from types import EllipsisType 

19from typing import Any, Literal, cast, overload 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import pydantic 

25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

26 

27from ._concrete_bounds import SerializableBounds 

28from ._geom import Bounds, Box 

29from ._image import Image, ImageSerializationModel 

30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

31from ._masked_image import MaskedImage, MaskedImageSerializationModel 

32from ._observation_summary_stats import ObservationSummaryStats 

33from ._polygon import Polygon 

34from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

35from .aperture_corrections import ( 

36 ApertureCorrectionMap, 

37 ApertureCorrectionMapSerializationModel, 

38 aperture_corrections_from_legacy, 

39) 

40from .cameras import Detector, DetectorSerializationModel 

41from .fits import FitsOpaqueMetadata 

42from .psfs import ( 

43 GaussianPointSpreadFunction, 

44 GaussianPSFSerializationModel, 

45 PiffSerializationModel, 

46 PiffWrapper, 

47 PointSpreadFunction, 

48 PSFExSerializationModel, 

49 PSFExWrapper, 

50) 

51from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive 

52from .utils import is_none 

53 

54 

55def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

56 # Try to get an ObservationInfo from the primary header as if 

57 # it's a raw header. Else fallback. 

58 try: 

59 obs_info = ObservationInfo.from_header(md, quiet=True) 

60 except ValueError: 

61 # Not known translator. Must fall back to visit info. If we have 

62 # an actual VisitInfo, serialize it since we know that it will be 

63 # complete. 

64 if visit_info is not None: 

65 from lsst.afw.image import setVisitInfoMetadata 

66 from lsst.daf.base import PropertyList 

67 

68 pl = PropertyList() 

69 setVisitInfoMetadata(pl, visit_info) 

70 # Merge so that we still have access to butler provenance. 

71 md.update(pl) 

72 

73 # Try the given header looking for VisitInfo hints. 

74 # We get lots of warnings if nothing can be found. Currently 

75 # no way to disable those without capturing them. 

76 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

77 return obs_info 

78 

79 

80def _update_obs_info_from_legacy( 

81 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None 

82) -> ObservationInfo: 

83 extra_md: dict[str, str | int] = {} 

84 

85 if filter_label is not None and filter_label.hasBandLabel(): 

86 extra_md["physical_filter"] = filter_label.physicalLabel 

87 

88 # Fill in detector metadata, check for consistency. 

89 # ObsInfo detector name and group can not be derived from 

90 # the getName() information without knowing how the components 

91 # are separated. 

92 if detector is not None: 

93 detector_md = { 

94 "detector_num": detector.getId(), 

95 "detector_serial": detector.getSerial(), 

96 "detector_unique_name": detector.getName(), 

97 } 

98 extra_md.update(detector_md) 

99 

100 obs_info_updates: dict[str, str | int] = {} 

101 for k, v in extra_md.items(): 

102 current = getattr(obs_info, k) 

103 if current is None: 

104 obs_info_updates[k] = v 

105 continue 

106 if current != v: 

107 raise RuntimeError( 

108 f"ObservationInfo contains value for '{k}' that is inconsistent " 

109 f"with given legacy object: {v} != {current}" 

110 ) 

111 

112 if obs_info_updates: 

113 obs_info = obs_info.model_copy(update=obs_info_updates) 

114 return obs_info 

115 

116 

117class VisitImage(MaskedImage): 

118 """A calibrated single-visit image. 

119 

120 Parameters 

121 ---------- 

122 image 

123 The main image plane. If this has a `Projection`, it will be used 

124 for all planes unless a ``projection`` is passed separately. 

125 mask 

126 A bitmask image that annotates the main image plane. Must have the 

127 same bounding box as ``image`` if provided. Any attached projection 

128 is replaced (possibly by `None`). 

129 variance 

130 The per-pixel uncertainty of the main image as an image of variance 

131 values. Must have the same bounding box as ``image`` if provided, and 

132 its units must be the square of ``image.unit`` or `None`. 

133 Values default to ``1.0``. Any attached projection is replaced 

134 (possibly by `None`). 

135 mask_schema 

136 Schema for the mask plane. Must be provided if and only if ``mask`` is 

137 not provided. 

138 projection 

139 Projection that maps the pixel grid to the sky. Can only be `None` if 

140 a projection is already attached to ``image``. 

141 bounds 

142 The region where this image's pixels and other properties are valid. 

143 If not provided, the bounding box of the image is used. Other 

144 components (``psf``, ``projection``, ``aperture_corrections``, etc.) 

145 are assumed to have their own bounds which may or may not be the same 

146 as the image bounds. If ``bounds`` extends beyond the image bounding 

147 box, the intersection between ``bounds`` and the image bounding box 

148 is used instead. 

149 obs_info 

150 General information about this visit in standardized form. 

151 summary_stats 

152 Summary statistics associated with this visit. Initialized to default 

153 values if not provided. 

154 psf 

155 Point-spread function model for this image, or an exception explaining 

156 why it could not be read (to be raised if the PSF is requested later). 

157 detector 

158 Geometry and electronic information for the detector attached to this 

159 image. 

160 aperture_corrections : `dict` [`str`, `~fields.BaseField`] 

161 Mapping from photometry algorithm name to the aperture correction for 

162 that algorithm. 

163 metadata 

164 Arbitrary flexible metadata to associate with the image. 

165 """ 

166 

167 def __init__( 

168 self, 

169 image: Image, 

170 *, 

171 mask: Mask | None = None, 

172 variance: Image | None = None, 

173 mask_schema: MaskSchema | None = None, 

174 projection: Projection[DetectorFrame] | None = None, 

175 bounds: Bounds | None = None, 

176 obs_info: ObservationInfo | None = None, 

177 summary_stats: ObservationSummaryStats | None = None, 

178 psf: PointSpreadFunction | ArchiveReadError, 

179 detector: Detector, 

180 aperture_corrections: ApertureCorrectionMap | None = None, 

181 metadata: dict[str, MetadataValue] | None = None, 

182 ): 

183 super().__init__( 

184 image, 

185 mask=mask, 

186 variance=variance, 

187 mask_schema=mask_schema, 

188 projection=projection, 

189 obs_info=obs_info, 

190 metadata=metadata, 

191 ) 

192 if self.image.unit is None: 

193 raise TypeError("The image component of a VisitImage must have units.") 

194 if self.image.projection is None: 

195 raise TypeError("The projection component of a VisitImage cannot be None.") 

196 if self.image.obs_info is None: 

197 raise TypeError("The observation info component of a VisitImage cannot be None.") 

198 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

199 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

200 if summary_stats is None: 

201 summary_stats = ObservationSummaryStats() 

202 self._summary_stats = summary_stats 

203 self._psf = psf 

204 self._detector = detector 

205 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {} 

206 self._bounds = bounds if bounds is not None else self.bbox 

207 if not self.bbox.contains(self._bounds.bbox): 

208 self._bounds = self._bounds.intersection(self.bbox) 

209 

210 @property 

211 def unit(self) -> astropy.units.UnitBase: 

212 """The units of the image plane (`astropy.units.Unit`).""" 

213 return cast(astropy.units.UnitBase, super().unit) 

214 

215 @property 

216 def projection(self) -> Projection[DetectorFrame]: 

217 """The projection that maps the pixel grid to the sky 

218 (`Projection` [`DetectorFrame`]). 

219 """ 

220 return cast(Projection[DetectorFrame], super().projection) 

221 

222 @property 

223 def bounds(self) -> Bounds: 

224 """The region where pixels are valid (`Bounds`).""" 

225 return self._bounds 

226 

227 @property 

228 def obs_info(self) -> ObservationInfo: 

229 """General information about this observation in standard form. 

230 (`~astro_metadata_translator.ObservationInfo`). 

231 """ 

232 obs_info = self.image.obs_info 

233 assert obs_info is not None 

234 return obs_info 

235 

236 @property 

237 def astropy_wcs(self) -> ProjectionAstropyView: 

238 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

239 

240 Notes 

241 ----- 

242 As expected for Astropy WCS objects, this defines pixel coordinates 

243 such that the first row and column in the arrays are ``(0, 0)``, not 

244 ``bbox.start``, as is the case for `projection`. 

245 

246 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

247 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

248 `astropy.wcs.WCS` (use `fits_wcs` for that). 

249 """ 

250 return cast(ProjectionAstropyView, super().astropy_wcs) 

251 

252 @property 

253 def summary_stats(self) -> ObservationSummaryStats: 

254 """Optional summary statistics for this observation 

255 (`ObservationSummaryStats`). 

256 """ 

257 return self._summary_stats 

258 

259 @property 

260 def psf(self) -> PointSpreadFunction: 

261 """The point-spread function model for this image 

262 (`.psfs.PointSpreadFunction`). 

263 """ 

264 if isinstance(self._psf, ArchiveReadError): 

265 raise self._psf 

266 return self._psf 

267 

268 @property 

269 def detector(self) -> Detector: 

270 """Geometry and electronic information about the detector 

271 (`.cameras.Detector`). 

272 """ 

273 return self._detector 

274 

275 @property 

276 def aperture_corrections(self) -> ApertureCorrectionMap: 

277 """A mapping from photometry algorithm name to the aperture correction 

278 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]). 

279 """ 

280 return self._aperture_corrections 

281 

282 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

283 if bbox is ...: 

284 return self 

285 super().__getitem__(bbox) 

286 return self._transfer_metadata( 

287 VisitImage( 

288 self.image[bbox], 

289 mask=self.mask[bbox], 

290 variance=self.variance[bbox], 

291 projection=self.projection, 

292 psf=self.psf, 

293 obs_info=self.obs_info, 

294 bounds=self._bounds, # don't need to intersect here, because __init__ will do that. 

295 summary_stats=self.summary_stats, 

296 detector=self._detector, 

297 aperture_corrections=self.aperture_corrections, 

298 ), 

299 bbox=bbox, 

300 ) 

301 

302 def __str__(self) -> str: 

303 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

304 

305 def __repr__(self) -> str: 

306 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

307 

308 def copy(self, *, copy_detector: bool = False) -> VisitImage: 

309 """Deep-copy the visit image. 

310 

311 Parameters 

312 ---------- 

313 copy_detector 

314 Whether to deep-copy the `detector` attribute. 

315 """ 

316 return self._transfer_metadata( 

317 VisitImage( 

318 image=self._image.copy(), 

319 mask=self._mask.copy(), 

320 variance=self._variance.copy(), 

321 psf=self._psf, 

322 obs_info=self.obs_info, 

323 bounds=self._bounds, 

324 summary_stats=self.summary_stats.model_copy(), 

325 detector=self._detector.copy() if copy_detector else self._detector, 

326 aperture_corrections=self.aperture_corrections.copy(), 

327 ), 

328 copy=True, 

329 ) 

330 

331 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

332 masked_image_model = super().serialize(archive) 

333 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

334 match self._psf: 

335 # MyPy is able to figure things out here with this match statement, 

336 # but not a single isinstance check on both types. 

337 case PiffWrapper(): 

338 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

339 case PSFExWrapper(): 

340 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

341 case GaussianPointSpreadFunction(): 

342 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

343 case _: 

344 raise TypeError( 

345 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

346 ) 

347 assert masked_image_model.projection is not None, "VisitImage always has a projection." 

348 assert masked_image_model.obs_info is not None, "VisitImage always has observation info." 

349 serialized_detector = self._detector.serialize(archive) 

350 serialized_aperture_corrections = ApertureCorrectionMapSerializationModel.serialize( 

351 self.aperture_corrections, archive 

352 ) 

353 return VisitImageSerializationModel( 

354 image=masked_image_model.image, 

355 mask=masked_image_model.mask, 

356 variance=masked_image_model.variance, 

357 projection=masked_image_model.projection, 

358 obs_info=masked_image_model.obs_info, 

359 psf=serialized_psf, 

360 summary_stats=self.summary_stats, 

361 detector=serialized_detector, 

362 aperture_corrections=serialized_aperture_corrections, 

363 bounds=self._bounds.serialize() if self._bounds != self.bbox else None, 

364 metadata=self.metadata, 

365 ) 

366 

367 @staticmethod 

368 def _get_archive_tree_type[P: pydantic.BaseModel]( 

369 pointer_type: type[P], 

370 ) -> type[VisitImageSerializationModel[P]]: 

371 """Return the serialization model type for this object for an archive 

372 type that uses the given pointer type. 

373 """ 

374 return VisitImageSerializationModel[pointer_type] # type: ignore 

375 

376 # write_fits and read_fits inherited from MaskedImage. 

377 

378 @staticmethod 

379 def from_legacy( 

380 legacy: Any, 

381 *, 

382 unit: astropy.units.Unit | None = None, 

383 plane_map: Mapping[str, MaskPlane] | None = None, 

384 instrument: str | None = None, 

385 visit: int | None = None, 

386 ) -> VisitImage: 

387 """Convert from an `lsst.afw.image.Exposure` instance. 

388 

389 Parameters 

390 ---------- 

391 legacy 

392 An `lsst.afw.image.Exposure` instance that will share image and 

393 variance (but not mask) pixel data with the returned object. 

394 unit 

395 Units of the image. If not provided, the ``BUNIT`` metadata 

396 key will be used, if available. 

397 plane_map 

398 A mapping from legacy mask plane name to the new plane name and 

399 description. If `None` (default) 

400 `get_legacy_visit_image_mask_planes` is used. 

401 instrument 

402 Name of the instrument. Extracted from the metadata if not 

403 provided. 

404 visit 

405 ID of the visit. Extracted from the metadata if not provided. 

406 """ 

407 if plane_map is None: 

408 plane_map = get_legacy_visit_image_mask_planes() 

409 md = legacy.getMetadata() 

410 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

411 instrument = _extract_or_check_header( 

412 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

413 ) 

414 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

415 unit = _extract_or_check_header( 

416 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits") 

417 ) 

418 legacy_wcs = legacy.getWcs() 

419 if legacy_wcs is None: 

420 raise ValueError("Exposure does not have a SkyWcs.") 

421 legacy_detector = legacy.getDetector() 

422 if legacy_detector is None: 

423 raise ValueError("Exposure does not have a Detector.") 

424 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

425 

426 # Update the ObservationInfo from other components. 

427 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter()) 

428 

429 opaque_fits_metadata = FitsOpaqueMetadata() 

430 primary_header = astropy.io.fits.Header() 

431 with warnings.catch_warnings(): 

432 # Silence warnings about long keys becoming HIERARCH. 

433 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

434 primary_header.update(md.toOrderedDict()) 

435 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

436 projection = Projection.from_legacy( 

437 legacy_wcs, 

438 DetectorFrame( 

439 instrument=instrument, 

440 visit=visit, 

441 detector=legacy_detector.getId(), 

442 bbox=detector_bbox, 

443 ), 

444 ) 

445 legacy_psf = legacy.getPsf() 

446 if legacy_psf is None: 

447 raise ValueError("Exposure file does not have a Psf.") 

448 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

449 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

450 legacy_summary_stats = legacy.info.getSummaryStats() 

451 legacy_ap_corr_map = legacy.info.getApCorrMap() 

452 legacy_polygon = legacy.info.getValidPolygon() 

453 result = VisitImage( 

454 image=masked_image.image.view(unit=unit), 

455 mask=masked_image.mask, 

456 variance=masked_image.variance, 

457 projection=projection, 

458 psf=psf, 

459 obs_info=obs_info, 

460 summary_stats=( 

461 ObservationSummaryStats.from_legacy(legacy_summary_stats) 

462 if legacy_summary_stats is not None 

463 else None 

464 ), 

465 detector=Detector.from_legacy( 

466 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

467 ), 

468 aperture_corrections=( 

469 aperture_corrections_from_legacy(legacy_ap_corr_map) 

470 if legacy_ap_corr_map is not None 

471 else None 

472 ), 

473 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None, 

474 ) 

475 

476 result._opaque_metadata = opaque_fits_metadata 

477 return result 

478 

479 @overload # type: ignore[override] 

480 @staticmethod 

481 def read_legacy( 481 ↛ exitline 481 didn't return from function 'read_legacy' because

482 filename: str, 

483 *, 

484 component: Literal["bbox"], 

485 ) -> Box: ... 

486 

487 @overload 

488 @staticmethod 

489 def read_legacy( 489 ↛ exitline 489 didn't return from function 'read_legacy' because

490 filename: str, 

491 *, 

492 preserve_quantization: bool = False, 

493 instrument: str | None = None, 

494 visit: int | None = None, 

495 component: Literal["image"], 

496 ) -> Image: ... 

497 

498 @overload 

499 @staticmethod 

500 def read_legacy( 500 ↛ exitline 500 didn't return from function 'read_legacy' because

501 filename: str, 

502 *, 

503 plane_map: Mapping[str, MaskPlane] | None = None, 

504 instrument: str | None = None, 

505 visit: int | None = None, 

506 component: Literal["mask"], 

507 ) -> Mask: ... 

508 

509 @overload 

510 @staticmethod 

511 def read_legacy( 511 ↛ exitline 511 didn't return from function 'read_legacy' because

512 filename: str, 

513 *, 

514 preserve_quantization: bool = False, 

515 instrument: str | None = None, 

516 visit: int | None = None, 

517 component: Literal["variance"], 

518 ) -> Image: ... 

519 

520 @overload 

521 @staticmethod 

522 def read_legacy( 522 ↛ exitline 522 didn't return from function 'read_legacy' because

523 filename: str, 

524 *, 

525 instrument: str | None = None, 

526 visit: int | None = None, 

527 component: Literal["projection"], 

528 ) -> Projection[DetectorFrame]: ... 

529 

530 @overload 

531 @staticmethod 

532 def read_legacy( 532 ↛ exitline 532 didn't return from function 'read_legacy' because

533 filename: str, 

534 *, 

535 component: Literal["psf"], 

536 ) -> PointSpreadFunction: ... 

537 

538 @overload 

539 @staticmethod 

540 def read_legacy( 540 ↛ exitline 540 didn't return from function 'read_legacy' because

541 filename: str, 

542 *, 

543 component: Literal["detector"], 

544 ) -> Detector: ... 

545 

546 @overload 

547 @staticmethod 

548 def read_legacy( 548 ↛ exitline 548 didn't return from function 'read_legacy' because

549 filename: str, 

550 *, 

551 component: Literal["obs_info"], 

552 ) -> ObservationInfo: ... 

553 

554 @overload 

555 @staticmethod 

556 def read_legacy( 556 ↛ exitline 556 didn't return from function 'read_legacy' because

557 filename: str, 

558 *, 

559 component: Literal["summary_stats"], 

560 ) -> ObservationSummaryStats: ... 

561 

562 @overload 

563 @staticmethod 

564 def read_legacy( 564 ↛ exitline 564 didn't return from function 'read_legacy' because

565 filename: str, 

566 *, 

567 component: Literal["aperture_corrections"], 

568 ) -> ApertureCorrectionMap: ... 

569 

570 @overload 

571 @staticmethod 

572 def read_legacy( 572 ↛ exitline 572 didn't return from function 'read_legacy' because

573 filename: str, 

574 *, 

575 preserve_quantization: bool = False, 

576 plane_map: Mapping[str, MaskPlane] | None = None, 

577 instrument: str | None = None, 

578 visit: int | None = None, 

579 component: None = None, 

580 ) -> VisitImage: ... 

581 

582 @staticmethod 

583 def read_legacy( # type: ignore[override] 

584 filename: str, 

585 *, 

586 preserve_quantization: bool = False, 

587 plane_map: Mapping[str, MaskPlane] | None = None, 

588 instrument: str | None = None, 

589 visit: int | None = None, 

590 component: Literal[ 

591 "bbox", 

592 "image", 

593 "mask", 

594 "variance", 

595 "projection", 

596 "psf", 

597 "detector", 

598 "obs_info", 

599 "summary_stats", 

600 "aperture_corrections", 

601 ] 

602 | None = None, 

603 ) -> Any: 

604 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

605 

606 Parameters 

607 ---------- 

608 filename 

609 Full name of the file. 

610 preserve_quantization 

611 If `True`, ensure that writing the masked image back out again will 

612 exactly preserve quantization-compressed pixel values. This causes 

613 the image and variance plane arrays to be marked as read-only and 

614 stores the original binary table data for those planes in memory. 

615 If the `MaskedImage` is copied, the precompressed pixel values are 

616 not transferred to the copy. 

617 plane_map 

618 A mapping from legacy mask plane name to the new plane name and 

619 description. If `None` (default) 

620 `get_legacy_visit_image_mask_planes` is used. 

621 instrument 

622 Name of the instrument. Read from the primary header if not 

623 provided. 

624 visit 

625 ID of the visit. Read from the primary header if not 

626 provided. 

627 component 

628 A component to read instead of the full image. 

629 """ 

630 from lsst.afw.image import ExposureFitsReader 

631 

632 reader = ExposureFitsReader(filename) 

633 if component == "bbox": 

634 return Box.from_legacy(reader.readBBox()) 

635 legacy_detector = reader.readDetector() 

636 if legacy_detector is None: 

637 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

638 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

639 legacy_wcs = None 

640 if component in (None, "image", "mask", "variance", "projection"): 

641 legacy_wcs = reader.readWcs() 

642 if legacy_wcs is None: 

643 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

644 legacy_exposure_info = reader.readExposureInfo() 

645 summary_stats = None 

646 if component in (None, "summary_stats"): 

647 legacy_stats = legacy_exposure_info.getSummaryStats() 

648 if legacy_stats is not None: 

649 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats) 

650 if component == "summary_stats": 

651 return summary_stats 

652 if component in (None, "psf"): 

653 legacy_psf = reader.readPsf() 

654 if legacy_psf is None: 

655 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

656 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

657 if component == "psf": 

658 return psf 

659 aperture_corrections: ApertureCorrectionMap = {} 

660 if component in (None, "aperture_corrections"): 

661 legacy_ap_corr_map = reader.readApCorrMap() 

662 if legacy_ap_corr_map is not None: 

663 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map) 

664 if component == "aperture_corrections": 

665 return aperture_corrections 

666 assert component in (None, "image", "mask", "variance", "projection", "obs_info", "detector"), ( 

667 component 

668 ) # for MyPy 

669 with astropy.io.fits.open(filename) as hdu_list: 

670 primary_header = hdu_list[0].header 

671 obs_info = _obs_info_from_md(primary_header) 

672 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter()) 

673 if component == "obs_info": 

674 return obs_info 

675 instrument = _extract_or_check_header( 

676 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

677 ) 

678 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

679 if component == "detector": 

680 return Detector.from_legacy( 

681 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

682 ) 

683 projection = Projection.from_legacy( 

684 legacy_wcs, 

685 DetectorFrame( 

686 instrument=instrument, 

687 visit=visit, 

688 detector=legacy_detector.getId(), 

689 bbox=detector_bbox, 

690 ), 

691 ) 

692 if component == "projection": 

693 return projection 

694 if plane_map is None: 

695 plane_map = get_legacy_visit_image_mask_planes() 

696 assert component != "psf", component # for MyPy 

697 from_masked_image = MaskedImage._read_legacy_hdus( 

698 hdu_list, 

699 filename, 

700 preserve_quantization=preserve_quantization, 

701 plane_map=plane_map, 

702 component=component, 

703 ) 

704 if component is not None: 

705 # This is the image, mask, or variance; attach the projection and 

706 # obs_info and return 

707 return from_masked_image.view(projection=projection, obs_info=obs_info) 

708 legacy_polygon = reader.readValidPolygon() 

709 result = VisitImage( 

710 from_masked_image.image, 

711 mask=from_masked_image.mask, 

712 variance=from_masked_image.variance, 

713 projection=projection, 

714 psf=psf, 

715 detector=Detector.from_legacy( 

716 legacy_detector, instrument=instrument, visit=visit, is_raw_assembled=True 

717 ), 

718 obs_info=obs_info, 

719 summary_stats=summary_stats, 

720 aperture_corrections=aperture_corrections, 

721 bounds=Polygon.from_legacy(legacy_polygon) if legacy_polygon is not None else None, 

722 ) 

723 result._opaque_metadata = from_masked_image._opaque_metadata 

724 return result 

725 

726 

727class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

728 """A Pydantic model used to represent a serialized `VisitImage`.""" 

729 

730 # Inherited attributes are duplicated because that improves the docs 

731 # (some limitation in the sphinx/pydantic integration), and these are 

732 # important docs. 

733 

734 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

735 mask: MaskSerializationModel[P] = pydantic.Field( 

736 description="Bitmask that annotates the main image's pixels." 

737 ) 

738 variance: ImageSerializationModel[P] = pydantic.Field( 

739 description="Per-pixel variance estimates for the main image." 

740 ) 

741 projection: ProjectionSerializationModel[P] = pydantic.Field( 

742 description="Projection that maps the pixel grid to the sky.", 

743 ) 

744 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

745 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

746 ) 

747 obs_info: ObservationInfo = pydantic.Field( 

748 description="Standardized description of visit metadata", 

749 ) 

750 summary_stats: ObservationSummaryStats = pydantic.Field( 

751 description="Summary statistics for the observation." 

752 ) 

753 detector: DetectorSerializationModel = pydantic.Field( 

754 description="Geometry and electronic information for the detector." 

755 ) 

756 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field( 

757 default_factory=ApertureCorrectionMapSerializationModel, 

758 description="Aperture corrections, keyed by flux algorithm.", 

759 ) 

760 bounds: SerializableBounds | None = pydantic.Field( 

761 default=None, 

762 description="Pixel validity region, if different from the image bounding box.", 

763 exclude_if=is_none, 

764 ) 

765 

766 def deserialize(self, archive: InputArchive[Any], *, bbox: Box | None = None) -> VisitImage: 

767 masked_image = super().deserialize(archive, bbox=bbox) 

768 psf = self.deserialize_psf(archive) 

769 detector = self.detector.deserialize(archive) 

770 aperture_corrections = self.aperture_corrections.deserialize(archive) 

771 return VisitImage( 

772 masked_image.image, 

773 mask=masked_image.mask, 

774 variance=masked_image.variance, 

775 psf=psf, 

776 projection=masked_image.projection, 

777 obs_info=masked_image.obs_info, 

778 summary_stats=self.summary_stats, 

779 detector=detector, 

780 aperture_corrections=aperture_corrections, 

781 bounds=self.bounds.deserialize() if self.bounds is not None else None, 

782 )._finish_deserialize(self) 

783 

784 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError: 

785 """Finish deserializing the PSF model, or *return* any exception 

786 raised in the attempt. 

787 """ 

788 try: 

789 return self.psf.deserialize(archive) 

790 except ArchiveReadError as err: 

791 return err 

792 

793 

794def _extract_or_check_value[T]( 

795 key: str, 

796 given_value: T | None, 

797 *sources: tuple[str, T | None], 

798) -> T: 

799 # Compare given value against multiple sources. If given value is not 

800 # supplied return the first non-None value in the reference sources. 

801 if given_value is not None: 

802 for source_name, source_value in sources: 

803 if source_value is not None and source_value != given_value: 

804 raise ValueError( 

805 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

806 ) 

807 if source_value is not None: 

808 # Only check the first non-None source rather than checking 

809 # all supplied values. 

810 break 

811 return given_value 

812 

813 for _, source_value in sources: 

814 if source_value is not None: 

815 return source_value 

816 

817 raise ValueError(f"No value found for {key}.") 

818 

819 

820def _extract_or_check_header[T]( 

821 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

822) -> T: 

823 hdr_value: T | None = None 

824 if (hdr_raw_value := header.get(key)) is not None: 

825 hdr_value = coerce(hdr_raw_value) 

826 return _extract_or_check_value( 

827 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

828 )