Coverage for tests / test_visit_image.py: 16%

370 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:32 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16import warnings 

17from typing import Any 

18 

19import astropy.io.fits 

20import astropy.units as u 

21import astropy.wcs 

22import numpy as np 

23from astro_metadata_translator import ObservationInfo 

24 

25from lsst.images import ( 

26 BackgroundMap, 

27 Box, 

28 DetectorFrame, 

29 Image, 

30 MaskPlane, 

31 MaskSchema, 

32 ObservationSummaryStats, 

33 Polygon, 

34 ProjectionAstropyView, 

35 TractFrame, 

36 VisitImage, 

37 get_legacy_visit_image_mask_planes, 

38) 

39from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy 

40from lsst.images.cameras import Detector 

41from lsst.images.fields import ChebyshevField, field_from_legacy_photo_calib 

42from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

43from lsst.images.json import read as read_json 

44from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction 

45from lsst.images.tests import ( 

46 DP2_VISIT_DETECTOR_DATA_ID, 

47 RoundtripFits, 

48 RoundtripNdf, 

49 TemporaryButler, 

50 assert_close, 

51 assert_masked_images_equal, 

52 assert_projections_equal, 

53 compare_aperture_corrections_to_legacy, 

54 compare_detector_to_legacy, 

55 compare_photo_calib_to_legacy, 

56 compare_visit_image_to_legacy, 

57 make_random_projection, 

58) 

59 

60try: 

61 import h5py # noqa: F401 

62 

63 HAVE_H5PY = True 

64except ImportError: 

65 HAVE_H5PY = False 

66 

67EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

68LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") 

69 

70 

71class VisitImageTestCase(unittest.TestCase): 

72 """Basic Tests for VisitImage.""" 

73 

74 @classmethod 

75 def setUpClass(cls) -> None: 

76 cls.rng = np.random.default_rng(500) 

77 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096]) 

78 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")]) 

79 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4) 

80 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4) 

81 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]) 

82 cls.aperture_corrections: ApertureCorrectionMap = { 

83 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])), 

84 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])), 

85 } 

86 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json")) 

87 

88 opaque = FitsOpaqueMetadata() 

89 hdr = astropy.io.fits.Header() 

90 with warnings.catch_warnings(): 

91 # Silence warnings about long keys becoming HIERARCH. 

92 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

93 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"}) 

94 opaque.extract_legacy_primary_header(hdr) 

95 

96 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy) 

97 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy) 

98 # polygon is the lower triangle of the image. 

99 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5]) 

100 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096]) 

101 # API signature suggests projection and obs_info can be None but they 

102 # are required (unless you pass them in via the image plane). 

103 cls.visit_image = VisitImage( 

104 cls.image, 

105 variance=cls.variance, 

106 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

107 mask_schema=cls.mask_schema, 

108 projection=cls.projection, 

109 obs_info=cls.obs_info, 

110 summary_stats=cls.summary_stats, 

111 detector=cls.detector, 

112 bounds=cls.polygon, 

113 aperture_corrections=cls.aperture_corrections, 

114 ) 

115 cls.visit_image.backgrounds.add( 

116 "standard", 

117 ChebyshevField(det_frame.bbox, np.array([[2.0]])), 

118 description="Background subtracted from the image.", 

119 is_subtracted=True, 

120 ) 

121 cls.visit_image._opaque_metadata = opaque 

122 cls.simplest_visit_image = VisitImage( 

123 cls.image, 

124 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

125 mask_schema=cls.mask_schema, 

126 projection=cls.projection, 

127 detector=cls.detector, 

128 obs_info=cls.obs_info, 

129 ) 

130 

131 def test_basics(self) -> None: 

132 """Test basic constructor patterns.""" 

133 # Test default fill of variance. 

134 visit = self.simplest_visit_image 

135 self.assertEqual(visit.variance.array[0, 0], 1.0) 

136 self.assertIs(visit[...], visit) 

137 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])") 

138 self.assertEqual( 

139 repr(visit), 

140 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024))," 

141 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')]," 

142 " dtype=dtype('uint8')))", 

