Coverage for tests / test_visit_image.py: 14%

405 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-23 01:30 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16import warnings 

17from typing import Any 

18 

19import astropy.io.fits 

20import astropy.units as u 

21import astropy.wcs 

22import numpy as np 

23from astro_metadata_translator import ObservationInfo 

24 

25from lsst.images import ( 

26 BackgroundMap, 

27 Box, 

28 DetectorFrame, 

29 Image, 

30 MaskPlane, 

31 MaskSchema, 

32 ObservationSummaryStats, 

33 Polygon, 

34 ProjectionAstropyView, 

35 TractFrame, 

36 VisitImage, 

37 get_legacy_visit_image_mask_planes, 

38) 

39from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy 

40from lsst.images.cameras import Detector 

41from lsst.images.fields import ChebyshevField, field_from_legacy_photo_calib 

42from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

43from lsst.images.json import read as read_json 

44from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction 

45from lsst.images.tests import ( 

46 DP2_VISIT_DETECTOR_DATA_ID, 

47 RoundtripFits, 

48 RoundtripNdf, 

49 TemporaryButler, 

50 assert_close, 

51 assert_masked_images_equal, 

52 assert_projections_equal, 

53 compare_aperture_corrections_to_legacy, 

54 compare_detector_to_legacy, 

55 compare_photo_calib_to_legacy, 

56 compare_visit_image_to_legacy, 

57 make_random_projection, 

58) 

59 

60try: 

61 import h5py # noqa: F401 

62 

63 HAVE_H5PY = True 

64except ImportError: 

65 HAVE_H5PY = False 

66 

67EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

68LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") 

69 

70 

71class VisitImageTestCase(unittest.TestCase): 

72 """Basic Tests for VisitImage.""" 

73 

74 @classmethod 

75 def setUpClass(cls) -> None: 

76 cls.rng = np.random.default_rng(500) 

77 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096]) 

78 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")]) 

79 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4, physical_filter="r1") 

80 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4) 

81 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]) 

82 cls.aperture_corrections: ApertureCorrectionMap = { 

83 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])), 

84 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])), 

85 } 

86 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json")) 

87 

88 opaque = FitsOpaqueMetadata() 

89 hdr = astropy.io.fits.Header() 

90 with warnings.catch_warnings(): 

91 # Silence warnings about long keys becoming HIERARCH. 

92 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

93 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"}) 

94 opaque.extract_legacy_primary_header(hdr) 

95 

96 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy) 

97 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy) 

98 # polygon is the lower triangle of the image. 

99 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5]) 

100 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096]) 

101 # API signature suggests projection and obs_info can be None but they 

102 # are required (unless you pass them in via the image plane). 

103 cls.visit_image = VisitImage( 

104 cls.image, 

105 variance=cls.variance, 

106 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

107 mask_schema=cls.mask_schema, 

108 projection=cls.projection, 

109 obs_info=cls.obs_info, 

110 summary_stats=cls.summary_stats, 

111 detector=cls.detector, 

112 bounds=cls.polygon, 

113 aperture_corrections=cls.aperture_corrections, 

114 band="r", 

115 ) 

116 cls.visit_image.backgrounds.add( 

117 "standard", 

118 ChebyshevField(det_frame.bbox, np.array([[2.0]])), 

119 description="Background subtracted from the image.", 

120 is_subtracted=True, 

121 ) 

122 cls.visit_image._opaque_metadata = opaque 

123 cls.simplest_visit_image = VisitImage( 

124 cls.image, 

125 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

126 mask_schema=cls.mask_schema, 

127 projection=cls.projection, 

128 detector=cls.detector, 

129 obs_info=cls.obs_info, 

130 band="r", 

131 ) 

132 

133 def test_basics(self) -> None: 

134 """Test basic constructor patterns.""" 

135 # Test default fill of variance. 

136 visit = self.simplest_visit_image 

137 self.assertEqual(visit.variance.array[0, 0], 1.0) 

138 self.assertIs(visit[...], visit) 

139 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])") 

140 self.assertEqual( 

141 repr(visit), 

142 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024))," 

143 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')]," 

144 " dtype=dtype('uint8')))", 

145 ) 

146 

