Coverage for tests/test_visit_image.py: 15%

403 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 08:40 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16import warnings 

17from typing import Any 

18 

19import astropy.io.fits 

20import astropy.units as u 

21import astropy.wcs 

22import numpy as np 

23from astro_metadata_translator import ObservationInfo 

24 

25from lsst.images import ( 

26 BackgroundMap, 

27 Box, 

28 DetectorFrame, 

29 Image, 

30 MaskPlane, 

31 MaskSchema, 

32 ObservationSummaryStats, 

33 Polygon, 

34 ProjectionAstropyView, 

35 TractFrame, 

36 VisitImage, 

37 get_legacy_visit_image_mask_planes, 

38) 

39from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy 

40from lsst.images.cameras import Detector 

41from lsst.images.fields import ChebyshevField, field_from_legacy_photo_calib 

42from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

43from lsst.images.json import read as read_json 

44from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction 

45from lsst.images.tests import ( 

46 DP2_VISIT_DETECTOR_DATA_ID, 

47 RoundtripFits, 

48 RoundtripJson, 

49 RoundtripNdf, 

50 TemporaryButler, 

51 assert_close, 

52 assert_masked_images_equal, 

53 assert_projections_equal, 

54 assert_visit_images_equal, 

55 compare_aperture_corrections_to_legacy, 

56 compare_detector_to_legacy, 

57 compare_photo_calib_to_legacy, 

58 compare_visit_image_to_legacy, 

59 make_random_projection, 

60) 

61 

62try: 

63 import h5py # noqa: F401 

64 

65 HAVE_H5PY = True 

66except ImportError: 

67 HAVE_H5PY = False 

68 

69EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

70LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") 

71 

72 

73class VisitImageTestCase(unittest.TestCase): 

74 """Basic Tests for VisitImage.""" 

75 

76 @classmethod 

77 def setUpClass(cls) -> None: 

78 cls.rng = np.random.default_rng(500) 

79 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096]) 

80 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")]) 

81 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4, physical_filter="r1") 

82 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4) 

83 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]) 

84 cls.aperture_corrections: ApertureCorrectionMap = { 

85 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])), 

86 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])), 

87 } 

88 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json")) 

89 

90 opaque = FitsOpaqueMetadata() 

91 hdr = astropy.io.fits.Header() 

92 with warnings.catch_warnings(): 

93 # Silence warnings about long keys becoming HIERARCH. 

94 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

95 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"}) 

96 opaque.extract_legacy_primary_header(hdr) 

97 

98 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy) 

99 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy) 

100 # polygon is the lower triangle of the image. 

101 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5]) 

102 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096]) 

103 # API signature suggests projection and obs_info can be None but they 

104 # are required (unless you pass them in via the image plane). 

105 cls.visit_image = VisitImage( 

106 cls.image, 

107 variance=cls.variance, 

108 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

109 mask_schema=cls.mask_schema, 

110 projection=cls.projection, 

111 obs_info=cls.obs_info, 

112 summary_stats=cls.summary_stats, 

113 detector=cls.detector, 

114 bounds=cls.polygon, 

115 aperture_corrections=cls.aperture_corrections, 

116 band="r", 

117 ) 

118 cls.visit_image.backgrounds.add( 

119 "standard", 

120 ChebyshevField(det_frame.bbox, np.array([[2.0]])), 

121 description="Background subtracted from the image.", 

122 is_subtracted=True, 

123 ) 

124 cls.visit_image._opaque_metadata = opaque 

125 cls.simplest_visit_image = VisitImage( 

126 cls.image, 

127 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

128 mask_schema=cls.mask_schema, 

129 projection=cls.projection, 

130 detector=cls.detector, 

131 obs_info=cls.obs_info, 

132 band="r", 

133 ) 

134 

135 def test_basics(self) -> None: 

136 """Test basic constructor patterns.""" 

137 # Test default fill of variance. 

138 visit = self.simplest_visit_image 

139 self.assertEqual(visit.variance.array[0, 0], 1.0) 

140 self.assertIs(visit[...], visit) 

141 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])") 

142 self.assertEqual( 

143 repr(visit), 

144 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024))," 

145 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')]," 

146 " dtype=dtype('uint8')))", 