143 ) 

144 

145 astropy_wcs = visit.astropy_wcs 

146 self.assertIsInstance(astropy_wcs, ProjectionAstropyView) 

147 approx_wcs = visit.fits_wcs 

148 self.assertIsInstance(approx_wcs, astropy.wcs.WCS) 

149 

150 with self.assertRaises(TypeError): 

151 # Requires a PSF. 

152 VisitImage( 

153 self.image, 

154 mask_schema=self.mask_schema, 

155 projection=self.projection, 

156 obs_info=self.obs_info, 

157 detector=self.detector, 

158 ) 

159 

160 with self.assertRaises(TypeError): 

161 # Requires ObservationInfo. 

162 VisitImage( 

163 self.image, 

164 psf=self.gaussian_psf, 

165 mask_schema=self.mask_schema, 

166 projection=self.projection, 

167 detector=self.detector, 

168 ) 

169 

170 with self.assertRaises(TypeError): 

171 # Requires a projection. 

172 VisitImage( 

173 self.image, 

174 psf=self.gaussian_psf, 

175 mask_schema=self.mask_schema, 

176 obs_info=self.obs_info, 

177 detector=self.detector, 

178 ) 

179 

180 with self.assertRaises(TypeError): 

181 # Requires a detector. 

182 VisitImage( 

183 self.image, 

184 psf=self.gaussian_psf, 

185 mask_schema=self.mask_schema, 

186 projection=self.projection, 

187 obs_info=self.obs_info, 

188 ) 

189 

190 with self.assertRaises(TypeError): 

191 # Requires some form of mask. 

192 VisitImage( 

193 self.image, 

194 psf=self.gaussian_psf, 

195 projection=self.projection, 

196 obs_info=self.obs_info, 

197 detector=self.detector, 

198 ) 

199 

200 with self.assertRaises(TypeError): 

201 VisitImage( 

202 Image(42, shape=(5, 5)), 

203 psf=self.gaussian_psf, 

204 mask_schema=self.mask_schema, 

205 projection=self.projection, 

206 obs_info=self.obs_info, 

207 detector=self.detector, 

208 ) 

209 

210 # Requires a DetectorFrame. 

211 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10]) 

212 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096]) 

213 with self.assertRaises(TypeError): 

214 VisitImage( 

215 self.image, 

216 projection=tract_proj, 

217 psf=self.gaussian_psf, 

218 mask_schema=self.mask_schema, 

219 obs_info=self.obs_info, 

220 detector=self.detector, 

221 ) 

222 

223 # Variance unit mismatch. 

224 with self.assertRaises(ValueError): 

225 VisitImage( 

226 self.image, 

227 variance=self.image, 

228 psf=self.gaussian_psf, 

229 mask_schema=self.mask_schema, 

230 projection=self.projection, 

231 obs_info=self.obs_info, 

232 detector=self.detector, 

233 ) 

234 

235 def test_copy_and_slice(self) -> None: 

236 """Test that arrays and components are copied (when not immutable) by 

237 'copy' and referenced by 'slice'. 

238 """ 

239 visit = self.visit_image 

240 copy = visit.copy() 

241 copy.image.array[0, 0] = 30.0 

242 self.assertEqual(visit.image.array[0, 0], 42.0) 

243 self.assertEqual(copy.image.array[0, 0], 30.0) 

244 subvisit = visit[Box.factory[0:5, 0:5]] 

245 # Check summary stats. 

246 self.assertEqual(copy.summary_stats, visit.summary_stats) 

247 self.assertIsNot(copy.summary_stats, visit.summary_stats) 

248 self.assertEqual(subvisit.summary_stats, visit.summary_stats) 

249 self.assertIs(subvisit.summary_stats, visit.summary_stats) 

250 # Check aperture corrections. 

251 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

252 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections) 

253 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

254 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections) 

255 # Check backgrounds. 

256 self.assertEqual(copy.backgrounds.keys(), visit.backgrounds.keys()) 

257 self.assertIsNot(copy.backgrounds, visit.backgrounds) 

258 self.assertEqual(subvisit.backgrounds.keys(), visit.backgrounds.keys()) 