147 astropy_wcs = visit.astropy_wcs 

148 self.assertIsInstance(astropy_wcs, ProjectionAstropyView) 

149 approx_wcs = visit.fits_wcs 

150 self.assertIsInstance(approx_wcs, astropy.wcs.WCS) 

151 

152 with self.assertRaises(TypeError): 

153 # Requires a PSF. 

154 VisitImage( 

155 self.image, 

156 mask_schema=self.mask_schema, 

157 projection=self.projection, 

158 obs_info=self.obs_info, 

159 detector=self.detector, 

160 band="r", 

161 ) 

162 

163 with self.assertRaises(TypeError): 

164 # Requires ObservationInfo. 

165 VisitImage( 

166 self.image, 

167 psf=self.gaussian_psf, 

168 mask_schema=self.mask_schema, 

169 projection=self.projection, 

170 detector=self.detector, 

171 band="r", 

172 ) 

173 

174 with self.assertRaises(TypeError): 

175 # Requires a projection. 

176 VisitImage( 

177 self.image, 

178 psf=self.gaussian_psf, 

179 mask_schema=self.mask_schema, 

180 obs_info=self.obs_info, 

181 detector=self.detector, 

182 band="r", 

183 ) 

184 

185 with self.assertRaises(TypeError): 

186 # Requires a detector. 

187 VisitImage( 

188 self.image, 

189 psf=self.gaussian_psf, 

190 mask_schema=self.mask_schema, 

191 projection=self.projection, 

192 obs_info=self.obs_info, 

193 band="r", 

194 ) 

195 

196 with self.assertRaises(TypeError): 

197 # Requires some form of mask. 

198 VisitImage( 

199 self.image, 

200 psf=self.gaussian_psf, 

201 projection=self.projection, 

202 obs_info=self.obs_info, 

203 detector=self.detector, 

204 band="r", 

205 ) 

206 

207 with self.assertRaises(TypeError): 

208 VisitImage( 

209 Image(42, shape=(5, 5)), 

210 psf=self.gaussian_psf, 

211 mask_schema=self.mask_schema, 

212 projection=self.projection, 

213 obs_info=self.obs_info, 

214 detector=self.detector, 

215 band="r", 

216 ) 

217 

218 # Requires a DetectorFrame. 

219 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10]) 

220 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096]) 

221 with self.assertRaises(TypeError): 

222 VisitImage( 

223 self.image, 

224 projection=tract_proj, 

225 psf=self.gaussian_psf, 

226 mask_schema=self.mask_schema, 

227 obs_info=self.obs_info, 

228 detector=self.detector, 

229 band="r", 

230 ) 

231 

232 # Variance unit mismatch. 

233 with self.assertRaises(ValueError): 

234 VisitImage( 

235 self.image, 

236 variance=self.image, 

237 psf=self.gaussian_psf, 

238 mask_schema=self.mask_schema, 

239 projection=self.projection, 

240 obs_info=self.obs_info, 

241 detector=self.detector, 

242 band="r", 

243 ) 

244 

245 def test_copy_and_slice(self) -> None: 

246 """Test that arrays and components are copied (when not immutable) by 

247 'copy' and referenced by 'slice'. 

248 """ 

249 visit = self.visit_image 

250 copy = visit.copy() 

251 copy.image.array[0, 0] = 30.0 

252 self.assertEqual(visit.image.array[0, 0], 42.0) 

253 self.assertEqual(copy.image.array[0, 0], 30.0) 

254 subvisit = visit[Box.factory[0:5, 0:5]] 

255 # Check summary stats. 

256 self.assertEqual(copy.summary_stats, visit.summary_stats) 

257 self.assertIsNot(copy.summary_stats, visit.summary_stats) 

258 self.assertEqual(subvisit.summary_stats, visit.summary_stats) 

259 self.assertIs(subvisit.summary_stats, visit.summary_stats) 

260 # Check aperture corrections. 

261 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

262 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections) 

263 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

264 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections) 

265 # Check backgrounds. 

266 self.assertEqual(copy.backgrounds.keys(), visit.backgrounds.keys()) 

267 self.assertIsNot(copy.backgrounds, visit.backgrounds) 