147 ) 

148 

149 astropy_wcs = visit.astropy_wcs 

150 self.assertIsInstance(astropy_wcs, ProjectionAstropyView) 

151 approx_wcs = visit.fits_wcs 

152 self.assertIsInstance(approx_wcs, astropy.wcs.WCS) 

153 

154 with self.assertRaises(TypeError): 

155 # Requires a PSF. 

156 VisitImage( 

157 self.image, 

158 mask_schema=self.mask_schema, 

159 projection=self.projection, 

160 obs_info=self.obs_info, 

161 detector=self.detector, 

162 band="r", 

163 ) 

164 

165 with self.assertRaises(TypeError): 

166 # Requires ObservationInfo. 

167 VisitImage( 

168 self.image, 

169 psf=self.gaussian_psf, 

170 mask_schema=self.mask_schema, 

171 projection=self.projection, 

172 detector=self.detector, 

173 band="r", 

174 ) 

175 

176 with self.assertRaises(TypeError): 

177 # Requires a projection. 

178 VisitImage( 

179 self.image, 

180 psf=self.gaussian_psf, 

181 mask_schema=self.mask_schema, 

182 obs_info=self.obs_info, 

183 detector=self.detector, 

184 band="r", 

185 ) 

186 

187 with self.assertRaises(TypeError): 

188 # Requires a detector. 

189 VisitImage( 

190 self.image, 

191 psf=self.gaussian_psf, 

192 mask_schema=self.mask_schema, 

193 projection=self.projection, 

194 obs_info=self.obs_info, 

195 band="r", 

196 ) 

197 

198 with self.assertRaises(TypeError): 

199 # Requires some form of mask. 

200 VisitImage( 

201 self.image, 

202 psf=self.gaussian_psf, 

203 projection=self.projection, 

204 obs_info=self.obs_info, 

205 detector=self.detector, 

206 band="r", 

207 ) 

208 

209 with self.assertRaises(TypeError): 

210 VisitImage( 

211 Image(42, shape=(5, 5)), 

212 psf=self.gaussian_psf, 

213 mask_schema=self.mask_schema, 

214 projection=self.projection, 

215 obs_info=self.obs_info, 

216 detector=self.detector, 

217 band="r", 

218 ) 

219 

220 # Requires a DetectorFrame. 

221 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10]) 

222 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096]) 

223 with self.assertRaises(TypeError): 

224 VisitImage( 

225 self.image, 

226 projection=tract_proj, 

227 psf=self.gaussian_psf, 

228 mask_schema=self.mask_schema, 

229 obs_info=self.obs_info, 

230 detector=self.detector, 

231 band="r", 

232 ) 

233 

234 # Variance unit mismatch. 

235 with self.assertRaises(ValueError): 

236 VisitImage( 

237 self.image, 

238 variance=self.image, 

239 psf=self.gaussian_psf, 

240 mask_schema=self.mask_schema, 

241 projection=self.projection, 

242 obs_info=self.obs_info, 

243 detector=self.detector, 

244 band="r", 

245 ) 

246 

247 def test_copy_and_slice(self) -> None: 

248 """Test that arrays and components are copied (when not immutable) by 

249 'copy' and referenced by 'slice'. 

250 """ 

251 visit = self.visit_image 

252 copy = visit.copy() 

253 copy.image.array[0, 0] = 30.0 

254 self.assertEqual(visit.image.array[0, 0], 42.0) 

255 self.assertEqual(copy.image.array[0, 0], 30.0) 

256 subvisit = visit[Box.factory[0:5, 0:5]] 

257 # Check summary stats. 

258 self.assertEqual(copy.summary_stats, visit.summary_stats) 

259 self.assertIsNot(copy.summary_stats, visit.summary_stats) 

260 self.assertEqual(subvisit.summary_stats, visit.summary_stats) 

261 self.assertIs(subvisit.summary_stats, visit.summary_stats) 

262 # Check aperture corrections. 

263 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

264 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections) 

265 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

266 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections) 

267 # Check backgrounds. 

268 self.assertEqual(copy.backgrounds.keys(), visit.backgrounds.keys()) 

269 self.assertIsNot(copy.backgrounds, visit.backgrounds) 