259 self.assertIs(subvisit.backgrounds, visit.backgrounds) 

260 # Check bounds. 

261 self.assertIs(copy.bounds, self.polygon) 

262 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox 

263 

264 def test_obs_info(self) -> None: 

265 """Check that ObservationInfo has been constructed.""" 

266 visit = self.visit_image 

267 self.assertIsNotNone(visit.obs_info) 

268 self.maxDiff = None 

269 assert visit.obs_info is not None # for mypy. 

270 self.assertEqual(visit.obs_info.instrument, "LSSTCam") 

271 

272 def test_summary_stats(self) -> None: 

273 """Test the comparisons and attributes of ObservationSummaryStats.""" 

274 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)) 

275 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5)) 

276 self.assertNotEqual( 

277 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2)) 

278 ) 

279 

280 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

281 def test_round_trip_ndf(self): 

282 """NDF round-trip for VisitImage.""" 

283 with RoundtripNdf(self, self.visit_image) as roundtrip: 

284 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

285 self.assertEqual(roundtrip.result.summary_stats, self.visit_image.summary_stats) 

286 self.assertEqual(type(roundtrip.result.psf), type(self.visit_image.psf)) 

287 

288 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

289 def test_fits_ndf_consistency(self): 

290 """FITS and NDF backends produce equal VisitImages on round-trip.""" 

291 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt: 

292 assert_masked_images_equal(self, self.visit_image, fits_rt.result, expect_view=False) 

293 assert_masked_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False) 

294 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

295 

296 def test_read_write(self) -> None: 

297 """Test that a visit can round trip through a FITS file.""" 

298 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

299 # Check that we're still using the right compression, and that we 

300 # wrote WCSs. 

301 fits = roundtrip.inspect() 

302 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

303 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN") 

304 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

305 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN") 

306 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

307 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN") 

308 # Check a subimage read. 

309 subbox = Box.factory[8:13, 9:30] 

310 subimage = roundtrip.get(bbox=subbox) 

311 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

312 with self.subTest(): 

313 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

314 with self.subTest(): 

315 obs_info = roundtrip.get("obs_info") 

316 self.assertIsInstance(obs_info, ObservationInfo) 

317 self.assertEqual(obs_info, self.visit_image.obs_info) 

318 with self.subTest(): 

319 summary_stats = roundtrip.get("summary_stats") 

320 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

321 self.assertEqual(summary_stats, self.visit_image.summary_stats) 

322 with self.subTest(): 

323 psf = roundtrip.get("psf") 

324 self.assertIsInstance(psf, GaussianPointSpreadFunction) 

325 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox) 

326 with self.subTest(): 

327 backgrounds = roundtrip.get("backgrounds") 

328 self.assertIsInstance(backgrounds, BackgroundMap) 

329 self.assertEqual(backgrounds.keys(), {"standard"}) 

330 self.assertIsInstance(backgrounds["standard"].field, ChebyshevField) 

331 self.assertEqual(backgrounds.subtracted.name, "standard") 

332 self.assertEqual( 

333 roundtrip.result.backgrounds.subtracted.description, 

334 "Background subtracted from the image.", 

335 ) 

336 

337 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

338 # Check that the round-tripped headers are the same (up to card order). 

339 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1) 

340 self.assertEqual( 

341 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

342 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

343 ) 

344 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

345 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

346 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

347 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info) 

348 self.assertIsNotNone(roundtrip.result.summary_stats) 

349 self.assertEqual( 

350 roundtrip.result.summary_stats.psfSigma, 

351 self.visit_image.summary_stats.psfSigma, 

352 ) 

353 self.assertEqual( 

354 roundtrip.result.summary_stats.zeroPoint, 

355 self.visit_image.summary_stats.zeroPoint, 

356 ) 

357 self.assertEqual(roundtrip.result.bounds, self.polygon) 

358 self.assertIsInstance(roundtrip.result.backgrounds, BackgroundMap) 

359 self.assertEqual(roundtrip.result.backgrounds.keys(), {"standard"}) 