268 self.assertEqual(subvisit.backgrounds.keys(), visit.backgrounds.keys()) 

269 self.assertIs(subvisit.backgrounds, visit.backgrounds) 

270 # Check bounds. 

271 self.assertIs(copy.bounds, self.polygon) 

272 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox 

273 

274 def test_obs_info(self) -> None: 

275 """Check that ObservationInfo has been constructed.""" 

276 visit = self.visit_image 

277 self.assertIsNotNone(visit.obs_info) 

278 self.maxDiff = None 

279 assert visit.obs_info is not None # for mypy. 

280 self.assertEqual(visit.obs_info.instrument, "LSSTCam") 

281 

282 def test_summary_stats(self) -> None: 

283 """Test the comparisons and attributes of ObservationSummaryStats.""" 

284 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)) 

285 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5)) 

286 self.assertNotEqual( 

287 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2)) 

288 ) 

289 

290 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

291 def test_round_trip_ndf(self): 

292 """NDF round-trip for VisitImage.""" 

293 with RoundtripNdf(self, self.visit_image) as roundtrip: 

294 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

295 self.assertEqual(roundtrip.result.summary_stats, self.visit_image.summary_stats) 

296 self.assertEqual(type(roundtrip.result.psf), type(self.visit_image.psf)) 

297 

298 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

299 def test_fits_ndf_consistency(self): 

300 """FITS and NDF backends produce equal VisitImages on round-trip.""" 

301 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt: 

302 assert_masked_images_equal(self, self.visit_image, fits_rt.result, expect_view=False) 

303 assert_masked_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False) 

304 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

305 

306 def test_read_write(self) -> None: 

307 """Test that a visit can round trip through a FITS file.""" 

308 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

309 # Check that we're still using the right compression, and that we 

310 # wrote WCSs. 

311 fits = roundtrip.inspect() 

312 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

313 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN") 

314 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

315 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN") 

316 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

317 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN") 

318 # Check a subimage read. 

319 subbox = Box.factory[8:13, 9:30] 

320 subimage = roundtrip.get(bbox=subbox) 

321 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

322 with self.subTest(): 

323 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

324 with self.subTest(): 

325 obs_info = roundtrip.get("obs_info") 

326 self.assertIsInstance(obs_info, ObservationInfo) 

327 self.assertEqual(obs_info, self.visit_image.obs_info) 

328 with self.subTest(): 

329 summary_stats = roundtrip.get("summary_stats") 

330 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

331 self.assertEqual(summary_stats, self.visit_image.summary_stats) 

332 with self.subTest(): 

333 psf = roundtrip.get("psf") 

334 self.assertIsInstance(psf, GaussianPointSpreadFunction) 

335 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox) 

336 with self.subTest(): 

337 backgrounds = roundtrip.get("backgrounds") 

338 self.assertIsInstance(backgrounds, BackgroundMap) 

339 self.assertEqual(backgrounds.keys(), {"standard"}) 

340 self.assertIsInstance(backgrounds["standard"].field, ChebyshevField) 

341 self.assertEqual(backgrounds.subtracted.name, "standard") 

342 self.assertEqual( 

343 roundtrip.result.backgrounds.subtracted.description, 

344 "Background subtracted from the image.", 

345 ) 

346 

347 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

348 # Check that the round-tripped headers are the same (up to card order). 

349 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1) 

350 self.assertEqual( 

351 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

352 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

353 ) 

354 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

355 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

356 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

357 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info) 

358 self.assertIsNotNone(roundtrip.result.summary_stats) 

359 self.assertEqual( 

360 roundtrip.result.summary_stats.psfSigma, 

361 self.visit_image.summary_stats.psfSigma, 

362 ) 

363 self.assertEqual( 

364 roundtrip.result.summary_stats.zeroPoint, 

365 self.visit_image.summary_stats.zeroPoint, 

366 ) 

367 self.assertEqual(roundtrip.result.bounds, self.polygon) 

368 self.assertIsInstance(roundtrip.result.backgrounds, BackgroundMap) 

369 self.assertEqual(roundtrip.result.backgrounds.keys(), {"standard"}) 

370 self.assertIsInstance(roundtrip.result.backgrounds["standard"].field, ChebyshevField) 