270 self.assertEqual(subvisit.backgrounds.keys(), visit.backgrounds.keys()) 

271 self.assertIs(subvisit.backgrounds, visit.backgrounds) 

272 # Check bounds. 

273 self.assertIs(copy.bounds, self.polygon) 

274 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox 

275 

276 def test_obs_info(self) -> None: 

277 """Check that ObservationInfo has been constructed.""" 

278 visit = self.visit_image 

279 self.assertIsNotNone(visit.obs_info) 

280 self.maxDiff = None 

281 assert visit.obs_info is not None # for mypy. 

282 self.assertEqual(visit.obs_info.instrument, "LSSTCam") 

283 

284 def test_summary_stats(self) -> None: 

285 """Test the comparisons and attributes of ObservationSummaryStats.""" 

286 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)) 

287 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5)) 

288 self.assertNotEqual( 

289 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2)) 

290 ) 

291 

292 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

293 def test_round_trip_ndf(self): 

294 """NDF round-trip for VisitImage.""" 

295 with RoundtripNdf(self, self.visit_image, "VisitImage") as roundtrip: 

296 assert_visit_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

297 

298 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

299 def test_fits_ndf_consistency(self): 

300 """FITS and NDF backends produce equal VisitImages on round-trip.""" 

301 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt: 

302 assert_visit_images_equal(self, self.visit_image, fits_rt.result, expect_view=False) 

303 assert_visit_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False) 

304 assert_visit_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

305 

306 def test_fits_json_consistency(self): 

307 """FITS and JSON backends produce equal VisitImages on round-trip.""" 

308 with ( 

309 RoundtripFits(self, self.visit_image) as fits_rt, 

310 RoundtripJson(self, self.visit_image) as json_rt, 

311 ): 

312 assert_visit_images_equal(self, self.visit_image, fits_rt.result, expect_view=False) 

313 assert_visit_images_equal(self, self.visit_image, json_rt.result, expect_view=False) 

314 assert_visit_images_equal(self, fits_rt.result, json_rt.result, expect_view=False) 

315 

316 def test_read_write(self) -> None: 

317 """Test that a visit can round trip through a FITS file.""" 

318 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

319 # Check that we're still using the right compression, and that we 

320 # wrote WCSs. 

321 fits = roundtrip.inspect() 

322 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

323 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN") 

324 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

325 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN") 

326 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

327 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN") 

328 # Check a subimage read. 

329 subbox = Box.factory[8:13, 9:30] 

330 subimage = roundtrip.get(bbox=subbox) 

331 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

332 with self.subTest(): 

333 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

334 with self.subTest(): 

335 obs_info = roundtrip.get("obs_info") 

336 self.assertIsInstance(obs_info, ObservationInfo) 

337 self.assertEqual(obs_info, self.visit_image.obs_info) 

338 with self.subTest(): 

339 summary_stats = roundtrip.get("summary_stats") 

340 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

341 self.assertEqual(summary_stats, self.visit_image.summary_stats) 

342 with self.subTest(): 

343 psf = roundtrip.get("psf") 

344 self.assertIsInstance(psf, GaussianPointSpreadFunction) 

345 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox) 

346 with self.subTest(): 

347 backgrounds = roundtrip.get("backgrounds") 

348 self.assertIsInstance(backgrounds, BackgroundMap) 

349 self.assertEqual(backgrounds.keys(), {"standard"}) 

350 self.assertIsInstance(backgrounds["standard"].field, ChebyshevField) 

351 self.assertEqual(backgrounds.subtracted.name, "standard") 

352 self.assertEqual( 

353 roundtrip.result.backgrounds.subtracted.description, 

354 "Background subtracted from the image.", 

355 ) 

356 

357 assert_visit_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

358 # Check that the round-tripped headers are the same (up to card order). 

359 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1) 

360 self.assertEqual( 

361 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

362 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

363 ) 

364 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

365 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

366 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

367 # Spot-check the concrete background contents (names, field types, 

368 # subtracted entry) against the known fixture, so the equality check 

369 # above is not vacuously satisfied by empty background maps. 

370 self.assertIsInstance(roundtrip.result.backgrounds, BackgroundMap) 

371 self.assertEqual(roundtrip.result.backgrounds.keys(), {"standard"}) 