360 self.assertIsInstance(roundtrip.result.backgrounds["standard"].field, ChebyshevField) 

361 self.assertEqual(roundtrip.result.backgrounds.subtracted.name, "standard") 

362 self.assertEqual( 

363 roundtrip.result.backgrounds.subtracted.description, "Background subtracted from the image." 

364 ) 

365 

366 

367class VisitImageLegacyTestMixin: 

368 """Tests for the VisitImage class and the basics of the archive, to be 

369 specialized for a particular test image. 

370 

371 `setUp` or `setUpClass` must be implemented to set the attributes declared 

372 in the class. 

373 """ 

374 

375 filename: str 

376 legacy_exposure: Any 

377 plane_map: dict[str, MaskPlane] 

378 visit_image: VisitImage 

379 unit: u.UnitBase 

380 

381 def test_legacy_errors(self) -> None: 

382 """Legacy read failure modes.""" 

383 with self.assertRaises(ValueError): 

384 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC") 

385 with self.assertRaises(ValueError): 

386 VisitImage.from_legacy(self.legacy_exposure, visit=123456) 

387 with self.assertRaises(ValueError): 

388 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy) 

389 visit = VisitImage.from_legacy( 

390 self.legacy_exposure, instrument="LSSTCam", unit=self.unit, visit=2025052000177 

391 ) 

392 self.assertEqual(visit.unit, self.unit) 

393 

394 with self.assertRaises(ValueError): 

395 VisitImage.read_legacy(self.filename, instrument="HSC") 

396 with self.assertRaises(ValueError): 

397 VisitImage.read_legacy(self.filename, visit=123456) 

398 

399 def test_component_reads(self) -> None: 

400 """Test reads of components from legacy file.""" 

401 visit = VisitImage.read_legacy(self.filename) 

402 proj = VisitImage.read_legacy(self.filename, component="projection") 

403 assert_projections_equal(self, proj, visit.projection, expect_identity=False) 

404 image = VisitImage.read_legacy(self.filename, component="image") 

405 self.assertEqual(image, visit.image) 

406 assert_projections_equal(self, proj, image.projection, expect_identity=False) 

407 variance = VisitImage.read_legacy(self.filename, component="variance") 

408 self.assertEqual(variance, visit.variance) 

409 assert_projections_equal(self, proj, variance.projection, expect_identity=False) 

410 mask = VisitImage.read_legacy(self.filename, component="mask") 

411 self.assertEqual(mask, visit.mask) 

412 assert_projections_equal(self, proj, mask.projection, expect_identity=False) 

413 psf = VisitImage.read_legacy(self.filename, component="psf") 

414 self.assertIsInstance(psf, PointSpreadFunction) 

415 obs_info = VisitImage.read_legacy(self.filename, component="obs_info") 

416 self.check_legacy_obs_info(obs_info) 

417 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats") 

418 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

419 self.assertEqual(summary_stats.nPsfStar, self.legacy_exposure.info.getSummaryStats().nPsfStar) 

420 compare_aperture_corrections_to_legacy( 

421 self, 

422 VisitImage.read_legacy(self.filename, component="aperture_corrections"), 

423 self.legacy_exposure.info.getApCorrMap(), 

424 visit.bbox, 

425 ) 

426 detector = VisitImage.read_legacy(self.filename, component="detector") 

427 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True) 

428 photometric_scaling = VisitImage.read_legacy(self.filename, component="photometric_scaling") 

429 compare_photo_calib_to_legacy( 

430 self, 

431 photometric_scaling, 

432 self.legacy_exposure.getPhotoCalib(), 

433 subimage_bbox=visit.bbox, 

434 ) 

435 

436 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None: 

437 """Check that an `ObservationInfo` instance is not `None`, and that it 

438 matches the one in the legacy test data file. 

439 """ 

440 self.assertIsInstance(obs_info, ObservationInfo) 

441 self.assertEqual(obs_info.instrument, "LSSTCam") 

442 self.assertEqual(obs_info.detector_num, 85, obs_info) 

443 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info) 

444 self.assertEqual(obs_info.physical_filter, "r_57", obs_info) 