371 self.assertEqual(roundtrip.result.backgrounds.subtracted.name, "standard") 

372 self.assertEqual( 

373 roundtrip.result.backgrounds.subtracted.description, "Background subtracted from the image." 

374 ) 

375 

376 

377class VisitImageLegacyTestMixin: 

378 """Tests for the VisitImage class and the basics of the archive, to be 

379 specialized for a particular test image. 

380 

381 `setUp` or `setUpClass` must be implemented to set the attributes declared 

382 in the class. 

383 """ 

384 

385 filename: str 

386 legacy_exposure: Any 

387 plane_map: dict[str, MaskPlane] 

388 visit_image: VisitImage 

389 unit: u.UnitBase 

390 

391 def test_legacy_errors(self) -> None: 

392 """Legacy read failure modes.""" 

393 with self.assertRaises(ValueError): 

394 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC") 

395 with self.assertRaises(ValueError): 

396 VisitImage.from_legacy(self.legacy_exposure, visit=123456) 

397 with self.assertRaises(ValueError): 

398 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy) 

399 visit = VisitImage.from_legacy( 

400 self.legacy_exposure, instrument="LSSTCam", unit=self.unit, visit=2025052000177 

401 ) 

402 self.assertEqual(visit.unit, self.unit) 

403 

404 with self.assertRaises(ValueError): 

405 VisitImage.read_legacy(self.filename, instrument="HSC") 

406 with self.assertRaises(ValueError): 

407 VisitImage.read_legacy(self.filename, visit=123456) 

408 

409 def test_component_reads(self) -> None: 

410 """Test reads of components from legacy file.""" 

411 visit = VisitImage.read_legacy(self.filename) 

412 proj = VisitImage.read_legacy(self.filename, component="projection") 

413 assert_projections_equal(self, proj, visit.projection, expect_identity=False) 

414 image = VisitImage.read_legacy(self.filename, component="image") 

415 self.assertEqual(image, visit.image) 

416 assert_projections_equal(self, proj, image.projection, expect_identity=False) 

417 variance = VisitImage.read_legacy(self.filename, component="variance") 

418 self.assertEqual(variance, visit.variance) 

419 assert_projections_equal(self, proj, variance.projection, expect_identity=False) 

420 mask = VisitImage.read_legacy(self.filename, component="mask") 

421 self.assertEqual(mask, visit.mask) 

422 assert_projections_equal(self, proj, mask.projection, expect_identity=False) 

423 psf = VisitImage.read_legacy(self.filename, component="psf") 

424 self.assertIsInstance(psf, PointSpreadFunction) 

425 obs_info = VisitImage.read_legacy(self.filename, component="obs_info") 

426 self.check_legacy_obs_info(obs_info) 

427 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats") 

428 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

429 self.assertEqual(summary_stats.nPsfStar, self.legacy_exposure.info.getSummaryStats().nPsfStar) 

430 compare_aperture_corrections_to_legacy( 

431 self, 

432 VisitImage.read_legacy(self.filename, component="aperture_corrections"), 

433 self.legacy_exposure.info.getApCorrMap(), 

434 visit.bbox, 

435 ) 

436 detector = VisitImage.read_legacy(self.filename, component="detector") 

437 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True) 

438 photometric_scaling = VisitImage.read_legacy(self.filename, component="photometric_scaling") 

439 compare_photo_calib_to_legacy( 

440 self, 

441 photometric_scaling, 

442 self.legacy_exposure.getPhotoCalib(), 

443 subimage_bbox=visit.bbox, 

444 ) 

445 

446 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None: 

447 """Check that an `ObservationInfo` instance is not `None`, and that it 

448 matches the one in the legacy test data file. 

449 """ 

450 self.assertIsInstance(obs_info, ObservationInfo) 

451 self.assertEqual(obs_info.instrument, "LSSTCam") 

452 self.assertEqual(obs_info.detector_num, 85, obs_info) 

453 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info) 

454 self.assertEqual(obs_info.physical_filter, "r_57", obs_info) 

455 

456 def test_obs_info(self) -> None: 

457 """Check that ObservationInfo has been constructed.""" 

458 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

459 self.assertIsNotNone(legacy.obs_info) 