372 self.assertIsInstance(roundtrip.result.backgrounds["standard"].field, ChebyshevField) 

373 self.assertEqual(roundtrip.result.backgrounds.subtracted.name, "standard") 

374 self.assertEqual( 

375 roundtrip.result.backgrounds.subtracted.description, "Background subtracted from the image." 

376 ) 

377 

378 

379class VisitImageLegacyTestMixin: 

380 """Tests for the VisitImage class and the basics of the archive, to be 

381 specialized for a particular test image. 

382 

383 `setUp` or `setUpClass` must be implemented to set the attributes declared 

384 in the class. 

385 """ 

386 

387 filename: str 

388 legacy_exposure: Any 

389 plane_map: dict[str, MaskPlane] 

390 visit_image: VisitImage 

391 unit: u.UnitBase 

392 

393 def test_legacy_errors(self) -> None: 

394 """Legacy read failure modes.""" 

395 with self.assertRaises(ValueError): 

396 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC") 

397 with self.assertRaises(ValueError): 

398 VisitImage.from_legacy(self.legacy_exposure, visit=123456) 

399 with self.assertRaises(ValueError): 

400 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy) 

401 visit = VisitImage.from_legacy( 

402 self.legacy_exposure, instrument="LSSTCam", unit=self.unit, visit=2025052000177 

403 ) 

404 self.assertEqual(visit.unit, self.unit) 

405 

406 with self.assertRaises(ValueError): 

407 VisitImage.read_legacy(self.filename, instrument="HSC") 

408 with self.assertRaises(ValueError): 

409 VisitImage.read_legacy(self.filename, visit=123456) 

410 

411 def test_component_reads(self) -> None: 

412 """Test reads of components from legacy file.""" 

413 visit = VisitImage.read_legacy(self.filename) 

414 proj = VisitImage.read_legacy(self.filename, component="projection") 

415 assert_projections_equal(self, proj, visit.projection, expect_identity=False) 

416 image = VisitImage.read_legacy(self.filename, component="image") 

417 self.assertEqual(image, visit.image) 

418 assert_projections_equal(self, proj, image.projection, expect_identity=False) 

419 variance = VisitImage.read_legacy(self.filename, component="variance") 

420 self.assertEqual(variance, visit.variance) 

421 assert_projections_equal(self, proj, variance.projection, expect_identity=False) 

422 mask = VisitImage.read_legacy(self.filename, component="mask") 

423 self.assertEqual(mask, visit.mask) 

424 assert_projections_equal(self, proj, mask.projection, expect_identity=False) 

425 psf = VisitImage.read_legacy(self.filename, component="psf") 

426 self.assertIsInstance(psf, PointSpreadFunction) 

427 obs_info = VisitImage.read_legacy(self.filename, component="obs_info") 

428 self.check_legacy_obs_info(obs_info) 

429 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats") 

430 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

431 self.assertEqual(summary_stats.nPsfStar, self.legacy_exposure.info.getSummaryStats().nPsfStar) 

432 compare_aperture_corrections_to_legacy( 

433 self, 

434 VisitImage.read_legacy(self.filename, component="aperture_corrections"), 

435 self.legacy_exposure.info.getApCorrMap(), 

436 visit.bbox, 

437 ) 

438 detector = VisitImage.read_legacy(self.filename, component="detector") 

439 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True) 

440 photometric_scaling = VisitImage.read_legacy(self.filename, component="photometric_scaling") 

441 compare_photo_calib_to_legacy( 

442 self, 

443 photometric_scaling, 

444 self.legacy_exposure.getPhotoCalib(), 

445 subimage_bbox=visit.bbox, 

446 ) 

447 

448 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None: 

449 """Check that an `ObservationInfo` instance is not `None`, and that it 

450 matches the one in the legacy test data file. 

451 """ 

452 self.assertIsInstance(obs_info, ObservationInfo) 

453 self.assertEqual(obs_info.instrument, "LSSTCam") 

454 self.assertEqual(obs_info.detector_num, 85, obs_info) 

455 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info) 

456 self.assertEqual(obs_info.physical_filter, "r_57", obs_info) 

457 

458 def test_obs_info(self) -> None: 