445 

446 def test_obs_info(self) -> None: 

447 """Check that ObservationInfo has been constructed.""" 

448 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

449 self.assertIsNotNone(legacy.obs_info) 

450 self.maxDiff = None 

451 self.assertEqual(legacy.obs_info, self.visit_image.obs_info) 

452 assert legacy.obs_info is not None # for mypy. 

453 self.assertEqual(legacy.obs_info.instrument, "LSSTCam") 

454 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info) 

455 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info) 

456 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info) 

457 

458 def test_aperture_corrections_to_legacy(self) -> None: 

459 """Test that we can convert an aperture correction map back to a 

460 legacy `lsst.afw.image.ApCorrMap`. 

461 """ 

462 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections) 

463 compare_aperture_corrections_to_legacy( 

464 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox 

465 ) 

466 

467 def test_read_legacy_headers(self) -> None: 

468 """Test that headers were correctly stripped and interpreted in 

469 `VisitImage.read_legacy`. 

470 """ 

471 # Check that we read the units from BUNIT. 

472 self.assertEqual(self.visit_image.unit, self.unit) 

473 # Check that the primary header has the keys we want, and none of the 

474 # keys we don't want. 

475 header = self.visit_image._opaque_metadata.headers[ExtensionKey()] 

476 self.assertIn("EXPTIME", header) 

477 self.assertEqual(header["PLATFORM"], "lsstcam") 

478 self.assertNotIn("LSST BUTLER ID", header) 

479 self.assertNotIn("AR HDU", header) 

480 self.assertNotIn("A_ORDER", header) 

481 # Check that the extension HDUs do not have any custom headers. 

482 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

483 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

484 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

485 

486 def test_from_legacy_headers(self) -> None: 

487 """Test that from_legacy handles headers properly.""" 

488 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

489 header = legacy._opaque_metadata.headers[ExtensionKey()] 

490 self.assertIn("EXPTIME", header) 

491 self.assertEqual(header["PLATFORM"], "lsstcam") 

492 self.assertNotIn("LSST BUTLER ID", header) 

493 self.assertNotIn("AR HDU", header) 

494 self.assertNotIn("A_ORDER", header) 

495 # Check that the extension HDUs do not have any custom headers. 

496 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

497 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

498 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

499 

500 def test_rewrite(self) -> None: 

501 """Test that we can rewrite the visit image and preserve both 

502 lossy-compressed pixel values and components exactly. 

503 """ 

504 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

505 # Check that we're still using the right compression, and that we 

506 # wrote WCSs. 

507 fits = roundtrip.inspect() 

508 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

509 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP") 

510 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

511 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP") 

512 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

513 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP") 

514 # Check a subimage read. 

515 subbox = Box.factory[8:13, 9:30] 

516 subimage = roundtrip.get(bbox=subbox) 

517 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

518 alternates: dict[str, Any] = {} 

519 with self.subTest(): 

520 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

521 alternates = { 

522 k: roundtrip.get(k) 

523 for k in [ 

524 "projection", 

525 "image", 

526 "mask", 

527 "variance", 

528 "psf", 

529 "obs_info", 

530 "summary_stats", 

531 "aperture_corrections", 

532 "detector", 

533 "photometric_scaling", 

534 ] 

535 } 

536 # Try to do a butler get of a component with storage class 

537 # override. 

538 with self.subTest(): 

539 if self.legacy_exposure is not None: 

540 import lsst.afw.image 

541 

542 # We have VisitInfo available. 

543 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo") 

544 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo) 

545 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam") 

546 else: 

547 raise unittest.SkipTest("Can not test VisitInfo conversion without afw") 

548 

549 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

550 # Check that the round-tripped headers are the same (up to card order). 

551 self.assertEqual( 

552 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

553 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

554 ) 

555 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

556 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

557 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

558 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam") 

559 compare_visit_image_to_legacy( 

560 self, 

561 roundtrip.result, 

562 self.legacy_exposure, 

563 expect_view=False, 

564 plane_map=self.plane_map, 

565 **DP2_VISIT_DETECTOR_DATA_ID, 

566 alternates=alternates, 

567 ) 