460 self.maxDiff = None 

461 self.assertEqual(legacy.obs_info, self.visit_image.obs_info) 

462 assert legacy.obs_info is not None # for mypy. 

463 self.assertEqual(legacy.obs_info.instrument, "LSSTCam") 

464 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info) 

465 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info) 

466 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info) 

467 

468 def test_aperture_corrections_to_legacy(self) -> None: 

469 """Test that we can convert an aperture correction map back to a 

470 legacy `lsst.afw.image.ApCorrMap`. 

471 """ 

472 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections) 

473 compare_aperture_corrections_to_legacy( 

474 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox 

475 ) 

476 

477 def test_read_legacy_headers(self) -> None: 

478 """Test that headers were correctly stripped and interpreted in 

479 `VisitImage.read_legacy`. 

480 """ 

481 # Check that we read the units from BUNIT. 

482 self.assertEqual(self.visit_image.unit, self.unit) 

483 # Check that the primary header has the keys we want, and none of the 

484 # keys we don't want. 

485 header = self.visit_image._opaque_metadata.headers[ExtensionKey()] 

486 self.assertIn("EXPTIME", header) 

487 self.assertEqual(header["PLATFORM"], "lsstcam") 

488 self.assertNotIn("LSST BUTLER ID", header) 

489 self.assertNotIn("AR HDU", header) 

490 self.assertNotIn("A_ORDER", header) 

491 # Check that the extension HDUs do not have any custom headers. 

492 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

493 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

494 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

495 

496 def test_from_legacy_headers(self) -> None: 

497 """Test that from_legacy handles headers properly.""" 

498 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

499 header = legacy._opaque_metadata.headers[ExtensionKey()] 

500 self.assertIn("EXPTIME", header) 

501 self.assertEqual(header["PLATFORM"], "lsstcam") 

502 self.assertNotIn("LSST BUTLER ID", header) 

503 self.assertNotIn("AR HDU", header) 

504 self.assertNotIn("A_ORDER", header) 

505 # Check that the extension HDUs do not have any custom headers. 

506 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

507 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

508 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

509 

510 def test_rewrite(self) -> None: 

511 """Test that we can rewrite the visit image and preserve both 

512 lossy-compressed pixel values and components exactly. 

513 """ 

514 import lsst.afw.image 

515 

516 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

517 # Check that we're still using the right compression, and that we 

518 # wrote WCSs. 

519 fits = roundtrip.inspect() 

520 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

521 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP") 

522 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

523 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP") 

524 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

525 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP") 

526 # Check a subimage read. 

527 subbox = Box.factory[8:13, 9:30] 

528 subimage = roundtrip.get(bbox=subbox) 

529 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

530 alternates: dict[str, Any] = {} 

531 with self.subTest(): 

532 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

533 alternates = { 

534 k: roundtrip.get(k) 

535 for k in [ 

536 "projection", 

537 "image", 

538 "mask", 

539 "variance", 

540 "psf", 

541 "obs_info", 

542 "summary_stats", 

543 "aperture_corrections", 

544 "detector", 

545 "photometric_scaling", 

546 ] 

547 } 

548 # Test reading back in as an Exposure. 

549 with self.subTest(): 

550 legacy_exposure = roundtrip.get(storageClass="Exposure") 

551 self.assertIsInstance(legacy_exposure, lsst.afw.image.Exposure) 

552 # This covers most of the compnents, which have clean 1-1 

553 # mappings from legacy to new: 

554 compare_visit_image_to_legacy( 

555 self, 

556 self.visit_image, 

557 legacy_exposure, 

558 expect_view=False, 

559 plane_map=self.plane_map, 

560 **DP2_VISIT_DETECTOR_DATA_ID, 

561 ) 

562 # A few components are different enough to merit extra 

563 # attention: 

564 if self.visit_image.unit == u.nJy: 

565 self.assertTrue(legacy_exposure.getPhotoCalib()._isConstant) 

566 self.assertEqual(legacy_exposure.getPhotoCalib().getCalibrationMean(), 1.0) 

567 else: 

568 compare_photo_calib_to_legacy( 

569 self, 

570 self.visit_image.photometric_scaling, 

571 legacy_exposure.getPhotoCalib(), 

572 subimage_bbox=subbox, 

573 ) 