459 """Check that ObservationInfo has been constructed.""" 

460 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

461 self.assertIsNotNone(legacy.obs_info) 

462 self.maxDiff = None 

463 self.assertEqual(legacy.obs_info, self.visit_image.obs_info) 

464 assert legacy.obs_info is not None # for mypy. 

465 self.assertEqual(legacy.obs_info.instrument, "LSSTCam") 

466 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info) 

467 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info) 

468 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info) 

469 

470 def test_aperture_corrections_to_legacy(self) -> None: 

471 """Test that we can convert an aperture correction map back to a 

472 legacy `lsst.afw.image.ApCorrMap`. 

473 """ 

474 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections) 

475 compare_aperture_corrections_to_legacy( 

476 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox 

477 ) 

478 

479 def test_read_legacy_headers(self) -> None: 

480 """Test that headers were correctly stripped and interpreted in 

481 `VisitImage.read_legacy`. 

482 """ 

483 # Check that we read the units from BUNIT. 

484 self.assertEqual(self.visit_image.unit, self.unit) 

485 # Check that the primary header has the keys we want, and none of the 

486 # keys we don't want. 

487 header = self.visit_image._opaque_metadata.headers[ExtensionKey()] 

488 self.assertIn("EXPTIME", header) 

489 self.assertEqual(header["PLATFORM"], "lsstcam") 

490 self.assertNotIn("LSST BUTLER ID", header) 

491 self.assertNotIn("AR HDU", header) 

492 self.assertNotIn("A_ORDER", header) 

493 # Check that the extension HDUs do not have any custom headers. 

494 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

495 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

496 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

497 

498 def test_from_legacy_headers(self) -> None: 

499 """Test that from_legacy handles headers properly.""" 

500 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

501 header = legacy._opaque_metadata.headers[ExtensionKey()] 

502 self.assertIn("EXPTIME", header) 

503 self.assertEqual(header["PLATFORM"], "lsstcam") 

504 self.assertNotIn("LSST BUTLER ID", header) 

505 self.assertNotIn("AR HDU", header) 

506 self.assertNotIn("A_ORDER", header) 

507 # Check that the extension HDUs do not have any custom headers. 

508 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

509 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

510 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

511 

512 def test_rewrite(self) -> None: 

513 """Test that we can rewrite the visit image and preserve both 

514 lossy-compressed pixel values and components exactly. 

515 """ 

516 import lsst.afw.image 

517 

518 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

519 # Check that we're still using the right compression, and that we 

520 # wrote WCSs. 

521 fits = roundtrip.inspect() 

522 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

523 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP") 

524 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

525 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP") 

526 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

527 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP") 

528 # Check a subimage read. 

529 subbox = Box.factory[8:13, 9:30] 

530 subimage = roundtrip.get(bbox=subbox) 

531 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

532 alternates: dict[str, Any] = {} 

533 with self.subTest(): 

534 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

535 alternates = { 

536 k: roundtrip.get(k) 

537 for k in [ 

538 "projection", 

539 "image", 

540 "mask", 

541 "variance", 

542 "psf", 

543 "obs_info", 

544 "summary_stats", 

545 "aperture_corrections", 

546 "detector", 

547 "photometric_scaling", 

548 ] 

549 } 

550 # Test reading back in as an Exposure. 

551 with self.subTest(): 

552 legacy_exposure = roundtrip.get(storageClass="Exposure") 

553 self.assertIsInstance(legacy_exposure, lsst.afw.image.Exposure) 

554 # This covers most of the compnents, which have clean 1-1 

555 # mappings from legacy to new: 

556 compare_visit_image_to_legacy( 

557 self, 

558 self.visit_image, 

559 legacy_exposure, 

560 expect_view=False, 

561 plane_map=self.plane_map, 

562 **DP2_VISIT_DETECTOR_DATA_ID, 

563 ) 

564 # A few components are different enough to merit extra 

565 # attention: 

566 if self.visit_image.unit == u.nJy: 

567 self.assertTrue(legacy_exposure.getPhotoCalib()._isConstant) 

568 self.assertEqual(legacy_exposure.getPhotoCalib().getCalibrationMean(), 1.0) 

569 else: 