568 # Check converting from the legacy object in-memory. 

569 compare_visit_image_to_legacy( 

570 self, 

571 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map), 

572 self.legacy_exposure, 

573 expect_view=True, 

574 plane_map=self.plane_map, 

575 **DP2_VISIT_DETECTOR_DATA_ID, 

576 ) 

577 

578 def test_butler_converters(self) -> None: 

579 """Test that we can read a VisitImage and its components from a butler 

580 dataset written as an `lsst.afw.image.Exposure`. 

581 """ 

582 if self.legacy_exposure is None: 

583 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.") 

584 with TemporaryButler(legacy="ExposureF") as helper: 

585 from lsst.daf.butler import FileDataset 

586 

587 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink") 

588 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage") 

589 visit_image = helper.butler.get(visit_image_ref) 

590 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox")) 

591 self.assertEqual(bbox, visit_image.bbox) 

592 alternates = { 

593 k: helper.butler.get(visit_image_ref.makeComponentRef(k)) 

594 # TODO: including "projection" or "obs_info" here fails because 

595 # there's code in daf_butler that expects any component to be 

596 # valid for the *internal* storage class, not the requested 

597 # one, and that's difficult to fix because it's tied up with 

598 # the data ID standardization logic. 

599 for k in ["image", "mask", "variance", "psf", "detector"] 

600 } 

601 compare_visit_image_to_legacy( 

602 self, 

603 visit_image, 

604 self.legacy_exposure, 

605 expect_view=False, 

606 plane_map=self.plane_map, 

607 alternates=alternates, 

608 **DP2_VISIT_DETECTOR_DATA_ID, 

609 ) 

610 

611 

612@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

613class VisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin): 

614 """Tests for the VisitImage class using a DRP-final visit_image dataset. 

615 

616 Requires legacy code. 

617 """ 

618 

619 @classmethod 

620 def setUpClass(cls) -> None: 

621 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

622 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits") 

623 try: 

624 from lsst.afw.image import ExposureFitsReader 

625 

626 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

627 except ImportError: 

628 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

629 cls.plane_map = get_legacy_visit_image_mask_planes() 

630 cls.visit_image = VisitImage.read_legacy( 

631 cls.filename, preserve_quantization=True, plane_map=cls.plane_map 

632 ) 

633 cls.unit = u.nJy 

634 

635 def test_convert_unit(self) -> None: 

636 """Test using the ``photometric_scaling`` to swap between 

637 calibrated and instrumental units. 

638 """ 

639 from lsst.afw.table import ExposureCatalog 

640 

641 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

642 # We should not be able to convert to instrumental units when there is 

643 # no photometric scaling. 

644 with self.assertRaises(u.UnitConversionError): 

645 self.visit_image.convert_unit(u.electron) 

646 # Converting to the current unit should be a no-op that does not need 

647 # to copy. 

648 visit_image_nJy = self.visit_image.convert_unit(u.nJy, copy=False) 

649 self.assertTrue(np.may_share_memory(visit_image_nJy.image.array, self.visit_image.image.array)) 

650 self.assertTrue(np.may_share_memory(visit_image_nJy.variance.array, self.visit_image.variance.array)) 

651 # Even without a photometric_scaling attached, we should be able to 

652 # convert to a compatible unit, but only if we allow copies. 

653 with self.assertRaises(u.UnitConversionError): 

654 self.visit_image.convert_unit(u.mJy, copy=False) 

655 visit_image_mJy = self.visit_image.convert_unit(u.mJy, copy="as-needed") 

656 self.assertEqual(visit_image_mJy.unit, u.mJy) 

657 assert_close(self, visit_image_mJy.image.array, self.visit_image.image.array * 1e-6) 

658 self.assertTrue(np.may_share_memory(visit_image_nJy.mask.array, self.visit_image.mask.array)) 

659 assert_close(self, visit_image_mJy.variance.array, self.visit_image.variance.array * 1e-12) 

660 # Test that we haven't dropped any component objects along the way, 