574 self.assertEqual(legacy_exposure.info.getId(), self.legacy_exposure.info.getId()) 

575 # Try to do a butler get of a component with storage class 

576 # override. 

577 with self.subTest(): 

578 # We have VisitInfo available. 

579 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo") 

580 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo) 

581 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam") 

582 

583 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

584 # Check that the round-tripped headers are the same (up to card order). 

585 self.assertEqual( 

586 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

587 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

588 ) 

589 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

590 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

591 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

592 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam") 

593 compare_visit_image_to_legacy( 

594 self, 

595 roundtrip.result, 

596 self.legacy_exposure, 

597 expect_view=False, 

598 plane_map=self.plane_map, 

599 **DP2_VISIT_DETECTOR_DATA_ID, 

600 alternates=alternates, 

601 ) 

602 # Check converting from the legacy object in-memory. 

603 compare_visit_image_to_legacy( 

604 self, 

605 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map), 

606 self.legacy_exposure, 

607 expect_view=True, 

608 plane_map=self.plane_map, 

609 **DP2_VISIT_DETECTOR_DATA_ID, 

610 ) 

611 

612 def test_butler_converters(self) -> None: 

613 """Test that we can read a VisitImage and its components from a butler 

614 dataset written as an `lsst.afw.image.Exposure`. 

615 """ 

616 if self.legacy_exposure is None: 

617 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.") 

618 with TemporaryButler(legacy="ExposureF") as helper: 

619 from lsst.daf.butler import FileDataset 

620 

621 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink") 

622 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage") 

623 with warnings.catch_warnings(): 

624 # Silence warnings about data ID and filter label disagreeing. 

625 warnings.simplefilter("ignore", category=UserWarning) 

626 visit_image = helper.butler.get(visit_image_ref) 

627 # We didn't ask for the quantization to be preserved, so it 

628 # shouldn't be. 

629 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), set()) 

630 # This time preserve the quantization 

631 visit_image = helper.butler.get(visit_image_ref, parameters={"preserve_quantization": True}) 

632 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), {"IMAGE", "VARIANCE"}) 

633 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox")) 

634 self.assertEqual(bbox, visit_image.bbox) 

635 alternates = { 

636 k: helper.butler.get(visit_image_ref.makeComponentRef(k)) 

637 # TODO: including "projection" or "obs_info" here fails because 

638 # there's code in daf_butler that expects any component to be 

639 # valid for the *internal* storage class, not the requested 

640 # one, and that's difficult to fix because it's tied up with 

641 # the data ID standardization logic. 

642 for k in ["image", "mask", "variance", "bbox", "psf", "detector"] 

643 } 

644 compare_visit_image_to_legacy( 

645 self, 

646 visit_image, 

647 self.legacy_exposure, 

648 expect_view=False, 

649 plane_map=self.plane_map, 

650 alternates=alternates, 

651 **DP2_VISIT_DETECTOR_DATA_ID, 

652 ) 

653 # Add some metadata to the new VisitImage and then do a converting 

654 # `put` that should write to the old format (we have to delete the 

655 # old one first, which just deletes a symlink). 

656 helper.butler.pruneDatasets([helper.legacy], purge=True, unstore=True, disassociate=True) 

657 visit_image.metadata["MixedCaseKey"] = 52 

658 helper.butler.put(visit_image, visit_image_ref) 

659 # Check that we can read *that* back in as a legacy exposure. 

660 legacy_exposure = helper.butler.get(helper.legacy) 

661 compare_visit_image_to_legacy( 

662 self, 

663 visit_image, 

664 legacy_exposure, 

665 expect_view=False, 

666 plane_map=self.plane_map, 

667 alternates=alternates, 

668 **DP2_VISIT_DETECTOR_DATA_ID, 

669 ) 

670 # Check that we can read it back in as a VisitImage, and that the 

671 # new metadata is preserved. 

672 visit_image_2 = helper.butler.get(visit_image_ref) 

673 compare_visit_image_to_legacy( 

674 self, 

675 visit_image_2, 

676 legacy_exposure, 

677 expect_view=False, 

678 plane_map=self.plane_map, 

679 alternates=alternates, 

680 **DP2_VISIT_DETECTOR_DATA_ID, 

681 ) 