570 compare_photo_calib_to_legacy( 

571 self, 

572 self.visit_image.photometric_scaling, 

573 legacy_exposure.getPhotoCalib(), 

574 subimage_bbox=subbox, 

575 ) 

576 self.assertEqual(legacy_exposure.info.getId(), self.legacy_exposure.info.getId()) 

577 # Try to do a butler get of a component with storage class 

578 # override. 

579 with self.subTest(): 

580 # We have VisitInfo available. 

581 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo") 

582 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo) 

583 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam") 

584 

585 assert_visit_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

586 # Check that the round-tripped headers are the same (up to card order). 

587 self.assertEqual( 

588 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

589 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

590 ) 

591 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

592 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

593 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

594 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam") 

595 compare_visit_image_to_legacy( 

596 self, 

597 roundtrip.result, 

598 self.legacy_exposure, 

599 expect_view=False, 

600 plane_map=self.plane_map, 

601 **DP2_VISIT_DETECTOR_DATA_ID, 

602 alternates=alternates, 

603 ) 

604 # Check converting from the legacy object in-memory. 

605 compare_visit_image_to_legacy( 

606 self, 

607 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map), 

608 self.legacy_exposure, 

609 expect_view=True, 

610 plane_map=self.plane_map, 

611 **DP2_VISIT_DETECTOR_DATA_ID, 

612 ) 

613 

614 def test_butler_converters(self) -> None: 

615 """Test that we can read a VisitImage and its components from a butler 

616 dataset written as an `lsst.afw.image.Exposure`. 

617 """ 

618 if self.legacy_exposure is None: 

619 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.") 

620 with TemporaryButler(legacy="ExposureF") as helper: 

621 from lsst.daf.butler import FileDataset 

622 

623 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink") 

624 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage") 

625 with warnings.catch_warnings(): 

626 # Silence warnings about data ID and filter label disagreeing. 

627 warnings.simplefilter("ignore", category=UserWarning) 

628 visit_image = helper.butler.get(visit_image_ref) 

629 # We didn't ask for the quantization to be preserved, so it 

630 # shouldn't be. 

631 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), set()) 

632 # This time preserve the quantization 

633 visit_image = helper.butler.get(visit_image_ref, parameters={"preserve_quantization": True}) 

634 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), {"IMAGE", "VARIANCE"}) 

635 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox")) 

636 self.assertEqual(bbox, visit_image.bbox) 

637 alternates = { 

638 k: helper.butler.get(visit_image_ref.makeComponentRef(k)) 

639 # TODO: including "projection" or "obs_info" here fails because 

640 # there's code in daf_butler that expects any component to be 

641 # valid for the *internal* storage class, not the requested 

642 # one, and that's difficult to fix because it's tied up with 

643 # the data ID standardization logic. 

644 for k in ["image", "mask", "variance", "bbox", "psf", "detector"] 

645 } 

646 compare_visit_image_to_legacy( 

647 self, 

648 visit_image, 

649 self.legacy_exposure, 

650 expect_view=False, 

651 plane_map=self.plane_map, 

652 alternates=alternates, 

653 **DP2_VISIT_DETECTOR_DATA_ID, 

654 ) 

655 # Add some metadata to the new VisitImage and then do a converting 

656 # `put` that should write to the old format (we have to delete the 

657 # old one first, which just deletes a symlink). 

658 helper.butler.pruneDatasets([helper.legacy], purge=True, unstore=True, disassociate=True) 

659 visit_image.metadata["MixedCaseKey"] = 52 

660 helper.butler.put(visit_image, visit_image_ref) 

661 # Check that we can read *that* back in as a legacy exposure. 

662 legacy_exposure = helper.butler.get(helper.legacy) 

663 compare_visit_image_to_legacy( 

664 self, 

665 visit_image, 

666 legacy_exposure, 

667 expect_view=False, 

668 plane_map=self.plane_map, 

669 alternates=alternates, 

670 **DP2_VISIT_DETECTOR_DATA_ID, 

671 ) 

672 # Check that we can read it back in as a VisitImage, and that the 

673 # new metadata is preserved. 

674 visit_image_2 = helper.butler.get(visit_image_ref) 