661 # and that they're all still the same objects or thin views. 

662 self.assertTrue(np.may_share_memory(visit_image_mJy.mask.array, self.visit_image.mask.array)) 

663 self.assertIs(visit_image_mJy.projection, self.visit_image.projection) 

664 self.assertIs(visit_image_mJy.obs_info, self.visit_image.obs_info) 

665 self.assertIs(visit_image_mJy.summary_stats, self.visit_image.summary_stats) 

666 self.assertIs(visit_image_mJy.psf, self.visit_image.psf) 

667 self.assertIs(visit_image_mJy.detector, self.visit_image.detector) 

668 self.assertIs(visit_image_mJy.bounds, self.visit_image.bounds) 

669 self.assertIs(visit_image_mJy.aperture_corrections, self.visit_image.aperture_corrections) 

670 self.assertIs(visit_image_mJy.photometric_scaling, self.visit_image.photometric_scaling) 

671 # Attach the final PhotoCalib (this isn't stored with the legacy file 

672 # because that is the mapping to nJy, which is trivial). 

673 visit_summary = ExposureCatalog.readFits( 

674 os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_summary.fits") 

675 ) 

676 legacy_photo_calib = visit_summary.find(DP2_VISIT_DETECTOR_DATA_ID["detector"]).getPhotoCalib() 

677 visit_image_nJy.photometric_scaling = field_from_legacy_photo_calib( 

678 legacy_photo_calib, bounds=self.visit_image.detector.bbox, instrumental_unit=u.electron 

679 ) 

680 compare_photo_calib_to_legacy( 

681 self, 

682 visit_image_nJy.photometric_scaling, 

683 self.legacy_exposure.getPhotoCalib(), 

684 applied_legacy_photo_calib=legacy_photo_calib, 

685 subimage_bbox=visit_image_nJy.bbox, 

686 ) 

687 # We still can't convert to completely unrelated units. 

688 with self.assertRaises(u.UnitConversionError): 

689 visit_image_nJy.convert_unit(u.mm) 

690 # Uncalibrating via the photometric_scaling matches what legacy code 

691 # does, and by default it copies everything. 

692 with self.assertRaises(u.UnitConversionError): 

693 visit_image_nJy.convert_unit(u.electron, copy=False) 

694 legacy_masked_image_e = legacy_photo_calib.uncalibrateImage(self.legacy_exposure.maskedImage) 

695 visit_image_e = visit_image_nJy.convert_unit(u.electron) 

696 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array) 

697 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array) 

698 self.assertFalse(np.may_share_memory(visit_image_e.mask.array, visit_image_nJy.mask.array)) 

699 # We can also uncalibrate if we start with an image that has units 

700 # that are compatible with the photometric_scaling but not identical 

701 # to it. 

702 visit_image_mJy.photometric_scaling = visit_image_nJy.photometric_scaling 

703 visit_image_e = visit_image_mJy.convert_unit(u.electron) 

704 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array) 

705 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array) 

706 # We can re-apply the scaling go go back to calibrated units. 

707 visit_image_nJy_2 = visit_image_e.convert_unit(u.nJy) 

708 assert_close(self, visit_image_nJy_2.image.array, visit_image_nJy.image.array) 

709 assert_close(self, visit_image_nJy_2.variance.array, self.visit_image.variance.array) 

710 

711 

712@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

713class PreliminaryVisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin): 

714 """Tests for the VisitImage class using a DRP preliminary_visit_image 

715 dataset. 

716 

717 Requires legacy code. 

718 """ 

719 

720 @classmethod 

721 def setUpClass(cls) -> None: 

722 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

723 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "preliminary_visit_image.fits") 

724 try: 

725 from lsst.afw.image import ExposureFitsReader 

726 

727 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

728 except ImportError: 

729 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

730 cls.plane_map = get_legacy_visit_image_mask_planes() 

731 cls.visit_image = VisitImage.read_legacy( 

732 cls.filename, preserve_quantization=True, plane_map=cls.plane_map 

733 ) 

734 cls.unit = u.electron 

735 

736 

737if __name__ == "__main__": 

738 unittest.main()