682 self.assertEqual(visit_image_2.metadata["MixedCaseKey"], 52) 

683 

684 

685@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

686class VisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin): 

687 """Tests for the VisitImage class using a DRP-final visit_image dataset. 

688 

689 Requires legacy code. 

690 """ 

691 

692 @classmethod 

693 def setUpClass(cls) -> None: 

694 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

695 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits") 

696 try: 

697 from lsst.afw.image import ExposureFitsReader 

698 

699 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

700 except ImportError: 

701 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

702 cls.plane_map = get_legacy_visit_image_mask_planes() 

703 cls.visit_image = VisitImage.read_legacy( 

704 cls.filename, preserve_quantization=True, plane_map=cls.plane_map 

705 ) 

706 cls.unit = u.nJy 

707 

708 def test_convert_unit(self) -> None: 

709 """Test using the ``photometric_scaling`` to swap between 

710 calibrated and instrumental units. 

711 

712 This includes tests of the `VisitImage.to_legacy` logic for units that 

713 don't map directly to `lsst.afw.image.PhotoCalib` conventions. 

714 """ 

715 from lsst.afw.table import ExposureCatalog 

716 

717 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

718 # Make a copy of class state so we can modify it without breaking 

719 # other tests. 

720 original = self.visit_image.copy() 

721 # We should not be able to convert to instrumental units when there is 

722 # no photometric scaling. 

723 with self.assertRaises(u.UnitConversionError): 

724 original.convert_unit(u.electron) 

725 # Converting to the current unit should be a no-op that does not need 

726 # to copy. 

727 visit_image_nJy = original.convert_unit(u.nJy, copy=False) 

728 self.assertTrue(np.may_share_memory(visit_image_nJy.image.array, original.image.array)) 

729 self.assertTrue(np.may_share_memory(visit_image_nJy.variance.array, original.variance.array)) 

730 # Even without a photometric_scaling attached, we should be able to 

731 # convert to a compatible unit, but only if we allow copies. 

732 with self.assertRaises(u.UnitConversionError): 

733 original.convert_unit(u.mJy, copy=False) 

734 visit_image_mJy = original.convert_unit(u.mJy, copy="as-needed") 

735 self.assertEqual(visit_image_mJy.unit, u.mJy) 

736 assert_close(self, visit_image_mJy.image.array, original.image.array * 1e-6) 

737 self.assertTrue(np.may_share_memory(visit_image_nJy.mask.array, original.mask.array)) 

738 assert_close(self, visit_image_mJy.variance.array, original.variance.array * 1e-12) 

739 # Converting a mJy image to legacy should make a PhotoCalib that maps 

740 # mJy to nJy. 

741 legacy_exposure_mJy = visit_image_mJy.to_legacy() 

742 assert_close(self, legacy_exposure_mJy.getPhotoCalib().getCalibrationMean(), 1e6) 

743 legacy_masked_image_nJy = legacy_exposure_mJy.getPhotoCalib().calibrateImage( 

744 legacy_exposure_mJy.maskedImage 

745 ) 

746 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array) 

747 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array) 

748 # Test that we haven't dropped any component objects along the way, 

749 # and that they're all still the same objects or thin views. 

750 self.assertTrue(np.may_share_memory(visit_image_mJy.mask.array, original.mask.array)) 

751 self.assertIs(visit_image_mJy.projection, original.projection) 

752 self.assertIs(visit_image_mJy.obs_info, original.obs_info) 

753 self.assertIs(visit_image_mJy.summary_stats, original.summary_stats) 

754 self.assertIs(visit_image_mJy.psf, original.psf) 

755 self.assertIs(visit_image_mJy.detector, original.detector) 

756 self.assertIs(visit_image_mJy.bounds, original.bounds) 

757 self.assertIs(visit_image_mJy.aperture_corrections, original.aperture_corrections) 

758 self.assertIs(visit_image_mJy.photometric_scaling, original.photometric_scaling) 

759 # Attach the final PhotoCalib (this isn't stored with the legacy file 

760 # because that is the mapping to nJy, which is trivial). 