675 compare_visit_image_to_legacy( 

676 self, 

677 visit_image_2, 

678 legacy_exposure, 

679 expect_view=False, 

680 plane_map=self.plane_map, 

681 alternates=alternates, 

682 **DP2_VISIT_DETECTOR_DATA_ID, 

683 ) 

684 self.assertEqual(visit_image_2.metadata["MixedCaseKey"], 52) 

685 

686 

687@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

688class VisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin): 

689 """Tests for the VisitImage class using a DRP-final visit_image dataset. 

690 

691 Requires legacy code. 

692 """ 

693 

694 @classmethod 

695 def setUpClass(cls) -> None: 

696 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

697 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits") 

698 try: 

699 from lsst.afw.image import ExposureFitsReader 

700 

701 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

702 except ImportError: 

703 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

704 cls.plane_map = get_legacy_visit_image_mask_planes() 

705 cls.visit_image = VisitImage.read_legacy( 

706 cls.filename, preserve_quantization=True, plane_map=cls.plane_map 

707 ) 

708 cls.unit = u.nJy 

709 

710 def test_convert_unit(self) -> None: 

711 """Test using the ``photometric_scaling`` to swap between 

712 calibrated and instrumental units. 

713 

714 This includes tests of the `VisitImage.to_legacy` logic for units that 

715 don't map directly to `lsst.afw.image.PhotoCalib` conventions. 

716 """ 

717 from lsst.afw.table import ExposureCatalog 

718 

719 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

720 # Make a copy of class state so we can modify it without breaking 

721 # other tests. 

722 original = self.visit_image.copy() 

723 # We should not be able to convert to instrumental units when there is 

724 # no photometric scaling. 

725 with self.assertRaises(u.UnitConversionError): 

726 original.convert_unit(u.electron) 

727 # Converting to the current unit should be a no-op that does not need 

728 # to copy. 

729 visit_image_nJy = original.convert_unit(u.nJy, copy=False) 

730 self.assertTrue(np.may_share_memory(visit_image_nJy.image.array, original.image.array)) 

731 self.assertTrue(np.may_share_memory(visit_image_nJy.variance.array, original.variance.array)) 

732 # Even without a photometric_scaling attached, we should be able to 

733 # convert to a compatible unit, but only if we allow copies. 

734 with self.assertRaises(u.UnitConversionError): 

735 original.convert_unit(u.mJy, copy=False) 

736 visit_image_mJy = original.convert_unit(u.mJy, copy="as-needed") 

737 self.assertEqual(visit_image_mJy.unit, u.mJy) 

738 assert_close(self, visit_image_mJy.image.array, original.image.array * 1e-6) 

739 self.assertTrue(np.may_share_memory(visit_image_nJy.mask.array, original.mask.array)) 

740 assert_close(self, visit_image_mJy.variance.array, original.variance.array * 1e-12) 

741 # Converting a mJy image to legacy should make a PhotoCalib that maps 

742 # mJy to nJy. 

743 legacy_exposure_mJy = visit_image_mJy.to_legacy() 

744 assert_close(self, legacy_exposure_mJy.getPhotoCalib().getCalibrationMean(), 1e6) 

745 legacy_masked_image_nJy = legacy_exposure_mJy.getPhotoCalib().calibrateImage( 

746 legacy_exposure_mJy.maskedImage 

747 ) 

748 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array) 

749 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array) 

750 # Test that we haven't dropped any component objects along the way, 

751 # and that they're all still the same objects or thin views. 

752 self.assertTrue(np.may_share_memory(visit_image_mJy.mask.array, original.mask.array)) 

753 self.assertIs(visit_image_mJy.projection, original.projection) 

754 self.assertIs(visit_image_mJy.obs_info, original.obs_info) 

755 self.assertIs(visit_image_mJy.summary_stats, original.summary_stats) 

756 self.assertIs(visit_image_mJy.psf, original.psf) 

757 self.assertIs(visit_image_mJy.detector, original.detector) 

758 self.assertIs(visit_image_mJy.bounds, original.bounds) 

759 self.assertIs(visit_image_mJy.aperture_corrections, original.aperture_corrections) 

760 self.assertIs(visit_image_mJy.photometric_scaling, original.photometric_scaling) 

761 # Attach the final PhotoCalib (this isn't stored with the legacy file 