761 visit_summary = ExposureCatalog.readFits( 

762 os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_summary.fits") 

763 ) 

764 legacy_photo_calib = visit_summary.find(DP2_VISIT_DETECTOR_DATA_ID["detector"]).getPhotoCalib() 

765 visit_image_nJy.photometric_scaling = field_from_legacy_photo_calib( 

766 legacy_photo_calib, bounds=original.detector.bbox, instrumental_unit=u.electron 

767 ) 

768 compare_photo_calib_to_legacy( 

769 self, 

770 visit_image_nJy.photometric_scaling, 

771 self.legacy_exposure.getPhotoCalib(), 

772 applied_legacy_photo_calib=legacy_photo_calib, 

773 subimage_bbox=visit_image_nJy.bbox, 

774 ) 

775 # We still can't convert to completely unrelated units. 

776 with self.assertRaises(u.UnitConversionError): 

777 visit_image_nJy.convert_unit(u.mm) 

778 # Uncalibrating via the photometric_scaling matches what legacy code 

779 # does, and by default it copies everything. 

780 with self.assertRaises(u.UnitConversionError): 

781 visit_image_nJy.convert_unit(u.electron, copy=False) 

782 legacy_masked_image_e = legacy_photo_calib.uncalibrateImage(self.legacy_exposure.maskedImage) 

783 visit_image_e = visit_image_nJy.convert_unit(u.electron) 

784 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array) 

785 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array) 

786 self.assertFalse(np.may_share_memory(visit_image_e.mask.array, visit_image_nJy.mask.array)) 

787 # We can also uncalibrate if we start with an image that has units 

788 # that are compatible with the photometric_scaling but not identical 

789 # to it. 

790 visit_image_mJy.photometric_scaling = visit_image_nJy.photometric_scaling 

791 visit_image_e = visit_image_mJy.convert_unit(u.electron) 

792 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array) 

793 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array) 

794 # We can re-apply the scaling to go back to calibrated units. 

795 visit_image_nJy_2 = visit_image_e.convert_unit(u.nJy) 

796 assert_close(self, visit_image_nJy_2.image.array, visit_image_nJy.image.array) 

797 assert_close(self, visit_image_nJy_2.variance.array, original.variance.array) 

798 # Try calibrating an image with a scaling that has units other than 

799 # nJy in the numerator. 

800 visit_image_e.photometric_scaling = visit_image_nJy.photometric_scaling * (1e-6 * u.mJy / u.nJy) 

801 visit_image_nJy_3 = visit_image_e.convert_unit(u.nJy) 

802 assert_close(self, visit_image_nJy_3.image.array, visit_image_nJy.image.array) 

803 assert_close(self, visit_image_nJy_3.variance.array, original.variance.array) 

804 # Try converting that uncalibrated image to legacy; the extra mJy/nJy 

805 # factor should get included in the PhotoCalib to recover the original 

806 # PhotoCalib. 

807 legacy_exposure_e = visit_image_e.to_legacy() 

808 assert_close( 

809 self, 

810 legacy_exposure_e.getPhotoCalib().getCalibrationMean(), 

811 legacy_photo_calib.getCalibrationMean(), 

812 ) 

813 legacy_masked_image_nJy = legacy_exposure_e.getPhotoCalib().calibrateImage( 

814 legacy_exposure_e.maskedImage 

815 ) 

816 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array) 

817 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array) 

818 

819 

820@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

821class PreliminaryVisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin): 

822 """Tests for the VisitImage class using a DRP preliminary_visit_image 

823 dataset. 

824 

825 Requires legacy code. 

826 """ 

827 

828 @classmethod 

829 def setUpClass(cls) -> None: 

830 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

831 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "preliminary_visit_image.fits") 

832 try: 

833 from lsst.afw.image import ExposureFitsReader 

834 

835 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

836 except ImportError: 

837 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

838 cls.plane_map = get_legacy_visit_image_mask_planes() 

839 cls.visit_image = VisitImage.read_legacy( 

840 cls.filename, preserve_quantization=True, plane_map=cls.plane_map 

841 ) 

842 cls.unit = u.electron 

843 

844 

845if __name__ == "__main__": 

846 unittest.main()