762 # because that is the mapping to nJy, which is trivial). 

763 visit_summary = ExposureCatalog.readFits( 

764 os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_summary.fits") 

765 ) 

766 legacy_photo_calib = visit_summary.find(DP2_VISIT_DETECTOR_DATA_ID["detector"]).getPhotoCalib() 

767 visit_image_nJy.photometric_scaling = field_from_legacy_photo_calib( 

768 legacy_photo_calib, bounds=original.detector.bbox, instrumental_unit=u.electron 

769 ) 

770 compare_photo_calib_to_legacy( 

771 self, 

772 visit_image_nJy.photometric_scaling, 

773 self.legacy_exposure.getPhotoCalib(), 

774 applied_legacy_photo_calib=legacy_photo_calib, 

775 subimage_bbox=visit_image_nJy.bbox, 

776 ) 

777 # We still can't convert to completely unrelated units. 

778 with self.assertRaises(u.UnitConversionError): 

779 visit_image_nJy.convert_unit(u.mm) 

780 # Uncalibrating via the photometric_scaling matches what legacy code 

781 # does, and by default it copies everything. 

782 with self.assertRaises(u.UnitConversionError): 

783 visit_image_nJy.convert_unit(u.electron, copy=False) 

784 legacy_masked_image_e = legacy_photo_calib.uncalibrateImage(self.legacy_exposure.maskedImage) 

785 visit_image_e = visit_image_nJy.convert_unit(u.electron) 

786 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array) 

787 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array) 

788 self.assertFalse(np.may_share_memory(visit_image_e.mask.array, visit_image_nJy.mask.array)) 

789 # We can also uncalibrate if we start with an image that has units 

790 # that are compatible with the photometric_scaling but not identical 

791 # to it. 

792 visit_image_mJy.photometric_scaling = visit_image_nJy.photometric_scaling 

793 visit_image_e = visit_image_mJy.convert_unit(u.electron) 

794 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array) 

795 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array) 

796 # We can re-apply the scaling to go back to calibrated units. 

797 visit_image_nJy_2 = visit_image_e.convert_unit(u.nJy) 

798 assert_close(self, visit_image_nJy_2.image.array, visit_image_nJy.image.array) 

799 assert_close(self, visit_image_nJy_2.variance.array, original.variance.array) 

800 # Try calibrating an image with a scaling that has units other than 

801 # nJy in the numerator. 

802 visit_image_e.photometric_scaling = visit_image_nJy.photometric_scaling * (1e-6 * u.mJy / u.nJy) 

803 visit_image_nJy_3 = visit_image_e.convert_unit(u.nJy) 

804 assert_close(self, visit_image_nJy_3.image.array, visit_image_nJy.image.array) 

805 assert_close(self, visit_image_nJy_3.variance.array, original.variance.array) 

806 # Try converting that uncalibrated image to legacy; the extra mJy/nJy 

807 # factor should get included in the PhotoCalib to recover the original 

808 # PhotoCalib. 

809 legacy_exposure_e = visit_image_e.to_legacy() 

810 assert_close( 

811 self, 

812 legacy_exposure_e.getPhotoCalib().getCalibrationMean(), 

813 legacy_photo_calib.getCalibrationMean(), 

814 ) 

815 legacy_masked_image_nJy = legacy_exposure_e.getPhotoCalib().calibrateImage( 

816 legacy_exposure_e.maskedImage 

817 ) 

818 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array) 

819 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array) 

820 

821 

822@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

823class PreliminaryVisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin): 

824 """Tests for the VisitImage class using a DRP preliminary_visit_image 

825 dataset. 

826 

827 Requires legacy code. 

828 """ 

829 

830 @classmethod 

831 def setUpClass(cls) -> None: 

832 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

833 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "preliminary_visit_image.fits") 

834 try: 

835 from lsst.afw.image import ExposureFitsReader 

836 

837 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

838 except ImportError: 

839 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

840 cls.plane_map = get_legacy_visit_image_mask_planes() 

841 cls.visit_image = VisitImage.read_legacy( 

842 cls.filename, preserve_quantization=True, plane_map=cls.plane_map 

843 ) 

844 cls.unit = u.electron 

845 

846 

847if __name__ == "__main__": 

848 unittest.main()